Compare commits

..

79 Commits

Author SHA1 Message Date
Konstantin Knizhnik
9ea17556b3 Fix calculation of logical delta layer space 2023-05-19 11:03:29 +03:00
Konstantin Knizhnik
a5db639a2f More precisely calculate logical image size 2023-05-18 17:58:15 +03:00
Konstantin Knizhnik
64aa8e5c6b Fi unit tests 2023-05-18 00:07:44 +03:00
Konstantin Knizhnik
6a1f2c8b71 Use last_record_lsn to collect keyspace in GC 2023-05-17 22:57:09 +03:00
Konstantin Knizhnik
a10ba532dd Reduce write amplification for wanted image layers 2023-05-17 14:19:56 +03:00
Konstantin Knizhnik
f783596825 Add missed assert to keyspace_add_range test 2023-05-16 21:22:21 +03:00
Konstantin Knizhnik
4b2b175db8 Replace env.poostgres with env.endpoints 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
3902868e68 Replace env.postgres with env.endpoint 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
1ffca3eadb Undo merge problem 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
b499ade206 Remove contains method from LayerMap API 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
0cd01e33c3 Add LayerMap::contains method 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
95dd5c71bf Prohibit insertion fo dulicated layers in layer map 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
c2731e17a9 Prevent duplicated layer in LayerMap 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
a7cf926aeb Reduce live time of wanted image layer lock 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
ffdf7df2ea Update pageserver/src/tenant/timeline.rs
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
74ab232afb Update pageserver/src/tenant/timeline.rs
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
2cf02b381c Update pageserver/src/keyspace.rs
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
6d687c198b Fix pythin style warnings 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
755166e275 Simplified version of test_gc_feedback 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
7d4ebf8485 Update pageserver/src/keyspace.rs
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
9b9b125d13 Make clippy happy 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
4d76c2916e Apply black to test_gc_feedback 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
199771371c Make ruff happy 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
43187715d6 Add KeySpaceRandomAccum 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
1e63fc99db Move test_gc_feedback test to performance 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
7b58f82f7b Move test_gc_feedback test to performance 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
2af45505b8 test_runner/performance/test_gc_feedback.py 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
3c5b99b4b9 Rename test_gc_feedback test 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
8b05a87f75 Add test that no redundant image are generatd if them are wanted by GC 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
3bc4a7c1e2 Add test that no redundant image are generatd if them are wanted by GC 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
3d0a51567f Fix KeySpace.add_range 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
7e6dbc32d1 Update pageserver/src/tenant/timeline.rs
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
f12f6b0275 Fix pythin style 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
0fbd85f64b Fix python style violations in test_gc_old_layers.py 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
af75d59b4c Update pageserver/src/tenant/timeline.rs
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
4618739cb3 Update test_runner/regress/test_gc_old_layers.py
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
f838a11514 Update pageserver/src/keyspace.rs
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
f0fe03ea80 Make clippy happy 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
be22be7b24 Make clippy happy 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
451479305e Use KeySpace for passing infirmation about wanted image layers from GC to copaction task 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
5e690307fb Avoid redundant generation of wanted image layers if such layer already exists beyond GC cutoff horizon 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
3e6288d7d8 Update pageserver/src/tenant/timeline.rs
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
2d015a1464 Update pageserver/src/tenant/timeline.rs
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
0785d92577 Add isort happy 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
fcb9bac847 Revert changes in key space partitioning 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
7a3d6531b8 Revert "fix KeySpace initialization in bench_layer_map.rs"
This reverts commit 63b1fcb813ca5f40a2b1328d4cb6e21646fba69f.
2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
3275305a30 Revert "Split keyspace in partitions without holes"
This reverts commit 02c0e9082f804ccf201fe1cf07eb167b697ea9a3.
2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
0deca452bf Add comments 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
98de2a6d93 Update test_runner/regress/test_gc_old_layers.py
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
88257b91d7 Update test_runner/regress/test_gc_old_layers.py
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
e8066631a6 Update pageserver/src/tenant/timeline.rs
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
e069c409ef Update pageserver/src/tenant/timeline.rs
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
7f81d57d52 Update pageserver/src/tenant/timeline.rs
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
787c4a8bbb Update pageserver/src/tenant/timeline.rs
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
6ec9922184 Make clippy happy 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
9b241f29cd Remove sleep at the end of test_gc_old_layers.py 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
9b418a71ac fix KeySpace initialization in bench_layer_map.rs 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
1bb8ca0806 Split keyspace in partitions without holes 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
a1c8e74fb9 Add test for GC of stairs layers 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
f9999c84d9 Rebase with main 2023-05-16 21:18:20 +03:00
Konstantin Knizhnik
c01c31d045 Add comment exlaining wanted_image_layers 2023-05-16 21:18:19 +03:00
Konstantin Knizhnik
4da24ba34f Pass set of wanted image layers from GC to compaction 2023-05-16 21:18:19 +03:00
Alexander Bayandin
a5615bd8ea Fix Allure reports for different benchmark jobs (#4229)
- Fix Allure report generation failure for Nightly Benchmarks
- Fix GitHub Autocomment for `run-benchmarks` label
(`build_and_test.yml::benchmarks` job)
2023-05-15 13:04:03 +01:00
Joonas Koivunen
4a76f2b8d6 upload new timeline index part json before 201 or on retry (#4204)
Await for upload to complete before returning 201 Created on
`branch_timeline` or when `bootstrap_timeline` happens. Should either of
those waits fail, then on the retried request await for uploads again.
This should work as expected assuming control-plane does not start to
use timeline creation as a wait_for_upload mechanism.

Fixes #3865, started from
https://github.com/neondatabase/neon/pull/3857/files#r1144468177

Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2023-05-15 14:16:43 +03:00
Shany Pozin
9cd6f2ceeb Remove duplicated logic in creating TenantConfOpt (#4230)
## Describe your changes

Remove duplicated logic in creating TenantConfOpt in both TryFrom of
TenantConfigRequest and TenantCreateRequest
2023-05-15 10:08:44 +03:00
Heikki Linnakangas
2855c73990 Fix race condition after attaching tenant with branches. (#4170)
After tenant attach, there is a window where the child timeline is
loaded and accepts GetPage requests, but its parent is not. If a
GetPage request needs to traverse to the parent, it needs to wait for
the parent timeline to become active, or it might miss some records on
the parent timeline.

It's also possible that the parent timeline is active, but it hasn't
yet received all the WAL up to the branch point from the safekeeper.
This happens if a pageserver crashes soon after creating a timeline,
so that the WAL leading to the branch point has not yet been uploaded
to remote storage. After restart, the WAL will be re-streamed and
ingested from the safekeeper, but that takes a while. Because of that,
it's not enough to check that the parent timeline is active, we also
need to wait for the WAL to arrive on the parent timeline, just like
at the beginning of GetPage handling. We probably should change the
behavior at create_timeline so that a timeline can only be created
after all the WAL up to the branch point has been uploaded to remote
storage, but that's not currently the case and out of scope for this
PR (see github issue #4218).

@NanoBjorn encountered this while working on tenant migration. After
migrating a tenant with a parent and child branch, connecting to the
child branch failed with an error like:

```
FATAL:  "base/16385" is not a valid data directory
DETAIL:  File "base/16385/PG_VERSION" is missing.
```

This commit adds two tests that reproduce the bug, with slightly
different symptoms.
2023-05-13 10:44:11 +03:00
Christian Schwarz
edcf4d61a4 distinguish imitated from real size::gather_input calls in metrics (#4224)
Before this PR, the gather_inputs() calls made to imitate synthetic size
calculation accesses were accounted towards the real logical size
calculation metric.

This PR forces all callers to declare the cause for making logical size
calculations, making the decision which cause counts towards which
metric explicit.

This is follow-up to

```
commit 1d266a6365
Author: Christian Schwarz <christian@neon.tech>
Date:   Thu May 11 16:09:29 2023 +0200

    logical size calculation metrics: differentiate regular vs imitated (#4197)
```

After merging this patch, I hope to be able to explain why we have ca
30x more "logical size" ops in prod than "imitate logical size" for any
given observation interval.

refs https://github.com/neondatabase/neon/issues/4154
2023-05-12 17:57:33 +00:00
Christian Schwarz
a2a9c598be add counter metric that increases whenever a background loop overruns its period (#4223)
We already have the warn!() log line for this condition. This PR adds a
corresponding metric on which we can have a dedicated alert. Cheaper and
more reliable than alerting on the logs, because, we run into log rate
limits from time to time these days.

refs https://github.com/neondatabase/neon/issues/4222
2023-05-12 19:00:06 +03:00
Alexander Bayandin
bb06d281ea Run regressions tests on both Postgres 14 and 15 (#4192)
This PR adds tests runs on Postgres 15 and created unified Allure report
with results for all tests.

- Split `.github/actions/allure-report` into
`.github/actions/allure-report-store` and
`.github/actions/allure-report-generate`
- Add debug or release pytest parameter for all tests (depending on
`BUILD_TYPE` env variable)
- Add Postgres version as a pytest parameter for all tests (depending on
`DEFAULT_PG_VERSION` env variable)
- Fix `test_wal_restore` and `restore_from_wal.sh` to support path with
`[`/`]` in it (fixed by applying spellcheck to the script and fixing all
warnings), `restore_from_wal_archive.sh` is deleted as unused.
- All known failures on Postgres 15 marked with xfail
2023-05-12 15:28:51 +01:00
Christian Schwarz
5869234290 logical size calculation: spawn with in_current_span (#4196)
While investigating https://github.com/neondatabase/neon/issues/4154 I
found that the `Calculating logical size for timeline` tracing events
created from within the logical size computation code are not always
attributable to the background task that caused it.

My goal is to be able to distinguish in the logs whether a `Calculating
logical size for timeline` was logged as part of a real synthetic size
calculation VS an imitation by the eviction task.

I want this distinction so I can prove my assumption that the disk IO
peaks which we see every 24h on prod are due to eviction's imitate
synthetic size calculations.

The alternative here, which I would have preferred, but is more work:
link RequestContext's into a child->parent list and dump this list when
we log `Calculating logical size for timeline`.

I would have preferred that over what we have in this PR because,
technically, the ondemand logical size computation can outlive the
caller that spawned it. This is against the idea of correctly nested
spans.

I guess in OpenTelemetry land, the correct modelling would be a link
between the caller's span and the task_mgr task's span.

Anyways, I think the case where we hang up on the spawned ondemand
logical size calculation is quite rare. So, I'm willing to tolerate
incorrectly nested spans for these edge-cases.

refs https://github.com/neondatabase/neon/issues/4154
2023-05-12 15:36:30 +02:00
Rahul Modpur
ecfe4757d3 fix bogus at character context in log messages
Signed-off-by: Rahul Modpur <rmodpur2@gmail.com>
2023-05-11 23:31:42 +01:00
Christian Schwarz
845e296562 eviction: add global histogram for iteration durations (#4212)
I would like to know whether and by how much the eviction iterations
spike in the $period-sized window that happens every $threshold , when
all the timelines do the imitate accesses.

refs https://github.com/neondatabase/neon/issues/4154
2023-05-11 18:02:19 +03:00
Heikki Linnakangas
1988cc5527 Fix failpoint_sleep_millis_async without use std::time::Duration (#4195)
I tried to use failpoint_sleep_millis_async(...) in a source file that
didn't do `use std::time::Duration`, and got a compiler error:

```
error[E0433]: failed to resolve: use of undeclared type `Duration`
   --> pageserver/src/walingest.rs:316:17
    |
316 |                 utils::failpoint_sleep_millis_async!("wal-ingest-logical-message-sleep");
    |                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ not found in this scope
    |
    = note: this error originates in the macro `utils::failpoint_sleep_millis_async` (in Nightly builds, run with -Z macro-backtrace for more info)
help: consider importing one of these items
    |
24  | use chrono::Duration;
    |
24  | use core::time::Duration;
    |
24  | use humantime::Duration;
    |
24  | use serde_with::__private__::Duration;
    |
      and 2 other candidates
```
2023-05-11 17:53:42 +03:00
Christian Schwarz
1d266a6365 logical size calculation metrics: differentiate regular vs imitated (#4197)
I want this distinction so I can prove my assumption that the disk IO
peaks which we see every 24h on prod are due to eviction's imitate
synthetic size calculations.

refs https://github.com/neondatabase/neon/issues/4154
2023-05-11 17:09:29 +03:00
Christian Schwarz
80522a1b9d replace has_in_progress_downloads with new attachment_status field (#4168)
Control Plane currently [^1] polls for `has_in_progress_downloads ==
false` after /attach to determine that an attach operation succeeded.

As pointed out in the OpenAPI spec as of neon#4151, polling for
`has_in_progress_downloads` is incorrect.

This patch changes the situation by
- removing `has_in_progress_downloads`
- adding a new field `attachment_status.`
- changing instructions for `/attach` to poll for `attachment_status ==
attached`.

This makes the instructions in `/attach` actionable for Control Plane.
NB that we don't expose the TenantState in the OpenAPI docs, even though
we expose it in the endpoint. That is with good reason because we don't
want to commit to a fixed set of tenant states forever. Hence, the
separate `attachment_status` field that exposes the bare minimum
required to make /attach + subsequent polling 100% safe wrt split brain.

It would have been nice to report failures explicitly, but the problem
is that we lose that state when we restart. So, we return `attached`
upon attach failure. The tenant is Broken in that case, causing Control
Plane's subsequent health check will fail. Control Plane can roll back
the relocation operation then.
NB: the reliance on the subsequent health check is no change to what we
had before this patch!
NB: we can always add additional TenantAttachmentStatus'es in the future
to communicate failure.

This PR also moves the attach-marker file's creation to the API
handler's synchronous part. That was done to avoid the need to
distinguish
* `Attaching but marker not yet written => AttachmentStatus::Maybe` from
* `Attaching, marker written, but attach failed for other reason =>
AttachmentStatus::Attached`

Coincidentally, this also adds more transactionality to the /attach API
because we only return 202 once we've written the marker file. But, in
the end, it doesn't affect how the control plane interacts with us or
how it needs to do retries. So, we don't mention any of this in the API
docs.

[^1]: The one-click tenant relocation PR cloud#4740, currently WIP, is
      the first real user.
2023-05-11 16:53:46 +03:00
Joonas Koivunen
ecced13d90 try: higher page_service timeouts to isolate an issue (#4206)
See #4205.
2023-05-11 16:14:42 +03:00
Alexander Bayandin
59510f6449 scripts/flaky_tests.py: use retriesStatusChange from Allure 2023-05-10 16:59:03 +01:00
Alexander Bayandin
7fc778d251 GitHub Autocomment: fix flaky test notifications 2023-05-10 16:59:03 +01:00
Alexander Bayandin
1d490b2311 Make benchmark_fixture less noisy 2023-05-10 16:59:03 +01:00
46 changed files with 2023 additions and 1655 deletions

View File

@@ -0,0 +1,184 @@
name: 'Create Allure report'
description: 'Generate Allure report from uploaded by actions/allure-report-store tests results'
outputs:
report-url:
description: 'Allure report URL'
value: ${{ steps.generate-report.outputs.report-url }}
report-json-url:
description: 'Allure report JSON URL'
value: ${{ steps.generate-report.outputs.report-json-url }}
runs:
using: "composite"
steps:
# We're using some of env variables quite offen, so let's set them once.
#
# It would be nice to have them set in common runs.env[0] section, but it doesn't work[1]
#
# - [0] https://docs.github.com/en/actions/creating-actions/metadata-syntax-for-github-actions#runsenv
# - [1] https://github.com/neondatabase/neon/pull/3907#discussion_r1154703456
#
- name: Set variables
shell: bash -euxo pipefail {0}
run: |
PR_NUMBER=$(jq --raw-output .pull_request.number "$GITHUB_EVENT_PATH" || true)
if [ "${PR_NUMBER}" != "null" ]; then
BRANCH_OR_PR=pr-${PR_NUMBER}
elif [ "${GITHUB_REF_NAME}" = "main" ] || [ "${GITHUB_REF_NAME}" = "release" ]; then
# Shortcut for special branches
BRANCH_OR_PR=${GITHUB_REF_NAME}
else
BRANCH_OR_PR=branch-$(printf "${GITHUB_REF_NAME}" | tr -c "[:alnum:]._-" "-")
fi
LOCK_FILE=reports/${BRANCH_OR_PR}/lock.txt
WORKDIR=/tmp/${BRANCH_OR_PR}-$(date +%s)
mkdir -p ${WORKDIR}
echo "BRANCH_OR_PR=${BRANCH_OR_PR}" >> $GITHUB_ENV
echo "LOCK_FILE=${LOCK_FILE}" >> $GITHUB_ENV
echo "WORKDIR=${WORKDIR}" >> $GITHUB_ENV
echo "BUCKET=${BUCKET}" >> $GITHUB_ENV
env:
BUCKET: neon-github-public-dev
# TODO: We can replace with a special docker image with Java and Allure pre-installed
- uses: actions/setup-java@v3
with:
distribution: 'temurin'
java-version: '17'
- name: Install Allure
shell: bash -euxo pipefail {0}
run: |
if ! which allure; then
ALLURE_ZIP=allure-${ALLURE_VERSION}.zip
wget -q https://github.com/allure-framework/allure2/releases/download/${ALLURE_VERSION}/${ALLURE_ZIP}
echo "${ALLURE_ZIP_MD5} ${ALLURE_ZIP}" | md5sum -c
unzip -q ${ALLURE_ZIP}
echo "$(pwd)/allure-${ALLURE_VERSION}/bin" >> $GITHUB_PATH
rm -f ${ALLURE_ZIP}
fi
env:
ALLURE_VERSION: 2.22.0
ALLURE_ZIP_MD5: d5c9f0989b896482536956340a7d5ec9
# Potentially we could have several running build for the same key (for example, for the main branch), so we use improvised lock for this
- name: Acquire lock
shell: bash -euxo pipefail {0}
run: |
LOCK_TIMEOUT=300 # seconds
LOCK_CONTENT="${GITHUB_RUN_ID}-${GITHUB_RUN_ATTEMPT}"
echo ${LOCK_CONTENT} > ${WORKDIR}/lock.txt
# Do it up to 5 times to avoid race condition
for _ in $(seq 1 5); do
for i in $(seq 1 ${LOCK_TIMEOUT}); do
LOCK_ACQUIRED=$(aws s3api head-object --bucket neon-github-public-dev --key ${LOCK_FILE} | jq --raw-output '.LastModified' || true)
# `date --date="..."` is supported only by gnu date (i.e. it doesn't work on BSD/macOS)
if [ -z "${LOCK_ACQUIRED}" ] || [ "$(( $(date +%s) - $(date --date="${LOCK_ACQUIRED}" +%s) ))" -gt "${LOCK_TIMEOUT}" ]; then
break
fi
sleep 1
done
aws s3 mv --only-show-errors ${WORKDIR}/lock.txt "s3://${BUCKET}/${LOCK_FILE}"
# Double-check that exactly THIS run has acquired the lock
aws s3 cp --only-show-errors "s3://${BUCKET}/${LOCK_FILE}" ./lock.txt
if [ "$(cat lock.txt)" = "${LOCK_CONTENT}" ]; then
break
fi
done
- name: Generate and publish final Allure report
id: generate-report
shell: bash -euxo pipefail {0}
run: |
REPORT_PREFIX=reports/${BRANCH_OR_PR}
RAW_PREFIX=reports-raw/${BRANCH_OR_PR}/${GITHUB_RUN_ID}
# Get previously uploaded data for this run
ZSTD_NBTHREADS=0
S3_FILEPATHS=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${RAW_PREFIX}/ | jq --raw-output '.Contents[].Key')
if [ -z "$S3_FILEPATHS" ]; then
# There's no previously uploaded data for this $GITHUB_RUN_ID
exit 0
fi
for S3_FILEPATH in ${S3_FILEPATHS}; do
time aws s3 cp --only-show-errors "s3://${BUCKET}/${S3_FILEPATH}" "${WORKDIR}"
archive=${WORKDIR}/$(basename $S3_FILEPATH)
mkdir -p ${archive%.tar.zst}
time tar -xf ${archive} -C ${archive%.tar.zst}
rm -f ${archive}
done
# Get history trend
time aws s3 cp --recursive --only-show-errors "s3://${BUCKET}/${REPORT_PREFIX}/latest/history" "${WORKDIR}/latest/history" || true
# Generate report
time allure generate --clean --output ${WORKDIR}/report ${WORKDIR}/*
# Replace a logo link with a redirect to the latest version of the report
sed -i 's|<a href="." class=|<a href="https://'${BUCKET}'.s3.amazonaws.com/'${REPORT_PREFIX}'/latest/index.html?nocache='"'+Date.now()+'"'" class=|g' ${WORKDIR}/report/app.js
# Upload a history and the final report (in this particular order to not to have duplicated history in 2 places)
time aws s3 mv --recursive --only-show-errors "${WORKDIR}/report/history" "s3://${BUCKET}/${REPORT_PREFIX}/latest/history"
time aws s3 mv --recursive --only-show-errors "${WORKDIR}/report" "s3://${BUCKET}/${REPORT_PREFIX}/${GITHUB_RUN_ID}"
REPORT_URL=https://${BUCKET}.s3.amazonaws.com/${REPORT_PREFIX}/${GITHUB_RUN_ID}/index.html
# Generate redirect
cat <<EOF > ${WORKDIR}/index.html
<!DOCTYPE html>
<meta charset="utf-8">
<title>Redirecting to ${REPORT_URL}</title>
<meta http-equiv="refresh" content="0; URL=${REPORT_URL}">
EOF
time aws s3 cp --only-show-errors ${WORKDIR}/index.html "s3://${BUCKET}/${REPORT_PREFIX}/latest/index.html"
echo "report-url=${REPORT_URL}" >> $GITHUB_OUTPUT
echo "report-json-url=${REPORT_URL%/index.html}/data/suites.json" >> $GITHUB_OUTPUT
- name: Release lock
if: always()
shell: bash -euxo pipefail {0}
run: |
aws s3 cp --only-show-errors "s3://${BUCKET}/${LOCK_FILE}" ./lock.txt || exit 0
if [ "$(cat lock.txt)" = "${GITHUB_RUN_ID}-${GITHUB_RUN_ATTEMPT}" ]; then
aws s3 rm "s3://${BUCKET}/${LOCK_FILE}"
fi
- name: Cleanup
if: always()
shell: bash -euxo pipefail {0}
run: |
if [ -d "${WORKDIR}" ]; then
rm -rf ${WORKDIR}
fi
- uses: actions/github-script@v6
if: always()
env:
REPORT_URL: ${{ steps.generate-report.outputs.report-url }}
COMMIT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
with:
script: |
const { REPORT_URL, COMMIT_SHA } = process.env
await github.rest.repos.createCommitStatus({
owner: context.repo.owner,
repo: context.repo.repo,
sha: `${COMMIT_SHA}`,
state: 'success',
target_url: `${REPORT_URL}`,
context: 'Allure report',
})

View File

@@ -0,0 +1,72 @@
name: 'Store Allure results'
description: 'Upload test results to be used by actions/allure-report-generate'
inputs:
report-dir:
description: 'directory with test results generated by tests'
required: true
unique-key:
description: 'string to distinguish different results in the same run'
required: true
runs:
using: "composite"
steps:
- name: Set variables
shell: bash -euxo pipefail {0}
run: |
PR_NUMBER=$(jq --raw-output .pull_request.number "$GITHUB_EVENT_PATH" || true)
if [ "${PR_NUMBER}" != "null" ]; then
BRANCH_OR_PR=pr-${PR_NUMBER}
elif [ "${GITHUB_REF_NAME}" = "main" ] || [ "${GITHUB_REF_NAME}" = "release" ]; then
# Shortcut for special branches
BRANCH_OR_PR=${GITHUB_REF_NAME}
else
BRANCH_OR_PR=branch-$(printf "${GITHUB_REF_NAME}" | tr -c "[:alnum:]._-" "-")
fi
echo "BRANCH_OR_PR=${BRANCH_OR_PR}" >> $GITHUB_ENV
echo "REPORT_DIR=${REPORT_DIR}" >> $GITHUB_ENV
env:
REPORT_DIR: ${{ inputs.report-dir }}
- name: Upload test results
shell: bash -euxo pipefail {0}
run: |
REPORT_PREFIX=reports/${BRANCH_OR_PR}
RAW_PREFIX=reports-raw/${BRANCH_OR_PR}/${GITHUB_RUN_ID}
# Add metadata
cat <<EOF > ${REPORT_DIR}/executor.json
{
"name": "GitHub Actions",
"type": "github",
"url": "https://${BUCKET}.s3.amazonaws.com/${REPORT_PREFIX}/latest/index.html",
"buildOrder": ${GITHUB_RUN_ID},
"buildName": "GitHub Actions Run #${GITHUB_RUN_NUMBER}/${GITHUB_RUN_ATTEMPT}",
"buildUrl": "${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY}/actions/runs/${GITHUB_RUN_ID}/attempts/${GITHUB_RUN_ATTEMPT}",
"reportUrl": "https://${BUCKET}.s3.amazonaws.com/${REPORT_PREFIX}/${GITHUB_RUN_ID}/index.html",
"reportName": "Allure Report"
}
EOF
cat <<EOF > ${REPORT_DIR}/environment.properties
COMMIT_SHA=${COMMIT_SHA}
EOF
ARCHIVE="${UNIQUE_KEY}-${GITHUB_RUN_ATTEMPT}-$(date +%s).tar.zst"
ZSTD_NBTHREADS=0
time tar -C ${REPORT_DIR} -cf ${ARCHIVE} --zstd .
time aws s3 mv --only-show-errors ${ARCHIVE} "s3://${BUCKET}/${RAW_PREFIX}/${ARCHIVE}"
env:
UNIQUE_KEY: ${{ inputs.unique-key }}
COMMIT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
BUCKET: neon-github-public-dev
- name: Cleanup
if: always()
shell: bash -euxo pipefail {0}
run: |
rm -rf ${REPORT_DIR}

View File

@@ -1,254 +0,0 @@
name: 'Create Allure report'
description: 'Create and publish Allure report'
inputs:
action:
desctiption: 'generate or store'
required: true
build_type:
description: '`build_type` from run-python-test-set action'
required: true
test_selection:
description: '`test_selector` from run-python-test-set action'
required: false
outputs:
report-url:
description: 'Allure report URL'
value: ${{ steps.generate-report.outputs.report-url }}
report-json-url:
description: 'Allure report JSON URL'
value: ${{ steps.generate-report.outputs.report-json-url }}
runs:
using: "composite"
steps:
# We're using some of env variables quite offen, so let's set them once.
#
# It would be nice to have them set in common runs.env[0] section, but it doesn't work[1]
#
# - [0] https://docs.github.com/en/actions/creating-actions/metadata-syntax-for-github-actions#runsenv
# - [1] https://github.com/neondatabase/neon/pull/3907#discussion_r1154703456
#
- name: Set common environment variables
shell: bash -euxo pipefail {0}
run: |
echo "BUILD_TYPE=${BUILD_TYPE}" >> $GITHUB_ENV
echo "BUCKET=${BUCKET}" >> $GITHUB_ENV
echo "TEST_OUTPUT=${TEST_OUTPUT}" >> $GITHUB_ENV
env:
BUILD_TYPE: ${{ inputs.build_type }}
BUCKET: neon-github-public-dev
TEST_OUTPUT: /tmp/test_output
- name: Validate input parameters
shell: bash -euxo pipefail {0}
run: |
if [ "${{ inputs.action }}" != "store" ] && [ "${{ inputs.action }}" != "generate" ]; then
echo >&2 "Unknown inputs.action type '${{ inputs.action }}'; allowed 'generate' or 'store' only"
exit 1
fi
if [ -z "${{ inputs.test_selection }}" ] && [ "${{ inputs.action }}" == "store" ]; then
echo >&2 "inputs.test_selection must be set for 'store' action"
exit 2
fi
- name: Calculate variables
id: calculate-vars
shell: bash -euxo pipefail {0}
run: |
# TODO: for manually triggered workflows (via workflow_dispatch) we need to have a separate key
pr_number=$(jq --raw-output .pull_request.number "$GITHUB_EVENT_PATH" || true)
if [ "${pr_number}" != "null" ]; then
key=pr-${pr_number}
elif [ "${GITHUB_REF_NAME}" = "main" ]; then
# Shortcut for a special branch
key=main
elif [ "${GITHUB_REF_NAME}" = "release" ]; then
# Shortcut for a special branch
key=release
else
key=branch-$(printf "${GITHUB_REF_NAME}" | tr -c "[:alnum:]._-" "-")
fi
echo "KEY=${key}" >> $GITHUB_OUTPUT
# Sanitize test selection to remove `/` and any other special characters
# Use printf instead of echo to avoid having `\n` at the end of the string
test_selection=$(printf "${{ inputs.test_selection }}" | tr -c "[:alnum:]._-" "-" )
echo "TEST_SELECTION=${test_selection}" >> $GITHUB_OUTPUT
- uses: actions/setup-java@v3
if: ${{ inputs.action == 'generate' }}
with:
distribution: 'temurin'
java-version: '17'
- name: Install Allure
if: ${{ inputs.action == 'generate' }}
shell: bash -euxo pipefail {0}
run: |
if ! which allure; then
ALLURE_ZIP=allure-${ALLURE_VERSION}.zip
wget -q https://github.com/allure-framework/allure2/releases/download/${ALLURE_VERSION}/${ALLURE_ZIP}
echo "${ALLURE_ZIP_MD5} ${ALLURE_ZIP}" | md5sum -c
unzip -q ${ALLURE_ZIP}
echo "$(pwd)/allure-${ALLURE_VERSION}/bin" >> $GITHUB_PATH
rm -f ${ALLURE_ZIP}
fi
env:
ALLURE_VERSION: 2.21.0
ALLURE_ZIP_MD5: c8db4dd8e2a7882583d569ed2c82879c
- name: Upload Allure results
if: ${{ inputs.action == 'store' }}
env:
REPORT_PREFIX: reports/${{ steps.calculate-vars.outputs.KEY }}/${{ inputs.build_type }}
RAW_PREFIX: reports-raw/${{ steps.calculate-vars.outputs.KEY }}/${{ inputs.build_type }}
TEST_SELECTION: ${{ steps.calculate-vars.outputs.TEST_SELECTION }}
shell: bash -euxo pipefail {0}
run: |
# Add metadata
cat <<EOF > $TEST_OUTPUT/allure/results/executor.json
{
"name": "GitHub Actions",
"type": "github",
"url": "https://${BUCKET}.s3.amazonaws.com/${REPORT_PREFIX}/latest/index.html",
"buildOrder": ${GITHUB_RUN_ID},
"buildName": "GitHub Actions Run #${{ github.run_number }}/${GITHUB_RUN_ATTEMPT}",
"buildUrl": "${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY}/actions/runs/${GITHUB_RUN_ID}/attempts/${GITHUB_RUN_ATTEMPT}",
"reportUrl": "https://${BUCKET}.s3.amazonaws.com/${REPORT_PREFIX}/${GITHUB_RUN_ID}/index.html",
"reportName": "Allure Report"
}
EOF
cat <<EOF > $TEST_OUTPUT/allure/results/environment.properties
TEST_SELECTION=${{ inputs.test_selection }}
BUILD_TYPE=${BUILD_TYPE}
EOF
ARCHIVE="${GITHUB_RUN_ID}-${TEST_SELECTION}-${GITHUB_RUN_ATTEMPT}-$(date +%s).tar.zst"
ZSTD_NBTHREADS=0
tar -C ${TEST_OUTPUT}/allure/results -cf ${ARCHIVE} --zstd .
aws s3 mv --only-show-errors ${ARCHIVE} "s3://${BUCKET}/${RAW_PREFIX}/${ARCHIVE}"
# Potentially we could have several running build for the same key (for example for the main branch), so we use improvised lock for this
- name: Acquire Allure lock
if: ${{ inputs.action == 'generate' }}
shell: bash -euxo pipefail {0}
env:
LOCK_FILE: reports/${{ steps.calculate-vars.outputs.KEY }}/lock.txt
TEST_SELECTION: ${{ steps.calculate-vars.outputs.TEST_SELECTION }}
run: |
LOCK_TIMEOUT=300 # seconds
for _ in $(seq 1 5); do
for i in $(seq 1 ${LOCK_TIMEOUT}); do
LOCK_ADDED=$(aws s3api head-object --bucket neon-github-public-dev --key ${LOCK_FILE} | jq --raw-output '.LastModified' || true)
# `date --date="..."` is supported only by gnu date (i.e. it doesn't work on BSD/macOS)
if [ -z "${LOCK_ADDED}" ] || [ "$(( $(date +%s) - $(date --date="${LOCK_ADDED}" +%s) ))" -gt "${LOCK_TIMEOUT}" ]; then
break
fi
sleep 1
done
echo "${GITHUB_RUN_ID}-${GITHUB_RUN_ATTEMPT}-${TEST_SELECTION}" > lock.txt
aws s3 mv --only-show-errors lock.txt "s3://${BUCKET}/${LOCK_FILE}"
# A double-check that exactly WE have acquired the lock
aws s3 cp --only-show-errors "s3://${BUCKET}/${LOCK_FILE}" ./lock.txt
if [ "$(cat lock.txt)" = "${GITHUB_RUN_ID}-${GITHUB_RUN_ATTEMPT}-${TEST_SELECTION}" ]; then
break
fi
done
- name: Generate and publish final Allure report
if: ${{ inputs.action == 'generate' }}
id: generate-report
env:
REPORT_PREFIX: reports/${{ steps.calculate-vars.outputs.KEY }}/${{ inputs.build_type }}
RAW_PREFIX: reports-raw/${{ steps.calculate-vars.outputs.KEY }}/${{ inputs.build_type }}
shell: bash -euxo pipefail {0}
run: |
# Get previously uploaded data for this run
ZSTD_NBTHREADS=0
s3_filepaths=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${RAW_PREFIX}/${GITHUB_RUN_ID}- | jq --raw-output '.Contents[].Key')
if [ -z "$s3_filepaths" ]; then
# There's no previously uploaded data for this run
exit 0
fi
for s3_filepath in ${s3_filepaths}; do
aws s3 cp --only-show-errors "s3://${BUCKET}/${s3_filepath}" "${TEST_OUTPUT}/allure/"
archive=${TEST_OUTPUT}/allure/$(basename $s3_filepath)
mkdir -p ${archive%.tar.zst}
tar -xf ${archive} -C ${archive%.tar.zst}
rm -f ${archive}
done
# Get history trend
aws s3 cp --recursive --only-show-errors "s3://${BUCKET}/${REPORT_PREFIX}/latest/history" "${TEST_OUTPUT}/allure/latest/history" || true
# Generate report
allure generate --clean --output $TEST_OUTPUT/allure/report $TEST_OUTPUT/allure/*
# Replace a logo link with a redirect to the latest version of the report
sed -i 's|<a href="." class=|<a href="https://'${BUCKET}'.s3.amazonaws.com/'${REPORT_PREFIX}'/latest/index.html" class=|g' $TEST_OUTPUT/allure/report/app.js
# Upload a history and the final report (in this particular order to not to have duplicated history in 2 places)
aws s3 mv --recursive --only-show-errors "${TEST_OUTPUT}/allure/report/history" "s3://${BUCKET}/${REPORT_PREFIX}/latest/history"
aws s3 mv --recursive --only-show-errors "${TEST_OUTPUT}/allure/report" "s3://${BUCKET}/${REPORT_PREFIX}/${GITHUB_RUN_ID}"
REPORT_URL=https://${BUCKET}.s3.amazonaws.com/${REPORT_PREFIX}/${GITHUB_RUN_ID}/index.html
# Generate redirect
cat <<EOF > ${TEST_OUTPUT}/allure/index.html
<!DOCTYPE html>
<meta charset="utf-8">
<title>Redirecting to ${REPORT_URL}</title>
<meta http-equiv="refresh" content="0; URL=${REPORT_URL}">
EOF
aws s3 cp --only-show-errors ${TEST_OUTPUT}/allure/index.html "s3://${BUCKET}/${REPORT_PREFIX}/latest/index.html"
echo "[Allure Report](${REPORT_URL})" >> ${GITHUB_STEP_SUMMARY}
echo "report-url=${REPORT_URL}" >> $GITHUB_OUTPUT
echo "report-json-url=${REPORT_URL%/index.html}/data/suites.json" >> $GITHUB_OUTPUT
- name: Release Allure lock
if: ${{ inputs.action == 'generate' && always() }}
shell: bash -euxo pipefail {0}
env:
LOCK_FILE: reports/${{ steps.calculate-vars.outputs.KEY }}/lock.txt
TEST_SELECTION: ${{ steps.calculate-vars.outputs.TEST_SELECTION }}
run: |
aws s3 cp --only-show-errors "s3://${BUCKET}/${LOCK_FILE}" ./lock.txt || exit 0
if [ "$(cat lock.txt)" = "${GITHUB_RUN_ID}-${GITHUB_RUN_ATTEMPT}-${TEST_SELECTION}" ]; then
aws s3 rm "s3://${BUCKET}/${LOCK_FILE}"
fi
- name: Cleanup
if: always()
shell: bash -euxo pipefail {0}
run: |
rm -rf ${TEST_OUTPUT}/allure
- uses: actions/github-script@v6
if: ${{ inputs.action == 'generate' && always() }}
env:
REPORT_URL: ${{ steps.generate-report.outputs.report-url }}
SHA: ${{ github.event.pull_request.head.sha || github.sha }}
with:
script: |
const { REPORT_URL, BUILD_TYPE, SHA } = process.env
await github.rest.repos.createCommitStatus({
owner: context.repo.owner,
repo: context.repo.repo,
sha: `${SHA}`,
state: 'success',
target_url: `${REPORT_URL}`,
context: `Allure report / ${BUILD_TYPE}`,
})

View File

@@ -197,14 +197,13 @@ runs:
uses: ./.github/actions/upload
with:
name: compatibility-snapshot-${{ inputs.build_type }}-pg14-${{ github.run_id }}
# The path includes a test name (test_create_snapshot) and directory that the test creates (compatibility_snapshot_pg14), keep the path in sync with the test
path: /tmp/test_output/test_create_snapshot/compatibility_snapshot_pg14/
# Directory is created by test_compatibility.py::test_create_snapshot, keep the path in sync with the test
path: /tmp/test_output/compatibility_snapshot_pg14/
prefix: latest
- name: Create Allure report
- name: Upload test results
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report
uses: ./.github/actions/allure-report-store
with:
action: store
build_type: ${{ inputs.build_type }}
test_selection: ${{ inputs.test_selection }}
report-dir: /tmp/test_output/allure/results
unique-key: ${{ inputs.build_type }}

View File

@@ -93,10 +93,7 @@ jobs:
- name: Create Allure report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report
with:
action: generate
build_type: ${{ env.BUILD_TYPE }}
uses: ./.github/actions/allure-report-generate
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
@@ -283,10 +280,7 @@ jobs:
- name: Create Allure report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report
with:
action: generate
build_type: ${{ env.BUILD_TYPE }}
uses: ./.github/actions/allure-report-generate
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
@@ -380,10 +374,7 @@ jobs:
- name: Create Allure report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report
with:
action: generate
build_type: ${{ env.BUILD_TYPE }}
uses: ./.github/actions/allure-report-generate
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
@@ -476,10 +467,7 @@ jobs:
- name: Create Allure report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report
with:
action: generate
build_type: ${{ env.BUILD_TYPE }}
uses: ./.github/actions/allure-report-generate
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
@@ -566,16 +554,13 @@ jobs:
- name: Create Allure report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report
with:
action: generate
build_type: ${{ env.BUILD_TYPE }}
uses: ./.github/actions/allure-report-generate
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
uses: slackapi/slack-github-action@v1
with:
channel-id: "C033QLM5P7D" # dev-staging-stream
slack-message: "Periodic TPC-H perf testing ${{ matrix.platform }}: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
slack-message: "Periodic User example perf testing ${{ matrix.platform }}: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}

View File

@@ -330,6 +330,7 @@ jobs:
fail-fast: false
matrix:
build_type: [ debug, release ]
pg_version: [ v14, v15 ]
steps:
- name: Checkout
uses: actions/checkout@v3
@@ -350,11 +351,12 @@ jobs:
real_s3_secret_access_key: "${{ secrets.AWS_SECRET_ACCESS_KEY_CI_TESTS_S3 }}"
rerun_flaky: true
env:
DEFAULT_PG_VERSION: ${{ matrix.pg_version }}
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR }}
CHECK_ONDISK_DATA_COMPATIBILITY: nonempty
- name: Merge and upload coverage data
if: matrix.build_type == 'debug'
if: matrix.build_type == 'debug' && matrix.pg_version == 'v14'
uses: ./.github/actions/save-coverage-data
benchmarks:
@@ -399,21 +401,10 @@ jobs:
steps:
- uses: actions/checkout@v3
- name: Create Allure report (debug)
- name: Create Allure report
if: ${{ !cancelled() }}
id: create-allure-report-debug
uses: ./.github/actions/allure-report
with:
action: generate
build_type: debug
- name: Create Allure report (release)
if: ${{ !cancelled() }}
id: create-allure-report-release
uses: ./.github/actions/allure-report
with:
action: generate
build_type: release
id: create-allure-report
uses: ./.github/actions/allure-report-generate
- uses: actions/github-script@v6
if: >
@@ -423,52 +414,37 @@ jobs:
# Retry script for 5XX server errors: https://github.com/actions/github-script#retries
retries: 5
script: |
const reports = [{
buildType: "debug",
reportUrl: "${{ steps.create-allure-report-debug.outputs.report-url }}",
jsonUrl: "${{ steps.create-allure-report-debug.outputs.report-json-url }}",
}, {
buildType: "release",
reportUrl: "${{ steps.create-allure-report-release.outputs.report-url }}",
jsonUrl: "${{ steps.create-allure-report-release.outputs.report-json-url }}",
}]
const report = {
reportUrl: "${{ steps.create-allure-report.outputs.report-url }}",
reportJsonUrl: "${{ steps.create-allure-report.outputs.report-json-url }}",
}
const script = require("./scripts/pr-comment-test-report.js")
await script({
github,
context,
fetch,
reports,
report,
})
- name: Store Allure test stat in the DB
if: >
!cancelled() && (
steps.create-allure-report-debug.outputs.report-url ||
steps.create-allure-report-release.outputs.report-url
)
if: ${{ !cancelled() && steps.create-allure-report.outputs.report-json-url }}
env:
SHA: ${{ github.event.pull_request.head.sha || github.sha }}
REPORT_JSON_URL_DEBUG: ${{ steps.create-allure-report-debug.outputs.report-json-url }}
REPORT_JSON_URL_RELEASE: ${{ steps.create-allure-report-release.outputs.report-json-url }}
COMMIT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
REPORT_JSON_URL: ${{ steps.create-allure-report.outputs.report-json-url }}
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR }}
run: |
./scripts/pysync
for report_url in $REPORT_JSON_URL_DEBUG $REPORT_JSON_URL_RELEASE; do
if [ -z "$report_url" ]; then
continue
fi
curl --fail --output suites.json "${REPORT_JSON_URL}"
export BUILD_TYPE=unified
export DATABASE_URL="$TEST_RESULT_CONNSTR"
if [[ "$report_url" == "$REPORT_JSON_URL_DEBUG" ]]; then
BUILD_TYPE=debug
else
BUILD_TYPE=release
fi
curl --fail --output suites.json "${report_url}"
DATABASE_URL="$TEST_RESULT_CONNSTR" poetry run python3 scripts/ingest_regress_test_result.py --revision ${SHA} --reference ${GITHUB_REF} --build-type ${BUILD_TYPE} --ingest suites.json
done
poetry run python3 scripts/ingest_regress_test_result.py \
--revision ${COMMIT_SHA} \
--reference ${GITHUB_REF} \
--build-type ${BUILD_TYPE} \
--ingest suites.json
coverage-report:
runs-on: [ self-hosted, gen3, small ]

View File

@@ -48,13 +48,33 @@ pub enum TenantState {
}
impl TenantState {
pub fn has_in_progress_downloads(&self) -> bool {
pub fn attachment_status(&self) -> TenantAttachmentStatus {
use TenantAttachmentStatus::*;
match self {
Self::Loading => true,
Self::Attaching => true,
Self::Active => false,
Self::Stopping => false,
Self::Broken { .. } => false,
// The attach procedure writes the marker file before adding the Attaching tenant to the tenants map.
// So, technically, we can return Attached here.
// However, as soon as Console observes Attached, it will proceed with the Postgres-level health check.
// But, our attach task might still be fetching the remote timelines, etc.
// So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
Self::Attaching => Maybe,
// tenant mgr startup distinguishes attaching from loading via marker file.
// If it's loading, there is no attach marker file, i.e., attach had finished in the past.
Self::Loading => Attached,
// We only reach Active after successful load / attach.
// So, call atttachment status Attached.
Self::Active => Attached,
// If the (initial or resumed) attach procedure fails, the tenant becomes Broken.
// However, it also becomes Broken if the regular load fails.
// We would need a separate TenantState variant to distinguish these cases.
// However, there's no practical difference from Console's perspective.
// It will run a Postgres-level health check as soon as it observes Attached.
// That will fail on Broken tenants.
// Console can then rollback the attach, or, wait for operator to fix the Broken tenant.
Self::Broken { .. } => Attached,
// Why is Stopping a Maybe case? Because, during pageserver shutdown,
// we set the Stopping state irrespective of whether the tenant
// has finished attaching or not.
Self::Stopping => Maybe,
}
}
@@ -209,16 +229,25 @@ impl TenantConfigRequest {
}
}
/// See [`TenantState::attachment_status`] and the OpenAPI docs for context.
#[derive(Serialize, Deserialize, Clone)]
#[serde(rename_all = "snake_case")]
pub enum TenantAttachmentStatus {
Maybe,
Attached,
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone)]
pub struct TenantInfo {
#[serde_as(as = "DisplayFromStr")]
pub id: TenantId,
// NB: intentionally not part of OpenAPI, we don't want to commit to a specific set of TenantState's
pub state: TenantState,
/// Sum of the size of all layer files.
/// If a layer is present in both local FS and S3, it counts only once.
pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
pub has_in_progress_downloads: Option<bool>,
pub attachment_status: TenantAttachmentStatus,
}
/// This represents the output of the "timeline_detail" and "timeline_list" API calls.
@@ -691,7 +720,7 @@ mod tests {
id: TenantId::generate(),
state: TenantState::Active,
current_physical_size: Some(42),
has_in_progress_downloads: Some(false),
attachment_status: TenantAttachmentStatus::Attached,
};
let expected_active = json!({
"id": original_active.id.to_string(),
@@ -699,7 +728,7 @@ mod tests {
"slug": "Active",
},
"current_physical_size": 42,
"has_in_progress_downloads": false,
"attachment_status": "attached",
});
let original_broken = TenantInfo {
@@ -709,7 +738,7 @@ mod tests {
backtrace: "backtrace info".into(),
},
current_physical_size: Some(42),
has_in_progress_downloads: Some(false),
attachment_status: TenantAttachmentStatus::Attached,
};
let expected_broken = json!({
"id": original_broken.id.to_string(),
@@ -721,7 +750,7 @@ mod tests {
}
},
"current_physical_size": 42,
"has_in_progress_downloads": false,
"attachment_status": "attached",
});
assert_eq!(

View File

@@ -146,6 +146,10 @@ pub const XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;
pub const XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED: u8 = (1 << 1) as u8;
pub const XLH_DELETE_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;
// From replication/message.h
pub const XLOG_LOGICAL_MESSAGE: u8 = 0x00;
// From rmgrlist.h
pub const RM_XLOG_ID: u8 = 0;
pub const RM_XACT_ID: u8 = 1;
pub const RM_SMGR_ID: u8 = 2;
@@ -157,6 +161,7 @@ pub const RM_RELMAP_ID: u8 = 7;
pub const RM_STANDBY_ID: u8 = 8;
pub const RM_HEAP2_ID: u8 = 9;
pub const RM_HEAP_ID: u8 = 10;
pub const RM_LOGICALMSG_ID: u8 = 21;
// from xlogreader.h
pub const XLR_INFO_MASK: u8 = 0x0F;

View File

@@ -1,21 +1,21 @@
#!/bin/bash
set -euxo pipefail
PG_BIN=$1
WAL_PATH=$2
DATA_DIR=$3
PORT=$4
SYSID=`od -A n -j 24 -N 8 -t d8 $WAL_PATH/000000010000000000000002* | cut -c 3-`
rm -fr $DATA_DIR
env -i LD_LIBRARY_PATH=$PG_BIN/../lib $PG_BIN/initdb -E utf8 -U cloud_admin -D $DATA_DIR --sysid=$SYSID
echo port=$PORT >> $DATA_DIR/postgresql.conf
REDO_POS=0x`$PG_BIN/pg_controldata -D $DATA_DIR | fgrep "REDO location"| cut -c 42-`
SYSID=$(od -A n -j 24 -N 8 -t d8 "$WAL_PATH"/000000010000000000000002* | cut -c 3-)
rm -fr "$DATA_DIR"
env -i LD_LIBRARY_PATH="$PG_BIN"/../lib "$PG_BIN"/initdb -E utf8 -U cloud_admin -D "$DATA_DIR" --sysid="$SYSID"
echo port="$PORT" >> "$DATA_DIR"/postgresql.conf
REDO_POS=0x$("$PG_BIN"/pg_controldata -D "$DATA_DIR" | grep -F "REDO location"| cut -c 42-)
declare -i WAL_SIZE=$REDO_POS+114
$PG_BIN/pg_ctl -D $DATA_DIR -l logfile start
$PG_BIN/pg_ctl -D $DATA_DIR -l logfile stop -m immediate
cp $DATA_DIR/pg_wal/000000010000000000000001 .
cp $WAL_PATH/* $DATA_DIR/pg_wal/
if [ -f $DATA_DIR/pg_wal/*.partial ]
then
(cd $DATA_DIR/pg_wal ; for partial in \*.partial ; do mv $partial `basename $partial .partial` ; done)
fi
dd if=000000010000000000000001 of=$DATA_DIR/pg_wal/000000010000000000000001 bs=$WAL_SIZE count=1 conv=notrunc
"$PG_BIN"/pg_ctl -D "$DATA_DIR" -l logfile start
"$PG_BIN"/pg_ctl -D "$DATA_DIR" -l logfile stop -m immediate
cp "$DATA_DIR"/pg_wal/000000010000000000000001 .
cp "$WAL_PATH"/* "$DATA_DIR"/pg_wal/
for partial in "$DATA_DIR"/pg_wal/*.partial ; do mv "$partial" "${partial%.partial}" ; done
dd if=000000010000000000000001 of="$DATA_DIR"/pg_wal/000000010000000000000001 bs=$WAL_SIZE count=1 conv=notrunc
rm -f 000000010000000000000001

View File

@@ -1,20 +0,0 @@
PG_BIN=$1
WAL_PATH=$2
DATA_DIR=$3
PORT=$4
SYSID=`od -A n -j 24 -N 8 -t d8 $WAL_PATH/000000010000000000000002* | cut -c 3-`
rm -fr $DATA_DIR /tmp/pg_wals
mkdir /tmp/pg_wals
env -i LD_LIBRARY_PATH=$PG_BIN/../lib $PG_BIN/initdb -E utf8 -U cloud_admin -D $DATA_DIR --sysid=$SYSID
echo port=$PORT >> $DATA_DIR/postgresql.conf
REDO_POS=0x`$PG_BIN/pg_controldata -D $DATA_DIR | fgrep "REDO location"| cut -c 42-`
declare -i WAL_SIZE=$REDO_POS+114
cp $WAL_PATH/* /tmp/pg_wals
if [ -f $DATA_DIR/pg_wal/*.partial ]
then
(cd /tmp/pg_wals ; for partial in \*.partial ; do mv $partial `basename $partial .partial` ; done)
fi
dd if=$DATA_DIR/pg_wal/000000010000000000000001 of=/tmp/pg_wals/000000010000000000000001 bs=$WAL_SIZE count=1 conv=notrunc
echo > $DATA_DIR/recovery.signal
rm -f $DATA_DIR/pg_wal/*
echo "restore_command = 'cp /tmp/pg_wals/%f %p'" >> $DATA_DIR/postgresql.conf

View File

@@ -60,28 +60,43 @@ pub mod tracing_span_assert;
pub mod rate_limit;
/// Primitive for coalescing operations into a single task which will not be cancelled by for
/// example external http client closing the connection.
pub mod shared_retryable;
mod failpoint_macro_helpers {
/// use with fail::cfg("$name", "return(2000)")
#[macro_export]
macro_rules! failpoint_sleep_millis_async {
($name:literal) => {{
let should_sleep: Option<std::time::Duration> = (|| {
fail::fail_point!($name, |v: Option<_>| {
let millis = v.unwrap().parse::<u64>().unwrap();
Some(Duration::from_millis(millis))
});
None
})();
if let Some(d) = should_sleep {
tracing::info!("failpoint {:?}: sleeping for {:?}", $name, d);
tokio::time::sleep(d).await;
tracing::info!("failpoint {:?}: sleep done", $name);
}
}};
/// use with fail::cfg("$name", "return(2000)")
///
/// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the
/// specified time (in milliseconds). The main difference is that we use async
/// tokio sleep function. Another difference is that we print lines to the log,
/// which can be useful in tests to check that the failpoint was hit.
#[macro_export]
macro_rules! failpoint_sleep_millis_async {
($name:literal) => {{
// If the failpoint is used with a "return" action, set should_sleep to the
// returned value (as string). Otherwise it's set to None.
let should_sleep = (|| {
::fail::fail_point!($name, |x| x);
::std::option::Option::None
})();
// Sleep if the action was a returned value
if let ::std::option::Option::Some(duration_str) = should_sleep {
$crate::failpoint_sleep_helper($name, duration_str).await
}
}};
}
// Helper function used by the macro. (A function has nicer scoping so we
// don't need to decorate everything with "::")
pub async fn failpoint_sleep_helper(name: &'static str, duration_str: String) {
let millis = duration_str.parse::<u64>().unwrap();
let d = std::time::Duration::from_millis(millis);
tracing::info!("failpoint {:?}: sleeping for {:?}", name, d);
tokio::time::sleep(d).await;
tracing::info!("failpoint {:?}: sleep done", name);
}
}
pub use failpoint_macro_helpers::failpoint_sleep_helper;
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
///

View File

@@ -1,659 +0,0 @@
use std::future::Future;
use std::sync::Arc;
/// Container using which many request handlers can come together and join a single task to
/// completion instead of racing each other and their own cancellation.
///
/// In a picture:
///
/// ```text
/// SharedRetryable::try_restart Spawned task completes with only one concurrent attempt
/// \ /
/// request handler 1 ---->|--X
/// request handler 2 ---->|-------|
/// request handler 3 ---->|-------|
/// | |
/// v |
/// one spawned task \------>/
///
/// (X = cancelled during await)
/// ```
///
/// Implementation is cancel safe. Implementation and internal structure are hurt by the inability
/// to just spawn the task, but this is needed for pageserver usage.
///
/// Implementation exposes a fully decomposed [`SharedRetryable::try_restart`] which requires the
/// caller to do the spawning before awaiting for the result. If the caller is dropped while this
/// happens, a new attempt will be required, and all concurrent awaiters will see a
/// [`RetriedTaskPanicked`] error.
///
/// There is another "family of APIs" [`SharedRetryable::attempt_spawn`] for infallible futures. It is
/// just provided for completeness, and it does not have a fully decomposed version like
/// `try_restart`.
///
/// For `try_restart_*` family of APIs, there is a concept of two leveled results. The inner level
/// is returned by the executed future. It needs to be `Clone`. Most errors are not `Clone`, so
/// implementation advice is to log the happened error, and not propagate more than a label as the
/// "inner error" which will be used to build an outer error. The outer error will also have to be
/// convertable from [`RetriedTaskPanicked`] to absorb that case as well.
///
/// ## Example
///
/// A shared service value completes the infallible work once, even if called concurrently by
/// multiple cancellable tasks.
///
/// Example moved as a test `service_example`.
#[derive(Clone)]
pub struct SharedRetryable<V> {
inner: Arc<tokio::sync::Mutex<Option<MaybeDone<V>>>>,
}
impl<V> Default for SharedRetryable<V> {
fn default() -> Self {
Self {
inner: Arc::new(tokio::sync::Mutex::new(None)),
}
}
}
/// Determine if an error is transient or permanent.
pub trait Retryable {
fn is_permanent(&self) -> bool {
true
}
}
pub trait MakeFuture {
type Future: Future<Output = Self::Output> + Send + 'static;
type Output: Send + 'static;
fn make_future(self) -> Self::Future;
}
impl<Fun, Fut, R> MakeFuture for Fun
where
Fun: FnOnce() -> Fut,
Fut: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
type Future = Fut;
type Output = R;
fn make_future(self) -> Self::Future {
self()
}
}
/// Retried task panicked, was cancelled, or never spawned (see [`SharedRetryable::try_restart`]).
#[derive(Debug, PartialEq, Eq)]
pub struct RetriedTaskPanicked;
impl<T, E1> SharedRetryable<Result<T, E1>>
where
T: Clone + std::fmt::Debug + Send + 'static,
E1: Retryable + Clone + std::fmt::Debug + Send + 'static,
{
/// Restart a previously failed operation unless it already completed with a terminal result.
///
/// Many futures can call this function and and get the terminal result from an earlier attempt
/// or start a new attempt, or join an existing one.
///
/// Compared to `Self::try_restart`, this method also spawns the future to run, which would
/// otherwise have to be done manually.
#[cfg(test)]
pub async fn try_restart_spawn<E2>(
&self,
retry_with: impl MakeFuture<Output = Result<T, E1>>,
) -> Result<T, E2>
where
E2: From<E1> + From<RetriedTaskPanicked> + Send + 'static,
{
let (recv, maybe_fut) = self.try_restart(retry_with).await;
if let Some(fut) = maybe_fut {
// top level function, we must spawn, pageserver cannot use this
tokio::spawn(fut);
}
recv.await
}
/// Restart a previously failed operation unless it already completed with a terminal result.
///
/// Many futures can call this function and get the terminal result from an earlier attempt or
/// start a new attempt, or join an existing one.
///
/// If a task calling this method is cancelled, at worst, a new attempt which is immediatedly
/// deemed as having panicked will happen, but without a panic ever happening.
///
/// Returns one future for waiting for the result and possibly another which needs to be
/// spawned when `Some`. Spawning has to happen before waiting is started, otherwise the first
/// future will never make progress.
///
/// This complication exists because on pageserver we cannot use `tokio::spawn` directly
/// at this time.
pub async fn try_restart<E2>(
&self,
retry_with: impl MakeFuture<Output = Result<T, E1>>,
) -> (
impl Future<Output = Result<T, E2>> + Send + 'static,
Option<impl Future<Output = ()> + Send + 'static>,
)
where
E2: From<E1> + From<RetriedTaskPanicked> + Send + 'static,
{
use futures::future::Either;
match self.decide_to_retry_or_join(retry_with).await {
Ok(terminal) => (Either::Left(async move { terminal }), None),
Err((rx, maybe_fut)) => {
let recv = Self::make_oneshot_alike_receiver(rx);
(Either::Right(recv), maybe_fut)
}
}
}
/// Returns a Ok if the previous attempt had resulted in a terminal result. Err is returned
/// when an attempt can be joined and possibly needs to be spawned.
async fn decide_to_retry_or_join<E2>(
&self,
retry_with: impl MakeFuture<Output = Result<T, E1>>,
) -> Result<
Result<T, E2>,
(
tokio::sync::broadcast::Receiver<Result<T, E1>>,
Option<impl Future<Output = ()> + Send + 'static>,
),
>
where
E2: From<E1> + From<RetriedTaskPanicked>,
{
let mut g = self.inner.lock().await;
let maybe_rx = match g.as_ref() {
Some(MaybeDone::Done(Ok(t))) => return Ok(Ok(t.to_owned())),
Some(MaybeDone::Done(Err(e))) if e.is_permanent() => {
return Ok(Err(E2::from(e.to_owned())))
}
Some(MaybeDone::Pending(weak)) => {
// failure to upgrade can mean only one thing: there was an unexpected
// panic which we consider as a transient retryable error.
weak.upgrade()
}
Some(MaybeDone::Done(Err(_retryable))) => None,
None => None,
};
let (strong, maybe_fut) = match maybe_rx {
Some(strong) => (strong, None),
None => {
// new attempt
// panic safety: invoke the factory before configuring the pending value
let fut = retry_with.make_future();
let (strong, fut) = self.make_run_and_complete(fut, &mut g);
(strong, Some(fut))
}
};
// important: the Arc<Receiver> is not held after unlocking
// important: we resubscribe before lock is released to be sure to get a message which
// is sent once receiver is dropped
let rx = strong.resubscribe();
drop(strong);
Err((rx, maybe_fut))
}
/// Configure a new attempt, but leave spawning it to the caller.
///
/// Returns an `Arc<Receiver<V>>` which is valid until the attempt completes, and the future
/// which will need to run to completion outside the lifecycle of the caller.
fn make_run_and_complete(
&self,
fut: impl Future<Output = Result<T, E1>> + Send + 'static,
g: &mut tokio::sync::MutexGuard<'_, Option<MaybeDone<Result<T, E1>>>>,
) -> (
Arc<tokio::sync::broadcast::Receiver<Result<T, E1>>>,
impl Future<Output = ()> + Send + 'static,
) {
#[cfg(debug_assertions)]
match &**g {
Some(MaybeDone::Pending(weak)) => {
assert!(
weak.upgrade().is_none(),
"when starting a restart, should no longer have an upgradeable channel"
);
}
Some(MaybeDone::Done(Err(err))) => {
assert!(
!err.is_permanent(),
"when restarting, the err must be transient"
);
}
Some(MaybeDone::Done(Ok(_))) => {
panic!("unexpected restart after a completion on MaybeDone");
}
None => {}
}
self.make_run_and_complete_any(fut, g)
}
/// Oneshot alike as in it's a future which will be consumed by an `await`.
///
/// Otherwise the caller might think it's beneficial or reasonable to poll the channel multiple
/// times.
async fn make_oneshot_alike_receiver<E2>(
mut rx: tokio::sync::broadcast::Receiver<Result<T, E1>>,
) -> Result<T, E2>
where
E2: From<E1> + From<RetriedTaskPanicked>,
{
use tokio::sync::broadcast::error::RecvError;
match rx.recv().await {
Ok(Ok(t)) => Ok(t),
Ok(Err(e)) => Err(E2::from(e)),
Err(RecvError::Closed | RecvError::Lagged(_)) => {
// lagged doesn't mean anything with 1 send, but whatever, handle it the same
// this case should only ever happen if a panick happened in the `fut`.
Err(E2::from(RetriedTaskPanicked))
}
}
}
}
impl<V> SharedRetryable<V>
where
V: std::fmt::Debug + Clone + Send + 'static,
{
/// Attempt to run once a spawned future to completion.
///
/// Any previous attempt which panicked will be retried, but the `RetriedTaskPanicked` will be
/// returned when the most recent attempt panicked.
#[cfg(test)]
pub async fn attempt_spawn(
&self,
attempt_with: impl MakeFuture<Output = V>,
) -> Result<V, RetriedTaskPanicked> {
let (rx, maybe_fut) = {
let mut g = self.inner.lock().await;
let maybe_rx = match g.as_ref() {
Some(MaybeDone::Done(v)) => return Ok(v.to_owned()),
Some(MaybeDone::Pending(weak)) => {
// see comment in try_restart
weak.upgrade()
}
None => None,
};
let (strong, maybe_fut) = match maybe_rx {
Some(strong) => (strong, None),
None => {
let fut = attempt_with.make_future();
let (strong, fut) = self.make_run_and_complete_any(fut, &mut g);
(strong, Some(fut))
}
};
// see decide_to_retry_or_join for important notes
let rx = strong.resubscribe();
drop(strong);
(rx, maybe_fut)
};
if let Some(fut) = maybe_fut {
// this is a top level function, need to spawn directly
// from pageserver one wouldn't use this but more piecewise functions
tokio::spawn(fut);
}
let recv = Self::make_oneshot_alike_receiver_any(rx);
recv.await
}
/// Configure a new attempt, but leave spawning it to the caller.
///
/// Forgetting the returned future is outside of scope of any correctness guarantees; all of
/// the waiters will then be deadlocked, and the MaybeDone will forever be pending. Dropping
/// and not running the future will then require a new attempt.
///
/// Also returns an `Arc<Receiver<V>>` which is valid until the attempt completes.
fn make_run_and_complete_any(
&self,
fut: impl Future<Output = V> + Send + 'static,
g: &mut tokio::sync::MutexGuard<'_, Option<MaybeDone<V>>>,
) -> (
Arc<tokio::sync::broadcast::Receiver<V>>,
impl Future<Output = ()> + Send + 'static,
) {
let (tx, rx) = tokio::sync::broadcast::channel(1);
let strong = Arc::new(rx);
**g = Some(MaybeDone::Pending(Arc::downgrade(&strong)));
let retry = {
let strong = strong.clone();
self.clone().run_and_complete(fut, tx, strong)
};
#[cfg(debug_assertions)]
match &**g {
Some(MaybeDone::Pending(weak)) => {
let rx = weak.upgrade().expect("holding the weak and strong locally");
assert!(Arc::ptr_eq(&strong, &rx));
}
_ => unreachable!("MaybeDone::pending must be set after spawn_and_run_complete_any"),
}
(strong, retry)
}
/// Run the actual attempt, and communicate the response via both:
/// - setting the `MaybeDone::Done`
/// - the broadcast channel
async fn run_and_complete(
self,
fut: impl Future<Output = V>,
tx: tokio::sync::broadcast::Sender<V>,
strong: Arc<tokio::sync::broadcast::Receiver<V>>,
) {
let res = fut.await;
{
let mut g = self.inner.lock().await;
MaybeDone::complete(&mut *g, &strong, res.clone());
// make the weak un-upgradeable by dropping the final alive
// reference to it. it is final Arc because the Arc never escapes
// the critical section in `decide_to_retry_or_join` or `attempt_spawn`.
Arc::try_unwrap(strong).expect("expected this to be the only Arc<Receiver<V>>");
}
// now no one can get the Pending(weak) value to upgrade and they only see
// the Done(res).
//
// send the result value to listeners, if any
drop(tx.send(res));
}
#[cfg(test)]
async fn make_oneshot_alike_receiver_any(
mut rx: tokio::sync::broadcast::Receiver<V>,
) -> Result<V, RetriedTaskPanicked> {
use tokio::sync::broadcast::error::RecvError;
match rx.recv().await {
Ok(t) => Ok(t),
Err(RecvError::Closed | RecvError::Lagged(_)) => {
// lagged doesn't mean anything with 1 send, but whatever, handle it the same
// this case should only ever happen if a panick happened in the `fut`.
Err(RetriedTaskPanicked)
}
}
}
}
/// MaybeDone handles synchronization for multiple requests and the single actual task.
///
/// If request handlers witness `Pending` which they are able to upgrade, they are guaranteed a
/// useful `recv().await`, where useful means "value" or "disconnect" arrives. If upgrade fails,
/// this means that "disconnect" has happened in the past.
///
/// On successful execution the one executing task will set this to `Done` variant, with the actual
/// resulting value.
#[derive(Debug)]
pub enum MaybeDone<V> {
Pending(std::sync::Weak<tokio::sync::broadcast::Receiver<V>>),
Done(V),
}
impl<V: std::fmt::Debug> MaybeDone<V> {
fn complete(
this: &mut Option<MaybeDone<V>>,
_strong: &Arc<tokio::sync::broadcast::Receiver<V>>,
outcome: V,
) {
#[cfg(debug_assertions)]
match this {
Some(MaybeDone::Pending(weak)) => {
let same = weak
.upgrade()
// we don't yet have Receiver::same_channel
.map(|rx| Arc::ptr_eq(_strong, &rx))
.unwrap_or(false);
assert!(same, "different channel had been replaced or dropped");
}
other => panic!("unexpected MaybeDone: {other:?}"),
}
*this = Some(MaybeDone::Done(outcome));
}
}
#[cfg(test)]
mod tests {
use super::{RetriedTaskPanicked, Retryable, SharedRetryable};
use std::sync::Arc;
#[derive(Debug)]
enum OuterError {
AttemptPanicked,
Unlucky,
}
#[derive(Clone, Debug)]
enum InnerError {
Unlucky,
}
impl Retryable for InnerError {
fn is_permanent(&self) -> bool {
false
}
}
impl From<InnerError> for OuterError {
fn from(_: InnerError) -> Self {
OuterError::Unlucky
}
}
impl From<RetriedTaskPanicked> for OuterError {
fn from(_: RetriedTaskPanicked) -> Self {
OuterError::AttemptPanicked
}
}
#[tokio::test]
async fn restartable_until_permanent() {
let shr = SharedRetryable::<Result<u8, InnerError>>::default();
let res = shr
.try_restart_spawn(|| async move { panic!("really unlucky") })
.await;
assert!(matches!(res, Err(OuterError::AttemptPanicked)));
let res = shr
.try_restart_spawn(|| async move { Err(InnerError::Unlucky) })
.await;
assert!(matches!(res, Err(OuterError::Unlucky)));
let res = shr.try_restart_spawn(|| async move { Ok(42) }).await;
assert!(matches!(res, Ok::<u8, OuterError>(42)));
let res = shr
.try_restart_spawn(|| async move { panic!("rerun should clone Ok(42)") })
.await;
assert!(matches!(res, Ok::<u8, OuterError>(42)));
}
/// Demonstration of the SharedRetryable::attempt
#[tokio::test]
async fn attemptable_until_no_panic() {
let shr = SharedRetryable::<u8>::default();
let res = shr
.attempt_spawn(|| async move { panic!("should not interfere") })
.await;
assert!(matches!(res, Err(RetriedTaskPanicked)), "{res:?}");
let res = shr.attempt_spawn(|| async move { 42 }).await;
assert_eq!(res, Ok(42));
let res = shr
.attempt_spawn(|| async move { panic!("should not be called") })
.await;
assert_eq!(res, Ok(42));
}
#[tokio::test]
async fn cancelling_spawner_is_fine() {
let shr = SharedRetryable::<Result<u8, InnerError>>::default();
let (recv1, maybe_fut) = shr
.try_restart(|| async move { panic!("should not have been called") })
.await;
let should_be_spawned = maybe_fut.unwrap();
let (recv2, maybe_fut) = shr
.try_restart(|| async move {
panic!("should never be called because waiting on should_be_spawned")
})
.await;
assert!(
matches!(maybe_fut, None),
"only the first one should had created the future"
);
let mut recv1 = std::pin::pin!(recv1);
let mut recv2 = std::pin::pin!(recv2);
tokio::select! {
_ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {},
_ = &mut recv1 => unreachable!("should not have completed because should_be_spawned not spawned"),
_ = &mut recv2 => unreachable!("should not have completed because should_be_spawned not spawned"),
}
drop(should_be_spawned);
let res = recv1.await;
assert!(matches!(res, Err(OuterError::AttemptPanicked)), "{res:?}");
let res = recv2.await;
assert!(matches!(res, Err(OuterError::AttemptPanicked)), "{res:?}");
// but we can still reach a terminal state if the api is not misused or the
// should_be_spawned winner is not cancelled
let recv1 = shr.try_restart_spawn::<OuterError>(|| async move { Ok(42) });
let recv2 = shr.try_restart_spawn::<OuterError>(|| async move { Ok(43) });
assert_eq!(recv1.await.unwrap(), 42);
assert_eq!(recv2.await.unwrap(), 42, "43 should never be returned");
}
#[tokio::test]
async fn service_example() {
#[derive(Debug, Clone, Copy)]
enum OneLevelError {
TaskPanicked,
}
impl Retryable for OneLevelError {
fn is_permanent(&self) -> bool {
// for a single level errors, this wording is weird
!matches!(self, OneLevelError::TaskPanicked)
}
}
impl From<RetriedTaskPanicked> for OneLevelError {
fn from(_: RetriedTaskPanicked) -> Self {
OneLevelError::TaskPanicked
}
}
#[derive(Clone, Default)]
struct Service(SharedRetryable<Result<u8, OneLevelError>>);
impl Service {
async fn work(
&self,
completions: Arc<std::sync::atomic::AtomicUsize>,
) -> Result<u8, OneLevelError> {
self.0
.try_restart_spawn(|| async move {
// give time to cancel some of the tasks
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
completions.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Self::work_once().await
})
.await
}
async fn work_once() -> Result<u8, OneLevelError> {
Ok(42)
}
}
let svc = Service::default();
let mut js = tokio::task::JoinSet::new();
let barrier = Arc::new(tokio::sync::Barrier::new(10 + 1));
let completions = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let handles = (0..10)
.map(|_| {
js.spawn({
let svc = svc.clone();
let barrier = barrier.clone();
let completions = completions.clone();
async move {
// make sure all tasks are ready to start at the same time
barrier.wait().await;
// after successfully starting the work, any of the futures could get cancelled
svc.work(completions).await
}
})
})
.collect::<Vec<_>>();
barrier.wait().await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
handles[5].abort();
let mut cancellations = 0;
while let Some(res) = js.join_next().await {
// all complete with the same result
match res {
Ok(res) => assert_eq!(res.unwrap(), 42),
Err(je) => {
// except for the one task we cancelled; it's cancelling
// does not interfere with the result
assert!(je.is_cancelled());
cancellations += 1;
assert_eq!(cancellations, 1, "only 6th task was aborted");
// however we cannot assert that everytime we get to cancel the 6th task
}
}
}
// there will be at most one terminal completion
assert_eq!(completions.load(std::sync::atomic::Ordering::Relaxed), 1);
}
}

View File

@@ -5,7 +5,7 @@
//!
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::mgr;
use crate::tenant::{mgr, LogicalSizeCalculationCause};
use anyhow;
use chrono::Utc;
use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
@@ -164,7 +164,8 @@ pub async fn collect_metrics_iteration(
timeline_written_size,
));
match timeline.get_current_logical_size(ctx) {
let span = info_span!("collect_metrics_iteration", tenant_id = %timeline.tenant_id, timeline_id = %timeline.timeline_id);
match span.in_scope(|| timeline.get_current_logical_size(ctx)) {
// Only send timeline logical size when it is fully calculated.
Ok((size, is_exact)) if is_exact => {
current_metrics.push((
@@ -334,7 +335,9 @@ pub async fn calculate_synthetic_size_worker(
if let Ok(tenant) = mgr::get_tenant(tenant_id, true).await
{
if let Err(e) = tenant.calculate_synthetic_size(ctx).await {
if let Err(e) = tenant.calculate_synthetic_size(
LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize,
ctx).await {
error!("failed to calculate synthetic size for tenant {}: {}", tenant_id, e);
}
}

View File

@@ -346,23 +346,23 @@ paths:
starts writing to the tenant's S3 state unless it receives one of the
distinguished errors below that state otherwise.
The method to identify whether a request has arrived at the pageserver, and
whether it has succeeded, is to poll for the tenant status to reach "Active"
or "Broken" state. These values are currently not explicitly documented in
the API spec.
Polling for `has_in_progress_downloads == false` is INCORRECT because that
value can turn `false` during shutdown while the Attach operation is still
unfinished.
If a client receives a not-distinguished response, e.g., a network timeout,
it MUST retry the /attach request and poll again for the tenant's
attachment status.
After the client has received a 202, it MUST poll the tenant's
attachment status (field `attachment_status`) to reach state `attached`.
If the `attachment_status` is missing, the client MUST retry the `/attach`
request (goto previous paragraph). This is a robustness measure in case the tenant
status endpoint is buggy, but the attach operation is ongoing.
There is no way to cancel an in-flight request.
If a client receives a not-distinguished response, e.g., a network timeout,
it MUST retry the /attach request and poll again for tenant status.
In any case, it must
* NOT ASSUME that the /attach request has been lost in the network,
* NOT ASSUME that the request has been lost based on a subsequent
tenant status request returning 404. (The request may still be in flight!)
In any case, the client
* MUST NOT ASSUME that the /attach request has been lost in the network,
* MUST NOT ASSUME that the request has been lost, based on the observation
that a subsequent tenant status request returns 404. The request may
still be in flight. It must be retried.
responses:
"202":
description: Tenant attaching scheduled
@@ -888,13 +888,27 @@ components:
type: object
required:
- id
- attachment_status
properties:
id:
type: string
current_physical_size:
type: integer
has_in_progress_downloads:
type: boolean
attachment_status:
description: |
Status of this tenant's attachment to this pageserver.
- `maybe` means almost nothing, don't read anything into it
except for the fact that the pageserver _might_ be already
writing to the tenant's S3 state, so, DO NOT ATTACH the
tenant to any other pageserver, or we risk split-brain.
- `attached` means that the attach operation has completed,
maybe successfully, maybe not. Perform a health check at
the Postgres level to determine healthiness of the tenant.
See the tenant `/attach` endpoint for more information.
type: string
enum: [ "maybe", "attached" ]
TenantCreateInfo:
type: object
properties:

View File

@@ -25,7 +25,7 @@ use crate::tenant::config::TenantConfOpt;
use crate::tenant::mgr::{TenantMapInsertError, TenantStateError};
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::LayerAccessStatsReset;
use crate::tenant::{PageReconstructError, Timeline};
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
use crate::{config::PageServerConf, tenant::mgr};
use utils::{
auth::JwtAuth,
@@ -105,6 +105,9 @@ impl From<PageReconstructError> for ApiError {
PageReconstructError::Cancelled => {
ApiError::InternalServerError(anyhow::anyhow!("request was cancelled"))
}
PageReconstructError::AncestorStopping(_) => {
ApiError::InternalServerError(anyhow::Error::new(pre))
}
PageReconstructError::WalRedo(pre) => {
ApiError::InternalServerError(anyhow::Error::new(pre))
}
@@ -169,6 +172,8 @@ async fn build_timeline_info(
include_non_incremental_logical_size: bool,
ctx: &RequestContext,
) -> anyhow::Result<TimelineInfo> {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
let mut info = build_timeline_info_common(timeline, ctx)?;
if include_non_incremental_logical_size {
// XXX we should be using spawn_ondemand_logical_size_calculation here.
@@ -191,6 +196,7 @@ fn build_timeline_info_common(
timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<TimelineInfo> {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
let last_record_lsn = timeline.get_last_record_lsn();
let (wal_source_connstr, last_received_msg_lsn, last_received_msg_ts) = {
let guard = timeline.last_received_wal.lock().unwrap();
@@ -263,25 +269,28 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
let tenant = mgr::get_tenant(tenant_id, true).await?;
match tenant.create_timeline(
new_timeline_id,
request_data.ancestor_timeline_id.map(TimelineId::from),
request_data.ancestor_start_lsn,
request_data.pg_version.unwrap_or(crate::DEFAULT_PG_VERSION),
&ctx,
)
.instrument(info_span!("timeline_create", tenant = %tenant_id, new_timeline = ?request_data.new_timeline_id, timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
.await {
Ok(Some(new_timeline)) => {
// Created. Construct a TimelineInfo for it.
let timeline_info = build_timeline_info_common(&new_timeline, &ctx)
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::CREATED, timeline_info)
async {
let tenant = mgr::get_tenant(tenant_id, true).await?;
match tenant.create_timeline(
new_timeline_id,
request_data.ancestor_timeline_id.map(TimelineId::from),
request_data.ancestor_start_lsn,
request_data.pg_version.unwrap_or(crate::DEFAULT_PG_VERSION),
&ctx,
)
.await {
Ok(Some(new_timeline)) => {
// Created. Construct a TimelineInfo for it.
let timeline_info = build_timeline_info_common(&new_timeline, &ctx)
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::CREATED, timeline_info)
}
Ok(None) => json_response(StatusCode::CONFLICT, ()), // timeline already exists
Err(err) => Err(ApiError::InternalServerError(err)),
}
Ok(None) => json_response(StatusCode::CONFLICT, ()), // timeline already exists
Err(err) => Err(ApiError::InternalServerError(err)),
}
.instrument(info_span!("timeline_create", tenant = %tenant_id, new_timeline = ?request_data.new_timeline_id, timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
.await
}
async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
@@ -303,6 +312,7 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
include_non_incremental_logical_size.unwrap_or(false),
&ctx,
)
.instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
.await
.context("Failed to convert tenant timeline {timeline_id} into the local one: {e:?}")
.map_err(ApiError::InternalServerError)?;
@@ -467,7 +477,7 @@ async fn tenant_list_handler(request: Request<Body>) -> Result<Response<Body>, A
id: *id,
state: state.clone(),
current_physical_size: None,
has_in_progress_downloads: Some(state.has_in_progress_downloads()),
attachment_status: state.attachment_status(),
})
.collect::<Vec<TenantInfo>>();
@@ -492,7 +502,7 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
id: tenant_id,
state: state.clone(),
current_physical_size: Some(current_physical_size),
has_in_progress_downloads: Some(state.has_in_progress_downloads()),
attachment_status: state.attachment_status(),
})
}
.instrument(info_span!("tenant_status_handler", tenant = %tenant_id))
@@ -527,7 +537,11 @@ async fn tenant_size_handler(request: Request<Body>) -> Result<Response<Body>, A
// this can be long operation
let inputs = tenant
.gather_size_inputs(retention_period, &ctx)
.gather_size_inputs(
retention_period,
LogicalSizeCalculationCause::TenantSizeHandler,
&ctx,
)
.await
.map_err(ApiError::InternalServerError)?;

View File

@@ -5,7 +5,7 @@ use std::ops::Range;
///
/// Represents a set of Keys, in a compact form.
///
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
pub struct KeySpace {
/// Contiguous ranges of keys that belong to the key space. In key order,
/// and with no overlap.
@@ -61,6 +61,58 @@ impl KeySpace {
KeyPartitioning { parts }
}
///
/// Calculate logical size of delta layers: total size of all blocks covered by it's key range
///
pub fn get_logical_size(&self, range: &Range<Key>) -> u64 {
let mut start_key = range.start;
let n_ranges = self.ranges.len();
let start_index = match self.ranges.binary_search_by_key(&start_key, |r| r.start) {
Ok(index) => index, // keyspace range starts with start_key
Err(index) => {
if index != 0 && self.ranges[index - 1].end > start_key {
index - 1 // previous keyspace range overlaps with specified
} else if index == n_ranges {
return 0; // no intersection with specified range
} else {
start_key = self.ranges[index].start;
index
}
}
};
let mut size = 0u64;
for i in start_index..n_ranges {
if self.ranges[i].start >= range.end {
break;
}
let end_key = if self.ranges[i].end < range.end {
self.ranges[i].end
} else {
range.end
};
let n_blocks = key_range_size(&(start_key..end_key));
if n_blocks != u32::MAX {
size += n_blocks as u64 * BLCKSZ as u64;
}
if i + 1 < n_ranges {
start_key = self.ranges[i + 1].start;
}
}
size
}
///
/// Check if key space contains overlapping range
///
pub fn overlaps(&self, range: &Range<Key>) -> bool {
match self.ranges.binary_search_by_key(&range.end, |r| r.start) {
Ok(0) => false,
Err(0) => false,
Ok(index) => self.ranges[index - 1].end > range.start,
Err(index) => self.ranges[index - 1].end > range.start,
}
}
}
///
@@ -129,3 +181,226 @@ impl KeySpaceAccum {
}
}
}
///
/// A helper object, to collect a set of keys and key ranges into a KeySpace
/// object. Key ranges may be inserted in any order and can overlap.
///
#[derive(Clone, Debug, Default)]
pub struct KeySpaceRandomAccum {
ranges: Vec<Range<Key>>,
}
impl KeySpaceRandomAccum {
pub fn new() -> Self {
Self { ranges: Vec::new() }
}
pub fn add_key(&mut self, key: Key) {
self.add_range(singleton_range(key))
}
pub fn add_range(&mut self, range: Range<Key>) {
self.ranges.push(range);
}
pub fn to_keyspace(mut self) -> KeySpace {
let mut ranges = Vec::new();
if !self.ranges.is_empty() {
self.ranges.sort_by_key(|r| r.start);
let mut start = self.ranges.first().unwrap().start;
let mut end = self.ranges.first().unwrap().end;
for r in self.ranges {
assert!(r.start >= start);
if r.start > end {
ranges.push(start..end);
start = r.start;
end = r.end;
} else if r.end > end {
end = r.end;
}
}
ranges.push(start..end);
}
KeySpace { ranges }
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fmt::Write;
// Helper function to create a key range.
//
// Make the tests below less verbose.
fn kr(irange: Range<i128>) -> Range<Key> {
Key::from_i128(irange.start)..Key::from_i128(irange.end)
}
#[allow(dead_code)]
fn dump_keyspace(ks: &KeySpace) {
for r in ks.ranges.iter() {
println!(" {}..{}", r.start.to_i128(), r.end.to_i128());
}
}
fn assert_ks_eq(actual: &KeySpace, expected: Vec<Range<Key>>) {
if actual.ranges != expected {
let mut msg = String::new();
writeln!(msg, "expected:").unwrap();
for r in &expected {
writeln!(msg, " {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
}
writeln!(msg, "got:").unwrap();
for r in &actual.ranges {
writeln!(msg, " {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
}
panic!("{}", msg);
}
}
#[test]
fn keyspace_add_range() {
// two separate ranges
//
// #####
// #####
let mut ks = KeySpaceRandomAccum::default();
ks.add_range(kr(0..10));
ks.add_range(kr(20..30));
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..10), kr(20..30)]);
// two separate ranges, added in reverse order
//
// #####
// #####
let mut ks = KeySpaceRandomAccum::default();
ks.add_range(kr(20..30));
ks.add_range(kr(0..10));
// add range that is adjacent to the end of an existing range
//
// #####
// #####
ks.add_range(kr(0..10));
ks.add_range(kr(10..30));
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
// add range that is adjacent to the start of an existing range
//
// #####
// #####
let mut ks = KeySpaceRandomAccum::default();
ks.add_range(kr(10..30));
ks.add_range(kr(0..10));
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
// add range that overlaps with the end of an existing range
//
// #####
// #####
let mut ks = KeySpaceRandomAccum::default();
ks.add_range(kr(0..10));
ks.add_range(kr(5..30));
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
// add range that overlaps with the start of an existing range
//
// #####
// #####
let mut ks = KeySpaceRandomAccum::default();
ks.add_range(kr(5..30));
ks.add_range(kr(0..10));
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
// add range that is fully covered by an existing range
//
// #########
// #####
let mut ks = KeySpaceRandomAccum::default();
ks.add_range(kr(0..30));
ks.add_range(kr(10..20));
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
// add range that extends an existing range from both ends
//
// #####
// #########
let mut ks = KeySpaceRandomAccum::default();
ks.add_range(kr(10..20));
ks.add_range(kr(0..30));
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
// add a range that overlaps with two existing ranges, joining them
//
// ##### #####
// #######
let mut ks = KeySpaceRandomAccum::default();
ks.add_range(kr(0..10));
ks.add_range(kr(20..30));
ks.add_range(kr(5..25));
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
}
#[test]
fn keyspace_overlaps() {
let mut ks = KeySpaceRandomAccum::default();
ks.add_range(kr(10..20));
ks.add_range(kr(30..40));
let ks = ks.to_keyspace();
// ##### #####
// xxxx
assert!(!ks.overlaps(&kr(0..5)));
// ##### #####
// xxxx
assert!(!ks.overlaps(&kr(5..9)));
// ##### #####
// xxxx
assert!(!ks.overlaps(&kr(5..10)));
// ##### #####
// xxxx
assert!(ks.overlaps(&kr(5..11)));
// ##### #####
// xxxx
assert!(ks.overlaps(&kr(10..15)));
// ##### #####
// xxxx
assert!(ks.overlaps(&kr(15..20)));
// ##### #####
// xxxx
assert!(ks.overlaps(&kr(15..25)));
// ##### #####
// xxxx
assert!(!ks.overlaps(&kr(22..28)));
// ##### #####
// xxxx
assert!(!ks.overlaps(&kr(25..30)));
// ##### #####
// xxxx
assert!(ks.overlaps(&kr(35..35)));
// ##### #####
// xxxx
assert!(!ks.overlaps(&kr(40..45)));
// ##### #####
// xxxx
assert!(!ks.overlaps(&kr(45..50)));
// ##### #####
// xxxxxxxxxxx
assert!(ks.overlaps(&kr(0..30))); // XXXXX This fails currently!
}
}

View File

@@ -30,6 +30,7 @@ const STORAGE_TIME_OPERATIONS: &[&str] = &[
"create images",
"init logical size",
"logical size",
"imitate logical size",
"load layer map",
"gc",
];
@@ -186,6 +187,16 @@ static PERSISTENT_BYTES_WRITTEN: Lazy<IntCounterVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static EVICTION_ITERATION_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_eviction_iteration_duration_seconds_global",
"Time spent on a single eviction iteration",
&["period_secs", "threshold_secs"],
STORAGE_OP_BUCKETS.into(),
)
.expect("failed to define a metric")
});
static EVICTIONS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_evictions",
@@ -478,6 +489,15 @@ pub static TENANT_TASK_EVENTS: Lazy<IntCounterVec> = Lazy::new(|| {
.expect("Failed to register tenant_task_events metric")
});
pub static BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_background_loop_period_overrun_count",
"Incremented whenever warn_when_period_overrun() logs a warning.",
&["task", "period"],
)
.expect("failed to define a metric")
});
// walreceiver metrics
pub static WALRECEIVER_STARTED_CONNECTIONS: Lazy<IntCounter> = Lazy::new(|| {
@@ -688,6 +708,7 @@ pub struct TimelineMetrics {
pub compact_time_histo: StorageTimeMetrics,
pub create_images_time_histo: StorageTimeMetrics,
pub logical_size_histo: StorageTimeMetrics,
pub imitate_logical_size_histo: StorageTimeMetrics,
pub load_layer_map_histo: StorageTimeMetrics,
pub garbage_collect_histo: StorageTimeMetrics,
pub last_record_gauge: IntGauge,
@@ -720,6 +741,8 @@ impl TimelineMetrics {
let create_images_time_histo =
StorageTimeMetrics::new("create images", &tenant_id, &timeline_id);
let logical_size_histo = StorageTimeMetrics::new("logical size", &tenant_id, &timeline_id);
let imitate_logical_size_histo =
StorageTimeMetrics::new("imitate logical size", &tenant_id, &timeline_id);
let load_layer_map_histo =
StorageTimeMetrics::new("load layer map", &tenant_id, &timeline_id);
let garbage_collect_histo = StorageTimeMetrics::new("gc", &tenant_id, &timeline_id);
@@ -756,6 +779,7 @@ impl TimelineMetrics {
compact_time_histo,
create_images_time_histo,
logical_size_histo,
imitate_logical_size_histo,
garbage_collect_histo,
load_layer_map_histo,
last_record_gauge,
@@ -1216,4 +1240,7 @@ pub fn preinitialize_metrics() {
// Initialize it eagerly, so that our alert rule can distinguish absence of the metric from metric value 0.
assert_eq!(UNEXPECTED_ONDEMAND_DOWNLOADS.get(), 0);
UNEXPECTED_ONDEMAND_DOWNLOADS.reset();
// Same as above for this metric, but, it's a Vec-type metric for which we don't know all the labels.
BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT.reset();
}

View File

@@ -256,7 +256,10 @@ async fn page_service_conn_main(
//
// no write timeout is used, because the kernel is assumed to error writes after some time.
let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
socket.set_timeout(Some(std::time::Duration::from_secs(60 * 10)));
// timeout should be lower, but trying out multiple days for
// <https://github.com/neondatabase/neon/issues/4205>
socket.set_timeout(Some(std::time::Duration::from_secs(60 * 60 * 24 * 3)));
let socket = std::pin::pin!(socket);
// XXX: pgbackend.run() should take the connection_ctx,

View File

@@ -500,6 +500,8 @@ impl Timeline {
cancel: CancellationToken,
ctx: &RequestContext,
) -> Result<u64, CalculateLogicalSizeError> {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
// Fetch list of database dirs and iterate them
let buf = self.get(DBDIR_KEY, lsn, ctx).await.context("read dbdir")?;
let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;

View File

@@ -272,9 +272,6 @@ pub enum TaskKind {
#[cfg(test)]
UnitTest,
/// Task which is the only task to delete this particular timeline
DeleteTimeline,
}
#[derive(Default)]

View File

@@ -97,7 +97,10 @@ mod timeline;
pub mod size;
pub use timeline::{LocalLayerInfoForDiskUsageEviction, PageReconstructError, Timeline};
pub(crate) use timeline::debug_assert_current_span_has_tenant_and_timeline_id;
pub use timeline::{
LocalLayerInfoForDiskUsageEviction, LogicalSizeCalculationCause, PageReconstructError, Timeline,
};
// re-export this function so that page_cache.rs can use it.
pub use crate::tenant::ephemeral_file::writeback as writeback_ephemeral_file;
@@ -456,50 +459,6 @@ pub enum DeleteTimelineError {
Other(#[from] anyhow::Error),
}
#[derive(Debug, thiserror::Error, Clone)]
enum InnerDeleteTimelineError {
#[error("upload queue is uninitialized, likely the timeline was in Broken state prior to this call because it failed to fetch IndexPart during load or attach, check the logs")]
QueueUninitialized,
#[error("failpoint: {0}")]
Failpoint(&'static str),
#[error("failed to remove local timeline directory")]
FailedToRemoveLocalTimelineDirectory,
#[error("index_part.json upload failed")]
UploadFailed,
#[error("the deleted timeline grew branches while deleting it; tenant should now be broken")]
DeletedGrewChildren,
}
impl utils::shared_retryable::Retryable for InnerDeleteTimelineError {
fn is_permanent(&self) -> bool {
use InnerDeleteTimelineError::*;
match self {
QueueUninitialized => false,
Failpoint(_) => false,
FailedToRemoveLocalTimelineDirectory => false,
UploadFailed => false,
DeletedGrewChildren => true,
}
}
}
impl From<InnerDeleteTimelineError> for DeleteTimelineError {
fn from(value: InnerDeleteTimelineError) -> Self {
DeleteTimelineError::Other(anyhow::Error::new(value))
}
}
impl From<utils::shared_retryable::RetriedTaskPanicked> for DeleteTimelineError {
fn from(_: utils::shared_retryable::RetriedTaskPanicked) -> Self {
DeleteTimelineError::Other(anyhow::anyhow!("deleting timeline failed, please retry"))
}
}
struct RemoteStartupData {
index_part: IndexPart,
remote_metadata: TimelineMetadata,
@@ -637,16 +596,19 @@ impl Tenant {
/// finishes. You can use wait_until_active() to wait for the task to
/// complete.
///
pub fn spawn_attach(
pub(crate) fn spawn_attach(
conf: &'static PageServerConf,
tenant_id: TenantId,
remote_storage: GenericRemoteStorage,
ctx: &RequestContext,
) -> Arc<Tenant> {
) -> anyhow::Result<Arc<Tenant>> {
// XXX: Attach should provide the config, especially during tenant migration.
// See https://github.com/neondatabase/neon/issues/1555
let tenant_conf = TenantConfOpt::default();
Self::attach_idempotent_create_marker_file(conf, tenant_id)
.context("create attach marker file")?;
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
let tenant = Arc::new(Tenant::new(
TenantState::Attaching,
@@ -679,7 +641,46 @@ impl Tenant {
Ok(())
},
);
tenant
Ok(tenant)
}
fn attach_idempotent_create_marker_file(
conf: &'static PageServerConf,
tenant_id: TenantId,
) -> anyhow::Result<()> {
// Create directory with marker file to indicate attaching state.
// The load_local_tenants() function in tenant::mgr relies on the marker file
// to determine whether a tenant has finished attaching.
let tenant_dir = conf.tenant_path(&tenant_id);
let marker_file = conf.tenant_attaching_mark_file_path(&tenant_id);
debug_assert_eq!(marker_file.parent().unwrap(), tenant_dir);
// TODO: should use tokio::fs here, but
// 1. caller is not async, for good reason (it holds tenants map lock)
// 2. we'd need to think about cancel safety. Turns out dropping a tokio::fs future
// doesn't wait for the activity in the fs thread pool.
crashsafe::create_dir_all(&tenant_dir).context("create tenant directory")?;
match fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(&marker_file)
{
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
// Either this is a retry of attach or there is a concurrent task also doing attach for this tenant.
// We cannot distinguish this here.
// The caller is responsible for ensuring there's no concurrent attach for a tenant.
{} // fsync again, we don't know if that already happened
}
err => {
err.context("create tenant attaching marker file")?;
unreachable!("we covered the Ok() case above");
}
}
crashsafe::fsync_file_and_parent(&marker_file)
.context("fsync tenant attaching marker file and parent")?;
debug_assert!(tenant_dir.is_dir());
debug_assert!(marker_file.is_file());
Ok(())
}
///
@@ -687,26 +688,15 @@ impl Tenant {
///
#[instrument(skip_all, fields(tenant_id=%self.tenant_id))]
async fn attach(self: &Arc<Tenant>, ctx: RequestContext) -> anyhow::Result<()> {
// Create directory with marker file to indicate attaching state.
// The load_local_tenants() function in tenant::mgr relies on the marker file
// to determine whether a tenant has finished attaching.
let tenant_dir = self.conf.tenant_path(&self.tenant_id);
let marker_file = self.conf.tenant_attaching_mark_file_path(&self.tenant_id);
debug_assert_eq!(marker_file.parent().unwrap(), tenant_dir);
if tenant_dir.exists() {
if !marker_file.is_file() {
anyhow::bail!(
"calling Tenant::attach with a tenant directory that doesn't have the attaching marker file:\ntenant_dir: {}\nmarker_file: {}",
tenant_dir.display(), marker_file.display());
}
} else {
crashsafe::create_dir_all(&tenant_dir).context("create tenant directory")?;
fs::File::create(&marker_file).context("create tenant attaching marker file")?;
crashsafe::fsync_file_and_parent(&marker_file)
.context("fsync tenant attaching marker file and parent")?;
if !tokio::fs::try_exists(&marker_file)
.await
.context("check for existence of marker file")?
{
anyhow::bail!(
"implementation error: marker file should exist at beginning of this function"
);
}
debug_assert!(tenant_dir.is_dir());
debug_assert!(marker_file.is_file());
// Get list of remote timelines
// download index files for every tenant timeline
@@ -883,11 +873,15 @@ impl Tenant {
}
/// Create a placeholder Tenant object for a broken tenant
pub fn create_broken_tenant(conf: &'static PageServerConf, tenant_id: TenantId) -> Arc<Tenant> {
pub fn create_broken_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
reason: String,
) -> Arc<Tenant> {
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
Arc::new(Tenant::new(
TenantState::Broken {
reason: "create_broken_tenant".into(),
reason,
backtrace: String::new(),
},
conf,
@@ -920,7 +914,7 @@ impl Tenant {
Ok(conf) => conf,
Err(e) => {
error!("load tenant config failed: {:?}", e);
return Tenant::create_broken_tenant(conf, tenant_id);
return Tenant::create_broken_tenant(conf, tenant_id, format!("{e:#}"));
}
};
@@ -1270,8 +1264,24 @@ impl Tenant {
"Cannot create timelines on inactive tenant"
);
if self.get_timeline(new_timeline_id, false).is_ok() {
if let Ok(existing) = self.get_timeline(new_timeline_id, false) {
debug!("timeline {new_timeline_id} already exists");
if let Some(remote_client) = existing.remote_client.as_ref() {
// Wait for uploads to complete, so that when we return Ok, the timeline
// is known to be durable on remote storage. Just like we do at the end of
// this function, after we have created the timeline ourselves.
//
// We only really care that the initial version of `index_part.json` has
// been uploaded. That's enough to remember that the timeline
// exists. However, there is no function to wait specifically for that so
// we just wait for all in-progress uploads to finish.
remote_client
.wait_completion()
.await
.context("wait for timeline uploads to complete")?;
}
return Ok(None);
}
@@ -1313,6 +1323,17 @@ impl Tenant {
}
};
if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
// Wait for the upload of the 'index_part.json` file to finish, so that when we return
// Ok, the timeline is durable in remote storage.
let kind = ancestor_timeline_id
.map(|_| "branched")
.unwrap_or("bootstrapped");
remote_client.wait_completion().await.with_context(|| {
format!("wait for {} timeline initial uploads to complete", kind)
})?;
}
Ok(Some(loaded_timeline))
}
@@ -1405,7 +1426,7 @@ impl Tenant {
/// Removes timeline-related in-memory data
pub async fn delete_timeline(
self: &Arc<Tenant>,
&self,
timeline_id: TimelineId,
_ctx: &RequestContext,
) -> Result<(), DeleteTimelineError> {
@@ -1438,196 +1459,162 @@ impl Tenant {
timeline
};
let span = tracing::Span::current();
// if we have concurrent requests, we will only execute one version of following future;
// initially it did not have any means to be cancelled.
// Now that the Timeline is in Stopping state, request all the related tasks to
// shut down.
//
// NOTE: Unlike "the usual" futures, this one will log any errors instead of just propagating
// them to the caller. This is because this one future produces a value, which will need to
// be cloneable to everyone interested, and normal `std::error::Error` are not clonable.
// Also, it wouldn't make sense to log the same failure multiple times, it would look like
// multiple failures to anyone reading the logs.
let factory = || {
let tenant = self.clone();
let tenant_id = self.tenant_id;
let timeline = timeline.clone();
let timeline_id = timeline.timeline_id;
// NB: If you call delete_timeline multiple times concurrently, they will
// all go through the motions here. Make sure the code here is idempotent,
// and don't error out if some of the shutdown tasks have already been
// completed!
async move {
// Now that the Timeline is in Stopping state, request all the related tasks to
// shut down.
//
// Stop the walreceiver first.
debug!("waiting for wal receiver to shutdown");
timeline.walreceiver.stop().await;
debug!("wal receiver shutdown confirmed");
// Stop the walreceiver first.
debug!("waiting for wal receiver to shutdown");
timeline.walreceiver.stop().await;
debug!("wal receiver shutdown confirmed");
// Prevent new uploads from starting.
if let Some(remote_client) = timeline.remote_client.as_ref() {
let res = remote_client.stop();
match res {
Ok(()) => {}
Err(e) => match e {
remote_timeline_client::StopError::QueueUninitialized => {
// This case shouldn't happen currently because the
// load and attach code bails out if _any_ of the timeline fails to fetch its IndexPart.
// That is, before we declare the Tenant as Active.
// But we only allow calls to delete_timeline on Active tenants.
warn!("failed to stop RemoteTimelineClient due to uninitialized queue");
return Err(InnerDeleteTimelineError::QueueUninitialized);
}
},
// Prevent new uploads from starting.
if let Some(remote_client) = timeline.remote_client.as_ref() {
let res = remote_client.stop();
match res {
Ok(()) => {}
Err(e) => match e {
remote_timeline_client::StopError::QueueUninitialized => {
// This case shouldn't happen currently because the
// load and attach code bails out if _any_ of the timeline fails to fetch its IndexPart.
// That is, before we declare the Tenant as Active.
// But we only allow calls to delete_timeline on Active tenants.
return Err(DeleteTimelineError::Other(anyhow::anyhow!("upload queue is uninitialized, likely the timeline was in Broken state prior to this call because it failed to fetch IndexPart during load or attach, check the logs")));
}
}
// Stop & wait for the remaining timeline tasks, including upload tasks.
info!("waiting for timeline tasks to shutdown");
task_mgr::shutdown_tasks(None, Some(tenant_id), Some(timeline_id)).await;
// Mark timeline as deleted in S3 so we won't pick it up next time
// during attach or pageserver restart.
// See comment in persist_index_part_with_deleted_flag.
if let Some(remote_client) = timeline.remote_client.as_ref() {
match remote_client.persist_index_part_with_deleted_flag().await {
// If we (now, or already) marked it successfully as deleted, we can proceed
Ok(()) | Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(_)) => (),
// Bail out otherwise
Err(e @ PersistIndexPartWithDeletedFlagError::AlreadyInProgress(_))
| Err(e @ PersistIndexPartWithDeletedFlagError::Other(_)) => {
warn!("upload failed: {e}");
return Err(InnerDeleteTimelineError::UploadFailed);
}
}
}
{
// Grab the layer_removal_cs lock, and actually perform the deletion.
//
// This lock prevents multiple concurrent delete_timeline calls from
// stepping on each other's toes, while deleting the files. It also
// prevents GC or compaction from running at the same time.
//
// Note that there are still other race conditions between
// GC, compaction and timeline deletion. GC task doesn't
// register itself properly with the timeline it's
// operating on. See
// https://github.com/neondatabase/neon/issues/2671
//
// No timeout here, GC & Compaction should be responsive to the
// `TimelineState::Stopping` change.
info!("waiting for layer_removal_cs.lock()");
let _layer_removal_guard = timeline.layer_removal_cs.lock().await;
info!("got layer_removal_cs.lock(), deleting layer files");
// NB: storage_sync upload tasks that reference these layers have been cancelled
// by us earlier.
let local_timeline_directory =
tenant.conf.timeline_path(&timeline_id, &tenant_id);
fail::fail_point!("timeline-delete-before-rm", |_| {
Err(InnerDeleteTimelineError::Failpoint(
"failpoint: timeline-delete-before-rm",
))
});
// NB: This need not be atomic because the deleted flag in the IndexPart
// will be observed during tenant/timeline load. The deletion will be resumed there.
//
// For configurations without remote storage, we tolerate that we're not crash-safe here.
// The timeline may come up Active but with missing layer files, in such setups.
// See https://github.com/neondatabase/neon/pull/3919#issuecomment-1531726720
match std::fs::remove_dir_all(&local_timeline_directory) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
// This can happen if we're called a second time, e.g.,
// because of a previous failure/cancellation at/after
// failpoint timeline-delete-after-rm.
//
// It can also happen if we race with tenant detach, because,
// it doesn't grab the layer_removal_cs lock.
//
// For now, log and continue.
// warn! level is technically not appropriate for the
// first case because we should expect retries to happen.
// But the error is so rare, it seems better to get attention if it happens.
let tenant_state = tenant.current_state();
warn!(
timeline_dir=?local_timeline_directory,
?tenant_state,
"timeline directory not found, proceeding anyway"
);
}
Err(e) => {
warn!(
"failed to remove local timeline directory {}: {e:#}",
local_timeline_directory.display()
);
return Err(
InnerDeleteTimelineError::FailedToRemoveLocalTimelineDirectory,
);
}
}
info!("finished deleting layer files, releasing layer_removal_cs.lock()");
}
fail::fail_point!("timeline-delete-after-rm", |_| {
Err(InnerDeleteTimelineError::Failpoint(
"timeline-delete-after-rm",
))
});
// Remove the timeline from the map or poison it if we've grown children.
let removed_timeline =
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let mut timelines = tenant.timelines.lock().unwrap();
let children_exist = timelines.iter().any(|(_, entry)| {
entry.get_ancestor_timeline_id() == Some(timeline_id)
});
// XXX this can happen because `branch_timeline` doesn't check `TimelineState::Stopping`.
// We already deleted the layer files, so it's probably best to panic.
// (Ideally, above remove_dir_all is atomic so we don't see this timeline after a restart)
if children_exist {
panic!("Timeline grew children while we removed layer files");
}
timelines.remove(&timeline_id)
}));
match removed_timeline {
Ok(Some(_)) => {}
Ok(None) => {
// with SharedRetryable this should no longer happen
warn!("no other task should had dropped the Timeline");
}
Err(_panic) => return Err(InnerDeleteTimelineError::DeletedGrewChildren),
}
Ok(())
}
// execute in the *winner's* span so we will capture the request id etc.
.instrument(span)
};
let (recv, maybe_fut) = timeline.delete_self.try_restart(factory).await;
if let Some(fut) = maybe_fut {
crate::task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::DeleteTimeline,
Some(self.tenant_id()),
None,
&format!("delete_timeline {}", timeline.timeline_id),
false,
async move {
fut.await;
Ok(())
},
);
}
}
recv.await
// Stop & wait for the remaining timeline tasks, including upload tasks.
// NB: This and other delete_timeline calls do not run as a task_mgr task,
// so, they are not affected by this shutdown_tasks() call.
info!("waiting for timeline tasks to shutdown");
task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id)).await;
// Mark timeline as deleted in S3 so we won't pick it up next time
// during attach or pageserver restart.
// See comment in persist_index_part_with_deleted_flag.
if let Some(remote_client) = timeline.remote_client.as_ref() {
match remote_client.persist_index_part_with_deleted_flag().await {
// If we (now, or already) marked it successfully as deleted, we can proceed
Ok(()) | Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(_)) => (),
// Bail out otherwise
Err(e @ PersistIndexPartWithDeletedFlagError::AlreadyInProgress(_))
| Err(e @ PersistIndexPartWithDeletedFlagError::Other(_)) => {
return Err(DeleteTimelineError::Other(anyhow::anyhow!(e)));
}
}
}
{
// Grab the layer_removal_cs lock, and actually perform the deletion.
//
// This lock prevents multiple concurrent delete_timeline calls from
// stepping on each other's toes, while deleting the files. It also
// prevents GC or compaction from running at the same time.
//
// Note that there are still other race conditions between
// GC, compaction and timeline deletion. GC task doesn't
// register itself properly with the timeline it's
// operating on. See
// https://github.com/neondatabase/neon/issues/2671
//
// No timeout here, GC & Compaction should be responsive to the
// `TimelineState::Stopping` change.
info!("waiting for layer_removal_cs.lock()");
let layer_removal_guard = timeline.layer_removal_cs.lock().await;
info!("got layer_removal_cs.lock(), deleting layer files");
// NB: storage_sync upload tasks that reference these layers have been cancelled
// by the caller.
let local_timeline_directory = self.conf.timeline_path(&timeline_id, &self.tenant_id);
fail::fail_point!("timeline-delete-before-rm", |_| {
Err(anyhow::anyhow!("failpoint: timeline-delete-before-rm"))?
});
// NB: This need not be atomic because the deleted flag in the IndexPart
// will be observed during tenant/timeline load. The deletion will be resumed there.
//
// For configurations without remote storage, we tolerate that we're not crash-safe here.
// The timeline may come up Active but with missing layer files, in such setups.
// See https://github.com/neondatabase/neon/pull/3919#issuecomment-1531726720
match std::fs::remove_dir_all(&local_timeline_directory) {
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
// This can happen if we're called a second time, e.g.,
// because of a previous failure/cancellation at/after
// failpoint timeline-delete-after-rm.
//
// It can also happen if we race with tenant detach, because,
// it doesn't grab the layer_removal_cs lock.
//
// For now, log and continue.
// warn! level is technically not appropriate for the
// first case because we should expect retries to happen.
// But the error is so rare, it seems better to get attention if it happens.
let tenant_state = self.current_state();
warn!(
timeline_dir=?local_timeline_directory,
?tenant_state,
"timeline directory not found, proceeding anyway"
);
// continue with the rest of the deletion
}
res => res.with_context(|| {
format!(
"Failed to remove local timeline directory '{}'",
local_timeline_directory.display()
)
})?,
}
info!("finished deleting layer files, releasing layer_removal_cs.lock()");
drop(layer_removal_guard);
}
fail::fail_point!("timeline-delete-after-rm", |_| {
Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))?
});
// Remove the timeline from the map.
let mut timelines = self.timelines.lock().unwrap();
let children_exist = timelines
.iter()
.any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id));
// XXX this can happen because `branch_timeline` doesn't check `TimelineState::Stopping`.
// We already deleted the layer files, so it's probably best to panic.
// (Ideally, above remove_dir_all is atomic so we don't see this timeline after a restart)
if children_exist {
panic!("Timeline grew children while we removed layer files");
}
let removed_timeline = timelines.remove(&timeline_id);
if removed_timeline.is_none() {
// This can legitimately happen if there's a concurrent call to this function.
// T1 T2
// lock
// unlock
// lock
// unlock
// remove files
// lock
// remove from map
// unlock
// return
// remove files
// lock
// remove from map observes empty map
// unlock
// return
debug!("concurrent call to this function won the race");
}
drop(timelines);
Ok(())
}
pub fn current_state(&self) -> TenantState {
@@ -2208,7 +2195,7 @@ impl Tenant {
// made.
break;
}
let result = timeline.gc().await?;
let result = timeline.gc(ctx).await?;
totals += result;
}
@@ -2416,17 +2403,18 @@ impl Tenant {
src_timeline.initdb_lsn,
src_timeline.pg_version,
);
let mut timelines = self.timelines.lock().unwrap();
let new_timeline = self
.prepare_timeline(
let new_timeline = {
let mut timelines = self.timelines.lock().unwrap();
self.prepare_timeline(
dst_id,
&metadata,
timeline_uninit_mark,
false,
Some(Arc::clone(src_timeline)),
)?
.initialize_with_lock(ctx, &mut timelines, true, true)?;
drop(timelines);
.initialize_with_lock(ctx, &mut timelines, true, true)?
};
// Root timeline gets its layers during creation and uploads them along with the metadata.
// A branch timeline though, when created, can get no writes for some time, hence won't get any layers created.
@@ -2684,6 +2672,7 @@ impl Tenant {
// `max_retention_period` overrides the cutoff that is used to calculate the size
// (only if it is shorter than the real cutoff).
max_retention_period: Option<u64>,
cause: LogicalSizeCalculationCause,
ctx: &RequestContext,
) -> anyhow::Result<size::ModelInputs> {
let logical_sizes_at_once = self
@@ -2705,6 +2694,7 @@ impl Tenant {
logical_sizes_at_once,
max_retention_period,
&mut shared_cache,
cause,
ctx,
)
.await
@@ -2714,8 +2704,12 @@ impl Tenant {
/// This is periodically called by background worker.
/// result is cached in tenant struct
#[instrument(skip_all, fields(tenant_id=%self.tenant_id))]
pub async fn calculate_synthetic_size(&self, ctx: &RequestContext) -> anyhow::Result<u64> {
let inputs = self.gather_size_inputs(None, ctx).await?;
pub async fn calculate_synthetic_size(
&self,
cause: LogicalSizeCalculationCause,
ctx: &RequestContext,
) -> anyhow::Result<u64> {
let inputs = self.gather_size_inputs(None, cause, ctx).await?;
let size = inputs.calculate()?;
@@ -3475,14 +3469,26 @@ mod tests {
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx)
.await?;
// The branchpoints should contain all timelines, even ones marked
// as Broken.
{
let branchpoints = &tline.gc_info.read().unwrap().retain_lsns;
assert_eq!(branchpoints.len(), 1);
assert_eq!(branchpoints[0], Lsn(0x40));
}
// You can read the key from the child branch even though the parent is
// Broken, as long as you don't need to access data from the parent.
assert_eq!(
newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await?,
TEST_IMG(&format!("foo at {}", Lsn(0x40)))
newtline.get(*TEST_KEY, Lsn(0x70), &ctx).await?,
TEST_IMG(&format!("foo at {}", Lsn(0x70)))
);
let branchpoints = &tline.gc_info.read().unwrap().retain_lsns;
assert_eq!(branchpoints.len(), 1);
assert_eq!(branchpoints[0], Lsn(0x40));
// This needs to traverse to the parent, and fails.
let err = newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await.unwrap_err();
assert!(err
.to_string()
.contains("will not become active. Current state: Broken"));
Ok(())
}
@@ -3748,7 +3754,7 @@ mod tests {
.await?;
tline.freeze_and_flush().await?;
tline.compact(&ctx).await?;
tline.gc().await?;
tline.gc(&ctx).await?;
}
Ok(())
@@ -3820,7 +3826,7 @@ mod tests {
.await?;
tline.freeze_and_flush().await?;
tline.compact(&ctx).await?;
tline.gc().await?;
tline.gc(&ctx).await?;
}
Ok(())
@@ -3904,7 +3910,7 @@ mod tests {
.await?;
tline.freeze_and_flush().await?;
tline.compact(&ctx).await?;
tline.gc().await?;
tline.gc(&ctx).await?;
}
Ok(())

View File

@@ -292,77 +292,93 @@ fn bad_duration<'a>(field_name: &'static str, value: &'a str) -> impl 'a + Fn()
move || format!("Cannot parse `{field_name}` duration {value:?}")
}
impl TryFrom<&'_ TenantCreateRequest> for TenantConfOpt {
type Error = anyhow::Error;
fn try_from(request_data: &TenantCreateRequest) -> Result<Self, Self::Error> {
impl TenantConfOpt {
#[allow(clippy::too_many_arguments)]
fn from_request(
checkpoint_distance: Option<u64>,
checkpoint_timeout: &Option<String>,
compaction_target_size: Option<u64>,
compaction_period: &Option<String>,
compaction_threshold: Option<usize>,
gc_horizon: Option<u64>,
gc_period: &Option<String>,
image_creation_threshold: Option<usize>,
pitr_interval: &Option<String>,
walreceiver_connect_timeout: &Option<String>,
lagging_wal_timeout: &Option<String>,
max_lsn_wal_lag: Option<NonZeroU64>,
trace_read_requests: Option<bool>,
eviction_policy: &Option<serde_json::Value>,
min_resident_size_override: Option<u64>,
evictions_low_residence_duration_metric_threshold: &Option<String>,
) -> Result<Self, anyhow::Error> {
let mut tenant_conf = TenantConfOpt::default();
if let Some(gc_period) = &request_data.gc_period {
if let Some(gc_period) = &gc_period {
tenant_conf.gc_period = Some(
humantime::parse_duration(gc_period)
.with_context(bad_duration("gc_period", gc_period))?,
);
}
tenant_conf.gc_horizon = request_data.gc_horizon;
tenant_conf.image_creation_threshold = request_data.image_creation_threshold;
tenant_conf.gc_horizon = gc_horizon;
tenant_conf.image_creation_threshold = image_creation_threshold;
if let Some(pitr_interval) = &request_data.pitr_interval {
if let Some(pitr_interval) = &pitr_interval {
tenant_conf.pitr_interval = Some(
humantime::parse_duration(pitr_interval)
.with_context(bad_duration("pitr_interval", pitr_interval))?,
);
}
if let Some(walreceiver_connect_timeout) = &request_data.walreceiver_connect_timeout {
if let Some(walreceiver_connect_timeout) = &walreceiver_connect_timeout {
tenant_conf.walreceiver_connect_timeout = Some(
humantime::parse_duration(walreceiver_connect_timeout).with_context(
bad_duration("walreceiver_connect_timeout", walreceiver_connect_timeout),
)?,
);
}
if let Some(lagging_wal_timeout) = &request_data.lagging_wal_timeout {
if let Some(lagging_wal_timeout) = &lagging_wal_timeout {
tenant_conf.lagging_wal_timeout = Some(
humantime::parse_duration(lagging_wal_timeout)
.with_context(bad_duration("lagging_wal_timeout", lagging_wal_timeout))?,
);
}
if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag {
if let Some(max_lsn_wal_lag) = max_lsn_wal_lag {
tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag);
}
if let Some(trace_read_requests) = request_data.trace_read_requests {
if let Some(trace_read_requests) = trace_read_requests {
tenant_conf.trace_read_requests = Some(trace_read_requests);
}
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
if let Some(checkpoint_timeout) = &request_data.checkpoint_timeout {
tenant_conf.checkpoint_distance = checkpoint_distance;
if let Some(checkpoint_timeout) = &checkpoint_timeout {
tenant_conf.checkpoint_timeout = Some(
humantime::parse_duration(checkpoint_timeout)
.with_context(bad_duration("checkpoint_timeout", checkpoint_timeout))?,
);
}
tenant_conf.compaction_target_size = request_data.compaction_target_size;
tenant_conf.compaction_threshold = request_data.compaction_threshold;
tenant_conf.compaction_target_size = compaction_target_size;
tenant_conf.compaction_threshold = compaction_threshold;
if let Some(compaction_period) = &request_data.compaction_period {
if let Some(compaction_period) = &compaction_period {
tenant_conf.compaction_period = Some(
humantime::parse_duration(compaction_period)
.with_context(bad_duration("compaction_period", compaction_period))?,
);
}
if let Some(eviction_policy) = &request_data.eviction_policy {
if let Some(eviction_policy) = &eviction_policy {
tenant_conf.eviction_policy = Some(
serde::Deserialize::deserialize(eviction_policy)
.context("parse field `eviction_policy`")?,
);
}
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
tenant_conf.min_resident_size_override = min_resident_size_override;
if let Some(evictions_low_residence_duration_metric_threshold) =
&request_data.evictions_low_residence_duration_metric_threshold
&evictions_low_residence_duration_metric_threshold
{
tenant_conf.evictions_low_residence_duration_metric_threshold = Some(
humantime::parse_duration(evictions_low_residence_duration_metric_threshold)
@@ -377,81 +393,53 @@ impl TryFrom<&'_ TenantCreateRequest> for TenantConfOpt {
}
}
impl TryFrom<&'_ TenantCreateRequest> for TenantConfOpt {
type Error = anyhow::Error;
fn try_from(request_data: &TenantCreateRequest) -> Result<Self, Self::Error> {
Self::from_request(
request_data.checkpoint_distance,
&request_data.checkpoint_timeout,
request_data.compaction_target_size,
&request_data.compaction_period,
request_data.compaction_threshold,
request_data.gc_horizon,
&request_data.gc_period,
request_data.image_creation_threshold,
&request_data.pitr_interval,
&request_data.walreceiver_connect_timeout,
&request_data.lagging_wal_timeout,
request_data.max_lsn_wal_lag,
request_data.trace_read_requests,
&request_data.eviction_policy,
request_data.min_resident_size_override,
&request_data.evictions_low_residence_duration_metric_threshold,
)
}
}
impl TryFrom<&'_ TenantConfigRequest> for TenantConfOpt {
type Error = anyhow::Error;
fn try_from(request_data: &TenantConfigRequest) -> Result<Self, Self::Error> {
let mut tenant_conf = TenantConfOpt::default();
if let Some(gc_period) = &request_data.gc_period {
tenant_conf.gc_period = Some(
humantime::parse_duration(gc_period)
.with_context(bad_duration("gc_period", gc_period))?,
);
}
tenant_conf.gc_horizon = request_data.gc_horizon;
tenant_conf.image_creation_threshold = request_data.image_creation_threshold;
if let Some(pitr_interval) = &request_data.pitr_interval {
tenant_conf.pitr_interval = Some(
humantime::parse_duration(pitr_interval)
.with_context(bad_duration("pitr_interval", pitr_interval))?,
);
}
if let Some(walreceiver_connect_timeout) = &request_data.walreceiver_connect_timeout {
tenant_conf.walreceiver_connect_timeout = Some(
humantime::parse_duration(walreceiver_connect_timeout).with_context(
bad_duration("walreceiver_connect_timeout", walreceiver_connect_timeout),
)?,
);
}
if let Some(lagging_wal_timeout) = &request_data.lagging_wal_timeout {
tenant_conf.lagging_wal_timeout = Some(
humantime::parse_duration(lagging_wal_timeout)
.with_context(bad_duration("lagging_wal_timeout", lagging_wal_timeout))?,
);
}
tenant_conf.max_lsn_wal_lag = request_data.max_lsn_wal_lag;
tenant_conf.trace_read_requests = request_data.trace_read_requests;
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
if let Some(checkpoint_timeout) = &request_data.checkpoint_timeout {
tenant_conf.checkpoint_timeout = Some(
humantime::parse_duration(checkpoint_timeout)
.with_context(bad_duration("checkpoint_timeout", checkpoint_timeout))?,
);
}
tenant_conf.compaction_target_size = request_data.compaction_target_size;
tenant_conf.compaction_threshold = request_data.compaction_threshold;
if let Some(compaction_period) = &request_data.compaction_period {
tenant_conf.compaction_period = Some(
humantime::parse_duration(compaction_period)
.with_context(bad_duration("compaction_period", compaction_period))?,
);
}
if let Some(eviction_policy) = &request_data.eviction_policy {
tenant_conf.eviction_policy = Some(
serde::Deserialize::deserialize(eviction_policy)
.context("parse field `eviction_policy`")?,
);
}
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
if let Some(evictions_low_residence_duration_metric_threshold) =
&request_data.evictions_low_residence_duration_metric_threshold
{
tenant_conf.evictions_low_residence_duration_metric_threshold = Some(
humantime::parse_duration(evictions_low_residence_duration_metric_threshold)
.with_context(bad_duration(
"evictions_low_residence_duration_metric_threshold",
evictions_low_residence_duration_metric_threshold,
))?,
);
}
Ok(tenant_conf)
Self::from_request(
request_data.checkpoint_distance,
&request_data.checkpoint_timeout,
request_data.compaction_target_size,
&request_data.compaction_period,
request_data.compaction_threshold,
request_data.gc_horizon,
&request_data.gc_period,
request_data.image_creation_threshold,
&request_data.pitr_interval,
&request_data.walreceiver_connect_timeout,
&request_data.lagging_wal_timeout,
request_data.max_lsn_wal_lag,
request_data.trace_read_requests,
&request_data.eviction_policy,
request_data.min_resident_size_override,
&request_data.evictions_low_residence_duration_metric_threshold,
)
}
}

View File

@@ -186,10 +186,20 @@ pub fn schedule_local_tenant_processing(
let tenant = if conf.tenant_attaching_mark_file_path(&tenant_id).exists() {
info!("tenant {tenant_id} has attaching mark file, resuming its attach operation");
if let Some(remote_storage) = remote_storage {
Tenant::spawn_attach(conf, tenant_id, remote_storage, ctx)
match Tenant::spawn_attach(conf, tenant_id, remote_storage, ctx) {
Ok(tenant) => tenant,
Err(e) => {
error!("Failed to spawn_attach tenant {tenant_id}, reason: {e:#}");
Tenant::create_broken_tenant(conf, tenant_id, format!("{e:#}"))
}
}
} else {
warn!("tenant {tenant_id} has attaching mark file, but pageserver has no remote storage configured");
Tenant::create_broken_tenant(conf, tenant_id)
Tenant::create_broken_tenant(
conf,
tenant_id,
"attaching mark file present but no remote storage configured".to_string(),
)
}
} else {
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
@@ -466,7 +476,8 @@ pub async fn attach_tenant(
"Cannot attach tenant {tenant_id}, local tenant directory already exists"
);
let tenant = Tenant::spawn_attach(conf, tenant_id, remote_storage, ctx);
let tenant =
Tenant::spawn_attach(conf, tenant_id, remote_storage, ctx).context("spawn_attach")?;
vacant_entry.insert(tenant);
Ok(())
})

View File

@@ -11,7 +11,7 @@ use tokio_util::sync::CancellationToken;
use crate::context::RequestContext;
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
use super::Tenant;
use super::{LogicalSizeCalculationCause, Tenant};
use crate::tenant::Timeline;
use utils::id::TimelineId;
use utils::lsn::Lsn;
@@ -126,6 +126,7 @@ pub(super) async fn gather_inputs(
limit: &Arc<Semaphore>,
max_retention_period: Option<u64>,
logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
cause: LogicalSizeCalculationCause,
ctx: &RequestContext,
) -> anyhow::Result<ModelInputs> {
// refresh is needed to update gc related pitr_cutoff and horizon_cutoff
@@ -318,7 +319,15 @@ pub(super) async fn gather_inputs(
// We left the 'size' field empty in all of the Segments so far.
// Now find logical sizes for all of the points that might need or benefit from them.
fill_logical_sizes(&timelines, &mut segments, limit, logical_size_cache, ctx).await?;
fill_logical_sizes(
&timelines,
&mut segments,
limit,
logical_size_cache,
cause,
ctx,
)
.await?;
Ok(ModelInputs {
segments,
@@ -336,6 +345,7 @@ async fn fill_logical_sizes(
segments: &mut [SegmentMeta],
limit: &Arc<Semaphore>,
logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
cause: LogicalSizeCalculationCause,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let timeline_hash: HashMap<TimelineId, Arc<Timeline>> = HashMap::from_iter(
@@ -373,13 +383,17 @@ async fn fill_logical_sizes(
let timeline = Arc::clone(timeline_hash.get(&timeline_id).unwrap());
let parallel_size_calcs = Arc::clone(limit);
let ctx = ctx.attached_child();
joinset.spawn(calculate_logical_size(
parallel_size_calcs,
timeline,
lsn,
ctx,
cancel.child_token(),
));
joinset.spawn(
calculate_logical_size(
parallel_size_calcs,
timeline,
lsn,
cause,
ctx,
cancel.child_token(),
)
.in_current_span(),
);
}
e.insert(cached_size);
}
@@ -482,6 +496,7 @@ async fn calculate_logical_size(
limit: Arc<tokio::sync::Semaphore>,
timeline: Arc<crate::tenant::Timeline>,
lsn: utils::lsn::Lsn,
cause: LogicalSizeCalculationCause,
ctx: RequestContext,
cancel: CancellationToken,
) -> Result<TimelineAtLsnSizeResult, RecvError> {
@@ -490,7 +505,7 @@ async fn calculate_logical_size(
.expect("global semaphore should not had been closed");
let size_res = timeline
.spawn_ondemand_logical_size_calculation(lsn, ctx, cancel)
.spawn_ondemand_logical_size_calculation(lsn, cause, ctx, cancel)
.instrument(info_span!("spawn_ondemand_logical_size_calculation"))
.await?;
Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))

View File

@@ -259,6 +259,7 @@ pub(crate) async fn random_init_delay(
}
}
/// Attention: the `task` and `period` beocme labels of a pageserver-wide prometheus metric.
pub(crate) fn warn_when_period_overrun(elapsed: Duration, period: Duration, task: &str) {
// Duration::ZERO will happen because it's the "disable [bgtask]" value.
if elapsed >= period && period != Duration::ZERO {
@@ -271,5 +272,8 @@ pub(crate) fn warn_when_period_overrun(elapsed: Duration, period: Duration, task
task,
"task iteration took longer than the configured period"
);
crate::metrics::BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT
.with_label_values(&[task, &format!("{}", period.as_secs())])
.inc();
}
}

View File

@@ -22,8 +22,7 @@ use tracing::*;
use utils::id::TenantTimelineId;
use std::cmp::{max, min, Ordering};
use std::collections::BinaryHeap;
use std::collections::HashMap;
use std::collections::{BinaryHeap, HashMap};
use std::fs;
use std::ops::{Deref, Range};
use std::path::{Path, PathBuf};
@@ -48,7 +47,7 @@ use crate::tenant::{
};
use crate::config::PageServerConf;
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
use crate::metrics::{TimelineMetrics, UNEXPECTED_ONDEMAND_DOWNLOADS};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
@@ -123,6 +122,17 @@ pub struct Timeline {
pub(super) layers: RwLock<LayerMap<dyn PersistentLayer>>,
/// Set of key ranges which should be covered by image layers to
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
/// It is used by compaction task when it checks if new image layer should be created.
/// Newly created image layer doesn't help to remove the delta layer, until the
/// newly created image layer falls off the PITR horizon. So on next GC cycle,
/// gc_timeline may still want the new image layer to be created. To avoid redundant
/// image layers creation we should check if image layer exists but beyond PITR horizon.
/// This is why we need remember GC cutoff LSN.
///
wanted_image_layers: Mutex<Option<(Lsn, KeySpace)>>,
last_freeze_at: AtomicLsn,
// Atomic would be more appropriate here.
last_freeze_ts: RwLock<Instant>,
@@ -227,9 +237,6 @@ pub struct Timeline {
state: watch::Sender<TimelineState>,
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
pub(super) delete_self:
utils::shared_retryable::SharedRetryable<Result<(), super::InnerDeleteTimelineError>>,
}
/// Internal structure to hold all data needed for logical size calculation.
@@ -399,6 +406,9 @@ pub enum PageReconstructError {
/// The operation was cancelled
Cancelled,
/// The ancestor of this is being stopped
AncestorStopping(TimelineId),
/// An error happened replaying WAL records
#[error(transparent)]
WalRedo(#[from] crate::walredo::WalRedoError),
@@ -417,6 +427,9 @@ impl std::fmt::Debug for PageReconstructError {
)
}
Self::Cancelled => write!(f, "cancelled"),
Self::AncestorStopping(timeline_id) => {
write!(f, "ancestor timeline {timeline_id} is being stopped")
}
Self::WalRedo(err) => err.fmt(f),
}
}
@@ -435,11 +448,22 @@ impl std::fmt::Display for PageReconstructError {
)
}
Self::Cancelled => write!(f, "cancelled"),
Self::AncestorStopping(timeline_id) => {
write!(f, "ancestor timeline {timeline_id} is being stopped")
}
Self::WalRedo(err) => err.fmt(f),
}
}
}
#[derive(Clone, Copy)]
pub enum LogicalSizeCalculationCause {
Initial,
ConsumptionMetricsSyntheticSize,
EvictionTaskImitation,
TenantSizeHandler,
}
/// Public interface functions
impl Timeline {
/// Get the LSN where this branch was created
@@ -929,6 +953,31 @@ impl Timeline {
self.state.subscribe()
}
pub async fn wait_to_become_active(
&self,
_ctx: &RequestContext, /* Prepare for use by cancellation */
) -> Result<(), TimelineState> {
let mut receiver = self.state.subscribe();
loop {
let current_state = *receiver.borrow_and_update();
match current_state {
TimelineState::Loading => {
receiver
.changed()
.await
.expect("holding a reference to self");
}
TimelineState::Active { .. } => {
return Ok(());
}
TimelineState::Broken { .. } | TimelineState::Stopping => {
// There's no chance the timeline can transition back into ::Active
return Err(current_state);
}
}
}
}
pub fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
let layer_map = self.layers.read().unwrap();
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
@@ -1315,6 +1364,7 @@ impl Timeline {
tenant_id,
pg_version,
layers: RwLock::new(LayerMap::default()),
wanted_image_layers: Mutex::new(None),
walredo_mgr,
walreceiver,
@@ -1382,8 +1432,6 @@ impl Timeline {
eviction_task_timeline_state: tokio::sync::Mutex::new(
EvictionTaskTimelineState::default(),
),
delete_self: utils::shared_retryable::SharedRetryable::default(),
};
result.repartition_threshold = result.get_checkpoint_distance() / 10;
result
@@ -1844,18 +1892,31 @@ impl Timeline {
// to spawn_ondemand_logical_size_calculation.
let cancel = CancellationToken::new();
let calculated_size = match self_clone
.logical_size_calculation_task(lsn, &background_ctx, cancel)
.logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx, cancel)
.await
{
Ok(s) => s,
Err(CalculateLogicalSizeError::Cancelled) => {
// Don't make noise, this is a common task.
// In the unlikely case that there ihs another call to this function, we'll retry
// In the unlikely case that there is another call to this function, we'll retry
// because initial_logical_size is still None.
info!("initial size calculation cancelled, likely timeline delete / tenant detach");
return Ok(());
}
x @ Err(_) => x.context("Failed to calculate logical size")?,
Err(CalculateLogicalSizeError::Other(err)) => {
if let Some(e @ PageReconstructError::AncestorStopping(_)) =
err.root_cause().downcast_ref()
{
// This can happen if the timeline parent timeline switches to
// Stopping state while we're still calculating the initial
// timeline size for the child, for example if the tenant is
// being detached or the pageserver is shut down. Like with
// CalculateLogicalSizeError::Cancelled, don't make noise.
info!("initial size calculation failed because the timeline or its ancestor is Stopping, likely because the tenant is being detached: {e:#}");
return Ok(());
}
return Err(err.context("Failed to calculate logical size"));
}
};
// we cannot query current_logical_size.current_size() to know the current
@@ -1891,13 +1952,14 @@ impl Timeline {
// so that we prevent future callers from spawning this task
permit.forget();
Ok(())
},
}.in_current_span(),
);
}
pub fn spawn_ondemand_logical_size_calculation(
self: &Arc<Self>,
lsn: Lsn,
cause: LogicalSizeCalculationCause,
ctx: RequestContext,
cancel: CancellationToken,
) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
@@ -1920,22 +1982,26 @@ impl Timeline {
false,
async move {
let res = self_clone
.logical_size_calculation_task(lsn, &ctx, cancel)
.logical_size_calculation_task(lsn, cause, &ctx, cancel)
.await;
let _ = sender.send(res).ok();
Ok(()) // Receiver is responsible for handling errors
},
}
.in_current_span(),
);
receiver
}
#[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))]
#[instrument(skip_all)]
async fn logical_size_calculation_task(
self: &Arc<Self>,
lsn: Lsn,
cause: LogicalSizeCalculationCause,
ctx: &RequestContext,
cancel: CancellationToken,
) -> Result<u64, CalculateLogicalSizeError> {
debug_assert_current_span_has_tenant_and_timeline_id();
let mut timeline_state_updates = self.subscribe_for_state_updates();
let self_calculation = Arc::clone(self);
@@ -1943,7 +2009,7 @@ impl Timeline {
let cancel = cancel.child_token();
let ctx = ctx.attached_child();
self_calculation
.calculate_logical_size(lsn, cancel, &ctx)
.calculate_logical_size(lsn, cause, cancel, &ctx)
.await
});
let timeline_state_cancellation = async {
@@ -1998,6 +2064,7 @@ impl Timeline {
pub async fn calculate_logical_size(
&self,
up_to_lsn: Lsn,
cause: LogicalSizeCalculationCause,
cancel: CancellationToken,
ctx: &RequestContext,
) -> Result<u64, CalculateLogicalSizeError> {
@@ -2031,7 +2098,15 @@ impl Timeline {
if let Some(size) = self.current_logical_size.initialized_size(up_to_lsn) {
return Ok(size);
}
let timer = self.metrics.logical_size_histo.start_timer();
let storage_time_metrics = match cause {
LogicalSizeCalculationCause::Initial
| LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize
| LogicalSizeCalculationCause::TenantSizeHandler => &self.metrics.logical_size_histo,
LogicalSizeCalculationCause::EvictionTaskImitation => {
&self.metrics.imitate_logical_size_histo
}
};
let timer = storage_time_metrics.start_timer();
let logical_size = self
.get_current_logical_size_non_incremental(up_to_lsn, cancel, ctx)
.await?;
@@ -2223,6 +2298,46 @@ impl Timeline {
Ok(timeline) => timeline,
Err(e) => return Err(PageReconstructError::from(e)),
};
// It's possible that the ancestor timeline isn't active yet, or
// is active but hasn't yet caught up to the branch point. Wait
// for it.
//
// This cannot happen while the pageserver is running normally,
// because you cannot create a branch from a point that isn't
// present in the pageserver yet. However, we don't wait for the
// branch point to be uploaded to cloud storage before creating
// a branch. I.e., the branch LSN need not be remote consistent
// for the branching operation to succeed.
//
// Hence, if we try to load a tenant in such a state where
// 1. the existence of the branch was persisted (in IndexPart and/or locally)
// 2. but the ancestor state is behind branch_lsn because it was not yet persisted
// then we will need to wait for the ancestor timeline to
// re-stream WAL up to branch_lsn before we access it.
//
// How can a tenant get in such a state?
// - ungraceful pageserver process exit
// - detach+attach => this is a bug, https://github.com/neondatabase/neon/issues/4219
//
// NB: this could be avoided by requiring
// branch_lsn >= remote_consistent_lsn
// during branch creation.
match ancestor.wait_to_become_active(ctx).await {
Ok(()) => {}
Err(state) if state == TimelineState::Stopping => {
return Err(PageReconstructError::AncestorStopping(ancestor.timeline_id));
}
Err(state) => {
return Err(PageReconstructError::Other(anyhow::anyhow!(
"Timeline {} will not become active. Current state: {:?}",
ancestor.timeline_id,
&state,
)));
}
}
ancestor.wait_lsn(timeline.ancestor_lsn, ctx).await?;
timeline_owned = ancestor;
timeline = &*timeline_owned;
prev_lsn = Lsn(u64::MAX);
@@ -2800,6 +2915,30 @@ impl Timeline {
let layers = self.layers.read().unwrap();
let mut max_deltas = 0;
{
let wanted_image_layers = self.wanted_image_layers.lock().unwrap();
if let Some((cutoff_lsn, wanted)) = &*wanted_image_layers {
let img_range =
partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end;
if wanted.overlaps(&img_range) {
//
// gc_timeline only pays attention to image layers that are older than the GC cutoff,
// but create_image_layers creates image layers at last-record-lsn.
// So it's possible that gc_timeline wants a new image layer to be created for a key range,
// but the range is already covered by image layers at more recent LSNs. Before we
// create a new image layer, check if the range is already covered at more recent LSNs.
if !layers
.image_layer_exists(&img_range, &(Lsn::min(lsn, *cutoff_lsn)..lsn + 1))?
{
debug!(
"Force generation of layer {}-{} wanted by GC, cutoff={}, lsn={})",
img_range.start, img_range.end, cutoff_lsn, lsn
);
return Ok(true);
}
}
}
}
for part_range in &partition.ranges {
let image_coverage = layers.image_coverage(part_range, lsn)?;
@@ -2919,6 +3058,12 @@ impl Timeline {
image_layers.push(image_layer);
}
}
// All layers that the GC wanted us to create have now been created.
//
// It's possible that another GC cycle happened while we were compacting, and added
// something new to wanted_image_layers, and we now clear that before processing it.
// That's OK, because the next GC iteration will put it back in.
*self.wanted_image_layers.lock().unwrap() = None;
// Sync the new layer to disk before adding it to the layer map, to make sure
// we don't garbage collect something based on the new layer, before it has
@@ -3522,7 +3667,7 @@ impl Timeline {
/// within a layer file. We can only remove the whole file if it's fully
/// obsolete.
///
pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
pub(super) async fn gc(&self, ctx: &RequestContext) -> anyhow::Result<GcResult> {
let timer = self.metrics.garbage_collect_histo.start_timer();
fail_point!("before-timeline-gc");
@@ -3552,6 +3697,7 @@ impl Timeline {
pitr_cutoff,
retain_lsns,
new_gc_cutoff,
ctx,
)
.instrument(
info_span!("gc_timeline", timeline = %self.timeline_id, cutoff = %new_gc_cutoff),
@@ -3571,6 +3717,7 @@ impl Timeline {
pitr_cutoff: Lsn,
retain_lsns: Vec<Lsn>,
new_gc_cutoff: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<GcResult> {
let now = SystemTime::now();
let mut result: GcResult = GcResult::default();
@@ -3616,6 +3763,16 @@ impl Timeline {
}
let mut layers_to_remove = Vec::new();
let mut wanted_image_layers = KeySpaceRandomAccum::default();
// Do not collect keyspace for Unit tests
let gc_keyspace = if ctx.task_kind() == TaskKind::GarbageCollector {
Some(
self.collect_keyspace(self.get_last_record_lsn(), ctx)
.await?,
)
} else {
None
};
// Scan all layers in the timeline (remote or on-disk).
//
@@ -3699,6 +3856,21 @@ impl Timeline {
"keeping {} because it is the latest layer",
l.filename().file_name()
);
// Collect delta key ranges that need image layers to allow garbage
// collecting the layers.
// It is not so obvious whether we need to propagate information only about
// delta layers. Image layers can form "stairs" preventing old image from been deleted.
// But image layers are in any case less sparse than delta layers. Also we need some
// protection from replacing recent image layers with new one after each GC iteration.
if l.is_incremental() && !LayerMap::is_l0(&*l) {
if let Some(keyspace) = &gc_keyspace {
let layer_logical_size = keyspace.get_logical_size(&l.get_key_range());
let layer_age = new_gc_cutoff.0 - l.get_lsn_range().start.0;
if layer_logical_size <= layer_age {
wanted_image_layers.add_range(l.get_key_range());
}
}
}
result.layers_not_updated += 1;
continue 'outer;
}
@@ -3711,6 +3883,10 @@ impl Timeline {
);
layers_to_remove.push(Arc::clone(&l));
}
self.wanted_image_layers
.lock()
.unwrap()
.replace((new_gc_cutoff, wanted_image_layers.to_keyspace()));
let mut updates = layers.batch_update();
if !layers_to_remove.is_empty() {

View File

@@ -30,7 +30,7 @@ use crate::{
tenant::{
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
storage_layer::PersistentLayer,
Tenant,
LogicalSizeCalculationCause, Tenant,
},
};
@@ -120,6 +120,13 @@ impl Timeline {
}
let elapsed = start.elapsed();
crate::tenant::tasks::warn_when_period_overrun(elapsed, p.period, "eviction");
crate::metrics::EVICTION_ITERATION_DURATION
.get_metric_with_label_values(&[
&format!("{}", p.period.as_secs()),
&format!("{}", p.threshold.as_secs()),
])
.unwrap()
.observe(elapsed.as_secs_f64());
ControlFlow::Continue(start + p.period)
}
}
@@ -335,7 +342,12 @@ impl Timeline {
// imitiate on-restart initial logical size
let size = self
.calculate_logical_size(lsn, cancel.clone(), ctx)
.calculate_logical_size(
lsn,
LogicalSizeCalculationCause::EvictionTaskImitation,
cancel.clone(),
ctx,
)
.instrument(info_span!("calculate_logical_size"))
.await;
@@ -407,9 +419,15 @@ impl Timeline {
.inner();
let mut throwaway_cache = HashMap::new();
let gather =
crate::tenant::size::gather_inputs(tenant, limit, None, &mut throwaway_cache, ctx)
.instrument(info_span!("gather_inputs"));
let gather = crate::tenant::size::gather_inputs(
tenant,
limit,
None,
&mut throwaway_cache,
LogicalSizeCalculationCause::EvictionTaskImitation,
ctx,
)
.instrument(info_span!("gather_inputs"));
tokio::select! {
_ = cancel.cancelled() => {}

View File

@@ -305,6 +305,15 @@ impl<'a> WalIngest<'a> {
self.checkpoint_modified = true;
}
}
} else if decoded.xl_rmid == pg_constants::RM_LOGICALMSG_ID {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_LOGICAL_MESSAGE {
// This is a convenient way to make the WAL ingestion pause at
// particular point in the WAL. For more fine-grained control,
// we could peek into the message and only pause if it contains
// a particular string, for example, but this is enough for now.
utils::failpoint_sleep_millis_async!("wal-ingest-logical-message-sleep");
}
}
// Iterate through all the blocks that the record modifies, and

View File

@@ -52,7 +52,7 @@ typedef struct
#define NEON_TAG "[NEON_SMGR] "
#define neon_log(tag, fmt, ...) ereport(tag, \
(errmsg(NEON_TAG fmt, ##__VA_ARGS__), \
errhidestmt(true), errhidecontext(true), internalerrposition(0)))
errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0)))
/*
* supertype of all the Neon*Request structs below

View File

@@ -21,6 +21,7 @@ FLAKY_TESTS_QUERY = """
jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'name' as suite,
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') -> 'name' as test,
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') -> 'status' as status,
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') -> 'retriesStatusChange' as retries_status_change,
to_timestamp((jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') -> 'time' -> 'start')::bigint / 1000)::date as timestamp
FROM
regress_test_results
@@ -29,7 +30,7 @@ FLAKY_TESTS_QUERY = """
) data
WHERE
timestamp > CURRENT_DATE - INTERVAL '%s' day
AND status::text IN ('"failed"', '"broken"')
AND (status::text IN ('"failed"', '"broken"') OR retries_status_change::boolean)
;
"""

View File

@@ -1,6 +1,5 @@
//
// The script parses Allure reports and posts a comment with a summary of the test results to the PR.
// It accepts an array of items and creates a comment with a summary for each one (for "release" and "debug", together or separately if any of them failed to be generated).
//
// The comment is updated on each run with the latest results.
//
@@ -13,19 +12,37 @@
// github,
// context,
// fetch,
// reports: [{...}, ...], // each report is expected to have "buildType", "reportUrl", and "jsonUrl" properties
// report: {
// reportUrl: "...",
// reportJsonUrl: "...",
// },
// })
//
module.exports = async ({ github, context, fetch, reports }) => {
// Analog of Python's defaultdict.
//
// const dm = new DefaultMap(() => new DefaultMap(() => []))
// dm["firstKey"]["secondKey"].push("value")
//
class DefaultMap extends Map {
constructor(getDefaultValue) {
return new Proxy({}, {
get: (target, name) => name in target ? target[name] : (target[name] = getDefaultValue(name))
})
}
}
module.exports = async ({ github, context, fetch, report }) => {
// Marker to find the comment in the subsequent runs
const startMarker = `<!--AUTOMATIC COMMENT START #${context.payload.number}-->`
// Let users know that the comment is updated automatically
const autoupdateNotice = `<div align="right"><sub>The comment gets automatically updated with the latest test results :recycle:</sub></div>`
// GitHub bot id taken from (https://api.github.com/users/github-actions[bot])
const githubActionsBotId = 41898282
// The latest commit in the PR URL
const commitUrl = `${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/pull/${context.payload.number}/commits/${context.payload.pull_request.head.sha}`
// Commend body itself
let commentBody = `${startMarker}\n### Test results for ${commitUrl}:\n___\n`
let commentBody = `${startMarker}\n`
// Common parameters for GitHub API requests
const ownerRepoParams = {
@@ -33,76 +50,119 @@ module.exports = async ({ github, context, fetch, reports }) => {
repo: context.repo.repo,
}
for (const report of reports) {
const {buildType, reportUrl, jsonUrl} = report
const {reportUrl, reportJsonUrl} = report
if (!reportUrl || !jsonUrl) {
commentBody += `#### ${buildType} build: no tests were run or test report is not available\n`
continue
}
if (!reportUrl || !reportJsonUrl) {
commentBody += `#### No tests were run or test report is not available\n`
commentBody += autoupdateNotice
return
}
const suites = await (await fetch(jsonUrl)).json()
const suites = await (await fetch(reportJsonUrl)).json()
// Allure distinguishes "failed" (with an assertion error) and "broken" (with any other error) tests.
// For this report it's ok to treat them in the same way (as failed).
failedTests = []
passedTests = []
skippedTests = []
// Allure distinguishes "failed" (with an assertion error) and "broken" (with any other error) tests.
// For this report it's ok to treat them in the same way (as failed).
const failedTests = new DefaultMap(() => new DefaultMap(() => []))
const passedTests = new DefaultMap(() => new DefaultMap(() => []))
const skippedTests = new DefaultMap(() => new DefaultMap(() => []))
const retriedTests = new DefaultMap(() => new DefaultMap(() => []))
const flakyTests = new DefaultMap(() => new DefaultMap(() => []))
retriedTests = []
retriedStatusChangedTests = []
let failedTestsCount = 0
let passedTestsCount = 0
let skippedTestsCount = 0
let flakyTestsCount = 0
for (const parentSuite of suites.children) {
for (const suite of parentSuite.children) {
for (const test of suite.children) {
pytestName = `${parentSuite.name.replace(".", "/")}/${suite.name}.py::${test.name}`
test.pytestName = pytestName
const pgVersions = new Set()
const buildTypes = new Set()
if (test.status === "passed") {
passedTests.push(test);
} else if (test.status === "failed" || test.status === "broken") {
failedTests.push(test);
} else if (test.status === "skipped") {
skippedTests.push(test);
}
for (const parentSuite of suites.children) {
for (const suite of parentSuite.children) {
for (const test of suite.children) {
let buildType, pgVersion
const match = test.name.match(/[\[-](?<buildType>debug|release)-pg(?<pgVersion>\d+)[-\]]/)?.groups
if (match) {
({buildType, pgVersion} = match)
} else {
// It's ok, we embed BUILD_TYPE and Postgres Version into the test name only for regress suite and do not for other suites (like performance).
console.info(`Cannot get BUILD_TYPE and Postgres Version from test name: "${test.name}", defaulting to "release" and "14"`)
if (test.retriesCount > 0) {
retriedTests.push(test);
buildType = "release"
pgVersion = "14"
}
if (test.retriedStatusChangedTests) {
retriedStatusChangedTests.push(test);
}
pgVersions.add(pgVersion)
buildTypes.add(buildType)
// Removing build type and PostgreSQL version from the test name to make it shorter
const testName = test.name.replace(new RegExp(`${buildType}-pg${pgVersion}-?`), "").replace("[]", "")
test.pytestName = `${parentSuite.name.replace(".", "/")}/${suite.name}.py::${testName}`
if (test.status === "passed") {
passedTests[pgVersion][buildType].push(test)
passedTestsCount += 1
} else if (test.status === "failed" || test.status === "broken") {
failedTests[pgVersion][buildType].push(test)
failedTestsCount += 1
} else if (test.status === "skipped") {
skippedTests[pgVersion][buildType].push(test)
skippedTestsCount += 1
}
if (test.retriesCount > 0) {
retriedTests[pgVersion][buildType].push(test)
if (test.retriesStatusChange) {
flakyTests[pgVersion][buildType].push(test)
flakyTestsCount += 1
}
}
}
}
}
const totalTestsCount = failedTests.length + passedTests.length + skippedTests.length
commentBody += `#### ${buildType} build: ${totalTestsCount} tests run: ${passedTests.length} passed, ${failedTests.length} failed, ${skippedTests.length} skipped ([full report](${reportUrl}))\n`
if (failedTests.length > 0) {
commentBody += `Failed tests:\n`
for (const test of failedTests) {
const allureLink = `${reportUrl}#suites/${test.parentUid}/${test.uid}`
const totalTestsCount = failedTestsCount + passedTestsCount + skippedTestsCount
commentBody += `### ${totalTestsCount} tests run: ${passedTestsCount} passed, ${failedTestsCount} failed, ${skippedTestsCount} skipped ([full report](${reportUrl}) for ${commitUrl})\n___\n`
commentBody += `- [\`${test.pytestName}\`](${allureLink})`
if (test.retriesCount > 0) {
commentBody += ` (ran [${test.retriesCount + 1} times](${allureLink}/retries))`
// Print test resuls from the newest to the oldest PostgreSQL version for release and debug builds.
for (const pgVersion of Array.from(pgVersions).sort().reverse()) {
for (const buildType of Array.from(buildTypes).sort().reverse()) {
if (failedTests[pgVersion][buildType].length > 0) {
commentBody += `#### PostgreSQL ${pgVersion} (${buildType} build)\n\n`
commentBody += `Failed tests:\n`
for (const test of failedTests[pgVersion][buildType]) {
const allureLink = `${reportUrl}#suites/${test.parentUid}/${test.uid}`
commentBody += `- [\`${test.pytestName}\`](${allureLink})`
if (test.retriesCount > 0) {
commentBody += ` (ran [${test.retriesCount + 1} times](${allureLink}/retries))`
}
commentBody += "\n"
}
commentBody += "\n"
}
commentBody += "\n"
}
if (retriedStatusChangedTests > 0) {
commentBody += `Flaky tests:\n`
for (const test of retriedStatusChangedTests) {
const status = test.status === "passed" ? ":white_check_mark:" : ":x:"
commentBody += `- ${status} [\`${test.pytestName}\`](${reportUrl}#suites/${test.parentUid}/${test.uid}/retries)\n`
}
commentBody += "\n"
}
commentBody += "___\n"
}
if (flakyTestsCount > 0) {
commentBody += "<details>\n<summary>Flaky tests</summary>\n\n"
for (const pgVersion of Array.from(pgVersions).sort().reverse()) {
for (const buildType of Array.from(buildTypes).sort().reverse()) {
if (flakyTests[pgVersion][buildType].length > 0) {
commentBody += `#### PostgreSQL ${pgVersion} (${buildType} build)\n\n`
for (const test of flakyTests[pgVersion][buildType]) {
const status = test.status === "passed" ? ":white_check_mark:" : ":x:"
commentBody += `- ${status} [\`${test.pytestName}\`](${reportUrl}#suites/${test.parentUid}/${test.uid}/retries)\n`
}
commentBody += "\n"
}
}
}
commentBody += "\n</details>\n"
}
commentBody += autoupdateNotice
const { data: comments } = await github.rest.issues.listComments({
issue_number: context.payload.number,
...ownerRepoParams,

View File

@@ -1,5 +1,6 @@
pytest_plugins = (
"fixtures.pg_version",
"fixtures.allure",
"fixtures.neon_fixtures",
"fixtures.benchmark_fixture",
"fixtures.pg_stats",

View File

@@ -0,0 +1,25 @@
import os
import pytest
from fixtures.pg_version import DEFAULT_VERSION, PgVersion
"""
Set of utilities to make Allure report more informative.
- It adds BUILD_TYPE and DEFAULT_PG_VERSION to the test names (only in test_runner/regress)
to make tests distinguishable in Allure report.
"""
@pytest.fixture(scope="function", autouse=True)
def allure_noop():
pass
def pytest_generate_tests(metafunc):
if "test_runner/regress" in metafunc.definition._nodeid:
build_type = os.environ.get("BUILD_TYPE", "DEBUG").lower()
pg_version = PgVersion(os.environ.get("DEFAULT_PG_VERSION", DEFAULT_VERSION))
metafunc.parametrize("allure_noop", [f"{build_type}-pg{pg_version}"])

View File

@@ -451,13 +451,17 @@ def pytest_terminal_summary(
revision = os.getenv("GITHUB_SHA", "local")
platform = os.getenv("PLATFORM", "local")
terminalreporter.section("Benchmark results", "-")
is_header_printed = False
result = []
for test_report in terminalreporter.stats.get("passed", []):
result_entry = []
for _, recorded_property in test_report.user_properties:
if not is_header_printed:
terminalreporter.section("Benchmark results", "-")
is_header_printed = True
terminalreporter.write(
"{}.{}: ".format(test_report.head_line, recorded_property["name"])
)
@@ -485,7 +489,6 @@ def pytest_terminal_summary(
out_dir = config.getoption("out_dir")
if out_dir is None:
warnings.warn("no out dir provided to store performance test results")
return
if not result:

View File

@@ -272,6 +272,7 @@ class PageserverHttpClient(requests.Session):
new_timeline_id: Optional[TimelineId] = None,
ancestor_timeline_id: Optional[TimelineId] = None,
ancestor_start_lsn: Optional[Lsn] = None,
**kwargs,
) -> Dict[Any, Any]:
body: Dict[str, Any] = {
"new_timeline_id": str(new_timeline_id) if new_timeline_id else None,
@@ -281,7 +282,9 @@ class PageserverHttpClient(requests.Session):
if pg_version != PgVersion.NOT_SET:
body["pg_version"] = int(pg_version)
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline", json=body)
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline", json=body, **kwargs
)
self.verbose_error(res)
if res.status_code == 409:
raise Exception(f"could not create timeline: already exists for id {new_timeline_id}")

View File

@@ -46,6 +46,20 @@ class PgVersion(str, enum.Enum):
DEFAULT_VERSION: PgVersion = PgVersion.V14
def skip_on_postgres(version: PgVersion, reason: str):
return pytest.mark.skipif(
PgVersion(os.environ.get("DEFAULT_PG_VERSION", DEFAULT_VERSION)) is version,
reason=reason,
)
def xfail_on_postgres(version: PgVersion, reason: str):
return pytest.mark.xfail(
PgVersion(os.environ.get("DEFAULT_PG_VERSION", DEFAULT_VERSION)) is version,
reason=reason,
)
def pytest_addoption(parser: Parser):
parser.addoption(
"--pg-version",

View File

@@ -0,0 +1,76 @@
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
@pytest.mark.timeout(10000)
def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
"""
Test that GC is able to collect all old layers even if them are forming
"stairs" and there are not three delta layers since last image layer.
Information about image layers needed to collect old layers should
be propagated by GC to compaction task which should take in in account
when make a decision which new image layers needs to be created.
"""
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
tenant_id, _ = env.neon_cli.create_tenant(
conf={
# disable default GC and compaction
"gc_period": "1000 m",
"compaction_period": "0 s",
"gc_horizon": f"{1024 ** 2}",
"checkpoint_distance": f"{1024 ** 2}",
"compaction_target_size": f"{1024 ** 2}",
# set PITR interval to be small, so we can do GC
"pitr_interval": "10 s",
# "compaction_threshold": "3",
# "image_creation_threshold": "2",
}
)
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
timeline_id = endpoint.safe_psql("show neon.timeline_id")[0][0]
n_steps = 10
n_update_iters = 100
step_size = 10000
with endpoint.cursor() as cur:
cur.execute("SET statement_timeout='1000s'")
cur.execute(
"CREATE TABLE t(step bigint, count bigint default 0, payload text default repeat(' ', 100)) with (fillfactor=50)"
)
cur.execute("CREATE INDEX ON t(step)")
# In each step, we insert 'step_size' new rows, and update the newly inserted rows
# 'n_update_iters' times. This creates a lot of churn and generates lots of WAL at the end of the table,
# without modifying the earlier parts of the table.
for step in range(n_steps):
cur.execute(f"INSERT INTO t (step) SELECT {step} FROM generate_series(1, {step_size})")
for i in range(n_update_iters):
cur.execute(f"UPDATE t set count=count+1 where step = {step}")
cur.execute("vacuum t")
# cur.execute("select pg_table_size('t')")
# logical_size = cur.fetchone()[0]
logical_size = client.timeline_detail(tenant_id, timeline_id)["current_logical_size"]
log.info(f"Logical storage size {logical_size}")
client.timeline_checkpoint(tenant_id, timeline_id)
# Do compaction and GC
client.timeline_gc(tenant_id, timeline_id, 0)
client.timeline_compact(tenant_id, timeline_id)
# One more iteration to check that no excessive image layers are generated
client.timeline_gc(tenant_id, timeline_id, 0)
client.timeline_compact(tenant_id, timeline_id)
physical_size = client.timeline_detail(tenant_id, timeline_id)["current_physical_size"]
log.info(f"Physical storage size {physical_size}")
MB = 1024 * 1024
zenbenchmark.record("logical_size", logical_size // MB, "Mb", MetricReport.LOWER_IS_BETTER)
zenbenchmark.record("physical_size", physical_size // MB, "Mb", MetricReport.LOWER_IS_BETTER)
zenbenchmark.record(
"physical/logical ratio", physical_size / logical_size, "", MetricReport.LOWER_IS_BETTER
)

View File

@@ -16,7 +16,7 @@ from fixtures.neon_fixtures import (
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.pg_version import PgVersion
from fixtures.pg_version import PgVersion, skip_on_postgres
from fixtures.types import Lsn
from pytest import FixtureRequest
@@ -41,12 +41,15 @@ check_ondisk_data_compatibility_if_enabled = pytest.mark.skipif(
)
# Note: if renaming this test, don't forget to update a reference to it in a workflow file:
# "Upload compatibility snapshot" step in .github/actions/run-python-test-set/action.yml
@check_ondisk_data_compatibility_if_enabled
@skip_on_postgres(PgVersion.V15, "Compatibility tests doesn't support Postgres 15 yet")
@pytest.mark.xdist_group("compatibility")
@pytest.mark.order(before="test_forward_compatibility")
def test_create_snapshot(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, test_output_dir: Path):
def test_create_snapshot(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
top_output_dir: Path,
test_output_dir: Path,
):
# The test doesn't really test anything
# it creates a new snapshot for releases after we tested the current version against the previous snapshot in `test_backward_compatibility`.
#
@@ -86,10 +89,14 @@ def test_create_snapshot(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, test_o
sk.stop()
env.pageserver.stop()
shutil.copytree(test_output_dir, test_output_dir / "compatibility_snapshot_pg14")
# Directory `test_output_dir / "compatibility_snapshot_pg14"` is uploaded to S3 in a workflow, keep the name in sync with it
# Directory `compatibility_snapshot_dir` is uploaded to S3 in a workflow, keep the name in sync with it
compatibility_snapshot_dir = top_output_dir / "compatibility_snapshot_pg14"
if compatibility_snapshot_dir.exists():
shutil.rmtree(compatibility_snapshot_dir)
shutil.copytree(test_output_dir, compatibility_snapshot_dir)
@skip_on_postgres(PgVersion.V15, "Compatibility tests doesn't support Postgres 15 yet")
@check_ondisk_data_compatibility_if_enabled
@pytest.mark.xdist_group("compatibility")
@pytest.mark.order(after="test_create_snapshot")
@@ -148,11 +155,13 @@ def test_backward_compatibility(
), "Breaking changes are allowed by ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE, but the test has passed without any breakage"
@skip_on_postgres(PgVersion.V15, "Compatibility tests doesn't support Postgres 15 yet")
@check_ondisk_data_compatibility_if_enabled
@pytest.mark.xdist_group("compatibility")
@pytest.mark.order(after="test_create_snapshot")
def test_forward_compatibility(
test_output_dir: Path,
top_output_dir: Path,
port_distributor: PortDistributor,
pg_version: PgVersion,
request: FixtureRequest,
@@ -174,9 +183,7 @@ def test_forward_compatibility(
), "COMPATIBILITY_POSTGRES_DISTRIB_DIR is not set. It should be set to a pg_install directrory (ideally generated by the previous version of Neon)"
compatibility_postgres_distrib_dir = Path(compatibility_postgres_distrib_dir_env).resolve()
compatibility_snapshot_dir = (
test_output_dir.parent / "test_create_snapshot" / "compatibility_snapshot_pg14"
)
compatibility_snapshot_dir = top_output_dir / "compatibility_snapshot_pg14"
breaking_changes_allowed = (
os.environ.get("ALLOW_FORWARD_COMPATIBILITY_BREAKAGE", "false").lower() == "true"

View File

@@ -136,9 +136,7 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> Ev
env.pageserver.allowed_errors.append(r".* running disk usage based eviction due to pressure.*")
# remove the initial tenant
## why wait for upload queue? => https://github.com/neondatabase/neon/issues/3865
assert env.initial_timeline
wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, env.initial_timeline)
pageserver_http.tenant_detach(env.initial_tenant)
assert isinstance(env.remote_storage, LocalFsStorage)
tenant_remote_storage = env.remote_storage.root / "tenants" / str(env.initial_tenant)

View File

@@ -1,7 +1,9 @@
import pytest
from fixtures.neon_fixtures import NeonEnv
from fixtures.pg_version import PgVersion, xfail_on_postgres
@xfail_on_postgres(PgVersion.V15, reason="https://github.com/neondatabase/neon/pull/4182")
@pytest.mark.timeout(1800)
def test_hot_standby(neon_simple_env: NeonEnv):
env = neon_simple_env

View File

@@ -5,6 +5,7 @@ from pathlib import Path
import pytest
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
from fixtures.pg_version import PgVersion, xfail_on_postgres
# Run the main PostgreSQL regression tests, in src/test/regress.
@@ -71,6 +72,7 @@ def test_pg_regress(
#
# This runs for a long time, especially in debug mode, so use a larger-than-default
# timeout.
@xfail_on_postgres(PgVersion.V15, reason="https://github.com/neondatabase/neon/pull/4213")
@pytest.mark.timeout(1800)
def test_isolation(
neon_simple_env: NeonEnv,

View File

@@ -2,11 +2,12 @@
# env NEON_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/neon_zzz/'}" poetry ......
import os
import queue
import shutil
import threading
import time
from pathlib import Path
from typing import Dict, List, Tuple
from typing import Dict, List, Optional, Tuple
import pytest
from fixtures.log_helper import log
@@ -26,6 +27,7 @@ from fixtures.pageserver.utils import (
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import print_gc_result, query_scalar, wait_until
from requests import ReadTimeout
#
@@ -626,10 +628,7 @@ def test_empty_branch_remote_storage_upload(
new_branch_name = "new_branch"
new_branch_timeline_id = env.neon_cli.create_branch(new_branch_name, "main", env.initial_tenant)
with env.endpoints.create_start(new_branch_name, tenant_id=env.initial_tenant) as endpoint:
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, new_branch_timeline_id)
wait_upload_queue_empty(client, env.initial_tenant, new_branch_timeline_id)
assert_nothing_to_upload(client, env.initial_tenant, new_branch_timeline_id)
timelines_before_detach = set(
map(
@@ -658,13 +657,19 @@ def test_empty_branch_remote_storage_upload(
), f"Expected to have same timelines after reattach, but got {timelines_after_detach}"
# Branches off a root branch, but does not write anything to the new branch, so it has a metadata file only.
# Ensures the branch is not on the remote storage and restarts the pageserver — the branch should be uploaded after the restart.
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
def test_empty_branch_remote_storage_upload_on_restart(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
"""
Branches off a root branch, but does not write anything to the new branch, so
it has a metadata file only.
Ensures the branch is not on the remote storage and restarts the pageserver
— the upload should be scheduled by load, and create_timeline should await
for it even though it gets 409 Conflict.
"""
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_empty_branch_remote_storage_upload_on_restart",
@@ -673,35 +678,87 @@ def test_empty_branch_remote_storage_upload_on_restart(
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
new_branch_name = "new_branch"
new_branch_timeline_id = env.neon_cli.create_branch(new_branch_name, "main", env.initial_tenant)
client.configure_failpoints(("before-upload-index", "return"))
with env.endpoints.create_start(new_branch_name, tenant_id=env.initial_tenant) as endpoint:
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, new_branch_timeline_id)
wait_upload_queue_empty(client, env.initial_tenant, new_branch_timeline_id)
new_branch_timeline_id = TimelineId.generate()
with pytest.raises(ReadTimeout):
client.timeline_create(
tenant_id=env.initial_tenant,
ancestor_timeline_id=env.initial_timeline,
new_timeline_id=new_branch_timeline_id,
pg_version=env.pg_version,
timeout=4,
)
env.pageserver.allowed_errors.append(
f".*POST.* path=/v1/tenant/{env.initial_tenant}/timeline.* request was dropped before completing"
)
# index upload is now hitting the failpoint, should not block the shutdown
env.pageserver.stop()
# Remove new branch from the remote storage
assert isinstance(env.remote_storage, LocalFsStorage)
new_branch_on_remote_storage = (
env.remote_storage.root
/ "tenants"
/ str(env.initial_tenant)
/ "timelines"
/ str(new_branch_timeline_id)
timeline_path = (
Path("tenants") / str(env.initial_tenant) / "timelines" / str(new_branch_timeline_id)
)
assert (
new_branch_on_remote_storage.is_dir()
), f"'{new_branch_on_remote_storage}' path does not exist on the remote storage"
shutil.rmtree(new_branch_on_remote_storage)
env.pageserver.start()
local_metadata = env.repo_dir / timeline_path / "metadata"
assert local_metadata.is_file(), "timeout cancelled timeline branching, not the upload"
wait_upload_queue_empty(client, env.initial_tenant, new_branch_timeline_id)
assert isinstance(env.remote_storage, LocalFsStorage)
new_branch_on_remote_storage = env.remote_storage.root / timeline_path
assert (
new_branch_on_remote_storage.is_dir()
), f"New branch should have been reuploaded on pageserver restart to the remote storage path '{new_branch_on_remote_storage}'"
not new_branch_on_remote_storage.exists()
), "failpoint should had prohibited index_part.json upload"
# during reconciliation we should had scheduled the uploads and on the
# retried create_timeline, we will await for those to complete on next
# client.timeline_create
env.pageserver.start(extra_env_vars={"FAILPOINTS": "before-upload-index=return"})
# sleep a bit to force the upload task go into exponential backoff
time.sleep(1)
q: queue.Queue[Optional[PageserverApiException]] = queue.Queue()
barrier = threading.Barrier(2)
def create_in_background():
barrier.wait()
try:
client.timeline_create(
tenant_id=env.initial_tenant,
ancestor_timeline_id=env.initial_timeline,
new_timeline_id=new_branch_timeline_id,
pg_version=env.pg_version,
)
q.put(None)
except PageserverApiException as e:
q.put(e)
create_thread = threading.Thread(target=create_in_background)
create_thread.start()
try:
# maximize chances of actually waiting for the uploads by create_timeline
barrier.wait()
assert not new_branch_on_remote_storage.exists(), "failpoint should had stopped uploading"
client.configure_failpoints(("before-upload-index", "off"))
conflict = q.get()
assert conflict, "create_timeline should not have succeeded"
assert (
conflict.status_code == 409
), "timeline was created before restart, and uploads scheduled during initial load, so we expect 409 conflict"
assert_nothing_to_upload(client, env.initial_tenant, new_branch_timeline_id)
assert (
new_branch_on_remote_storage / "index_part.json"
).is_file(), "uploads scheduled during initial load should had been awaited for"
finally:
create_thread.join()
def wait_upload_queue_empty(
@@ -752,4 +809,17 @@ def get_queued_count(
return int(val)
def assert_nothing_to_upload(
client: PageserverHttpClient,
tenant_id: TenantId,
timeline_id: TimelineId,
):
"""
Check last_record_lsn == remote_consistent_lsn. Assert works only for empty timelines, which
do not have anything to compact or gc.
"""
detail = client.timeline_detail(tenant_id, timeline_id)
assert Lsn(detail["last_record_lsn"]) == Lsn(detail["remote_consistent_lsn"])
# TODO Test that we correctly handle GC of files that are stuck in upload queue.

View File

@@ -1,5 +1,7 @@
import os
import shutil
import threading
import time
from contextlib import closing, contextmanager
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
@@ -12,6 +14,8 @@ from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PortDistributor,
RemoteStorageKind,
available_remote_storages,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import (
@@ -512,3 +516,225 @@ def test_tenant_relocation(
if line.startswith("listen_pg_addr"):
lines[i] = f"listen_pg_addr = 'localhost:{env.pageserver.service_port.pg}'"
(env.repo_dir / "config").write_text("\n".join(lines))
# Simulate hard crash of pageserver and re-attach a tenant with a branch
#
# This exercises a race condition after tenant attach, where the
# branch point on the ancestor timeline is greater than the ancestor's
# last-record LSN. We had a bug where GetPage incorrectly followed the
# timeline to the ancestor without waiting for the missing WAL to
# arrive.
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_emergency_relocate_with_branches_slow_replay(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_emergency_relocate_with_branches_slow_replay",
)
env = neon_env_builder.init_start()
env.pageserver.is_testing_enabled_or_skip()
pageserver_http = env.pageserver.http_client()
# Prepare for the test:
#
# - Main branch, with a table and two inserts to it.
# - A logical replication message between the inserts, so that we can conveniently
# pause the WAL ingestion between the two inserts.
# - Child branch, created after the inserts
tenant_id, _ = env.neon_cli.create_tenant()
main_endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
with main_endpoint.cursor() as cur:
cur.execute("CREATE TABLE test_reattach (t text)")
cur.execute("INSERT INTO test_reattach VALUES ('before pause')")
cur.execute("SELECT pg_logical_emit_message(false, 'neon-test', 'between inserts')")
cur.execute("INSERT INTO test_reattach VALUES ('after pause')")
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
main_endpoint.stop()
env.neon_cli.create_branch("child", tenant_id=tenant_id, ancestor_start_lsn=current_lsn)
# Now kill the pageserver, remove the tenant directory, and restart. This simulates
# the scenario that a pageserver dies unexpectedly and cannot be recovered, so we relocate
# the tenant to a different pageserver. We reuse the same pageserver because it's
# simpler than initializing a new one from scratch, but the effect on the single tenant
# is the same.
env.pageserver.stop(immediate=True)
shutil.rmtree(Path(env.repo_dir) / "tenants" / str(tenant_id))
env.pageserver.start()
# This fail point will pause the WAL ingestion on the main branch, after the
# the first insert
pageserver_http.configure_failpoints([("wal-ingest-logical-message-sleep", "return(5000)")])
# Attach and wait a few seconds to give it time to load the tenants, attach to the
# safekeepers, and to stream and ingest the WAL up to the pause-point.
before_attach_time = time.time()
pageserver_http.tenant_attach(tenant_id)
time.sleep(3)
# The wal ingestion on the main timeline should now be paused at the fail point.
# Run a query on the child branch. The GetPage requests for this should recurse to the
# parent timeline, and wait for the WAL to be ingested there. Otherwise it won't see
# the second insert.
child_endpoint = env.endpoints.create_start("child", tenant_id=tenant_id)
with child_endpoint.cursor() as cur:
cur.execute("SELECT * FROM test_reattach")
assert cur.fetchall() == [("before pause",), ("after pause",)]
# Sanity check that the failpoint was reached
assert env.pageserver.log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done')
assert time.time() - before_attach_time > 5
# Clean up
pageserver_http.configure_failpoints(("wal-ingest-logical-message-sleep", "off"))
# Simulate hard crash of pageserver and re-attach a tenant with a branch
#
# This exercises the same race condition after as
# 'test_emergency_relocate_with_branches_slow_replay', but this test case
# is closer to the original scenario where we originally found the
# issue.
#
# In this scenario, the incorrect result to get-request leads to
# *permanent damage* in the child timeline, because ingesting the WAL
# on the child timeline depended on incorrect view of the parent. This
# test reproduced one such case; the symptom was an error on the child, when
# trying to connect to the child endpoint after re-attaching the tenant:
#
# FATAL: database "neondb" does not exist
#
# In the original case where we bumped into this, the error was slightly
# different:
#
# FATAL: "base/16385" is not a valid data directory
# DETAIL: File "base/16385/PG_VERSION" is missing.
#
### Detailed explanation of the original bug and why it lead to that error:
#
# The WAL on the main and the child branches look like this:
#
# Main Child
# 1. CREATE DATABASE
# <child branch is created>
# 2. CREATE TABLE AS SELECT ... 3. CREATE TABLE AS SELECT ...
#
# None of these WAL records have been flushed to disk or uploaded to remote
# storage in the pageserver yet, when the tenant is detached.
#
# After detach and re-attach, a walreceiver is spawned on both timelines.
# They will connect to the safekeepers and start ingesting the WAL
# from their respective IndexParts' `disk_consistent_lsn` onward.
#
# The bug occurs if the child branch's walreceiver runs before the
# main's. It receives the SMGR_CREATE WAL record emitted by the
# CREATE TABLE statement (3.), and applies it, without seeing the
# effect of the CREATE DATABASE statement.
#
# To understand why that leads to a 'File "base/16385/PG_VERSION" is
# missing' error, let's look at what the handlers for the WAL records
# do:
#
# CREATE DATABASE WAL record is handled by ingest_xlog_dbase_create:
#
# ingest_xlog_dbase_create:
# put_relmap_file()
# // NOTE 'true': It means that there is a relmapper and PG_VERSION file
# 1: let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
#
#
# CREATE TABLE emits an SMGR_CREATE record, which is handled by:
#
# ingest_xlog_smgr_create:
# put_rel_creation:
# ...
# let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() {
# 2: // Didn't exist. Update dbdir
# dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false);
# let buf = DbDirectory::ser(&dbdir)?;
# self.put(DBDIR_KEY, Value::Image(buf.into()));
#
# // and create the RelDirectory
# RelDirectory::default()
# } else {
# 3: // reldir already exists, fetch it
# RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?
# };
#
#
# In the correct ordering, the SMGR_CREATE record is applied after the
# CREATE DATABASE record. The CREATE DATABASE creates the entry in the
# 'dbdir', with the 'true' flag that indicates that PG_VERSION exists
# (1). The SMGR_CREATE handler calls put_rel_creation, which finds the
# dbdir entry that the CREATE DATABASE record created, and takes the
# "reldir already exists, fetch it" else-branch at the if statement (3).
#
# In the incorrect ordering, the child walreceiver applies the
# SMGR_CREATE record without seeing the effects of the CREATE
# DATABASE. In that case, put_rel_creation takes the "Didn't
# exist. Update dbir" path (2), and inserts an entry in the
# DbDirectory with 'false' to indicate there is no PG_VERSION file.
#
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_emergency_relocate_with_branches_createdb(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_emergency_relocate_with_branches_createdb",
)
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
# create new nenant
tenant_id, _ = env.neon_cli.create_tenant()
main_endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
with main_endpoint.cursor() as cur:
cur.execute("SELECT pg_logical_emit_message(false, 'neon-test', 'between inserts')")
cur.execute("CREATE DATABASE neondb")
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
env.neon_cli.create_branch("child", tenant_id=tenant_id, ancestor_start_lsn=current_lsn)
with main_endpoint.cursor(dbname="neondb") as cur:
cur.execute("CREATE TABLE test_migrate_one AS SELECT generate_series(1,100)")
main_endpoint.stop()
child_endpoint = env.endpoints.create_start("child", tenant_id=tenant_id)
with child_endpoint.cursor(dbname="neondb") as cur:
cur.execute("CREATE TABLE test_migrate_one AS SELECT generate_series(1,200)")
child_endpoint.stop()
# Kill the pageserver, remove the tenant directory, and restart
env.pageserver.stop(immediate=True)
shutil.rmtree(Path(env.repo_dir) / "tenants" / str(tenant_id))
env.pageserver.start()
# Wait before ingesting the WAL for CREATE DATABASE on the main branch. The original
# bug reproduced easily even without this, as there is always some delay between
# loading the timeline and establishing the connection to the safekeeper to stream and
# ingest the WAL, but let's make this less dependent on accidental timing.
pageserver_http.configure_failpoints([("wal-ingest-logical-message-sleep", "return(5000)")])
before_attach_time = time.time()
pageserver_http.tenant_attach(tenant_id)
child_endpoint.start()
with child_endpoint.cursor(dbname="neondb") as cur:
assert query_scalar(cur, "SELECT count(*) FROM test_migrate_one") == 200
# Sanity check that the failpoint was reached
assert env.pageserver.log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done')
assert time.time() - before_attach_time > 5
# Clean up
pageserver_http.configure_failpoints(("wal-ingest-logical-message-sleep", "off"))

View File

@@ -11,6 +11,7 @@ from fixtures.neon_fixtures import (
wait_for_wal_insert_lsn,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pg_version import PgVersion, xfail_on_postgres
from fixtures.types import Lsn, TenantId, TimelineId
@@ -512,6 +513,7 @@ def test_single_branch_get_tenant_size_grows(
assert size_after == prev, "size after restarting pageserver should not have changed"
@xfail_on_postgres(PgVersion.V15, reason="Test significantly more flaky on Postgres 15")
def test_get_tenant_size_with_multiple_branches(
neon_env_builder: NeonEnvBuilder, test_output_dir: Path
):

View File

@@ -346,31 +346,20 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
failpoint_name = "persist_index_part_with_deleted_flag_after_set_before_upload_pause"
ps_http.configure_failpoints((failpoint_name, "pause"))
def delete_timeline_call(name, result_queue, barrier):
if barrier:
barrier.wait()
def first_call(result_queue):
try:
log.info(f"{name} call start")
log.info("first call start")
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=10)
log.info(f"{name} call success")
log.info("first call success")
result_queue.put("success")
except Exception:
log.exception(f"{name} call failed")
log.exception("first call failed")
result_queue.put("failure, see log for stack trace")
delete_results: queue.Queue[str] = queue.Queue()
first_call_thread = threading.Thread(
target=delete_timeline_call,
args=(
"1st",
delete_results,
None,
),
)
first_call_result: queue.Queue[str] = queue.Queue()
first_call_thread = threading.Thread(target=first_call, args=(first_call_result,))
first_call_thread.start()
second_call_thread = None
try:
def first_call_hit_failpoint():
@@ -380,42 +369,30 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
wait_until(50, 0.1, first_call_hit_failpoint)
barrier = threading.Barrier(2)
second_call_thread = threading.Thread(
target=delete_timeline_call,
args=(
"2nd",
delete_results,
barrier,
),
)
second_call_thread.start()
barrier.wait()
# release the pause
ps_http.configure_failpoints((failpoint_name, "off"))
# both should had succeeded: the second call will coalesce with the already-ongoing first call
result = delete_results.get()
assert result == "success"
result = delete_results.get()
assert result == "success"
# the second call will try to transition the timeline into Stopping state, but it's already in that state
# (the transition to Stopping state is not part of the request coalescing, because Tenant and Timeline states are a mess already)
# make the second call and assert behavior
log.info("second call start")
error_msg_re = "another task is already setting the deleted_flag, started at"
with pytest.raises(PageserverApiException, match=error_msg_re) as second_call_err:
ps_http.timeline_delete(env.initial_tenant, child_timeline_id)
assert second_call_err.value.status_code == 500
env.pageserver.allowed_errors.append(f".*{child_timeline_id}.*{error_msg_re}.*")
# the second call will try to transition the timeline into Stopping state as well
env.pageserver.allowed_errors.append(
f".*{child_timeline_id}.*Ignoring new state, equal to the existing one: Stopping"
)
log.info("second call failed as expected")
# by now we know that the second call failed, let's ensure the first call will finish
ps_http.configure_failpoints((failpoint_name, "off"))
result = first_call_result.get()
assert result == "success"
finally:
log.info("joining 1st thread")
log.info("joining first call thread")
# in any case, make sure the lifetime of the thread is bounded to this test
first_call_thread.join()
if second_call_thread:
log.info("joining 2nd thread")
second_call_thread.join()
def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
"""
@@ -463,16 +440,9 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
# ok, retry without failpoint, it should succeed
ps_http.configure_failpoints((failpoint_name, "off"))
try:
ps_http.timeline_delete(env.initial_tenant, child_timeline_id)
env.pageserver.allowed_errors.append(
f".*{child_timeline_id}.*Ignoring new state, equal to the existing one: Stopping"
)
except PageserverApiException as e:
if e.status_code != 404:
raise e
else:
# mock_s3 was fast enough to delete before we got the request in
env.pageserver.allowed_errors.append(
f".*{child_timeline_id}.*Error processing HTTP request: NotFound: timeline not found"
)
# this should succeed
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2)
# the second call will try to transition the timeline into Stopping state, but it's already in that state
env.pageserver.allowed_errors.append(
f".*{child_timeline_id}.*Ignoring new state, equal to the existing one: Stopping"
)

View File

@@ -1,14 +1,20 @@
import sys
from pathlib import Path
import pytest
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
PortDistributor,
VanillaPostgres,
)
from fixtures.types import TenantId
from fixtures.types import TenantId, TimelineId
@pytest.mark.skipif(
sys.platform != "linux",
reason="restore_from_wal.sh supports only Linux",
)
def test_wal_restore(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
@@ -22,6 +28,7 @@ def test_wal_restore(
endpoint = env.endpoints.create_start("test_wal_restore")
endpoint.safe_psql("create table t as select generate_series(1,300000)")
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
env.neon_cli.pageserver_stop()
port = port_distributor.get_port()
data_dir = test_output_dir / "pgsql.restored"
@@ -30,9 +37,16 @@ def test_wal_restore(
) as restored:
pg_bin.run_capture(
[
str(base_dir / "libs/utils/scripts/restore_from_wal.sh"),
str(base_dir / "libs" / "utils" / "scripts" / "restore_from_wal.sh"),
str(pg_distrib_dir / f"v{env.pg_version}/bin"),
str(test_output_dir / "repo" / "safekeepers" / "sk1" / str(tenant_id) / "*"),
str(
test_output_dir
/ "repo"
/ "safekeepers"
/ "sk1"
/ str(tenant_id)
/ str(timeline_id)
),
str(data_dir),
str(port),
]