Compare commits

..

40 Commits

Author SHA1 Message Date
John Spray
1e392a00e4 pageserver: add GcInfo::notify_child_keyspace 2024-07-06 19:51:57 +01:00
John Spray
3a6101bd21 pageserver: initialize layer visbility opportunistically 2024-07-06 19:51:57 +01:00
John Spray
4dbb8f4c18 pageserver: exclude covered layers from heatmap 2024-07-06 19:51:57 +01:00
John Spray
d7bca9fcdb pageserver: implementation of update_layer_visibility 2024-07-06 19:51:57 +01:00
John Spray
c78c810118 pageserver: move out common compaction step 2024-07-06 18:38:49 +01:00
John Spray
b9f1fa5edb pageserver: add LayerVisibility 2024-07-06 18:38:49 +01:00
John Spray
b874f1dc94 pageserver: maintain GcInfo incrementally (2/2) 2024-07-06 16:08:24 +01:00
John Spray
8b6e076983 pageserver: hold child timeline IDs in retain_lsns 2024-07-06 16:08:24 +01:00
John Spray
9d042caa0d pageserver: maintain GcInfo incrementally 2024-07-06 16:08:24 +01:00
John Spray
a33b3d93f4 pageserver: ordered entries in sparse keyspace 2024-07-06 16:08:24 +01:00
Arpad Müller
0a937b7f91 Add concurrency to the find-large-objects scrubber subcommand (#8291)
The find-large-objects scrubber subcommand is quite fast if you run it
in an environment with low latency to the S3 bucket (say an EC2 instance
in the same region). However, the higher the latency gets, the slower
the command becomes. Therefore, add a concurrency param and make it
parallelized. This doesn't change that general relationship, but at
least lets us do multiple requests in parallel and therefore hopefully
faster.

Running with concurrency of 64 (default):

```
2024-07-05T17:30:22.882959Z  INFO lazy_load_identity [...]
[...]
2024-07-05T17:30:28.289853Z  INFO Scanned 500 shards. [...]
```

With concurrency of 1, simulating state before this PR:

```
2024-07-05T17:31:43.375153Z  INFO lazy_load_identity [...]
[...]
2024-07-05T17:33:51.987092Z  INFO Scanned 500 shards. [...]
```

In other words, to list 500 shards, speed is increased from 2:08 minutes
to 6 seconds.

Follow-up of  #8257, part of #5431
2024-07-05 21:36:28 +01:00
Arpad Müller
b8d031cd0c Improve parsing of ImageCompressionAlgorithm (#8281)
Improve parsing of the `ImageCompressionAlgorithm` enum to allow level
customization like `zstd(1)`, as strum only takes `Default::default()`,
i.e. `None` as the level.

Part of #5431
2024-07-05 20:18:05 +00:00
Christian Schwarz
f0d29a0f3e pageserver_live_connections: track as counter pair (#8227)
Generally counter pairs are preferred over gauges.
In this case, I found myself asking what the typical rate of accepted
page_service connections on a pageserver is, and I couldn't answer it
with the gauge metric.

There are a few dashboards using this metric:

https://github.com/search?q=repo%3Aneondatabase%2Fgrafana-dashboard-export%20pageserver_live_connections&type=code

I'll convert them to use the new metric once this PR reaches prod.

refs https://github.com/neondatabase/neon/issues/7427
2024-07-05 21:17:05 +01:00
Konstantin Knizhnik
13522fb722 Increase timeout for wating subscriber caught-up (#8118)
## Problem

test_subscriber_restart has quit large failure rate'

https://neonprod.grafana.net/d/fddp4rvg7k2dcf/regression-test-failures?orgId=1&var-test_name=test_subscriber_restart&var-max_count=100&var-restrict=false

I can be caused by too small timeout (5 seconds) to wait until changes
are propagated.

Related to #8097

## Summary of changes

Increase timeout to 30 seconds.

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2024-07-05 20:39:10 +03:00
Alexander Bayandin
c9fd8d7693 SELECT 💣(); (#8270)
## Problem
We want to be able to test how our infrastructure reacts on segfaults in
Postgres (for example, we collect cores, and get some required
logs/metrics, etc)

## Summary of changes
- Add `trigger_segfauls` function to `neon_test_utils` to trigger a
segfault in Postgres
- Add `trigger_panic` function to `neon_test_utils` to trigger SIGABRT
(by using `elog(PANIC, ...))
- Fix cleanup logic in regression tests in endpoint crashed
2024-07-05 15:12:01 +01:00
Vlad Lazar
7dd2e447d3 pageserver: add time based image layer creation check (#8247)
## Problem
Assume a timeline with the following workload: very slow ingest of
updates to a small number of keys that fit within the same partition (as decided by
`KeySpace::partition`). These tenants will create small L0 layers since due to time 
based rolling, and, consequently, the L1 layers will also be small.

Currently, by default, we need to ingest 512 MiB of WAL before checking
if an image layer is required. This scheme works fine under the assumption that L1s are roughly of
checkpoint distance size, but as the first paragraph explained, that's not the case for all workloads.

## Summary of changes
Check if new image layers are required at least once every checkpoint timeout interval.
2024-07-05 14:02:02 +01:00
John Spray
6849ae4810 safekeeper: add separate tombstones map for deleted timelines (#8253)
## Problem

Safekeepers left running for a long time use a lot of memory (up to the
point of OOMing, on small nodes) for deleted timelines, because the
`Timeline` struct is kept alive as a guard against recreating deleted
timelines.

Closes: https://github.com/neondatabase/neon/issues/6810

## Summary of changes

- Create separate tombstones that just record a ttid and when the
timeline was deleted.
- Add a periodic housekeeping task that cleans up tombstones older than
a hardcoded TTL (24h)

I think this also makes https://github.com/neondatabase/neon/pull/6766
un-needed, as the tombstone is also checked during deletion.

I considered making the overall timeline map use an enum type containing
active or deleted, but having a separate map of tombstones avoids
bloating that map, so that calls like `get()` can still go straight to a
timeline without having to walk a hashmap that also contains tombstones.
2024-07-05 11:17:44 +01:00
John Spray
5aae80640b tests: make location_conf_churn more robust (#8271)
## Problem

This test directly manages locations on pageservers and configuration of
an endpoint. However, it did not switch off the parts of the storage
controller that attempt to do the same: occasionally, the test would
fail in a strange way such as a compute failing to accept a
reconfiguration request.

## Summary of changes

- Wire up the storage controller's compute notification hook to a no-op
handler
- Configure the tenant's scheduling policy to Stop.
2024-07-05 10:34:16 +01:00
Peter Bendel
6876f0d066 correct error handling for periodic pagebench runner status (#8274)
## Problem

the following periodic pagebench run was failed but was still shown as
successful


https://github.com/neondatabase/neon/actions/runs/9798909458/job/27058179993#step:9:47

## Summary of changes

if the ec2 test runner reports a failure fail the job step and thus the
workflow

---------

Co-authored-by: Alexander Bayandin <alexander@neon.tech>
2024-07-05 10:23:46 +01:00
John Spray
e25ac31fc9 tests: extend allow list in deletion test (#8268)
## Problem

1ea5d8b132 tolerated this as an error
message, but it can show up in logs as well.

Example failure:
https://neon-github-public-dev.s3.amazonaws.com/reports/pr-8201/9780147712/index.html#testresult/263422f5f5f292ea/retries

## Summary of changes

- Tolerate "failed to delete 1 objects" in pageserver logs, this occurs
occasionally when injected failures exhaust deletion's retries.
2024-07-05 10:09:15 +01:00
Peter Bendel
711716c725 add checkout depth1 to workflow to access local github actions like generate allure report (#8259)
## Problem

job step to create allure report fails


https://github.com/neondatabase/neon/actions/runs/9781886710/job/27006997416#step:11:1

## Summary of changes

Shallow checkout of sources to get access to local github action needed
in the job step

## Example run 
example run with this change
https://github.com/neondatabase/neon/actions/runs/9790647724
do not merge this PR until the job is clean

---------

Co-authored-by: Alexander Bayandin <alexander@neon.tech>
2024-07-04 22:17:45 +02:00
Konstantin Knizhnik
88b13d4552 implement rolling hyper-log-log algorithm (#8068)
## Problem

See #7466

## Summary of changes

Implement algorithm descried in
https://hal.science/hal-00465313/document

Now new GUC is added:
`neon.wss_max_duration` which specifies size of sliding window (in
seconds). Default value is 1 hour.

It is possible to request estimation of working set sizes (within this
window using new function
`approximate_working_set_size_seconds`. Old function
`approximate_working_set_size` is preserved for backward compatibility.
But its scope is also limited by `neon.wss_max_duration`.

Version of Neon extension is changed to 1.4

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
Co-authored-by: Matthias van de Meent <matthias@neon.tech>
2024-07-04 22:03:58 +03:00
Arpad Müller
adde0ecfe0 Flatten compression algorithm setting (#8265)
This flattens the compression algorithm setting, removing the
`Option<_>` wrapping layer and making handling of the setting easier.

It also adds a specific setting for *disabled* compression with the
continued ability to read copmressed data, giving us the option to
more easily back out of a compression rollout, should the need arise,
which was one of the limitations of #8238.

Implements my suggestion from
https://github.com/neondatabase/neon/pull/8238#issuecomment-2206181594 ,
inspired by Christian's review in
https://github.com/neondatabase/neon/pull/8238#pullrequestreview-2156460268 .

Part of #5431
2024-07-04 16:59:19 +00:00
Yuchen Liang
19accfee4e feat(pageserver): integrate lsn lease into synthetic size (#8220)
Part of #7497, closes #8071. (accidentally closed #8208, reopened here)

## Problem

After the changes in #8084, we need synthetic size to also account for
leased LSNs so that users do not get free retention by running a small
ephemeral endpoint for a long time.

## Summary of changes

This PR integrates LSN leases into the synthetic size calculation. We
model leases as read-only branches started at the leased LSN (except it
does not have a timeline id).

Other changes:
- Add new unit tests testing whether a lease behaves like a read-only
branch.
- Change `/size_debug` response to include lease point in the SVG
visualization.
- Fix `/lsn_lease` HTTP API to do proper parsing for POST.



Signed-off-by: Yuchen Liang <yuchen@neon.tech>
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
Co-authored-by: Christian Schwarz <christian@neon.tech>
2024-07-04 15:09:05 +00:00
Arpad Müller
e579bc0819 Add find-large-objects subcommand to scrubber (#8257)
Adds a find-large-objects subcommand to the scrubber to allow listing
layer objects larger than a specific size.

To be used like:

```
AWS_PROFILE=dev REGION=us-east-2 BUCKET=neon-dev-storage-us-east-2 cargo run -p storage_scrubber -- find-large-objects --min-size 250000000 --ignore-deltas
```

Part of #5431
2024-07-04 15:07:16 +00:00
John Spray
c9e6dd45d3 pageserver: downgrade stale generation messages to INFO (#8256)
## Problem

When generations were new, these messages were an important way of
noticing if something unexpected was going on. We found some real issues
when investigating tests that unexpectedly tripped them.

At time has gone on, this code is now pretty battle-tested, and as we do
more live migrations etc, it's fairly normal to see the occasional
message from a node with a stale generation.

At this point the cognitive load on developers to selectively allow-list
these logs outweighs the benefit of having them at warn severity.

Closes: https://github.com/neondatabase/neon/issues/8080

## Summary of changes

- Downgrade "Dropped remote consistent LSN updates" and "Dropping stale
deletions" messages to INFO
- Remove all the allow-list entries for these logs.
2024-07-04 15:05:41 +01:00
Alexander Bayandin
bf9fc77061 CI(pg-clients): unify workflow with build-and-test (#8160)
## Problem

`pg-clients` workflow looks different from the main `build-and-test`
workflow for historical reasons (it was my very first task at Neon, and 
back then I wasn't really familiar with the rest of the CI pipelines).
This PR unifies `pg-clients` workflow with `build-and-test`

## Summary of changes
- Rename `pg_clients.yml` to `pg-clients.yml`
- Run the workflow on changes in relevant files
- Create Allure report for tests
- Send slack notifications to `#on-call-qa-staging-stream` channel
(instead of `#on-call-staging-stream`)
- Update Client libraries once we're here
2024-07-04 14:58:01 +01:00
Arpad Müller
a004d27fca Use bool param for round_trip_test_compressed (#8252)
As per @koivunej 's request in
https://github.com/neondatabase/neon/pull/8238#discussion_r1663892091 ,
use a runtime param instead of monomorphizing the function based on the value.

Part of https://github.com/neondatabase/neon/issues/5431
2024-07-04 15:04:08 +02:00
Vlad Lazar
a46253766b pageserver: increase rate limit duration for layer visit log (#8263)
## Problem
I'd like to keep this in the tree since it might be useful in prod as
well. It's a bit too noisy as is and missing the lsn.

## Summary of changes
Add an lsn field and and increase the rate limit duration.
2024-07-04 13:22:33 +01:00
Alexander Bayandin
5b69b32dc5 CI(build-and-test): add conclusion job (#8246)
## Problem

Currently, if you need to rename a job and the job is listed in [branch
protection
rules](https://github.com/neondatabase/neon/settings/branch_protection_rules),
the PR won't be allowed to merge.

## Summary of changes
- Add `conclusion` job that fails if any of its dependencies don't
finish successfully
2024-07-04 09:20:01 +01:00
Conrad Ludgate
e03c3c9893 proxy: cache certain non-retriable console errors for a short time (#8201)
## Problem

If there's a quota error, it makes sense to cache it for a short window
of time. Many clients do not handle database connection errors
gracefully, so just spam retry 🤡

## Summary of changes

Updates the node_info cache to support storing console errors. Store
console errors if they cannot be retried (using our own heuristic.
should only trigger for quota exceeded errors).
2024-07-04 09:03:03 +01:00
Vlad Lazar
bbb2fa7cdd tests: perform graceful rolling restarts in storcon scale test (#8173)
## Problem
Scale test doesn't exercise drain & fill.

## Summary of changes
Make scale test exercise drain & fill
2024-07-04 06:04:19 +01:00
John Spray
778787d8e9 pageserver: add supplementary branch usage stats (#8131)
## Problem

The metrics we have today aren't convenient for planning around the
impact of timeline archival on costs.

Closes: https://github.com/neondatabase/neon/issues/8108

## Summary of changes

- Add metric `pageserver_archive_size`, which indicates the logical
bytes of data which we would expect to write into an archived branch.
- Add metric `pageserver_pitr_history_size`, which indicates the
distance between last_record_lsn and the PITR cutoff.

These metrics are somewhat temporary: when we implement #8088 and
associated consumption metric changes, these will reach a final form.
For now, an "archived" branch is just any branch outside of its parent's
PITR window: later, archival will become an explicit state (which will
_usually_ correspond to falling outside the parent's PITR window).

The overall volume of timeline metrics is something to watch, but we are
removing many more in https://github.com/neondatabase/neon/pull/8245
than this PR is adding.
2024-07-03 22:29:43 +01:00
Alex Chi Z
90b51dcf16 fix(pageserver): ensure test creates valid layer map (#8191)
I'd like to add some constraints to the layer map we generate in tests.

(1) is the layer map that the current compaction algorithm will produce.
There is a property that for all delta layer, all delta layer overlaps
with it on the LSN axis will have the same LSN range.
(2) is the layer map that cannot be produced with the legacy compaction
algorithm.
(3) is the layer map that will be produced by the future
tiered-compaction algorithm. The current validator does not allow that
but we can modify the algorithm to allow it in the future.

## Summary of changes

Add a validator to check if the layer map is valid and refactor the test
cases to include delta layer start/end LSN.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: Christian Schwarz <christian@neon.tech>
2024-07-03 18:46:58 +00:00
Christian Schwarz
a85aa03d18 page_service: stop exposing get_last_record_rlsn (#8244)
Compute doesn't use it, let's eliminate it.

Ref to Slack thread:
https://neondb.slack.com/archives/C033RQ5SPDH/p1719920261995529
2024-07-03 20:05:01 +02:00
Japin Li
cdaed4d79c Fix outdated comment (#8149)
Commit 97b48c23f changes the log wait timeout from 1 second to 100
milliseconds but forgets to update the comment.
2024-07-03 13:55:36 -04:00
John Spray
ea0b22a9b0 pageserver: reduce ops tracked at per-timeline detail (#8245)
## Problem

We record detailed histograms for all page_service op types, which
mostly aren't very interesting, but make our prometheus scrapes huge.

Closes: #8223 

## Summary of changes

- Only track GetPageAtLsn histograms on a per-timeline granularity. For
all other operation types, rely on existing node-wide histograms.
2024-07-03 17:27:34 +01:00
Peter Bendel
392a58bdce add pagebench test cases for periodic pagebench on dedicated hardware (#8233)
we want to run some specific pagebench test cases on dedicated hardware
to get reproducible results

run1: 1 client per tenant => characterize throughput with n tenants.
-  500 tenants
- scale 13 (200 MB database)
- 1 hour duration
- ca 380 GB layer snapshot files

run2.singleclient: 1 client per tenant => characterize latencies
run2.manyclient: N clients per tenant => characterize throughput
scalability within one tenant.
- 1 tenant with 1 client for latencies
- 1 tenant with 64 clients because typically for a high number of
connections we recommend the connection pooler
which by default uses 64 connections (for scalability)
- scale 136 (2048 MB database)
- 20 minutes each
2024-07-03 16:22:33 +00:00
Arpad Müller
e0891ec8c8 Only support compressed reads if the compression setting is present (#8238)
PR #8106 was created with the assumption that no blob is larger than
`256 MiB`. Due to #7852 we have checking for *writes* of blobs larger
than that limit, but we didn't have checking for *reads* of such large
blobs: in theory, we could be reading these blobs every day but we just
don't happen to write the blobs for some reason.

Therefore, we now add a warning for *reads* of such large blobs as well.

To make deploying compression less dangerous, we therefore only assume a
blob is compressed if the compression setting is present in the config.
This also means that we can't back out of compression once we enabled
it.

Part of https://github.com/neondatabase/neon/issues/5431
2024-07-03 18:02:10 +02:00
John Spray
97f7188a07 pageserver: don't try to flush if shutdown during attach (#8235)
## Problem

test_location_conf_churn fails on log errors when it tries to shutdown a
pageserver immediately after starting a tenant attach, like this:
https://neon-github-public-dev.s3.amazonaws.com/reports/pr-8224/9761000525/index.html#/testresult/15fb6beca5c7327c

```
shutdown:shutdown{tenant_id=35f5c55eb34e7e5e12288c5d8ab8b909 shard_id=0000}:timeline_shutdown{timeline_id=30936747043353a98661735ad09cbbfe shutdown_mode=FreezeAndFlush}: failed to freeze and flush: cannot flush frozen layers when flush_loop is not running, state is Exited\n')
```

This is happening because Tenant::shutdown fires its cancellation token
early if the tenant is not fully attached by the time shutdown is
called, so the flush loop is shutdown by the time we try and flush.

## Summary of changes

- In the early-cancellation case, also set the shutdown mode to Hard to
skip trying to do a flush that will fail.
2024-07-03 13:13:06 +00:00
99 changed files with 3011 additions and 1092 deletions

View File

@@ -30,7 +30,7 @@ jobs:
if: ${{ !contains(github.event.pull_request.labels.*.name, 'run-no-ci') }}
uses: ./.github/workflows/check-permissions.yml
with:
github-event-name: ${{ github.event_name}}
github-event-name: ${{ github.event_name }}
cancel-previous-e2e-tests:
needs: [ check-permissions ]
@@ -1368,3 +1368,31 @@ jobs:
with:
from-tag: ${{ needs.build-build-tools-image.outputs.image-tag }}
secrets: inherit
# This job simplifies setting branch protection rules (in GitHub UI)
# by allowing to set only this job instead of listing many others.
# It also makes it easier to rename or parametrise jobs (using matrix)
# which requires changes in branch protection rules
#
# Note, that we can't add external check (like `neon-cloud-e2e`) we still need to use GitHub UI for that.
#
# https://github.com/neondatabase/neon/settings/branch_protection_rules
conclusion:
if: always()
# Format `needs` differently to make the list more readable.
# Usually we do `needs: [...]`
needs:
- check-codestyle-python
- check-codestyle-rust
- regress-tests
- test-images
runs-on: ubuntu-22.04
steps:
# The list of possible results:
# https://docs.github.com/en/actions/learn-github-actions/contexts#needs-context
- name: Fail the job if any of the dependencies do not succeed
run: exit 1
if: |
contains(needs.*.result, 'failure')
|| contains(needs.*.result, 'cancelled')
|| contains(needs.*.result, 'skipped')

155
.github/workflows/periodic_pagebench.yml vendored Normal file
View File

@@ -0,0 +1,155 @@
name: Periodic pagebench performance test on dedicated EC2 machine in eu-central-1 region
on:
schedule:
# * is a special character in YAML so you have to quote this string
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
- cron: '0 18 * * *' # Runs at 6 PM UTC every day
workflow_dispatch: # Allows manual triggering of the workflow
inputs:
commit_hash:
type: string
description: 'The long neon repo commit hash for the system under test (pageserver) to be tested.'
required: false
default: ''
defaults:
run:
shell: bash -euo pipefail {0}
concurrency:
group: ${{ github.workflow }}
cancel-in-progress: false
jobs:
trigger_bench_on_ec2_machine_in_eu_central_1:
runs-on: [ self-hosted, gen3, small ]
container:
image: neondatabase/build-tools:pinned
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
options: --init
timeout-minutes: 360 # Set the timeout to 6 hours
env:
API_KEY: ${{ secrets.PERIODIC_PAGEBENCH_EC2_RUNNER_API_KEY }}
RUN_ID: ${{ github.run_id }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_EC2_US_TEST_RUNNER_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY : ${{ secrets.AWS_EC2_US_TEST_RUNNER_ACCESS_KEY_SECRET }}
AWS_DEFAULT_REGION : "eu-central-1"
AWS_INSTANCE_ID : "i-02a59a3bf86bc7e74"
steps:
# we don't need the neon source code because we run everything remotely
# however we still need the local github actions to run the allure step below
- uses: actions/checkout@v4
- name: Show my own (github runner) external IP address - usefull for IP allowlisting
run: curl https://ifconfig.me
- name: Start EC2 instance and wait for the instance to boot up
run: |
aws ec2 start-instances --instance-ids $AWS_INSTANCE_ID
aws ec2 wait instance-running --instance-ids $AWS_INSTANCE_ID
sleep 60 # sleep some time to allow cloudinit and our API server to start up
- name: Determine public IP of the EC2 instance and set env variable EC2_MACHINE_URL_US
run: |
public_ip=$(aws ec2 describe-instances --instance-ids $AWS_INSTANCE_ID --query 'Reservations[*].Instances[*].PublicIpAddress' --output text)
echo "Public IP of the EC2 instance: $public_ip"
echo "EC2_MACHINE_URL_US=https://${public_ip}:8443" >> $GITHUB_ENV
- name: Determine commit hash
env:
INPUT_COMMIT_HASH: ${{ github.event.inputs.commit_hash }}
run: |
if [ -z "$INPUT_COMMIT_HASH" ]; then
echo "COMMIT_HASH=$(curl -s https://api.github.com/repos/neondatabase/neon/commits/main | jq -r '.sha')" >> $GITHUB_ENV
else
echo "COMMIT_HASH=$INPUT_COMMIT_HASH" >> $GITHUB_ENV
fi
- name: Start Bench with run_id
run: |
curl -k -X 'POST' \
"${EC2_MACHINE_URL_US}/start_test/${GITHUB_RUN_ID}" \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-H "Authorization: Bearer $API_KEY" \
-d "{\"neonRepoCommitHash\": \"${COMMIT_HASH}\"}"
- name: Poll Test Status
id: poll_step
run: |
status=""
while [[ "$status" != "failure" && "$status" != "success" ]]; do
response=$(curl -k -X 'GET' \
"${EC2_MACHINE_URL_US}/test_status/${GITHUB_RUN_ID}" \
-H 'accept: application/json' \
-H "Authorization: Bearer $API_KEY")
echo "Response: $response"
set +x
status=$(echo $response | jq -r '.status')
echo "Test status: $status"
if [[ "$status" == "failure" ]]; then
echo "Test failed"
exit 1 # Fail the job step if status is failure
elif [[ "$status" == "success" || "$status" == "null" ]]; then
break
elif [[ "$status" == "too_many_runs" ]]; then
echo "Too many runs already running"
echo "too_many_runs=true" >> "$GITHUB_OUTPUT"
exit 1
fi
sleep 60 # Poll every 60 seconds
done
- name: Retrieve Test Logs
if: always() && steps.poll_step.outputs.too_many_runs != 'true'
run: |
curl -k -X 'GET' \
"${EC2_MACHINE_URL_US}/test_log/${GITHUB_RUN_ID}" \
-H 'accept: application/gzip' \
-H "Authorization: Bearer $API_KEY" \
--output "test_log_${GITHUB_RUN_ID}.gz"
- name: Unzip Test Log and Print it into this job's log
if: always() && steps.poll_step.outputs.too_many_runs != 'true'
run: |
gzip -d "test_log_${GITHUB_RUN_ID}.gz"
cat "test_log_${GITHUB_RUN_ID}"
- name: Create Allure report
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
uses: slackapi/slack-github-action@v1
with:
channel-id: "C033QLM5P7D" # dev-staging-stream
slack-message: "Periodic pagebench testing on dedicated hardware: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
- name: Cleanup Test Resources
if: always()
run: |
curl -k -X 'POST' \
"${EC2_MACHINE_URL_US}/cleanup_test/${GITHUB_RUN_ID}" \
-H 'accept: application/json' \
-H "Authorization: Bearer $API_KEY" \
-d ''
- name: Stop EC2 instance and wait for the instance to be stopped
if: always() && steps.poll_step.outputs.too_many_runs != 'true'
run: |
aws ec2 stop-instances --instance-ids $AWS_INSTANCE_ID
aws ec2 wait instance-stopped --instance-ids $AWS_INSTANCE_ID

115
.github/workflows/pg-clients.yml vendored Normal file
View File

@@ -0,0 +1,115 @@
name: Test Postgres client libraries
on:
schedule:
# * is a special character in YAML so you have to quote this string
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
- cron: '23 02 * * *' # run once a day, timezone is utc
pull_request:
paths:
- '.github/workflows/pg-clients.yml'
- 'test_runner/pg_clients/**'
- 'poetry.lock'
workflow_dispatch:
concurrency:
group: ${{ github.workflow }}-${{ github.ref_name }}
cancel-in-progress: ${{ github.event_name == 'pull_request' }}
defaults:
run:
shell: bash -euxo pipefail {0}
env:
DEFAULT_PG_VERSION: 16
PLATFORM: neon-captest-new
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
AWS_DEFAULT_REGION: eu-central-1
jobs:
check-permissions:
if: ${{ !contains(github.event.pull_request.labels.*.name, 'run-no-ci') }}
uses: ./.github/workflows/check-permissions.yml
with:
github-event-name: ${{ github.event_name }}
check-build-tools-image:
needs: [ check-permissions ]
uses: ./.github/workflows/check-build-tools-image.yml
build-build-tools-image:
needs: [ check-build-tools-image ]
uses: ./.github/workflows/build-build-tools-image.yml
with:
image-tag: ${{ needs.check-build-tools-image.outputs.image-tag }}
secrets: inherit
test-postgres-client-libs:
needs: [ build-build-tools-image ]
runs-on: ubuntu-22.04
container:
image: ${{ needs.build-build-tools-image.outputs.image }}
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
options: --init --user root
steps:
- uses: actions/checkout@v4
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
- name: Create Neon Project
id: create-neon-project
uses: ./.github/actions/neon-project-create
with:
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
postgres_version: ${{ env.DEFAULT_PG_VERSION }}
- name: Run tests
uses: ./.github/actions/run-python-test-set
with:
build_type: remote
test_selection: pg_clients
run_in_parallel: false
extra_params: -m remote_cluster
pg_version: ${{ env.DEFAULT_PG_VERSION }}
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
- name: Delete Neon Project
if: always()
uses: ./.github/actions/neon-project-delete
with:
project_id: ${{ steps.create-neon-project.outputs.project_id }}
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
- name: Create Allure report
if: ${{ !cancelled() }}
id: create-allure-report
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
- name: Post to a Slack channel
if: github.event.schedule && failure()
uses: slackapi/slack-github-action@v1
with:
channel-id: "C06KHQVQ7U3" # on-call-qa-staging-stream
slack-message: |
Testing Postgres clients: <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|${{ job.status }}> (<${{ steps.create-allure-report.outputs.report-url }}|test report>)
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}

View File

@@ -1,98 +0,0 @@
name: Test Postgres client libraries
on:
schedule:
# * is a special character in YAML so you have to quote this string
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
- cron: '23 02 * * *' # run once a day, timezone is utc
workflow_dispatch:
concurrency:
# Allow only one workflow per any non-`main` branch.
group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
cancel-in-progress: true
jobs:
test-postgres-client-libs:
# TODO: switch to gen2 runner, requires docker
runs-on: ubuntu-22.04
env:
DEFAULT_PG_VERSION: 14
TEST_OUTPUT: /tmp/test_output
steps:
- name: Checkout
uses: actions/checkout@v4
- uses: actions/setup-python@v4
with:
python-version: 3.9
- name: Install Poetry
uses: snok/install-poetry@v1
- name: Cache poetry deps
uses: actions/cache@v4
with:
path: ~/.cache/pypoetry/virtualenvs
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-ubunutu-latest-${{ hashFiles('poetry.lock') }}
- name: Install Python deps
shell: bash -euxo pipefail {0}
run: ./scripts/pysync
- name: Create Neon Project
id: create-neon-project
uses: ./.github/actions/neon-project-create
with:
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
postgres_version: ${{ env.DEFAULT_PG_VERSION }}
- name: Run pytest
env:
REMOTE_ENV: 1
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
shell: bash -euxo pipefail {0}
run: |
# Test framework expects we have psql binary;
# but since we don't really need it in this test, let's mock it
mkdir -p "$POSTGRES_DISTRIB_DIR/v${DEFAULT_PG_VERSION}/bin" && touch "$POSTGRES_DISTRIB_DIR/v${DEFAULT_PG_VERSION}/bin/psql";
./scripts/pytest \
--junitxml=$TEST_OUTPUT/junit.xml \
--tb=short \
--verbose \
-m "remote_cluster" \
-rA "test_runner/pg_clients"
- name: Delete Neon Project
if: ${{ always() }}
uses: ./.github/actions/neon-project-delete
with:
project_id: ${{ steps.create-neon-project.outputs.project_id }}
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
# We use GitHub's action upload-artifact because `ubuntu-latest` doesn't have configured AWS CLI.
# It will be fixed after switching to gen2 runner
- name: Upload python test logs
if: always()
uses: actions/upload-artifact@v4
with:
retention-days: 7
name: python-test-pg_clients-${{ runner.os }}-${{ runner.arch }}-stage-logs
path: ${{ env.TEST_OUTPUT }}
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
uses: slackapi/slack-github-action@v1
with:
channel-id: "C033QLM5P7D" # dev-staging-stream
slack-message: "Testing Postgres clients: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}

View File

@@ -1,5 +1,13 @@
FROM debian:bullseye-slim
# Use ARG as a build-time environment variable here to allow.
# It's not supposed to be set outside.
# Alternatively it can be obtained using the following command
# ```
# . /etc/os-release && echo "${VERSION_CODENAME}"
# ```
ARG DEBIAN_VERSION_CODENAME=bullseye
# Add nonroot user
RUN useradd -ms /bin/bash nonroot -b /home
SHELL ["/bin/bash", "-c"]
@@ -66,12 +74,24 @@ RUN curl -sL "https://github.com/peak/s5cmd/releases/download/v${S5CMD_VERSION}/
# LLVM
ENV LLVM_VERSION=18
RUN curl -fsSL 'https://apt.llvm.org/llvm-snapshot.gpg.key' | apt-key add - \
&& echo "deb http://apt.llvm.org/bullseye/ llvm-toolchain-bullseye-${LLVM_VERSION} main" > /etc/apt/sources.list.d/llvm.stable.list \
&& echo "deb http://apt.llvm.org/${DEBIAN_VERSION_CODENAME}/ llvm-toolchain-${DEBIAN_VERSION_CODENAME}-${LLVM_VERSION} main" > /etc/apt/sources.list.d/llvm.stable.list \
&& apt update \
&& apt install -y clang-${LLVM_VERSION} llvm-${LLVM_VERSION} \
&& bash -c 'for f in /usr/bin/clang*-${LLVM_VERSION} /usr/bin/llvm*-${LLVM_VERSION}; do ln -s "${f}" "${f%-${LLVM_VERSION}}"; done' \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
# Install docker
RUN curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg \
&& echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/debian ${DEBIAN_VERSION_CODENAME} stable" > /etc/apt/sources.list.d/docker.list \
&& apt update \
&& apt install -y docker-ce docker-ce-cli \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
# Configure sudo & docker
RUN usermod -aG sudo nonroot && \
echo '%sudo ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers && \
usermod -aG docker nonroot
# AWS CLI
RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-$(uname -m).zip" -o "awscliv2.zip" \
&& unzip -q awscliv2.zip \

View File

@@ -873,9 +873,8 @@ impl ComputeNode {
Ok(())
}
// We could've wrapped this around `pg_ctl reload`, but right now we don't use
// `pg_ctl` for start / stop, so this just seems much easier to do as we already
// have opened connection to Postgres and superuser access.
// Wrapped this around `pg_ctl reload`, but right now we don't use
// `pg_ctl` for start / stop.
#[instrument(skip_all)]
fn pg_reload_conf(&self) -> Result<()> {
let pgctl_bin = Path::new(&self.pgbin).parent().unwrap().join("pg_ctl");

View File

@@ -489,7 +489,7 @@ pub fn handle_postgres_logs(stderr: std::process::ChildStderr) -> JoinHandle<()>
/// Read Postgres logs from `stderr` until EOF. Buffer is flushed on one of the following conditions:
/// - next line starts with timestamp
/// - EOF
/// - no new lines were written for the last second
/// - no new lines were written for the last 100 milliseconds
async fn handle_postgres_logs_async(stderr: tokio::process::ChildStderr) -> Result<()> {
let mut lines = tokio::io::BufReader::new(stderr).lines();
let timeout_duration = Duration::from_millis(100);

View File

@@ -9,6 +9,7 @@ use std::{
collections::HashMap,
io::{BufRead, Read},
num::{NonZeroU64, NonZeroUsize},
str::FromStr,
sync::atomic::AtomicUsize,
time::{Duration, SystemTime},
};
@@ -228,6 +229,11 @@ pub struct TimelineCreateRequest {
pub pg_version: Option<u32>,
}
#[derive(Serialize, Deserialize, Clone)]
pub struct LsnLeaseRequest {
pub lsn: Lsn,
}
#[derive(Serialize, Deserialize)]
pub struct TenantShardSplitRequest {
pub new_shard_count: u8,
@@ -432,22 +438,49 @@ pub enum CompactionAlgorithm {
Tiered,
}
#[derive(
Debug,
Clone,
Copy,
PartialEq,
Eq,
Serialize,
Deserialize,
strum_macros::FromRepr,
strum_macros::EnumString,
)]
#[strum(serialize_all = "kebab-case")]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ImageCompressionAlgorithm {
/// Disabled for writes, and never decompress during reading.
/// Never set this after you've enabled compression once!
DisabledNoDecompress,
// Disabled for writes, support decompressing during read path
Disabled,
/// Zstandard compression. Level 0 means and None mean the same (default level). Levels can be negative as well.
/// For details, see the [manual](http://facebook.github.io/zstd/zstd_manual.html).
Zstd { level: Option<i8> },
Zstd {
level: Option<i8>,
},
}
impl ImageCompressionAlgorithm {
pub fn allow_decompression(&self) -> bool {
!matches!(self, ImageCompressionAlgorithm::DisabledNoDecompress)
}
}
impl FromStr for ImageCompressionAlgorithm {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut components = s.split(['(', ')']);
let first = components
.next()
.ok_or_else(|| anyhow::anyhow!("empty string"))?;
match first {
"disabled-no-decompress" => Ok(ImageCompressionAlgorithm::DisabledNoDecompress),
"disabled" => Ok(ImageCompressionAlgorithm::Disabled),
"zstd" => {
let level = if let Some(v) = components.next() {
let v: i8 = v.parse()?;
Some(v)
} else {
None
};
Ok(ImageCompressionAlgorithm::Zstd { level })
}
_ => anyhow::bail!("invalid specifier '{first}'"),
}
}
}
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
@@ -661,6 +694,16 @@ pub struct TimelineInfo {
pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
pub current_logical_size_non_incremental: Option<u64>,
/// How many bytes of WAL are within this branch's pitr_interval. If the pitr_interval goes
/// beyond the branch's branch point, we only count up to the branch point.
pub pitr_history_size: u64,
/// Whether this branch's branch point is within its ancestor's PITR interval (i.e. any
/// ancestor data used by this branch would have been retained anyway). If this is false, then
/// this branch may be imposing a cost on the ancestor by causing it to retain layers that it would
/// otherwise be able to GC.
pub within_ancestor_pitr: bool,
pub timeline_dir_layer_file_size_sum: Option<u64>,
pub wal_source_connstr: Option<String>,
@@ -1632,4 +1675,29 @@ mod tests {
AuxFilePolicy::CrossValidation
);
}
#[test]
fn test_image_compression_algorithm_parsing() {
use ImageCompressionAlgorithm::*;
assert_eq!(
ImageCompressionAlgorithm::from_str("disabled").unwrap(),
Disabled
);
assert_eq!(
ImageCompressionAlgorithm::from_str("disabled-no-decompress").unwrap(),
DisabledNoDecompress
);
assert_eq!(
ImageCompressionAlgorithm::from_str("zstd").unwrap(),
Zstd { level: None }
);
assert_eq!(
ImageCompressionAlgorithm::from_str("zstd(18)").unwrap(),
Zstd { level: Some(18) }
);
assert_eq!(
ImageCompressionAlgorithm::from_str("zstd(-3)").unwrap(),
Zstd { level: Some(-3) }
);
}
}

View File

@@ -34,10 +34,10 @@ struct SegmentSize {
}
struct SizeAlternatives {
// cheapest alternative if parent is available.
/// cheapest alternative if parent is available.
incremental: SegmentSize,
// cheapest alternative if parent node is not available
/// cheapest alternative if parent node is not available
non_incremental: Option<SegmentSize>,
}

View File

@@ -3,10 +3,17 @@ use std::fmt::Write;
const SVG_WIDTH: f32 = 500.0;
/// Different branch kind for SVG drawing.
#[derive(PartialEq)]
pub enum SvgBranchKind {
Timeline,
Lease,
}
struct SvgDraw<'a> {
storage: &'a StorageModel,
branches: &'a [String],
seg_to_branch: &'a [usize],
seg_to_branch: &'a [(usize, SvgBranchKind)],
sizes: &'a [SegmentSizeResult],
// layout
@@ -42,13 +49,18 @@ fn draw_legend(result: &mut String) -> anyhow::Result<()> {
"<line x1=\"5\" y1=\"70\" x2=\"15\" y2=\"70\" stroke-width=\"1\" stroke=\"gray\" />"
)?;
writeln!(result, "<text x=\"20\" y=\"75\">WAL not retained</text>")?;
writeln!(
result,
"<line x1=\"10\" y1=\"85\" x2=\"10\" y2=\"95\" stroke-width=\"3\" stroke=\"blue\" />"
)?;
writeln!(result, "<text x=\"20\" y=\"95\">LSN lease</text>")?;
Ok(())
}
pub fn draw_svg(
storage: &StorageModel,
branches: &[String],
seg_to_branch: &[usize],
seg_to_branch: &[(usize, SvgBranchKind)],
sizes: &SizeResult,
) -> anyhow::Result<String> {
let mut draw = SvgDraw {
@@ -100,7 +112,7 @@ impl<'a> SvgDraw<'a> {
// Layout the timelines on Y dimension.
// TODO
let mut y = 100.0;
let mut y = 120.0;
let mut branch_y_coordinates = Vec::new();
for _branch in self.branches {
branch_y_coordinates.push(y);
@@ -109,7 +121,7 @@ impl<'a> SvgDraw<'a> {
// Calculate coordinates for each point
let seg_coordinates = std::iter::zip(segments, self.seg_to_branch)
.map(|(seg, branch_id)| {
.map(|(seg, (branch_id, _))| {
let x = (seg.lsn - min_lsn) as f32 / xscale;
let y = branch_y_coordinates[*branch_id];
(x, y)
@@ -175,6 +187,22 @@ impl<'a> SvgDraw<'a> {
// draw a snapshot point if it's needed
let (coord_x, coord_y) = self.seg_coordinates[seg_id];
let (_, kind) = &self.seg_to_branch[seg_id];
if kind == &SvgBranchKind::Lease {
let (x1, y1) = (coord_x, coord_y - 10.0);
let (x2, y2) = (coord_x, coord_y + 10.0);
let style = "stroke-width=\"3\" stroke=\"blue\"";
writeln!(
result,
"<line x1=\"{x1}\" y1=\"{y1}\" x2=\"{x2}\" y2=\"{y2}\" {style}>",
)?;
writeln!(result, " <title>leased lsn at {}</title>", seg.lsn)?;
writeln!(result, "</line>")?;
}
if self.sizes[seg_id].method == SegmentMethod::SnapshotHere {
writeln!(
result,

View File

@@ -91,7 +91,8 @@ pub mod defaults {
pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 128 * 1024; // 128 KiB
pub const DEFAULT_IMAGE_COMPRESSION: Option<ImageCompressionAlgorithm> = None;
pub const DEFAULT_IMAGE_COMPRESSION: ImageCompressionAlgorithm =
ImageCompressionAlgorithm::DisabledNoDecompress;
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = true;
@@ -288,7 +289,7 @@ pub struct PageServerConf {
pub validate_vectored_get: bool,
pub image_compression: Option<ImageCompressionAlgorithm>,
pub image_compression: ImageCompressionAlgorithm,
/// How many bytes of ephemeral layer content will we allow per kilobyte of RAM. When this
/// is exceeded, we start proactively closing ephemeral layers to limit the total amount
@@ -402,7 +403,7 @@ struct PageServerConfigBuilder {
validate_vectored_get: BuilderValue<bool>,
image_compression: BuilderValue<Option<ImageCompressionAlgorithm>>,
image_compression: BuilderValue<ImageCompressionAlgorithm>,
ephemeral_bytes_per_memory_kb: BuilderValue<usize>,
@@ -680,7 +681,7 @@ impl PageServerConfigBuilder {
self.validate_vectored_get = BuilderValue::Set(value);
}
pub fn get_image_compression(&mut self, value: Option<ImageCompressionAlgorithm>) {
pub fn get_image_compression(&mut self, value: ImageCompressionAlgorithm) {
self.image_compression = BuilderValue::Set(value);
}
@@ -1028,7 +1029,7 @@ impl PageServerConf {
builder.get_validate_vectored_get(parse_toml_bool("validate_vectored_get", item)?)
}
"image_compression" => {
builder.get_image_compression(Some(parse_toml_from_str("image_compression", item)?))
builder.get_image_compression(parse_toml_from_str("image_compression", item)?)
}
"ephemeral_bytes_per_memory_kb" => {
builder.get_ephemeral_bytes_per_memory_kb(parse_toml_u64("ephemeral_bytes_per_memory_kb", item)? as usize)

View File

@@ -190,7 +190,7 @@ where
}
} else {
// If we failed validation, then do not apply any of the projected updates
warn!("Dropped remote consistent LSN updates for tenant {tenant_id} in stale generation {:?}", tenant_lsn_state.generation);
info!("Dropped remote consistent LSN updates for tenant {tenant_id} in stale generation {:?}", tenant_lsn_state.generation);
metrics::DELETION_QUEUE.dropped_lsn_updates.inc();
}
}
@@ -225,7 +225,7 @@ where
&& (tenant.generation == *validated_generation);
if !this_list_valid {
warn!("Dropping stale deletions for tenant {tenant_id} in generation {:?}, objects may be leaked", tenant.generation);
info!("Dropping stale deletions for tenant {tenant_id} in generation {:?}, objects may be leaked", tenant.generation);
metrics::DELETION_QUEUE.keys_dropped.inc_by(tenant.len() as u64);
mutated = true;
} else {

View File

@@ -265,15 +265,19 @@ paths:
type: string
format: hex
post:
description: Obtain lease for the given LSN
parameters:
- name: lsn
in: query
required: true
schema:
type: string
format: hex
description: A LSN to obtain the lease for
description: Obtains a lease for the given LSN.
requestBody:
content:
application/json:
schema:
type: object
required:
- lsn
properties:
lsn:
description: A LSN to obtain the lease for.
type: string
format: hex
responses:
"200":
description: OK

View File

@@ -22,6 +22,7 @@ use pageserver_api::models::ListAuxFilesRequest;
use pageserver_api::models::LocationConfig;
use pageserver_api::models::LocationConfigListResponse;
use pageserver_api::models::LsnLease;
use pageserver_api::models::LsnLeaseRequest;
use pageserver_api::models::ShardParameters;
use pageserver_api::models::TenantDetails;
use pageserver_api::models::TenantLocationConfigResponse;
@@ -42,7 +43,7 @@ use pageserver_api::shard::TenantShardId;
use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use remote_storage::TimeTravelError;
use tenant_size_model::{SizeResult, StorageModel};
use tenant_size_model::{svg::SvgBranchKind, SizeResult, StorageModel};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::auth::JwtAuth;
@@ -406,6 +407,8 @@ async fn build_timeline_info_common(
let walreceiver_status = timeline.walreceiver_status();
let (pitr_history_size, within_ancestor_pitr) = timeline.get_pitr_history_stats();
let info = TimelineInfo {
tenant_id: timeline.tenant_shard_id,
timeline_id: timeline.timeline_id,
@@ -426,6 +429,8 @@ async fn build_timeline_info_common(
directory_entries_counts: timeline.get_directory_metrics().to_vec(),
current_physical_size,
current_logical_size_non_incremental: None,
pitr_history_size,
within_ancestor_pitr,
timeline_dir_layer_file_size_sum: None,
wal_source_connstr,
last_received_msg_lsn,
@@ -1191,10 +1196,15 @@ fn synthetic_size_html_response(
timeline_map.insert(ti.timeline_id, index);
timeline_ids.push(ti.timeline_id.to_string());
}
let seg_to_branch: Vec<usize> = inputs
let seg_to_branch: Vec<(usize, SvgBranchKind)> = inputs
.segments
.iter()
.map(|seg| *timeline_map.get(&seg.timeline_id).unwrap())
.map(|seg| {
(
*timeline_map.get(&seg.timeline_id).unwrap(),
seg.kind.into(),
)
})
.collect();
let svg =
@@ -1527,15 +1537,13 @@ async fn handle_tenant_break(
// Obtains an lsn lease on the given timeline.
async fn lsn_lease_handler(
request: Request<Body>,
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let lsn: Lsn = parse_query_param(&request, "lsn")?
.ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'lsn' query parameter")))?;
let lsn = json_request::<LsnLeaseRequest>(&mut request).await?.lsn;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);

View File

@@ -8,7 +8,7 @@ use metrics::{
};
use once_cell::sync::Lazy;
use pageserver_api::shard::TenantShardId;
use strum::{EnumCount, IntoEnumIterator, VariantNames};
use strum::{EnumCount, VariantNames};
use strum_macros::{EnumVariantNames, IntoStaticStr};
use tracing::warn;
use utils::id::TimelineId;
@@ -464,6 +464,24 @@ static LAST_RECORD_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
static PITR_HISTORY_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_pitr_history_size",
"Data written since PITR cutoff on this timeline",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});
static TIMELINE_ARCHIVE_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_archive_size",
"Timeline's logical size if it is considered eligible for archival (outside PITR window), else zero",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});
static STANDBY_HORIZON: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_standby_horizon",
@@ -482,6 +500,15 @@ static RESIDENT_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
static VISIBLE_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_visible_physical_size",
"The size of the layer files present in the pageserver's filesystem.",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});
pub(crate) static RESIDENT_PHYSICAL_SIZE_GLOBAL: Lazy<UIntGauge> = Lazy::new(|| {
register_uint_gauge!(
"pageserver_resident_physical_size_global",
@@ -1076,21 +1103,12 @@ pub(crate) mod virtual_file_io_engine {
});
}
#[derive(Debug)]
struct GlobalAndPerTimelineHistogram {
global: Histogram,
per_tenant_timeline: Histogram,
}
impl GlobalAndPerTimelineHistogram {
fn observe(&self, value: f64) {
self.global.observe(value);
self.per_tenant_timeline.observe(value);
}
}
struct GlobalAndPerTimelineHistogramTimer<'a, 'c> {
h: &'a GlobalAndPerTimelineHistogram,
global_metric: &'a Histogram,
// Optional because not all op types are tracked per-timeline
timeline_metric: Option<&'a Histogram>,
ctx: &'c RequestContext,
start: std::time::Instant,
op: SmgrQueryType,
@@ -1121,7 +1139,10 @@ impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> {
elapsed
}
};
self.h.observe(ex_throttled.as_secs_f64());
self.global_metric.observe(ex_throttled.as_secs_f64());
if let Some(timeline_metric) = self.timeline_metric {
timeline_metric.observe(ex_throttled.as_secs_f64());
}
}
}
@@ -1146,7 +1167,8 @@ pub enum SmgrQueryType {
#[derive(Debug)]
pub(crate) struct SmgrQueryTimePerTimeline {
metrics: [GlobalAndPerTimelineHistogram; SmgrQueryType::COUNT],
global_metrics: [Histogram; SmgrQueryType::COUNT],
per_timeline_getpage: Histogram,
}
static SMGR_QUERY_TIME_PER_TENANT_TIMELINE: Lazy<HistogramVec> = Lazy::new(|| {
@@ -1224,27 +1246,32 @@ impl SmgrQueryTimePerTimeline {
let tenant_id = tenant_shard_id.tenant_id.to_string();
let shard_slug = format!("{}", tenant_shard_id.shard_slug());
let timeline_id = timeline_id.to_string();
let metrics = std::array::from_fn(|i| {
let global_metrics = std::array::from_fn(|i| {
let op = SmgrQueryType::from_repr(i).unwrap();
let global = SMGR_QUERY_TIME_GLOBAL
SMGR_QUERY_TIME_GLOBAL
.get_metric_with_label_values(&[op.into()])
.unwrap();
let per_tenant_timeline = SMGR_QUERY_TIME_PER_TENANT_TIMELINE
.get_metric_with_label_values(&[op.into(), &tenant_id, &shard_slug, &timeline_id])
.unwrap();
GlobalAndPerTimelineHistogram {
global,
per_tenant_timeline,
}
.unwrap()
});
Self { metrics }
let per_timeline_getpage = SMGR_QUERY_TIME_PER_TENANT_TIMELINE
.get_metric_with_label_values(&[
SmgrQueryType::GetPageAtLsn.into(),
&tenant_id,
&shard_slug,
&timeline_id,
])
.unwrap();
Self {
global_metrics,
per_timeline_getpage,
}
}
pub(crate) fn start_timer<'c: 'a, 'a>(
&'a self,
op: SmgrQueryType,
ctx: &'c RequestContext,
) -> impl Drop + '_ {
let metric = &self.metrics[op as usize];
) -> Option<impl Drop + '_> {
let global_metric = &self.global_metrics[op as usize];
let start = Instant::now();
match ctx.micros_spent_throttled.open() {
Ok(()) => (),
@@ -1263,12 +1290,20 @@ impl SmgrQueryTimePerTimeline {
});
}
}
GlobalAndPerTimelineHistogramTimer {
h: metric,
let timeline_metric = if matches!(op, SmgrQueryType::GetPageAtLsn) {
Some(&self.per_timeline_getpage)
} else {
None
};
Some(GlobalAndPerTimelineHistogramTimer {
global_metric,
timeline_metric,
ctx,
start,
op,
}
})
}
}
@@ -1315,17 +1350,9 @@ mod smgr_query_time_tests {
let get_counts = || {
let global: u64 = ops
.iter()
.map(|op| metrics.metrics[*op as usize].global.get_sample_count())
.map(|op| metrics.global_metrics[*op as usize].get_sample_count())
.sum();
let per_tenant_timeline: u64 = ops
.iter()
.map(|op| {
metrics.metrics[*op as usize]
.per_tenant_timeline
.get_sample_count()
})
.sum();
(global, per_tenant_timeline)
(global, metrics.per_timeline_getpage.get_sample_count())
};
let (pre_global, pre_per_tenant_timeline) = get_counts();
@@ -1336,7 +1363,12 @@ mod smgr_query_time_tests {
drop(timer);
let (post_global, post_per_tenant_timeline) = get_counts();
assert_eq!(post_per_tenant_timeline, 1);
if matches!(op, super::SmgrQueryType::GetPageAtLsn) {
// getpage ops are tracked per-timeline, others aren't
assert_eq!(post_per_tenant_timeline, 1);
} else {
assert_eq!(post_per_tenant_timeline, 0);
}
assert!(post_global > pre_global);
}
}
@@ -1433,10 +1465,12 @@ impl<'a, 'c> BasebackupQueryTimeOngoingRecording<'a, 'c> {
}
}
pub(crate) static LIVE_CONNECTIONS_COUNT: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_live_connections",
"Number of live network connections",
pub(crate) static LIVE_CONNECTIONS: Lazy<IntCounterPairVec> = Lazy::new(|| {
register_int_counter_pair_vec!(
"pageserver_live_connections_started",
"Number of network connections that we started handling",
"pageserver_live_connections_finished",
"Number of network connections that we finished handling",
&["pageserver_connection_kind"]
)
.expect("failed to define a metric")
@@ -1447,7 +1481,6 @@ pub(crate) enum ComputeCommandKind {
PageStreamV2,
PageStream,
Basebackup,
GetLastRecordRlsn,
Fullbackup,
ImportBasebackup,
ImportWal,
@@ -2102,8 +2135,11 @@ pub(crate) struct TimelineMetrics {
pub garbage_collect_histo: StorageTimeMetrics,
pub find_gc_cutoffs_histo: StorageTimeMetrics,
pub last_record_gauge: IntGauge,
pub pitr_history_size: UIntGauge,
pub archival_size: UIntGauge,
pub standby_horizon_gauge: IntGauge,
pub resident_physical_size_gauge: UIntGauge,
pub visible_physical_size_gauge: UIntGauge,
/// copy of LayeredTimeline.current_logical_size
pub current_logical_size_gauge: UIntGauge,
pub aux_file_size_gauge: IntGauge,
@@ -2175,12 +2211,24 @@ impl TimelineMetrics {
let last_record_gauge = LAST_RECORD_LSN
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let pitr_history_size = PITR_HISTORY_SIZE
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let archival_size = TIMELINE_ARCHIVE_SIZE
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let standby_horizon_gauge = STANDBY_HORIZON
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let resident_physical_size_gauge = RESIDENT_PHYSICAL_SIZE
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let visible_physical_size_gauge = VISIBLE_PHYSICAL_SIZE
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
// TODO: we shouldn't expose this metric
let current_logical_size_gauge = CURRENT_LOGICAL_SIZE
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
@@ -2227,8 +2275,11 @@ impl TimelineMetrics {
find_gc_cutoffs_histo,
load_layer_map_histo,
last_record_gauge,
pitr_history_size,
archival_size,
standby_horizon_gauge,
resident_physical_size_gauge,
visible_physical_size_gauge,
current_logical_size_gauge,
aux_file_size_gauge,
directory_entries_count_gauge,
@@ -2280,10 +2331,15 @@ impl TimelineMetrics {
RESIDENT_PHYSICAL_SIZE_GLOBAL.sub(self.resident_physical_size_get());
let _ = RESIDENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
}
let _ = VISIBLE_PHYSICAL_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = CURRENT_LOGICAL_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
if let Some(metric) = Lazy::get(&DIRECTORY_ENTRIES_COUNT) {
let _ = metric.remove_label_values(&[tenant_id, shard_id, timeline_id]);
}
let _ = TIMELINE_ARCHIVE_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = PITR_HISTORY_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = EVICTIONS.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = AUX_FILE_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = VALID_LSN_LEASE_COUNT.remove_label_values(&[tenant_id, shard_id, timeline_id]);
@@ -2317,14 +2373,12 @@ impl TimelineMetrics {
let _ = STORAGE_IO_SIZE.remove_label_values(&[op, tenant_id, shard_id, timeline_id]);
}
for op in SmgrQueryType::iter() {
let _ = SMGR_QUERY_TIME_PER_TENANT_TIMELINE.remove_label_values(&[
op.into(),
tenant_id,
shard_id,
timeline_id,
]);
}
let _ = SMGR_QUERY_TIME_PER_TENANT_TIMELINE.remove_label_values(&[
SmgrQueryType::GetPageAtLsn.into(),
tenant_id,
shard_id,
timeline_id,
]);
}
}

View File

@@ -55,7 +55,7 @@ use crate::basebackup::BasebackupError;
use crate::context::{DownloadBehavior, RequestContext};
use crate::import_datadir::import_wal_from_tar;
use crate::metrics;
use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS_COUNT};
use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
use crate::pgdatadir_mapping::Version;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
@@ -215,14 +215,9 @@ async fn page_service_conn_main(
auth_type: AuthType,
connection_ctx: RequestContext,
) -> anyhow::Result<()> {
// Immediately increment the gauge, then create a job to decrement it on task exit.
// One of the pros of `defer!` is that this will *most probably*
// get called, even in presence of panics.
let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["page_service"]);
gauge.inc();
scopeguard::defer! {
gauge.dec();
}
let _guard = LIVE_CONNECTIONS
.with_label_values(&["page_service"])
.guard();
socket
.set_nodelay(true)
@@ -1656,53 +1651,6 @@ where
metric_recording.observe(&res);
res?;
}
// return pair of prev_lsn and last_lsn
else if let Some(params) = parts.strip_prefix(&["get_last_record_rlsn"]) {
if params.len() != 2 {
return Err(QueryError::Other(anyhow::anyhow!(
"invalid param number for get_last_record_rlsn command"
)));
}
let tenant_id = TenantId::from_str(params[0])
.with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
let timeline_id = TimelineId::from_str(params[1])
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
tracing::Span::current()
.record("tenant_id", field::display(tenant_id))
.record("timeline_id", field::display(timeline_id));
self.check_permission(Some(tenant_id))?;
COMPUTE_COMMANDS_COUNTERS
.for_command(ComputeCommandKind::GetLastRecordRlsn)
.inc();
async {
let timeline = self
.get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
.await?;
let end_of_timeline = timeline.get_last_record_rlsn();
pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor::text_col(b"prev_lsn"),
RowDescriptor::text_col(b"last_lsn"),
]))?
.write_message_noflush(&BeMessage::DataRow(&[
Some(end_of_timeline.prev.to_string().as_bytes()),
Some(end_of_timeline.last.to_string().as_bytes()),
]))?
.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
anyhow::Ok(())
}
.instrument(info_span!(
"handle_get_last_record_lsn",
shard_id = tracing::field::Empty
))
.await?;
}
// same as basebackup, but result includes relational data as well
else if let Some(params) = parts.strip_prefix(&["fullbackup"]) {
if params.len() < 2 {

View File

@@ -931,7 +931,7 @@ impl Timeline {
result.to_keyspace(),
/* AUX sparse key space */
SparseKeySpace(KeySpace {
ranges: vec![repl_origin_key_range(), Key::metadata_aux_key_range()],
ranges: vec![Key::metadata_aux_key_range(), repl_origin_key_range()],
}),
))
}

View File

@@ -19,6 +19,7 @@ use enumset::EnumSet;
use futures::stream::FuturesUnordered;
use futures::FutureExt;
use futures::StreamExt;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models;
use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::TimelineState;
@@ -30,6 +31,7 @@ use pageserver_api::shard::TenantShardId;
use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use remote_storage::TimeoutOrCancel;
use std::collections::BTreeMap;
use std::fmt;
use std::time::SystemTime;
use storage_broker::BrokerClientChannel;
@@ -92,14 +94,12 @@ use crate::tenant::storage_layer::ImageLayer;
use crate::walredo;
use crate::InitializationOrder;
use std::collections::hash_map::Entry;
use std::collections::BTreeSet;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Debug;
use std::fmt::Display;
use std::fs;
use std::fs::File;
use std::ops::Bound::Included;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
@@ -632,6 +632,11 @@ impl Tenant {
timeline.maybe_spawn_flush_loop();
}
}
if let Some(ancestor) = timeline.get_ancestor_timeline() {
let mut ancestor_gc_info = ancestor.gc_info.write().unwrap();
ancestor_gc_info.insert_child(timeline.timeline_id, timeline.get_ancestor_lsn());
}
};
// Sanity check: a timeline should have some content.
@@ -1365,7 +1370,7 @@ impl Tenant {
initdb_lsn: Lsn,
pg_version: u32,
ctx: &RequestContext,
delta_layer_desc: Vec<Vec<(pageserver_api::key::Key, Lsn, crate::repository::Value)>>,
delta_layer_desc: Vec<timeline::DeltaLayerTestDesc>,
image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>,
end_lsn: Lsn,
) -> anyhow::Result<Arc<Timeline>> {
@@ -1733,6 +1738,9 @@ impl Tenant {
.values()
.filter(|timeline| !(timeline.is_broken() || timeline.is_stopping()));
// Before activation, populate each Timeline's GcInfo with information about its children
self.initialize_gc_info(&timelines_accessor);
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
tasks::start_background_loops(self, background_jobs_can_start);
@@ -1816,9 +1824,15 @@ impl Tenant {
// If we're still attaching, fire the cancellation token early to drop out: this
// will prevent us flushing, but ensures timely shutdown if some I/O during attach
// is very slow.
if matches!(self.current_state(), TenantState::Attaching) {
let shutdown_mode = if matches!(self.current_state(), TenantState::Attaching) {
self.cancel.cancel();
}
// Having fired our cancellation token, do not try and flush timelines: their cancellation tokens
// are children of ours, so their flush loops will have shut down already
timeline::ShutdownMode::Hard
} else {
shutdown_mode
};
match self.set_stopping(shutdown_progress, false, false).await {
Ok(()) => {}
@@ -2759,6 +2773,56 @@ impl Tenant {
.await
}
/// Populate all Timelines' `GcInfo` with information about their children. We do not set the
/// PITR cutoffs here, because that requires I/O: this is done later, before GC, by [`Self::refresh_gc_info_internal`]
///
/// Subsequently, parent-child relationships are updated incrementally during timeline creation/deletion.
fn initialize_gc_info(
&self,
timelines: &std::sync::MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,
) {
// This function must be called before activation: after activation timeline create/delete operations
// might happen, and this function is not safe to run concurrently with those.
assert!(!self.is_active());
// Scan all timelines. For each timeline, remember the timeline ID and
// the branch point where it was created.
let mut all_branchpoints: BTreeMap<TimelineId, Vec<(Lsn, TimelineId, Option<KeySpace>)>> =
BTreeMap::new();
timelines.iter().for_each(|(timeline_id, timeline_entry)| {
if let Some(ancestor_timeline_id) = &timeline_entry.get_ancestor_timeline_id() {
let ancestor_children = all_branchpoints.entry(*ancestor_timeline_id).or_default();
ancestor_children.push((timeline_entry.get_ancestor_lsn(), *timeline_id, None));
}
});
// The number of bytes we always keep, irrespective of PITR: this is a constant across timelines
let horizon = self.get_gc_horizon();
// Populate each timeline's GcInfo with information about its child branches
for timeline in timelines.values() {
let mut branchpoints: Vec<(Lsn, TimelineId, Option<KeySpace>)> = all_branchpoints
.remove(&timeline.timeline_id)
.unwrap_or_default();
branchpoints.sort_by_key(|b| b.0);
let mut target = timeline.gc_info.write().unwrap();
target.retain_lsns = branchpoints;
let horizon_cutoff = timeline
.get_last_record_lsn()
.checked_sub(horizon)
.unwrap_or(Lsn(0));
target.cutoffs = GcCutoffs {
horizon: horizon_cutoff,
pitr: Lsn::INVALID,
};
}
}
async fn refresh_gc_info_internal(
&self,
target_timeline_id: Option<TimelineId>,
@@ -2781,6 +2845,11 @@ impl Tenant {
.cloned()
.collect::<Vec<_>>();
if target_timeline_id.is_some() && timelines.is_empty() {
// We were to act on a particular timeline and it wasn't found
return Err(GcError::TimelineNotFound);
}
let mut gc_cutoffs: HashMap<TimelineId, GcCutoffs> =
HashMap::with_capacity(timelines.len());
@@ -2803,71 +2872,19 @@ impl Tenant {
// because that will stall branch creation.
let gc_cs = self.gc_cs.lock().await;
// Scan all timelines. For each timeline, remember the timeline ID and
// the branch point where it was created.
let (all_branchpoints, timelines): (BTreeSet<(TimelineId, Lsn)>, _) = {
let timelines = self.timelines.lock().unwrap();
let mut all_branchpoints = BTreeSet::new();
let timelines = {
if let Some(target_timeline_id) = target_timeline_id.as_ref() {
if timelines.get(target_timeline_id).is_none() {
return Err(GcError::TimelineNotFound);
}
};
timelines
.iter()
.map(|(_timeline_id, timeline_entry)| {
if let Some(ancestor_timeline_id) =
&timeline_entry.get_ancestor_timeline_id()
{
// If target_timeline is specified, we only need to know branchpoints of its children
if let Some(timeline_id) = target_timeline_id {
if ancestor_timeline_id == &timeline_id {
all_branchpoints.insert((
*ancestor_timeline_id,
timeline_entry.get_ancestor_lsn(),
));
}
}
// Collect branchpoints for all timelines
else {
all_branchpoints.insert((
*ancestor_timeline_id,
timeline_entry.get_ancestor_lsn(),
));
}
}
timeline_entry.clone()
})
.collect::<Vec<_>>()
};
(all_branchpoints, timelines)
};
// Ok, we now know all the branch points.
// Update the GC information for each timeline.
let mut gc_timelines = Vec::with_capacity(timelines.len());
for timeline in timelines {
// If target_timeline is specified, ignore all other timelines
// We filtered the timeline list above
if let Some(target_timeline_id) = target_timeline_id {
if timeline.timeline_id != target_timeline_id {
continue;
}
assert_eq!(target_timeline_id, timeline.timeline_id);
}
let branchpoints: Vec<Lsn> = all_branchpoints
.range((
Included((timeline.timeline_id, Lsn(0))),
Included((timeline.timeline_id, Lsn(u64::MAX))),
))
.map(|&x| x.1)
.collect();
{
let mut target = timeline.gc_info.write().unwrap();
// Cull any expired leases
let now = SystemTime::now();
target.leases.retain(|_, lease| !lease.is_expired(&now));
@@ -2876,20 +2893,37 @@ impl Tenant {
.valid_lsn_lease_count_gauge
.set(target.leases.len() as u64);
match gc_cutoffs.remove(&timeline.timeline_id) {
Some(cutoffs) => {
target.retain_lsns = branchpoints;
target.cutoffs = cutoffs;
// Look up parent's PITR cutoff to update the child's knowledge of whether it is within parent's PITR
if let Some(ancestor_id) = timeline.get_ancestor_timeline_id() {
if let Some(ancestor_gc_cutoffs) = gc_cutoffs.get(&ancestor_id) {
target.within_ancestor_pitr =
timeline.get_ancestor_lsn() >= ancestor_gc_cutoffs.pitr;
}
None => {
// reasons for this being unavailable:
// - this timeline was created while we were finding cutoffs
// - lsn for timestamp search fails for this timeline repeatedly
//
// in both cases, refreshing the branchpoints is correct.
target.retain_lsns = branchpoints;
}
};
}
// Update metrics that depend on GC state
timeline
.metrics
.archival_size
.set(if target.within_ancestor_pitr {
timeline.metrics.current_logical_size_gauge.get()
} else {
0
});
timeline.metrics.pitr_history_size.set(
timeline
.get_last_record_lsn()
.checked_sub(target.cutoffs.pitr)
.unwrap_or(Lsn(0))
.0,
);
// Apply the cutoffs we found to the Timeline's GcInfo. Why might we _not_ have cutoffs for a timeline?
// - this timeline was created while we were finding cutoffs
// - lsn for timestamp search fails for this timeline repeatedly
if let Some(cutoffs) = gc_cutoffs.remove(&timeline.timeline_id) {
target.cutoffs = cutoffs;
}
}
gc_timelines.push(timeline);
@@ -2927,7 +2961,7 @@ impl Tenant {
dst_id: TimelineId,
ancestor_lsn: Option<Lsn>,
ctx: &RequestContext,
delta_layer_desc: Vec<Vec<(pageserver_api::key::Key, Lsn, crate::repository::Value)>>,
delta_layer_desc: Vec<timeline::DeltaLayerTestDesc>,
image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>,
end_lsn: Lsn,
) -> anyhow::Result<Arc<Timeline>> {
@@ -3927,7 +3961,7 @@ mod tests {
use storage_layer::PersistentLayerKey;
use tests::storage_layer::ValuesReconstructState;
use tests::timeline::{GetVectoredError, ShutdownMode};
use timeline::GcInfo;
use timeline::{DeltaLayerTestDesc, GcInfo};
use utils::bin_ser::BeSer;
use utils::id::TenantId;
@@ -4273,7 +4307,7 @@ mod tests {
{
let branchpoints = &tline.gc_info.read().unwrap().retain_lsns;
assert_eq!(branchpoints.len(), 1);
assert_eq!(branchpoints[0], Lsn(0x40));
assert_eq!(branchpoints[0], (Lsn(0x40), NEW_TIMELINE_ID, None));
}
// You can read the key from the child branch even though the parent is
@@ -6223,27 +6257,6 @@ mod tests {
.await
.unwrap();
async fn get_vectored_impl_wrapper(
tline: &Arc<Timeline>,
key: Key,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Option<Bytes>, GetVectoredError> {
let mut reconstruct_state = ValuesReconstructState::new();
let mut res = tline
.get_vectored_impl(
KeySpace::single(key..key.next()),
lsn,
&mut reconstruct_state,
ctx,
)
.await?;
Ok(res.pop_last().map(|(k, v)| {
assert_eq!(k, key);
v.unwrap()
}))
}
let lsn = Lsn(0x30);
// test vectored get on parent timeline
@@ -6319,27 +6332,6 @@ mod tests {
.await
.unwrap();
async fn get_vectored_impl_wrapper(
tline: &Arc<Timeline>,
key: Key,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Option<Bytes>, GetVectoredError> {
let mut reconstruct_state = ValuesReconstructState::new();
let mut res = tline
.get_vectored_impl(
KeySpace::single(key..key.next()),
lsn,
&mut reconstruct_state,
ctx,
)
.await?;
Ok(res.pop_last().map(|(k, v)| {
assert_eq!(k, key);
v.unwrap()
}))
}
let lsn = Lsn(0x30);
// test vectored get on parent timeline
@@ -6415,9 +6407,18 @@ mod tests {
&ctx,
// delta layers
vec![
vec![(key2, Lsn(0x10), Value::Image(test_img("metadata key 2")))],
vec![(key1, Lsn(0x20), Value::Image(Bytes::new()))],
vec![(key2, Lsn(0x20), Value::Image(Bytes::new()))],
DeltaLayerTestDesc::new_with_inferred_key_range(
Lsn(0x10)..Lsn(0x20),
vec![(key2, Lsn(0x10), Value::Image(test_img("metadata key 2")))],
),
DeltaLayerTestDesc::new_with_inferred_key_range(
Lsn(0x20)..Lsn(0x30),
vec![(key1, Lsn(0x20), Value::Image(Bytes::new()))],
),
DeltaLayerTestDesc::new_with_inferred_key_range(
Lsn(0x20)..Lsn(0x30),
vec![(key2, Lsn(0x20), Value::Image(Bytes::new()))],
),
],
// image layers
vec![
@@ -6483,17 +6484,29 @@ mod tests {
&ctx,
// delta layers
vec![
vec![(key2, Lsn(0x10), Value::Image(test_img("metadata key 2")))],
vec![(key1, Lsn(0x20), Value::Image(Bytes::new()))],
vec![(key2, Lsn(0x20), Value::Image(Bytes::new()))],
vec![
(key0, Lsn(0x30), Value::Image(test_img("metadata key 0"))),
(key3, Lsn(0x30), Value::Image(test_img("metadata key 3"))),
],
DeltaLayerTestDesc::new_with_inferred_key_range(
Lsn(0x10)..Lsn(0x20),
vec![(key2, Lsn(0x10), Value::Image(test_img("metadata key 2")))],
),
DeltaLayerTestDesc::new_with_inferred_key_range(
Lsn(0x20)..Lsn(0x30),
vec![(key1, Lsn(0x20), Value::Image(Bytes::new()))],
),
DeltaLayerTestDesc::new_with_inferred_key_range(
Lsn(0x20)..Lsn(0x30),
vec![(key2, Lsn(0x20), Value::Image(Bytes::new()))],
),
DeltaLayerTestDesc::new_with_inferred_key_range(
Lsn(0x30)..Lsn(0x40),
vec![
(key0, Lsn(0x30), Value::Image(test_img("metadata key 0"))),
(key3, Lsn(0x30), Value::Image(test_img("metadata key 3"))),
],
),
],
// image layers
vec![(Lsn(0x10), vec![(key1, test_img("metadata key 1"))])],
Lsn(0x30),
Lsn(0x40),
)
.await
.unwrap();
@@ -6516,7 +6529,7 @@ mod tests {
// Image layers are created at last_record_lsn
let images = tline
.inspect_image_layers(Lsn(0x30), &ctx)
.inspect_image_layers(Lsn(0x40), &ctx)
.await
.unwrap()
.into_iter()
@@ -6542,9 +6555,18 @@ mod tests {
&ctx,
// delta layers
vec![
vec![(key2, Lsn(0x10), Value::Image(test_img("metadata key 2")))],
vec![(key1, Lsn(0x20), Value::Image(Bytes::new()))],
vec![(key2, Lsn(0x20), Value::Image(Bytes::new()))],
DeltaLayerTestDesc::new_with_inferred_key_range(
Lsn(0x10)..Lsn(0x20),
vec![(key2, Lsn(0x10), Value::Image(test_img("metadata key 2")))],
),
DeltaLayerTestDesc::new_with_inferred_key_range(
Lsn(0x20)..Lsn(0x30),
vec![(key1, Lsn(0x20), Value::Image(Bytes::new()))],
),
DeltaLayerTestDesc::new_with_inferred_key_range(
Lsn(0x20)..Lsn(0x30),
vec![(key2, Lsn(0x20), Value::Image(Bytes::new()))],
),
],
// image layers
vec![(Lsn(0x10), vec![(key1, test_img("metadata key 1"))])],
@@ -6592,15 +6614,21 @@ mod tests {
key
}
// We create one bottom-most image layer, a delta layer D1 crossing the GC horizon, D2 below the horizon, and D3 above the horizon.
// We create
// - one bottom-most image layer,
// - a delta layer D1 crossing the GC horizon with data below and above the horizon,
// - a delta layer D2 crossing the GC horizon with data only below the horizon,
// - a delta layer D3 above the horizon.
//
// | D1 | | D3 |
// | D3 |
// | D1 |
// -| |-- gc horizon -----------------
// | | | D2 |
// --------- img layer ------------------
//
// What we should expact from this compaction is:
// | Part of D1 | | D3 |
// | D3 |
// | Part of D1 |
// --------- img layer with D1+D2 at GC horizon------------------
// img layer at 0x10
@@ -6640,13 +6668,13 @@ mod tests {
let delta3 = vec![
(
get_key(8),
Lsn(0x40),
Value::Image(Bytes::from("value 8@0x40")),
Lsn(0x48),
Value::Image(Bytes::from("value 8@0x48")),
),
(
get_key(9),
Lsn(0x40),
Value::Image(Bytes::from("value 9@0x40")),
Lsn(0x48),
Value::Image(Bytes::from("value 9@0x48")),
),
];
@@ -6656,7 +6684,11 @@ mod tests {
Lsn(0x10),
DEFAULT_PG_VERSION,
&ctx,
vec![delta1, delta2, delta3], // delta layers
vec![
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x48), delta1),
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x48), delta2),
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x48)..Lsn(0x50), delta3),
], // delta layers
vec![(Lsn(0x10), img_layer)], // image layers
Lsn(0x50),
)
@@ -6677,8 +6709,8 @@ mod tests {
Bytes::from_static(b"value 5@0x20"),
Bytes::from_static(b"value 6@0x20"),
Bytes::from_static(b"value 7@0x10"),
Bytes::from_static(b"value 8@0x40"),
Bytes::from_static(b"value 9@0x40"),
Bytes::from_static(b"value 8@0x48"),
Bytes::from_static(b"value 9@0x48"),
];
for (idx, expected) in expected_result.iter().enumerate() {
@@ -6766,10 +6798,10 @@ mod tests {
lsn_range: Lsn(0x30)..Lsn(0x41),
is_delta: true
},
// The delta layer we created and should not be picked for the compaction
// The delta3 layer that should not be picked for the compaction
PersistentLayerKey {
key_range: get_key(8)..get_key(10),
lsn_range: Lsn(0x40)..Lsn(0x41),
lsn_range: Lsn(0x48)..Lsn(0x50),
is_delta: true
}
]
@@ -6833,7 +6865,10 @@ mod tests {
Lsn(0x10),
DEFAULT_PG_VERSION,
&ctx,
vec![delta1], // delta layers
vec![DeltaLayerTestDesc::new_with_inferred_key_range(
Lsn(0x10)..Lsn(0x40),
delta1,
)], // delta layers
vec![(Lsn(0x10), image1)], // image layers
Lsn(0x50),
)
@@ -6957,15 +6992,21 @@ mod tests {
key
}
// We create one bottom-most image layer, a delta layer D1 crossing the GC horizon, D2 below the horizon, and D3 above the horizon.
// We create
// - one bottom-most image layer,
// - a delta layer D1 crossing the GC horizon with data below and above the horizon,
// - a delta layer D2 crossing the GC horizon with data only below the horizon,
// - a delta layer D3 above the horizon.
//
// | D1 | | D3 |
// | D3 |
// | D1 |
// -| |-- gc horizon -----------------
// | | | D2 |
// --------- img layer ------------------
//
// What we should expact from this compaction is:
// | Part of D1 | | D3 |
// | D3 |
// | Part of D1 |
// --------- img layer with D1+D2 at GC horizon------------------
// img layer at 0x10
@@ -7015,13 +7056,13 @@ mod tests {
let delta3 = vec![
(
get_key(8),
Lsn(0x40),
Value::WalRecord(NeonWalRecord::wal_append("@0x40")),
Lsn(0x48),
Value::WalRecord(NeonWalRecord::wal_append("@0x48")),
),
(
get_key(9),
Lsn(0x40),
Value::WalRecord(NeonWalRecord::wal_append("@0x40")),
Lsn(0x48),
Value::WalRecord(NeonWalRecord::wal_append("@0x48")),
),
];
@@ -7031,7 +7072,11 @@ mod tests {
Lsn(0x10),
DEFAULT_PG_VERSION,
&ctx,
vec![delta1, delta2, delta3], // delta layers
vec![
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x10)..Lsn(0x48), delta1),
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x10)..Lsn(0x48), delta2),
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x48)..Lsn(0x50), delta3),
], // delta layers
vec![(Lsn(0x10), img_layer)], // image layers
Lsn(0x50),
)
@@ -7046,6 +7091,7 @@ mod tests {
horizon: Lsn(0x30),
},
leases: Default::default(),
within_ancestor_pitr: false,
};
}
@@ -7058,8 +7104,8 @@ mod tests {
Bytes::from_static(b"value 5@0x10@0x20"),
Bytes::from_static(b"value 6@0x10@0x20"),
Bytes::from_static(b"value 7@0x10"),
Bytes::from_static(b"value 8@0x10@0x40"),
Bytes::from_static(b"value 9@0x10@0x40"),
Bytes::from_static(b"value 8@0x10@0x48"),
Bytes::from_static(b"value 9@0x10@0x48"),
];
let expected_result_at_gc_horizon = [

View File

@@ -19,6 +19,7 @@ use bytes::{BufMut, BytesMut};
use pageserver_api::models::ImageCompressionAlgorithm;
use tokio::io::AsyncWriteExt;
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
use tracing::warn;
use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
@@ -72,14 +73,22 @@ impl<'a> BlockCursor<'a> {
len_buf.copy_from_slice(&buf[off..off + 4]);
off += 4;
}
len_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
let bit_mask = if self.read_compressed {
!LEN_COMPRESSION_BIT_MASK
} else {
0x7f
};
len_buf[0] &= bit_mask;
u32::from_be_bytes(len_buf) as usize
};
let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
let mut tmp_buf = Vec::new();
let buf_to_write;
let compression = if compression_bits <= BYTE_UNCOMPRESSED {
let compression = if compression_bits <= BYTE_UNCOMPRESSED || !self.read_compressed {
if compression_bits > BYTE_UNCOMPRESSED {
warn!("reading key above future limit ({len} bytes)");
}
buf_to_write = dstbuf;
None
} else if compression_bits == BYTE_ZSTD {
@@ -264,7 +273,12 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
srcbuf: B,
ctx: &RequestContext,
) -> (B::Buf, Result<u64, Error>) {
self.write_blob_maybe_compressed(srcbuf, ctx, None).await
self.write_blob_maybe_compressed(
srcbuf,
ctx,
ImageCompressionAlgorithm::DisabledNoDecompress,
)
.await
}
/// Write a blob of data. Returns the offset that it was written to,
@@ -273,7 +287,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
&mut self,
srcbuf: B,
ctx: &RequestContext,
algorithm: Option<ImageCompressionAlgorithm>,
algorithm: ImageCompressionAlgorithm,
) -> (B::Buf, Result<u64, Error>) {
let offset = self.offset;
@@ -305,7 +319,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
);
}
let (high_bit_mask, len_written, srcbuf) = match algorithm {
Some(ImageCompressionAlgorithm::Zstd { level }) => {
ImageCompressionAlgorithm::Zstd { level } => {
let mut encoder = if let Some(level) = level {
async_compression::tokio::write::ZstdEncoder::with_quality(
Vec::new(),
@@ -326,7 +340,10 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
(BYTE_UNCOMPRESSED, len, slice.into_inner())
}
}
None => (BYTE_UNCOMPRESSED, len, srcbuf.slice_full().into_inner()),
ImageCompressionAlgorithm::Disabled
| ImageCompressionAlgorithm::DisabledNoDecompress => {
(BYTE_UNCOMPRESSED, len, srcbuf.slice_full().into_inner())
}
};
let mut len_buf = (len_written as u32).to_be_bytes();
assert_eq!(len_buf[0] & 0xf0, 0);
@@ -384,11 +401,12 @@ mod tests {
use rand::{Rng, SeedableRng};
async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
round_trip_test_compressed::<BUFFERED, 0>(blobs).await
round_trip_test_compressed::<BUFFERED>(blobs, false).await
}
async fn round_trip_test_compressed<const BUFFERED: bool, const COMPRESSION: u8>(
async fn round_trip_test_compressed<const BUFFERED: bool>(
blobs: &[Vec<u8>],
compression: bool,
) -> Result<(), Error> {
let temp_dir = camino_tempfile::tempdir()?;
let pathbuf = temp_dir.path().join("file");
@@ -400,17 +418,15 @@ mod tests {
let file = VirtualFile::create(pathbuf.as_path(), &ctx).await?;
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
for blob in blobs.iter() {
let (_, res) = match COMPRESSION {
0 => wtr.write_blob(blob.clone(), &ctx).await,
1 => {
wtr.write_blob_maybe_compressed(
blob.clone(),
&ctx,
Some(ImageCompressionAlgorithm::Zstd { level: Some(1) }),
)
.await
}
_ => unreachable!("Invalid compression {COMPRESSION}"),
let (_, res) = if compression {
wtr.write_blob_maybe_compressed(
blob.clone(),
&ctx,
ImageCompressionAlgorithm::Zstd { level: Some(1) },
)
.await
} else {
wtr.write_blob(blob.clone(), &ctx).await
};
let offs = res?;
offsets.push(offs);
@@ -425,7 +441,7 @@ mod tests {
let file = VirtualFile::open(pathbuf.as_path(), &ctx).await?;
let rdr = BlockReaderRef::VirtualFile(&file);
let rdr = BlockCursor::new(rdr);
let rdr = BlockCursor::new_with_compression(rdr, compression);
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
let blob_read = rdr.read_blob(*offset, &ctx).await?;
assert_eq!(
@@ -459,6 +475,8 @@ mod tests {
];
round_trip_test::<false>(blobs).await?;
round_trip_test::<true>(blobs).await?;
round_trip_test_compressed::<false>(blobs, true).await?;
round_trip_test_compressed::<true>(blobs, true).await?;
Ok(())
}
@@ -474,8 +492,8 @@ mod tests {
];
round_trip_test::<false>(blobs).await?;
round_trip_test::<true>(blobs).await?;
round_trip_test_compressed::<false, 1>(blobs).await?;
round_trip_test_compressed::<true, 1>(blobs).await?;
round_trip_test_compressed::<false>(blobs, true).await?;
round_trip_test_compressed::<true>(blobs, true).await?;
Ok(())
}

View File

@@ -149,16 +149,24 @@ impl<'a> BlockReaderRef<'a> {
/// ```
///
pub struct BlockCursor<'a> {
pub(super) read_compressed: bool,
reader: BlockReaderRef<'a>,
}
impl<'a> BlockCursor<'a> {
pub(crate) fn new(reader: BlockReaderRef<'a>) -> Self {
BlockCursor { reader }
Self::new_with_compression(reader, false)
}
pub(crate) fn new_with_compression(reader: BlockReaderRef<'a>, read_compressed: bool) -> Self {
BlockCursor {
read_compressed,
reader,
}
}
// Needed by cli
pub fn new_fileblockreader(reader: &'a FileBlockReader) -> Self {
BlockCursor {
read_compressed: false,
reader: BlockReaderRef::FileBlockReader(reader),
}
}
@@ -188,11 +196,25 @@ pub struct FileBlockReader<'a> {
/// Unique ID of this file, used as key in the page cache.
file_id: page_cache::FileId,
compressed_reads: bool,
}
impl<'a> FileBlockReader<'a> {
pub fn new(file: &'a VirtualFile, file_id: FileId) -> Self {
FileBlockReader { file_id, file }
Self::new_with_compression(file, file_id, false)
}
pub fn new_with_compression(
file: &'a VirtualFile,
file_id: FileId,
compressed_reads: bool,
) -> Self {
FileBlockReader {
file_id,
file,
compressed_reads,
}
}
/// Read a page from the underlying file into given buffer.
@@ -239,7 +261,10 @@ impl<'a> FileBlockReader<'a> {
impl BlockReader for FileBlockReader<'_> {
fn block_cursor(&self) -> BlockCursor<'_> {
BlockCursor::new(BlockReaderRef::FileBlockReader(self))
BlockCursor::new_with_compression(
BlockReaderRef::FileBlockReader(self),
self.compressed_reads,
)
}
}

View File

@@ -51,7 +51,7 @@ use crate::keyspace::KeyPartitioning;
use crate::repository::Key;
use crate::tenant::storage_layer::InMemoryLayer;
use anyhow::Result;
use pageserver_api::keyspace::KeySpaceAccum;
use pageserver_api::keyspace::{KeySpace, KeySpaceAccum, KeySpaceRandomAccum};
use std::collections::{HashMap, VecDeque};
use std::iter::Peekable;
use std::ops::Range;
@@ -61,7 +61,7 @@ use utils::lsn::Lsn;
use historic_layer_coverage::BufferedHistoricLayerCoverage;
pub use historic_layer_coverage::LayerKey;
use super::storage_layer::PersistentLayerDesc;
use super::storage_layer::{LayerVisibility, PersistentLayerDesc};
///
/// LayerMap tracks what layers exist on a timeline.
@@ -870,6 +870,164 @@ impl LayerMap {
println!("End dump LayerMap");
Ok(())
}
/// `read_points` represent the tip of a timeline and any branch points, i.e. the places
/// where we expect to serve reads.
///
/// This function is O(N) and should be called infrequently. The caller is responsible for
/// looking up and updating the Layer objects for these layer descriptors.
pub(crate) fn get_visibility(
&self,
mut read_points: Vec<(Lsn, KeySpace)>,
) -> (Vec<(Arc<PersistentLayerDesc>, LayerVisibility)>, KeySpace) {
// This is like a KeySpace, but written for efficient subtraction of layers and unions with KeySpaces
struct KeyShadow {
// FIXME: consider efficiency. KeySpace is a flat vector, so in principle fairly inefficient for
// repeatedly calling contains(), BUT as we iterate through the layermap we expect the shadow to shrink
// to something quite small, and for small collections an algorithmically expensive vector is often better
// for performance than a more algorithmically cheap data structure.
inner: KeySpace,
}
impl KeyShadow {
fn new(keyspace: KeySpace) -> Self {
Self { inner: keyspace }
}
fn contains(&self, range: Range<Key>) -> bool {
self.inner.overlaps(&range)
}
/// Return true if anything was removed.
fn subtract(&mut self, range: Range<Key>) -> bool {
let removed = self.inner.remove_overlapping_with(&KeySpace {
ranges: vec![range],
});
!removed.ranges.is_empty()
}
fn union_with(&mut self, keyspace: KeySpace) {
let mut accum = KeySpaceRandomAccum::new();
let prev = std::mem::take(&mut self.inner);
accum.add_keyspace(prev);
accum.add_keyspace(keyspace);
self.inner = accum.to_keyspace();
}
}
// The 'shadow' will be updated as we sweep through the layers: an image layer subtracts from the shadow,
// and a ReadPoint
read_points.sort_by_key(|rp| rp.0);
let mut shadow = KeyShadow::new(
read_points
.pop()
.expect("Every timeline has at least one read point")
.1,
);
// We will interleave all our read points and layers into a sorted collection
enum Item {
ReadPoint { lsn: Lsn, keyspace: KeySpace },
Layer(Arc<PersistentLayerDesc>),
}
let mut items = Vec::with_capacity(self.historic.len() + read_points.len());
items.extend(self.iter_historic_layers().map(Item::Layer));
items.extend(read_points.into_iter().map(|rp| Item::ReadPoint {
lsn: rp.0,
keyspace: rp.1,
}));
// Ordering: we want to iterate like this:
// 1. Highest LSNs first
// 2. Consider ReadPoints before image layers if they're at the same LSN
items.sort_by_key(|item| {
std::cmp::Reverse(match item {
Item::ReadPoint {
lsn,
keyspace: _keyspace,
} => (*lsn, 0),
Item::Layer(layer) => {
if layer.is_delta() {
(layer.get_lsn_range().end, 1)
} else {
(layer.image_layer_lsn(), 2)
}
}
})
});
let mut results = Vec::with_capacity(self.historic.len());
// TODO: handle delta layers properly with multiple read points: if a read point intersects a delta layer, we might already
// have encountered it and marked it as not-visible. We need to keep track of which delta layers we are currently within, and
// when we encounter a ReadPoint, update the delta layer's visibility as needed.
// let mut pending_delta : Vec= ...
let mut maybe_covered_deltas: Vec<Arc<PersistentLayerDesc>> = Vec::new();
for item in items {
let (reached_lsn, is_readpoint) = match &item {
Item::ReadPoint {
lsn,
keyspace: _keyspace,
} => (lsn, true),
Item::Layer(layer) => (&layer.lsn_range.start, false),
};
maybe_covered_deltas.retain(|d| {
if *reached_lsn >= d.lsn_range.start && is_readpoint {
// We encountered a readpoint within the delta layer: it is visible
results.push((d.clone(), LayerVisibility::Visible));
false
} else if *reached_lsn < d.lsn_range.start {
// We passed the layer's range without encountering a read point: it is not visible
results.push((d.clone(), LayerVisibility::Covered));
false
} else {
// We're still in the delta layer: continue iterating
true
}
});
match item {
Item::ReadPoint {
lsn: _lsn,
keyspace,
} => {
shadow.union_with(keyspace);
}
Item::Layer(layer) => {
let visibility = if layer.is_delta() {
if shadow.contains(layer.get_key_range()) {
LayerVisibility::Visible
} else {
// If a layer isn't visible based on current state, we must defer deciding whether
// it is truly not visible until we have advanced past the delta's range: we might
// encounter another branch point within this delta layer's LSN range.
maybe_covered_deltas.push(layer);
continue;
}
} else if shadow.subtract(layer.get_key_range()) {
// An image layer, which overlapped with the shadow
LayerVisibility::Visible
} else {
// An image layer, which did not overlap with the shadow
LayerVisibility::Covered
};
results.push((layer, visibility));
}
}
}
// Drain any remaining maybe_covered deltas
results.extend(
maybe_covered_deltas
.into_iter()
.map(|d| (d, LayerVisibility::Covered)),
);
(results, shadow.inner)
}
}
#[cfg(test)]

View File

@@ -521,6 +521,10 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
Ok(&self.historic_coverage)
}
pub(crate) fn len(&self) -> usize {
self.layers.len()
}
}
#[test]

View File

@@ -3,6 +3,7 @@ use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tenant_size_model::svg::SvgBranchKind;
use tokio::sync::oneshot::error::RecvError;
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
@@ -87,6 +88,9 @@ impl SegmentMeta {
LsnKind::BranchPoint => true,
LsnKind::GcCutOff => true,
LsnKind::BranchEnd => false,
LsnKind::LeasePoint => true,
LsnKind::LeaseStart => false,
LsnKind::LeaseEnd => false,
}
}
}
@@ -103,6 +107,21 @@ pub enum LsnKind {
GcCutOff,
/// Last record LSN
BranchEnd,
/// A LSN lease is granted here.
LeasePoint,
/// A lease starts from here.
LeaseStart,
/// Last record LSN for the lease (should have the same LSN as the previous [`LsnKind::LeaseStart`]).
LeaseEnd,
}
impl From<LsnKind> for SvgBranchKind {
fn from(kind: LsnKind) -> Self {
match kind {
LsnKind::LeasePoint | LsnKind::LeaseStart | LsnKind::LeaseEnd => SvgBranchKind::Lease,
_ => SvgBranchKind::Timeline,
}
}
}
/// Collect all relevant LSNs to the inputs. These will only be helpful in the serialized form as
@@ -124,6 +143,9 @@ pub struct TimelineInputs {
/// Cutoff point calculated from the user-supplied 'max_retention_period'
retention_param_cutoff: Option<Lsn>,
/// Lease points on the timeline
lease_points: Vec<Lsn>,
}
/// Gathers the inputs for the tenant sizing model.
@@ -234,6 +256,13 @@ pub(super) async fn gather_inputs(
None
};
let lease_points = gc_info
.leases
.keys()
.filter(|&&lsn| lsn > ancestor_lsn)
.copied()
.collect::<Vec<_>>();
// next_gc_cutoff in parent branch are not of interest (right now at least), nor do we
// want to query any logical size before initdb_lsn.
let branch_start_lsn = cmp::max(ancestor_lsn, timeline.initdb_lsn);
@@ -242,12 +271,18 @@ pub(super) async fn gather_inputs(
let mut lsns: Vec<(Lsn, LsnKind)> = gc_info
.retain_lsns
.iter()
.filter(|&&lsn| lsn > ancestor_lsn)
.copied()
// this assumes there are no other retain_lsns than the branchpoints
.map(|lsn| (lsn, LsnKind::BranchPoint))
.filter_map(|(lsn, _child_id, _)| {
if lsn > &ancestor_lsn {
// this assumes there are no other retain_lsns than the branchpoints
Some((*lsn, LsnKind::BranchPoint))
} else {
None
}
})
.collect::<Vec<_>>();
lsns.extend(lease_points.iter().map(|&lsn| (lsn, LsnKind::LeasePoint)));
drop(gc_info);
// Add branch points we collected earlier, just in case there were any that were
@@ -296,6 +331,7 @@ pub(super) async fn gather_inputs(
if kind == LsnKind::BranchPoint {
branchpoint_segments.insert((timeline_id, lsn), segments.len());
}
segments.push(SegmentMeta {
segment: Segment {
parent: Some(parent),
@@ -306,7 +342,45 @@ pub(super) async fn gather_inputs(
timeline_id: timeline.timeline_id,
kind,
});
parent += 1;
parent = segments.len() - 1;
if kind == LsnKind::LeasePoint {
// Needs `LeaseStart` and `LeaseEnd` as well to model lease as a read-only branch that never writes data
// (i.e. it's lsn has not advanced from ancestor_lsn), and therefore the three segments have the same LSN
// value. Without the other two segments, the calculation code would not count the leased LSN as a point
// to be retained.
// Did not use `BranchStart` or `BranchEnd` so we can differentiate branches and leases during debug.
//
// Alt Design: rewrite the entire calculation code to be independent of timeline id. Both leases and
// branch points can be given a synthetic id so we can unite them.
let mut lease_parent = parent;
// Start of a lease.
segments.push(SegmentMeta {
segment: Segment {
parent: Some(lease_parent),
lsn: lsn.0,
size: None, // Filled in later, if necessary
needed: lsn > next_gc_cutoff, // only needed if the point is within rentention.
},
timeline_id: timeline.timeline_id,
kind: LsnKind::LeaseStart,
});
lease_parent += 1;
// End of the lease.
segments.push(SegmentMeta {
segment: Segment {
parent: Some(lease_parent),
lsn: lsn.0,
size: None, // Filled in later, if necessary
needed: true, // everything at the lease LSN must be readable => is needed
},
timeline_id: timeline.timeline_id,
kind: LsnKind::LeaseEnd,
});
}
}
// Current end of the timeline
@@ -332,6 +406,7 @@ pub(super) async fn gather_inputs(
pitr_cutoff,
next_gc_cutoff,
retention_param_cutoff,
lease_points,
});
}
@@ -674,7 +749,8 @@ fn verify_size_for_multiple_branches() {
"horizon_cutoff": "0/2210CD0",
"pitr_cutoff": "0/2210CD0",
"next_gc_cutoff": "0/2210CD0",
"retention_param_cutoff": null
"retention_param_cutoff": null,
"lease_points": []
},
{
"timeline_id": "454626700469f0a9914949b9d018e876",
@@ -684,7 +760,8 @@ fn verify_size_for_multiple_branches() {
"horizon_cutoff": "0/1817770",
"pitr_cutoff": "0/1817770",
"next_gc_cutoff": "0/1817770",
"retention_param_cutoff": null
"retention_param_cutoff": null,
"lease_points": []
},
{
"timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
@@ -694,7 +771,8 @@ fn verify_size_for_multiple_branches() {
"horizon_cutoff": "0/18B3D98",
"pitr_cutoff": "0/18B3D98",
"next_gc_cutoff": "0/18B3D98",
"retention_param_cutoff": null
"retention_param_cutoff": null,
"lease_points": []
}
]
}
@@ -749,7 +827,8 @@ fn verify_size_for_one_branch() {
"horizon_cutoff": "47/240A5860",
"pitr_cutoff": "47/240A5860",
"next_gc_cutoff": "47/240A5860",
"retention_param_cutoff": "0/0"
"retention_param_cutoff": "0/0",
"lease_points": []
}
]
}"#;

View File

@@ -457,6 +457,26 @@ pub enum ValueReconstructResult {
Missing,
}
#[derive(Debug, Clone)]
pub(crate) enum LayerVisibility {
/// A Visible layer might be read while serving a read, because there is not an image layer between it
/// and a readable LSN (the tip of the branch or a child's branch point)
Visible,
/// A Covered layer probably won't be read right now, but _can_ be read in future if someone creates
/// a branch or ephemeral endpoint at an LSN below the layer that covers this.
Covered,
/// Calculating layer visibilty requires I/O, so until this has happened layers are loaded
/// in this state. Note that newly written layers may be called Visible immediately, this uninitialized
/// state is for when existing layers are constructed while loading a timeline.
Uninitialized,
}
impl Default for LayerVisibility {
fn default() -> Self {
Self::Uninitialized
}
}
#[derive(Debug)]
pub struct LayerAccessStats(Mutex<LayerAccessStatsLocked>);
@@ -468,6 +488,7 @@ pub struct LayerAccessStats(Mutex<LayerAccessStatsLocked>);
struct LayerAccessStatsLocked {
for_scraping_api: LayerAccessStatsInner,
for_eviction_policy: LayerAccessStatsInner,
visibility: LayerVisibility,
}
impl LayerAccessStatsLocked {
@@ -591,7 +612,13 @@ impl LayerAccessStats {
inner.count_by_access_kind[access_kind] += 1;
inner.task_kind_flag |= ctx.task_kind();
inner.last_accesses.write(this_access);
})
});
// We may access a layer marked as Covered, if a new branch was created that depends on
// this layer, and background updates to layer visibility didn't notice it yet
if !matches!(locked.visibility, LayerVisibility::Visible) {
locked.visibility = LayerVisibility::Visible;
}
}
fn as_api_model(
@@ -673,6 +700,28 @@ impl LayerAccessStats {
},
}
}
pub(crate) fn set_visibility(&self, visibility: LayerVisibility) {
self.0.lock().unwrap().visibility = visibility;
}
pub(crate) fn get_visibility(&self) -> LayerVisibility {
self.0.lock().unwrap().visibility.clone()
}
/// Summarize how likely this layer is to be used: its access time (if accessed), and its visibility hint.
pub(crate) fn atime_visibility(&self) -> (Option<SystemTime>, LayerVisibility) {
let state = self.0.lock().unwrap();
(
state
.for_eviction_policy
.last_accesses
.recent()
.map(|a| a.when),
state.visibility.clone(),
)
}
}
/// Get a layer descriptor from a layer.

View File

@@ -49,7 +49,7 @@ use camino::{Utf8Path, Utf8PathBuf};
use futures::StreamExt;
use itertools::Itertools;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::models::{ImageCompressionAlgorithm, LayerAccessKind};
use pageserver_api::shard::TenantShardId;
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
@@ -453,7 +453,7 @@ impl DeltaLayerWriterInner {
) -> (Vec<u8>, anyhow::Result<()>) {
assert!(self.lsn_range.start <= lsn);
// We don't want to use compression in delta layer creation
let compression = None;
let compression = ImageCompressionAlgorithm::DisabledNoDecompress;
let (val, res) = self
.blob_writer
.write_blob_maybe_compressed(val, ctx, compression)

View File

@@ -165,6 +165,7 @@ pub struct ImageLayerInner {
file_id: FileId,
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
compressed_reads: bool,
}
impl std::fmt::Debug for ImageLayerInner {
@@ -178,7 +179,8 @@ impl std::fmt::Debug for ImageLayerInner {
impl ImageLayerInner {
pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let block_reader =
FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
self.index_start_blk,
self.index_root_blk,
@@ -266,9 +268,10 @@ impl ImageLayer {
async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
let path = self.path();
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx)
.await
.and_then(|res| res)?;
let loaded =
ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, false, ctx)
.await
.and_then(|res| res)?;
// not production code
let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
@@ -377,6 +380,7 @@ impl ImageLayerInner {
lsn: Lsn,
summary: Option<Summary>,
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
support_compressed_reads: bool,
ctx: &RequestContext,
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path, ctx).await {
@@ -420,6 +424,7 @@ impl ImageLayerInner {
file,
file_id,
max_vectored_read_bytes,
compressed_reads: support_compressed_reads,
key_range: actual_summary.key_range,
}))
}
@@ -430,7 +435,8 @@ impl ImageLayerInner {
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let block_reader =
FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
@@ -490,12 +496,14 @@ impl ImageLayerInner {
&self,
ctx: &RequestContext,
) -> anyhow::Result<Vec<(Key, Lsn, Value)>> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let block_reader =
FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
let mut result = Vec::new();
let mut stream = Box::pin(tree_reader.into_stream(&[0; KEY_SIZE], ctx));
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let block_reader =
FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
let cursor = block_reader.block_cursor();
while let Some(item) = stream.next().await {
// TODO: dedup code with get_reconstruct_value
@@ -530,7 +538,8 @@ impl ImageLayerInner {
.into(),
);
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let block_reader =
FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
@@ -691,7 +700,8 @@ impl ImageLayerInner {
#[cfg(test)]
pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let block_reader =
FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
ImageLayerIterator {

View File

@@ -250,6 +250,8 @@ impl Layer {
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
// Newly created layers are marked visible by default: the usual case is that they were created to be read.
access_stats.set_visibility(super::LayerVisibility::Visible);
let local_path = local_layer_path(
conf,
@@ -1685,6 +1687,7 @@ impl DownloadedLayer {
lsn,
summary,
Some(owner.conf.max_vectored_read_bytes),
owner.conf.image_compression.allow_decompression(),
ctx,
)
.await

View File

@@ -14,6 +14,7 @@ use anyhow::{anyhow, bail, ensure, Context, Result};
use arc_swap::ArcSwap;
use bytes::Bytes;
use camino::Utf8Path;
use chrono::{DateTime, Utc};
use enumset::EnumSet;
use fail::fail_point;
use once_cell::sync::Lazy;
@@ -29,7 +30,7 @@ use pageserver_api::{
InMemoryLayerInfo, LayerMapInfo, LsnLease, TimelineState,
},
reltag::BlockNumber,
shard::{ShardIdentity, ShardNumber, TenantShardId},
shard::{ShardCount, ShardIdentity, ShardNumber, TenantShardId},
};
use rand::Rng;
use serde_with::serde_as;
@@ -134,7 +135,7 @@ use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::{config::TenantConf, storage_layer::LayerVisibility};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
use super::{remote_timeline_client::RemoteTimelineClient, storage_layer::ReadableLayer};
@@ -364,6 +365,7 @@ pub struct Timeline {
repartition_threshold: u64,
last_image_layer_creation_check_at: AtomicLsn,
last_image_layer_creation_check_instant: std::sync::Mutex<Option<Instant>>,
/// Current logical size of the "datadir", at the last LSN.
current_logical_size: LogicalSize,
@@ -451,24 +453,44 @@ pub struct WalReceiverInfo {
/// Garbage Collection.
#[derive(Default)]
pub(crate) struct GcInfo {
/// Specific LSNs that are needed.
/// Record which parts of this timeline's history are still needed by children
///
/// Currently, this includes all points where child branches have
/// been forked off from. In the future, could also include
/// explicit user-defined snapshot points.
pub(crate) retain_lsns: Vec<Lsn>,
/// Optionally store each child's keyspace at their branch LSN: parts of the keyspace not covered here may be dropped during GC, as
/// the child will never read them. For example, a child which has covered its whole keyspace with image layers
/// will put an empty keyspace here. Children populate this: if it is None, presume the child may read any part of the keyspace.
pub(crate) retain_lsns: Vec<(Lsn, TimelineId, Option<KeySpace>)>,
/// The cutoff coordinates, which are combined by selecting the minimum.
pub(crate) cutoffs: GcCutoffs,
/// Leases granted to particular LSNs.
pub(crate) leases: BTreeMap<Lsn, LsnLease>,
/// Whether our branch point is within our ancestor's PITR interval (for cost estimation)
pub(crate) within_ancestor_pitr: bool,
}
impl GcInfo {
pub(crate) fn min_cutoff(&self) -> Lsn {
self.cutoffs.select_min()
}
pub(super) fn insert_child(&mut self, child_id: TimelineId, child_lsn: Lsn) {
self.retain_lsns.push((child_lsn, child_id, None));
self.retain_lsns.sort_by_key(|i| i.0);
}
pub(super) fn remove_child(&mut self, child_id: TimelineId) {
self.retain_lsns.retain(|i| i.1 != child_id);
}
/// When the child re-calculates which parts of the keyspace it will read from the ancestor, it posts
/// and update to the parent using this function, to enable the parent to perhaps GC more layers.
pub(super) fn notify_child_keyspace(&mut self, child_id: TimelineId, key_space: KeySpace) {
if let Ok(idx) = self.retain_lsns.binary_search_by_key(&child_id, |i| i.1) {
self.retain_lsns.get_mut(idx).unwrap().2 = Some(key_space);
}
}
}
/// The `GcInfo` component describing which Lsns need to be retained.
@@ -688,7 +710,7 @@ pub enum GetLogicalSizePriority {
Background,
}
#[derive(enumset::EnumSetType, Debug)]
#[derive(enumset::EnumSetType)]
pub(crate) enum CompactFlags {
ForceRepartition,
ForceImageLayerCreation,
@@ -851,6 +873,18 @@ impl Timeline {
.map(|ancestor| ancestor.timeline_id)
}
/// Get the bytes written since the PITR cutoff on this branch, and
/// whether this branch's ancestor_lsn is within its parent's PITR.
pub(crate) fn get_pitr_history_stats(&self) -> (u64, bool) {
let gc_info = self.gc_info.read().unwrap();
let history = self
.get_last_record_lsn()
.checked_sub(gc_info.cutoffs.pitr)
.unwrap_or(Lsn(0))
.0;
(history, gc_info.within_ancestor_pitr)
}
/// Lock and get timeline's GC cutoff
pub(crate) fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
self.latest_gc_cutoff_lsn.read()
@@ -1269,15 +1303,14 @@ impl Timeline {
if avg >= Self::VEC_GET_LAYERS_VISITED_WARN_THRESH {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(60))));
let mut rate_limit = LOGGED.lock().unwrap();
rate_limit.call(|| {
tracing::info!(
tenant_id = %self.tenant_shard_id.tenant_id,
shard_id = %self.tenant_shard_id.shard_slug(),
timeline_id = %self.timeline_id,
"Vectored read for {} visited {} layers on average per key and {} in total. {}/{} pages were returned",
keyspace, avg, layers_visited, results.len(), keyspace.total_raw_size());
shard_id = %self.tenant_shard_id.shard_slug(),
lsn = %lsn,
"Vectored read for {} visited {} layers on average per key and {} in total. {}/{} pages were returned",
keyspace, avg, layers_visited, results.len(), keyspace.total_raw_size());
});
}
@@ -1576,7 +1609,13 @@ impl Timeline {
let existing_lease = occupied.get_mut();
if valid_until > existing_lease.valid_until {
existing_lease.valid_until = valid_until;
let dt: DateTime<Utc> = valid_until.into();
info!("lease extended to {}", dt);
} else {
let dt: DateTime<Utc> = existing_lease.valid_until.into();
info!("existing lease covers greater length, valid until {}", dt);
}
existing_lease.clone()
} else {
// Reject already GC-ed LSN (lsn < latest_gc_cutoff)
@@ -1585,6 +1624,8 @@ impl Timeline {
bail!("tried to request a page version that was garbage collected. requested at {} gc cutoff {}", lsn, *latest_gc_cutoff_lsn);
}
let dt: DateTime<Utc> = valid_until.into();
info!("lease created, valid until {}", dt);
entry.or_insert(LsnLease { valid_until }).clone()
}
};
@@ -1769,9 +1810,26 @@ impl Timeline {
}
match self.get_compaction_algorithm_settings().kind {
CompactionAlgorithm::Tiered => self.compact_tiered(cancel, ctx).await,
CompactionAlgorithm::Legacy => self.compact_legacy(cancel, flags, ctx).await,
CompactionAlgorithm::Tiered => self.compact_tiered(cancel, ctx).await?,
CompactionAlgorithm::Legacy => self.compact_legacy(cancel, flags, ctx).await?,
}
if self.shard_identity.count >= ShardCount::new(2) {
// Limit the number of layer rewrites to the number of partitions: this means its
// runtime should be comparable to a full round of image layer creations, rather than
// being potentially much longer.
// TODO: make `partitioning` a sync lock: see comment in `repartition()` for why there's no
// real async use.
let rewrite_max = self.partitioning.try_lock().unwrap().0 .0.parts.len();
self.compact_shard_ancestors(rewrite_max, ctx).await?;
}
// TODO: be more selective: call this once at startup, and thereafter only when some branching changes or
// when image layer are generated.
self.update_layer_visibility(ctx).await?;
Ok(())
}
/// Mutate the timeline with a [`TimelineWriter`].
@@ -2361,6 +2419,7 @@ impl Timeline {
)),
repartition_threshold: 0,
last_image_layer_creation_check_at: AtomicLsn::new(0),
last_image_layer_creation_check_instant: Mutex::new(None),
last_received_wal: Mutex::new(None),
rel_size_cache: RwLock::new(RelSizeCache {
@@ -2942,6 +3001,17 @@ impl Timeline {
.set((calculated_size, metrics_guard.calculation_result_saved()))
.ok()
.expect("only this task sets it");
// As a nice-to-have, calculate layer visibilties. Otherwise this will
// be initialized on first compaction. Doing it as early as possible
// enables code that depends on layer visibility (like uploading heatmaps)
// to execute earlier, rather than waiting for compaction.
match self.update_layer_visibility(&background_ctx).await {
Ok(_) | Err(CompactionError::ShuttingDown) => {}
Err(e) => {
tracing::warn!("Initial layer visibility calculation failed: {e}");
}
}
}
pub(crate) fn spawn_ondemand_logical_size_calculation(
@@ -3119,7 +3189,8 @@ impl Timeline {
}
/// The timeline heatmap is a hint to secondary locations from the primary location,
/// indicating which layers are currently on-disk on the primary.
/// indicating which layers should be downloaded on the secondary to give it a warm
/// cache, that will enable it to take over as the attached location without degrading performance.
///
/// None is returned if the Timeline is in a state where uploading a heatmap
/// doesn't make sense, such as shutting down or initializing. The caller
@@ -3132,19 +3203,32 @@ impl Timeline {
let guard = self.layers.read().await;
let resident = guard.likely_resident_layers().map(|layer| {
let last_activity_ts = layer.access_stats().latest_activity_or_now();
let mut resident_visible_layers = Vec::new();
let now = SystemTime::now();
for layer in guard.likely_resident_layers() {
let (atime, visibility) = layer.access_stats().atime_visibility();
HeatMapLayer::new(
layer.layer_desc().layer_name(),
layer.metadata(),
last_activity_ts,
)
});
match visibility {
LayerVisibility::Uninitialized => {
// Refuse to generate a heatmap at all until layer visibilty is initialized
return None;
}
LayerVisibility::Covered => {
// This layer is covered: exclude it from the heatmap because a secondary
// node is highly unlikely to need this layer in the event that it takes over as attached
}
LayerVisibility::Visible => resident_visible_layers.push(HeatMapLayer::new(
layer.layer_desc().layer_name(),
layer.metadata(),
atime.unwrap_or(now),
)),
}
}
let layers = resident.collect();
Some(HeatMapTimeline::new(self.timeline_id, layers))
Some(HeatMapTimeline::new(
self.timeline_id,
resident_visible_layers,
))
}
/// Returns true if the given lsn is or was an ancestor branchpoint.
@@ -4441,6 +4525,58 @@ impl Timeline {
}
}
/// Predicate function which indicates whether we should check if new image layers
/// are required. Since checking if new image layers are required is expensive in
/// terms of CPU, we only do it in the following cases:
/// 1. If the timeline has ingested sufficient WAL to justify the cost
/// 2. If enough time has passed since the last check
/// 2.1. For large tenants, we wish to perform the check more often since they
/// suffer from the lack of image layers
/// 2.2. For small tenants (that can mostly fit in RAM), we use a much longer interval
fn should_check_if_image_layers_required(self: &Arc<Timeline>, lsn: Lsn) -> bool {
const LARGE_TENANT_THRESHOLD: u64 = 2 * 1024 * 1024 * 1024;
let last_checks_at = self.last_image_layer_creation_check_at.load();
let distance = lsn
.checked_sub(last_checks_at)
.expect("Attempt to compact with LSN going backwards");
let min_distance =
self.get_image_layer_creation_check_threshold() as u64 * self.get_checkpoint_distance();
let distance_based_decision = distance.0 >= min_distance;
let mut time_based_decision = false;
let mut last_check_instant = self.last_image_layer_creation_check_instant.lock().unwrap();
if let CurrentLogicalSize::Exact(logical_size) = self.current_logical_size.current_size() {
let check_required_after = if Into::<u64>::into(&logical_size) >= LARGE_TENANT_THRESHOLD
{
self.get_checkpoint_timeout()
} else {
Duration::from_secs(3600 * 48)
};
time_based_decision = match *last_check_instant {
Some(last_check) => {
let elapsed = last_check.elapsed();
elapsed >= check_required_after
}
None => true,
};
}
// Do the expensive delta layer counting only if this timeline has ingested sufficient
// WAL since the last check or a checkpoint timeout interval has elapsed since the last
// check.
let decision = distance_based_decision || time_based_decision;
if decision {
self.last_image_layer_creation_check_at.store(lsn);
*last_check_instant = Some(Instant::now());
}
decision
}
#[tracing::instrument(skip_all, fields(%lsn, %mode))]
async fn create_image_layers(
self: &Arc<Timeline>,
@@ -4463,27 +4599,9 @@ impl Timeline {
// image layers <100000000..100000099> and <200000000..200000199> are not completely covering it.
let mut start = Key::MIN;
let check_for_image_layers = {
let last_checks_at = self.last_image_layer_creation_check_at.load();
let distance = lsn
.checked_sub(last_checks_at)
.expect("Attempt to compact with LSN going backwards");
let min_distance = self.get_image_layer_creation_check_threshold() as u64
* self.get_checkpoint_distance();
// Skip the expensive delta layer counting if this timeline has not ingested sufficient
// WAL since the last check.
distance.0 >= min_distance
};
if check_for_image_layers {
self.last_image_layer_creation_check_at.store(lsn);
}
tracing::info!("Compacting image layers at lsn {lsn} with creation mode {mode:?} check_for_image_layers={check_for_image_layers}");
let check_for_image_layers = self.should_check_if_image_layers_required(lsn);
for partition in partitioning.parts.iter() {
tracing::info!("Looking at partition {partition}");
let img_range = start..partition.ranges.last().unwrap().end;
let compact_metadata = partition.overlaps(&Key::metadata_key_range());
if compact_metadata {
@@ -4506,9 +4624,7 @@ impl Timeline {
} else if let ImageLayerCreationMode::Try = mode {
// check_for_image_layers = false -> skip
// check_for_image_layers = true -> check time_for_new_image_layer -> skip/generate
let time_for_new_image_layer = self.time_for_new_image_layer(partition, lsn).await;
if !check_for_image_layers || time_for_new_image_layer {
tracing::info!("Skipping image layer creation check_for_image_layers={check_for_image_layers} time_for_new_image_layer={time_for_new_image_layer}");
if !check_for_image_layers || !self.time_for_new_image_layer(partition, lsn).await {
start = img_range.end;
continue;
}
@@ -4740,6 +4856,42 @@ impl DurationRecorder {
}
}
/// Descriptor for a delta layer used in testing infra. The start/end key/lsn range of the
/// delta layer might be different from the min/max key/lsn in the delta layer. Therefore,
/// the layer descriptor requires the user to provide the ranges, which should cover all
/// keys specified in the `data` field.
#[cfg(test)]
pub struct DeltaLayerTestDesc {
pub lsn_range: Range<Lsn>,
pub key_range: Range<Key>,
pub data: Vec<(Key, Lsn, Value)>,
}
#[cfg(test)]
impl DeltaLayerTestDesc {
#[allow(dead_code)]
pub fn new(lsn_range: Range<Lsn>, key_range: Range<Key>, data: Vec<(Key, Lsn, Value)>) -> Self {
Self {
lsn_range,
key_range,
data,
}
}
pub fn new_with_inferred_key_range(
lsn_range: Range<Lsn>,
data: Vec<(Key, Lsn, Value)>,
) -> Self {
let key_min = data.iter().map(|(key, _, _)| key).min().unwrap();
let key_max = data.iter().map(|(key, _, _)| key).max().unwrap();
Self {
key_range: (*key_min)..(key_max.next()),
lsn_range,
data,
}
}
}
impl Timeline {
async fn finish_compact_batch(
self: &Arc<Self>,
@@ -4942,7 +5094,11 @@ impl Timeline {
let horizon_cutoff = min(gc_info.cutoffs.horizon, self.get_disk_consistent_lsn());
let pitr_cutoff = gc_info.cutoffs.pitr;
let retain_lsns = gc_info.retain_lsns.clone();
let retain_lsns = gc_info
.retain_lsns
.iter()
.map(|(lsn, _child_id, _)| *lsn)
.collect();
// Gets the maximum LSN that holds the valid lease.
//
@@ -5540,37 +5696,65 @@ impl Timeline {
#[cfg(test)]
pub(super) async fn force_create_delta_layer(
self: &Arc<Timeline>,
mut deltas: Vec<(Key, Lsn, Value)>,
mut deltas: DeltaLayerTestDesc,
check_start_lsn: Option<Lsn>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let last_record_lsn = self.get_last_record_lsn();
deltas.sort_unstable_by(|(ka, la, _), (kb, lb, _)| (ka, la).cmp(&(kb, lb)));
let min_key = *deltas.first().map(|(k, _, _)| k).unwrap();
let end_key = deltas.last().map(|(k, _, _)| k).unwrap().next();
let min_lsn = *deltas.iter().map(|(_, lsn, _)| lsn).min().unwrap();
let max_lsn = *deltas.iter().map(|(_, lsn, _)| lsn).max().unwrap();
deltas
.data
.sort_unstable_by(|(ka, la, _), (kb, lb, _)| (ka, la).cmp(&(kb, lb)));
assert!(deltas.data.first().unwrap().0 >= deltas.key_range.start);
assert!(deltas.data.last().unwrap().0 < deltas.key_range.end);
for (_, lsn, _) in &deltas.data {
assert!(deltas.lsn_range.start <= *lsn && *lsn < deltas.lsn_range.end);
}
assert!(
max_lsn <= last_record_lsn,
"advance last record lsn before inserting a layer, max_lsn={max_lsn}, last_record_lsn={last_record_lsn}"
deltas.lsn_range.end <= last_record_lsn,
"advance last record lsn before inserting a layer, end_lsn={}, last_record_lsn={}",
deltas.lsn_range.end,
last_record_lsn
);
let end_lsn = Lsn(max_lsn.0 + 1);
if let Some(check_start_lsn) = check_start_lsn {
assert!(min_lsn >= check_start_lsn);
assert!(deltas.lsn_range.start >= check_start_lsn);
}
// check if the delta layer does not violate the LSN invariant, the legacy compaction should always produce a batch of
// layers of the same start/end LSN, and so should the force inserted layer
{
/// Checks if a overlaps with b, assume a/b = [start, end).
pub fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
!(a.end <= b.start || b.end <= a.start)
}
let guard = self.layers.read().await;
for layer in guard.layer_map().iter_historic_layers() {
if layer.is_delta()
&& overlaps_with(&layer.lsn_range, &deltas.lsn_range)
&& layer.lsn_range != deltas.lsn_range
{
// If a delta layer overlaps with another delta layer AND their LSN range is not the same, panic
panic!(
"inserted layer violates delta layer LSN invariant: current_lsn_range={}..{}, conflict_lsn_range={}..{}",
deltas.lsn_range.start, deltas.lsn_range.end, layer.lsn_range.start, layer.lsn_range.end
);
}
}
}
let mut delta_layer_writer = DeltaLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
min_key,
min_lsn..end_lsn,
deltas.key_range.start,
deltas.lsn_range,
ctx,
)
.await?;
for (key, lsn, val) in deltas {
for (key, lsn, val) in deltas.data {
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
}
let delta_layer = delta_layer_writer.finish(end_key, self, ctx).await?;
let delta_layer = delta_layer_writer
.finish(deltas.key_range.end, self, ctx)
.await?;
{
let mut guard = self.layers.write().await;

View File

@@ -19,14 +19,14 @@ use enumset::EnumSet;
use fail::fail_point;
use itertools::Itertools;
use pageserver_api::keyspace::ShardedRange;
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, info_span, trace, warn, Instrument};
use utils::id::TimelineId;
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
use crate::page_cache;
use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc};
use crate::tenant::storage_layer::{AsLayerDesc, LayerVisibility, PersistentLayerDesc};
use crate::tenant::timeline::{drop_rlock, Hole, ImageLayerCreationOutcome};
use crate::tenant::timeline::{DeltaLayerWriter, ImageLayerWriter};
use crate::tenant::timeline::{Layer, ResidentLayer};
@@ -51,8 +51,6 @@ impl Timeline {
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
) -> Result<(), CompactionError> {
tracing::info!("Compacting with flags {flags:?}");
if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) {
return self.compact_with_gc(cancel, ctx).await;
}
@@ -102,7 +100,7 @@ impl Timeline {
// Define partitioning schema if needed
// FIXME: the match should only cover repartitioning, not the next steps
let partition_count = match self
match self
.repartition(
self.get_last_record_lsn(),
self.get_compaction_target_size(),
@@ -142,7 +140,6 @@ impl Timeline {
.await?;
self.upload_new_image_layers(image_layers)?;
partitioning.parts.len()
}
Err(err) => {
// no partitioning? This is normal, if the timeline was just created
@@ -154,19 +151,9 @@ impl Timeline {
if !self.cancel.is_cancelled() {
tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
}
1
}
};
if self.shard_identity.count >= ShardCount::new(2) {
// Limit the number of layer rewrites to the number of partitions: this means its
// runtime should be comparable to a full round of image layer creations, rather than
// being potentially much longer.
let rewrite_max = partition_count;
self.compact_shard_ancestors(rewrite_max, ctx).await?;
}
Ok(())
}
@@ -178,7 +165,7 @@ impl Timeline {
///
/// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
/// how much work it will try to do in each compaction pass.
async fn compact_shard_ancestors(
pub(super) async fn compact_shard_ancestors(
self: &Arc<Self>,
rewrite_max: usize,
ctx: &RequestContext,
@@ -360,6 +347,88 @@ impl Timeline {
Ok(())
}
/// A post-compaction step to update the LayerVisibility of layers covered by image layers. This
/// should also be called when new branches are created.
///
/// Sweep through the layer map, identifying layers which are covered by image layers
/// such that they do not need to be available to service reads. The resulting LayerVisibility
/// result may be used as an input to eviction and secondary downloads to de-prioritize layers
/// that we know won't be needed for reads.
pub(super) async fn update_layer_visibility(
&self,
ctx: &RequestContext,
) -> Result<(), CompactionError> {
// Start with a keyspace representing all the keys we need to read from the tip of the branch
let head_lsn = self.get_last_record_lsn();
let (mut head_keyspace, sparse_ks) = self.collect_keyspace(head_lsn, ctx).await?;
// Converting the sparse part of the keyspace into the dense keyspace is safe in this context
// because we will never iterate through the keys.
head_keyspace.merge(&sparse_ks.0);
// We will sweep through layers in reverse-LSN order. We only do historic layers. L0 deltas
// are implicitly visible, because LayerVisibility's default is Visible, and we never modify it here.
let layer_manager = self.layers.read().await;
let layer_map = layer_manager.layer_map();
let mut visible_size: u64 = 0;
// FIXME: we only get accurate keyspaces from children if they've already run update_layer_visibility themselves. At startup all the timelines
// initialize this in arbitrary order (at the end of initial_logical_size_calculation). We should coordinate these. Perhaps at the very start
// of the tenant compaction task we should do all the timelines' layer visibility calculations in a leaf-first order?
let readable_points = {
let children = self.gc_info.read().unwrap().retain_lsns.clone();
let mut readable_points = Vec::with_capacity(children.len() + 1);
for (child_lsn, _child_timeline_id, child_keyspace) in &children {
let keyspace = match child_keyspace {
Some(ks) => ks.clone(),
None => {
// The child has not posted information about which parts of the keyspace they depend on: presume they depend on all of it.
let (mut keyspace, sparse_keyspace) =
self.collect_keyspace(*child_lsn, ctx).await?;
keyspace.merge(&sparse_keyspace.0);
keyspace
}
};
readable_points.push((*child_lsn, keyspace));
}
readable_points.push((head_lsn, head_keyspace));
readable_points
};
let (layer_visibility, shadow) = layer_map.get_visibility(readable_points);
for (layer_desc, visibility) in layer_visibility {
// FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
let layer = layer_manager.get_from_desc(&layer_desc);
if matches!(visibility, LayerVisibility::Visible) {
visible_size += layer.metadata().file_size;
}
layer.access_stats().set_visibility(visibility);
}
if let Some(ancestor) = &self.ancestor_timeline {
// Having calculated the readable keyspace after walking back through all this timeline's layers, the resulting keyspace is the remaining
// keys for which reads may still fall through to the parent branch. Notify the parent branch of this, so that they may GC layers which
// do not overlap with this keyspace, and so that they may use this as an input to their own visibility updates.
ancestor
.gc_info
.write()
.unwrap()
.notify_child_keyspace(self.timeline_id, shadow);
}
// Also include in the visible size all the layers which we would never update visibility on
// TODO: getter that doesn't spuriously construct a Vec<>
for layer in layer_map.get_level0_deltas().unwrap() {
visible_size += layer.file_size;
}
self.metrics.visible_physical_size_gauge.set(visible_size);
Ok(())
}
/// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
/// as Level 1 files.
async fn compact_level0(

View File

@@ -148,14 +148,14 @@ async fn cleanup_remaining_timeline_fs_traces(
/// For more context see comments in [`DeleteTimelineFlow::prepare`]
async fn remove_timeline_from_tenant(
tenant: &Tenant,
timeline_id: TimelineId,
timeline: &Timeline,
_: &DeletionGuard, // using it as a witness
) -> anyhow::Result<()> {
// Remove the timeline from the map.
let mut timelines = tenant.timelines.lock().unwrap();
let children_exist = timelines
.iter()
.any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id));
.any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline.timeline_id));
// XXX this can happen because `branch_timeline` doesn't check `TimelineState::Stopping`.
// We already deleted the layer files, so it's probably best to panic.
// (Ideally, above remove_dir_all is atomic so we don't see this timeline after a restart)
@@ -163,8 +163,14 @@ async fn remove_timeline_from_tenant(
panic!("Timeline grew children while we removed layer files");
}
// Unlink from parent
if let Some(ancestor) = timeline.get_ancestor_timeline() {
let mut ancestor_gc_info = ancestor.gc_info.write().unwrap();
ancestor_gc_info.remove_child(timeline.timeline_id);
}
timelines
.remove(&timeline_id)
.remove(&timeline.timeline_id)
.expect("timeline that we were deleting was concurrently removed from 'timelines' map");
drop(timelines);
@@ -293,6 +299,9 @@ impl DeleteTimelineFlow {
{
let mut locked = tenant.timelines.lock().unwrap();
locked.insert(timeline_id, Arc::clone(&timeline));
// Note that we do not insert this into the parent branch's GcInfo: the parent is not obliged to retain
// any data for child timelines being deleted.
}
guard.mark_in_progress()?;
@@ -413,7 +422,7 @@ impl DeleteTimelineFlow {
pausable_failpoint!("in_progress_delete");
remove_timeline_from_tenant(tenant, timeline.timeline_id, &guard).await?;
remove_timeline_from_tenant(tenant, timeline, &guard).await?;
*guard = Self::Finished;

View File

@@ -255,6 +255,14 @@ impl LayerManager {
new_layer.layer_desc().lsn_range
);
// Transfer visibilty hint from old to new layer, since the new layer covers the same key space. This is not guaranteed to
// be accurate (as the new layer may cover a different subset of the key range), but is a sensible default, and prevents
// always marking rewritten layers as visible.
new_layer
.as_ref()
.access_stats()
.set_visibility(old_layer.access_stats().get_visibility());
// Safety: we may never rewrite the same file in-place. Callers are responsible
// for ensuring that they only rewrite layers after something changes the path,
// such as an increment in the generation number.

View File

@@ -26,7 +26,7 @@ use tracing::{debug, error, info, trace, warn, Instrument};
use super::TaskStateUpdate;
use crate::{
context::RequestContext,
metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST},
metrics::{LIVE_CONNECTIONS, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST},
task_mgr::TaskKind,
task_mgr::WALRECEIVER_RUNTIME,
tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline, WalReceiverInfo},
@@ -208,14 +208,9 @@ pub(super) async fn handle_walreceiver_connection(
.instrument(tracing::info_span!("poller")),
);
// Immediately increment the gauge, then create a job to decrement it on task exit.
// One of the pros of `defer!` is that this will *most probably*
// get called, even in presence of panics.
let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["wal_receiver"]);
gauge.inc();
scopeguard::defer! {
gauge.dec();
}
let _guard = LIVE_CONNECTIONS
.with_label_values(&["wal_receiver"])
.guard();
let identify = identify_system(&replication_client).await?;
info!("{identify:?}");

View File

@@ -6,6 +6,7 @@ OBJS = \
$(WIN32RES) \
extension_server.o \
file_cache.o \
hll.o \
libpagestore.o \
neon.o \
neon_utils.o \
@@ -22,7 +23,7 @@ SHLIB_LINK_INTERNAL = $(libpq)
SHLIB_LINK = -lcurl
EXTENSION = neon
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql neon--1.3--1.4.sql neon--1.4--1.3.sql
PGFILEDESC = "neon - cloud storage for PostgreSQL"
EXTRA_CLEAN = \

View File

@@ -26,7 +26,6 @@
#include "miscadmin.h"
#include "pagestore_client.h"
#include "common/hashfn.h"
#include "lib/hyperloglog.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include RELFILEINFO_HDR
@@ -40,6 +39,8 @@
#include "utils/dynahash.h"
#include "utils/guc.h"
#include "hll.h"
/*
* Local file cache is used to temporary store relations pages in local file system.
* All blocks of all relations are stored inside one file and addressed using shared hash map.
@@ -62,7 +63,6 @@
#define BLOCKS_PER_CHUNK 128 /* 1Mb chunk */
#define MB ((uint64)1024*1024)
#define HYPER_LOG_LOG_BIT_WIDTH 10
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK))
typedef struct FileCacheEntry
@@ -87,8 +87,7 @@ typedef struct FileCacheControl
uint64 writes;
dlist_head lru; /* double linked list for LRU replacement
* algorithm */
hyperLogLogState wss_estimation; /* estimation of wroking set size */
uint8_t hyperloglog_hashes[(1 << HYPER_LOG_LOG_BIT_WIDTH) + 1];
HyperLogLogState wss_estimation; /* estimation of working set size */
} FileCacheControl;
static HTAB *lfc_hash;
@@ -238,12 +237,7 @@ lfc_shmem_startup(void)
dlist_init(&lfc_ctl->lru);
/* Initialize hyper-log-log structure for estimating working set size */
initHyperLogLog(&lfc_ctl->wss_estimation, HYPER_LOG_LOG_BIT_WIDTH);
/* We need hashes in shared memory */
pfree(lfc_ctl->wss_estimation.hashesArr);
memset(lfc_ctl->hyperloglog_hashes, 0, sizeof lfc_ctl->hyperloglog_hashes);
lfc_ctl->wss_estimation.hashesArr = lfc_ctl->hyperloglog_hashes;
initSHLL(&lfc_ctl->wss_estimation);
/* Recreate file cache on restart */
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
@@ -545,7 +539,7 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
/* Approximate working set */
tag.blockNum = blkno;
addHyperLogLog(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
if (entry == NULL || (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) == 0)
{
@@ -986,20 +980,38 @@ local_cache_pages(PG_FUNCTION_ARGS)
SRF_RETURN_DONE(funcctx);
}
PG_FUNCTION_INFO_V1(approximate_working_set_size_seconds);
Datum
approximate_working_set_size_seconds(PG_FUNCTION_ARGS)
{
if (lfc_size_limit != 0)
{
int32 dc;
time_t duration = PG_ARGISNULL(0) ? (time_t)-1 : PG_GETARG_INT32(0);
LWLockAcquire(lfc_lock, LW_SHARED);
dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, duration);
LWLockRelease(lfc_lock);
PG_RETURN_INT32(dc);
}
PG_RETURN_NULL();
}
PG_FUNCTION_INFO_V1(approximate_working_set_size);
Datum
approximate_working_set_size(PG_FUNCTION_ARGS)
{
int32 dc = -1;
if (lfc_size_limit != 0)
{
int32 dc;
bool reset = PG_GETARG_BOOL(0);
LWLockAcquire(lfc_lock, reset ? LW_EXCLUSIVE : LW_SHARED);
dc = (int32) estimateHyperLogLog(&lfc_ctl->wss_estimation);
dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, (time_t)-1);
if (reset)
memset(lfc_ctl->hyperloglog_hashes, 0, sizeof lfc_ctl->hyperloglog_hashes);
memset(lfc_ctl->wss_estimation.regs, 0, sizeof lfc_ctl->wss_estimation.regs);
LWLockRelease(lfc_lock);
PG_RETURN_INT32(dc);
}
PG_RETURN_INT32(dc);
PG_RETURN_NULL();
}

193
pgxn/neon/hll.c Normal file
View File

@@ -0,0 +1,193 @@
/*-------------------------------------------------------------------------
*
* hll.c
* Sliding HyperLogLog cardinality estimator
*
* Portions Copyright (c) 2014-2023, PostgreSQL Global Development Group
*
* Implements https://hal.science/hal-00465313/document
*
* Based on Hideaki Ohno's C++ implementation. This is probably not ideally
* suited to estimating the cardinality of very large sets; in particular, we
* have not attempted to further optimize the implementation as described in
* the Heule, Nunkesser and Hall paper "HyperLogLog in Practice: Algorithmic
* Engineering of a State of The Art Cardinality Estimation Algorithm".
*
* A sparse representation of HyperLogLog state is used, with fixed space
* overhead.
*
* The copyright terms of Ohno's original version (the MIT license) follow.
*
* IDENTIFICATION
* src/backend/lib/hyperloglog.c
*
*-------------------------------------------------------------------------
*/
/*
* Copyright (c) 2013 Hideaki Ohno <hide.o.j55{at}gmail.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the 'Software'), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#include <math.h>
#include "postgres.h"
#include "funcapi.h"
#include "port/pg_bitutils.h"
#include "utils/timestamp.h"
#include "hll.h"
#define POW_2_32 (4294967296.0)
#define NEG_POW_2_32 (-4294967296.0)
#define ALPHA_MM ((0.7213 / (1.0 + 1.079 / HLL_N_REGISTERS)) * HLL_N_REGISTERS * HLL_N_REGISTERS)
/*
* Worker for addHyperLogLog().
*
* Calculates the position of the first set bit in first b bits of x argument
* starting from the first, reading from most significant to least significant
* bits.
*
* Example (when considering fist 10 bits of x):
*
* rho(x = 0b1000000000) returns 1
* rho(x = 0b0010000000) returns 3
* rho(x = 0b0000000000) returns b + 1
*
* "The binary address determined by the first b bits of x"
*
* Return value "j" used to index bit pattern to watch.
*/
static inline uint8
rho(uint32 x, uint8 b)
{
uint8 j = 1;
if (x == 0)
return b + 1;
j = 32 - pg_leftmost_one_pos32(x);
if (j > b)
return b + 1;
return j;
}
/*
* Initialize HyperLogLog track state
*/
void
initSHLL(HyperLogLogState *cState)
{
memset(cState->regs, 0, sizeof(cState->regs));
}
/*
* Adds element to the estimator, from caller-supplied hash.
*
* It is critical that the hash value passed be an actual hash value, typically
* generated using hash_any(). The algorithm relies on a specific bit-pattern
* observable in conjunction with stochastic averaging. There must be a
* uniform distribution of bits in hash values for each distinct original value
* observed.
*/
void
addSHLL(HyperLogLogState *cState, uint32 hash)
{
uint8 count;
uint32 index;
size_t i;
size_t j;
TimestampTz now = GetCurrentTimestamp();
/* Use the first "k" (registerWidth) bits as a zero based index */
index = hash >> HLL_C_BITS;
/* Compute the rank of the remaining 32 - "k" (registerWidth) bits */
count = rho(hash << HLL_BIT_WIDTH, HLL_C_BITS);
cState->regs[index][count] = now;
}
static uint8
getMaximum(const TimestampTz* reg, TimestampTz since)
{
uint8 max = 0;
for (size_t i = 0; i < HLL_C_BITS + 1; i++)
{
if (reg[i] >= since)
{
max = i;
}
}
return max;
}
/*
* Estimates cardinality, based on elements added so far
*/
double
estimateSHLL(HyperLogLogState *cState, time_t duration)
{
double result;
double sum = 0.0;
size_t i;
uint8 R[HLL_N_REGISTERS];
/* 0 indicates uninitialized timestamp, so if we need to cover the whole range than starts with 1 */
TimestampTz since = duration == (time_t)-1 ? 1 : GetCurrentTimestamp() - duration * USECS_PER_SEC;
for (i = 0; i < HLL_N_REGISTERS; i++)
{
R[i] = getMaximum(cState->regs[i], since);
sum += 1.0 / pow(2.0, R[i]);
}
/* result set to "raw" HyperLogLog estimate (E in the HyperLogLog paper) */
result = ALPHA_MM / sum;
if (result <= (5.0 / 2.0) * HLL_N_REGISTERS)
{
/* Small range correction */
int zero_count = 0;
for (i = 0; i < HLL_N_REGISTERS; i++)
{
zero_count += R[i] == 0;
}
if (zero_count != 0)
result = HLL_N_REGISTERS * log((double) HLL_N_REGISTERS /
zero_count);
}
else if (result > (1.0 / 30.0) * POW_2_32)
{
/* Large range correction */
result = NEG_POW_2_32 * log(1.0 - (result / POW_2_32));
}
return result;
}

86
pgxn/neon/hll.h Normal file
View File

@@ -0,0 +1,86 @@
/*-------------------------------------------------------------------------
*
* hll.h
* Sliding HyperLogLog cardinality estimator
*
* Portions Copyright (c) 2014-2023, PostgreSQL Global Development Group
*
* Implements https://hal.science/hal-00465313/document
*
* Based on Hideaki Ohno's C++ implementation. This is probably not ideally
* suited to estimating the cardinality of very large sets; in particular, we
* have not attempted to further optimize the implementation as described in
* the Heule, Nunkesser and Hall paper "HyperLogLog in Practice: Algorithmic
* Engineering of a State of The Art Cardinality Estimation Algorithm".
*
* A sparse representation of HyperLogLog state is used, with fixed space
* overhead.
*
* The copyright terms of Ohno's original version (the MIT license) follow.
*
* IDENTIFICATION
* src/backend/lib/hyperloglog.c
*
*-------------------------------------------------------------------------
*/
/*
* Copyright (c) 2013 Hideaki Ohno <hide.o.j55{at}gmail.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the 'Software'), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#ifndef HLL_H
#define HLL_H
#define HLL_BIT_WIDTH 10
#define HLL_C_BITS (32 - HLL_BIT_WIDTH)
#define HLL_N_REGISTERS (1 << HLL_BIT_WIDTH)
/*
* HyperLogLog is an approximate technique for computing the number of distinct
* entries in a set. Importantly, it does this by using a fixed amount of
* memory. See the 2007 paper "HyperLogLog: the analysis of a near-optimal
* cardinality estimation algorithm" for more.
*
* Instead of a single counter for every bits register, we have a timestamp
* for every valid number of bits we can encounter. Every time we encounter
* a certain number of bits, we update the timestamp in those registers to
* the current timestamp.
*
* We can query the sketch's stored cardinality for the range of some timestamp
* up to now: For each register, we return the highest bits bucket that has a
* modified timestamp >= the query timestamp. This value is the number of bits
* for this register in the normal HLL calculation.
*
* The memory usage is 2^B * (C + 1) * sizeof(TimetampTz), or 184kiB.
* Usage could be halved if we decide to reduce the required time dimension
* precision; as 32 bits in second precision should be enough for statistics.
* However, that is not yet implemented.
*/
typedef struct HyperLogLogState
{
TimestampTz regs[HLL_N_REGISTERS][HLL_C_BITS + 1];
} HyperLogLogState;
extern void initSHLL(HyperLogLogState *cState);
extern void addSHLL(HyperLogLogState *cState, uint32 hash);
extern double estimateSHLL(HyperLogLogState *cState, time_t dutration);
#endif

View File

@@ -0,0 +1,9 @@
\echo Use "ALTER EXTENSION neon UPDATE TO '1.4'" to load this file. \quit
CREATE FUNCTION approximate_working_set_size_seconds(duration integer default null)
RETURNS integer
AS 'MODULE_PATHNAME', 'approximate_working_set_size_seconds'
LANGUAGE C PARALLEL SAFE;
GRANT EXECUTE ON FUNCTION approximate_working_set_size_seconds(integer) TO pg_monitor;

View File

@@ -0,0 +1 @@
DROP FUNCTION IF EXISTS approximate_working_set_size_seconds(integer) CASCADE;

View File

@@ -7,7 +7,7 @@ OBJS = \
neontest.o
EXTENSION = neon_test_utils
DATA = neon_test_utils--1.2.sql
DATA = neon_test_utils--1.3.sql
PGFILEDESC = "neon_test_utils - helpers for neon testing and debugging"
PG_CONFIG = pg_config

View File

@@ -45,3 +45,21 @@ CREATE FUNCTION neon_xlogflush(lsn pg_lsn DEFAULT NULL)
RETURNS VOID
AS 'MODULE_PATHNAME', 'neon_xlogflush'
LANGUAGE C PARALLEL UNSAFE;
CREATE FUNCTION trigger_panic()
RETURNS VOID
AS 'MODULE_PATHNAME', 'trigger_panic'
LANGUAGE C PARALLEL UNSAFE;
CREATE FUNCTION trigger_segfault()
RETURNS VOID
AS 'MODULE_PATHNAME', 'trigger_segfault'
LANGUAGE C PARALLEL UNSAFE;
-- Alias for `trigger_segfault`, just because `SELECT 💣()` looks fun
CREATE OR REPLACE FUNCTION 💣() RETURNS void
LANGUAGE plpgsql AS $$
BEGIN
PERFORM trigger_segfault();
END;
$$;

View File

@@ -1,6 +1,6 @@
# neon_test_utils extension
comment = 'helpers for neon testing and debugging'
default_version = '1.2'
default_version = '1.3'
module_pathname = '$libdir/neon_test_utils'
relocatable = true
trusted = true

View File

@@ -42,6 +42,8 @@ PG_FUNCTION_INFO_V1(clear_buffer_cache);
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn);
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn_ex);
PG_FUNCTION_INFO_V1(neon_xlogflush);
PG_FUNCTION_INFO_V1(trigger_panic);
PG_FUNCTION_INFO_V1(trigger_segfault);
/*
* Linkage to functions in neon module.
@@ -489,3 +491,24 @@ neon_xlogflush(PG_FUNCTION_ARGS)
XLogFlush(lsn);
PG_RETURN_VOID();
}
/*
* Function to trigger panic.
*/
Datum
trigger_panic(PG_FUNCTION_ARGS)
{
elog(PANIC, "neon_test_utils: panic");
PG_RETURN_VOID();
}
/*
* Function to trigger a segfault.
*/
Datum
trigger_segfault(PG_FUNCTION_ARGS)
{
int *ptr = NULL;
*ptr = 42;
PG_RETURN_VOID();
}

View File

@@ -53,6 +53,13 @@ impl<C: Cache, V> Cached<C, V> {
)
}
pub fn map<U>(self, f: impl FnOnce(V) -> U) -> Cached<C, U> {
Cached {
token: self.token,
value: f(self.value),
}
}
/// Drop this entry from a cache if it's still there.
pub fn invalidate(self) -> V {
if let Some((cache, info)) = &self.token {

View File

@@ -65,6 +65,8 @@ impl<K: Hash + Eq, V> Cache for TimedLru<K, V> {
struct Entry<T> {
created_at: Instant,
expires_at: Instant,
ttl: Duration,
update_ttl_on_retrieval: bool,
value: T,
}
@@ -122,7 +124,6 @@ impl<K: Hash + Eq, V> TimedLru<K, V> {
Q: Hash + Eq + ?Sized,
{
let now = Instant::now();
let deadline = now.checked_add(self.ttl).expect("time overflow");
// Do costly things before taking the lock.
let mut cache = self.cache.lock();
@@ -142,7 +143,8 @@ impl<K: Hash + Eq, V> TimedLru<K, V> {
let (created_at, expires_at) = (entry.created_at, entry.expires_at);
// Update the deadline and the entry's position in the LRU list.
if self.update_ttl_on_retrieval {
let deadline = now.checked_add(raw_entry.get().ttl).expect("time overflow");
if raw_entry.get().update_ttl_on_retrieval {
raw_entry.get_mut().expires_at = deadline;
}
raw_entry.to_back();
@@ -162,12 +164,27 @@ impl<K: Hash + Eq, V> TimedLru<K, V> {
/// existed, return the previous value and its creation timestamp.
#[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)]
fn insert_raw(&self, key: K, value: V) -> (Instant, Option<V>) {
self.insert_raw_ttl(key, value, self.ttl, self.update_ttl_on_retrieval)
}
/// Insert an entry to the cache. If an entry with the same key already
/// existed, return the previous value and its creation timestamp.
#[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)]
fn insert_raw_ttl(
&self,
key: K,
value: V,
ttl: Duration,
update: bool,
) -> (Instant, Option<V>) {
let created_at = Instant::now();
let expires_at = created_at.checked_add(self.ttl).expect("time overflow");
let expires_at = created_at.checked_add(ttl).expect("time overflow");
let entry = Entry {
created_at,
expires_at,
ttl,
update_ttl_on_retrieval: update,
value,
};
@@ -190,6 +207,21 @@ impl<K: Hash + Eq, V> TimedLru<K, V> {
}
impl<K: Hash + Eq + Clone, V: Clone> TimedLru<K, V> {
pub fn insert_ttl(&self, key: K, value: V, ttl: Duration) {
self.insert_raw_ttl(key, value, ttl, false);
}
pub fn insert_unit(&self, key: K, value: V) -> (Option<V>, Cached<&Self, ()>) {
let (created_at, old) = self.insert_raw(key.clone(), value);
let cached = Cached {
token: Some((self, LookupInfo { created_at, key })),
value: (),
};
(old, cached)
}
pub fn insert(&self, key: K, value: V) -> (Option<V>, Cached<&Self>) {
let (created_at, old) = self.insert_raw(key.clone(), value.clone());

View File

@@ -9,7 +9,7 @@ use crate::proxy::retry::CouldRetry;
/// Generic error response with human-readable description.
/// Note that we can't always present it to user as is.
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub struct ConsoleError {
pub error: Box<str>,
#[serde(skip)]
@@ -82,41 +82,19 @@ impl CouldRetry for ConsoleError {
.details
.error_info
.map_or(Reason::Unknown, |e| e.reason);
match reason {
// not a transitive error
Reason::RoleProtected => false,
// on retry, it will still not be found
Reason::ResourceNotFound
| Reason::ProjectNotFound
| Reason::EndpointNotFound
| Reason::BranchNotFound => false,
// we were asked to go away
Reason::RateLimitExceeded
| Reason::NonDefaultBranchComputeTimeExceeded
| Reason::ActiveTimeQuotaExceeded
| Reason::ComputeTimeQuotaExceeded
| Reason::WrittenDataQuotaExceeded
| Reason::DataTransferQuotaExceeded
| Reason::LogicalSizeQuotaExceeded => false,
// transitive error. control plane is currently busy
// but might be ready soon
Reason::RunningOperations => true,
Reason::ConcurrencyLimitReached => true,
Reason::LockAlreadyTaken => true,
// unknown error. better not retry it.
Reason::Unknown => false,
}
reason.can_retry()
}
}
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub struct Status {
pub code: Box<str>,
pub message: Box<str>,
pub details: Details,
}
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub struct Details {
pub error_info: Option<ErrorInfo>,
pub retry_info: Option<RetryInfo>,
@@ -199,6 +177,34 @@ impl Reason {
| Reason::BranchNotFound
)
}
pub fn can_retry(&self) -> bool {
match self {
// do not retry role protected errors
// not a transitive error
Reason::RoleProtected => false,
// on retry, it will still not be found
Reason::ResourceNotFound
| Reason::ProjectNotFound
| Reason::EndpointNotFound
| Reason::BranchNotFound => false,
// we were asked to go away
Reason::RateLimitExceeded
| Reason::NonDefaultBranchComputeTimeExceeded
| Reason::ActiveTimeQuotaExceeded
| Reason::ComputeTimeQuotaExceeded
| Reason::WrittenDataQuotaExceeded
| Reason::DataTransferQuotaExceeded
| Reason::LogicalSizeQuotaExceeded => false,
// transitive error. control plane is currently busy
// but might be ready soon
Reason::RunningOperations
| Reason::ConcurrencyLimitReached
| Reason::LockAlreadyTaken => true,
// unknown error. better not retry it.
Reason::Unknown => false,
}
}
}
#[derive(Copy, Clone, Debug, Deserialize)]
@@ -206,7 +212,7 @@ pub struct RetryInfo {
pub retry_delay_ms: u64,
}
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub struct UserFacingMessage {
pub message: Box<str>,
}

View File

@@ -2,7 +2,7 @@
pub mod mock;
pub mod neon;
use super::messages::MetricsAuxInfo;
use super::messages::{ConsoleError, MetricsAuxInfo};
use crate::{
auth::{
backend::{ComputeCredentialKeys, ComputeUserInfo},
@@ -317,8 +317,8 @@ impl NodeInfo {
}
}
pub type NodeInfoCache = TimedLru<EndpointCacheKey, NodeInfo>;
pub type CachedNodeInfo = Cached<&'static NodeInfoCache>;
pub type NodeInfoCache = TimedLru<EndpointCacheKey, Result<NodeInfo, Box<ConsoleError>>>;
pub type CachedNodeInfo = Cached<&'static NodeInfoCache, NodeInfo>;
pub type CachedRoleSecret = Cached<&'static ProjectInfoCacheImpl, Option<AuthSecret>>;
pub type CachedAllowedIps = Cached<&'static ProjectInfoCacheImpl, Arc<Vec<IpPattern>>>;

View File

@@ -9,7 +9,7 @@ use super::{
use crate::{
auth::backend::ComputeUserInfo,
compute,
console::messages::ColdStartInfo,
console::messages::{ColdStartInfo, Reason},
http,
metrics::{CacheOutcome, Metrics},
rate_limiter::EndpointRateLimiter,
@@ -17,10 +17,10 @@ use crate::{
};
use crate::{cache::Cached, context::RequestMonitoring};
use futures::TryFutureExt;
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use tokio::time::Instant;
use tokio_postgres::config::SslMode;
use tracing::{error, info, info_span, warn, Instrument};
use tracing::{debug, error, info, info_span, warn, Instrument};
pub struct Api {
endpoint: http::Endpoint,
@@ -273,26 +273,34 @@ impl super::Api for Api {
) -> Result<CachedNodeInfo, WakeComputeError> {
let key = user_info.endpoint_cache_key();
macro_rules! check_cache {
() => {
if let Some(cached) = self.caches.node_info.get(&key) {
let (cached, info) = cached.take_value();
let info = info.map_err(|c| {
info!(key = &*key, "found cached wake_compute error");
WakeComputeError::ApiError(ApiError::Console(*c))
})?;
debug!(key = &*key, "found cached compute node info");
ctx.set_project(info.aux.clone());
return Ok(cached.map(|()| info));
}
};
}
// Every time we do a wakeup http request, the compute node will stay up
// for some time (highly depends on the console's scale-to-zero policy);
// The connection info remains the same during that period of time,
// which means that we might cache it to reduce the load and latency.
if let Some(cached) = self.caches.node_info.get(&key) {
info!(key = &*key, "found cached compute node info");
ctx.set_project(cached.aux.clone());
return Ok(cached);
}
check_cache!();
let permit = self.locks.get_permit(&key).await?;
// after getting back a permit - it's possible the cache was filled
// double check
if permit.should_check_cache() {
if let Some(cached) = self.caches.node_info.get(&key) {
info!(key = &*key, "found cached compute node info");
ctx.set_project(cached.aux.clone());
return Ok(cached);
}
check_cache!();
}
// check rate limit
@@ -300,23 +308,56 @@ impl super::Api for Api {
.wake_compute_endpoint_rate_limiter
.check(user_info.endpoint.normalize_intern(), 1)
{
info!(key = &*key, "found cached compute node info");
return Err(WakeComputeError::TooManyConnections);
}
let mut node = permit.release_result(self.do_wake_compute(ctx, user_info).await)?;
ctx.set_project(node.aux.clone());
let cold_start_info = node.aux.cold_start_info;
info!("woken up a compute node");
let node = permit.release_result(self.do_wake_compute(ctx, user_info).await);
match node {
Ok(node) => {
ctx.set_project(node.aux.clone());
debug!(key = &*key, "created a cache entry for woken compute node");
// store the cached node as 'warm'
node.aux.cold_start_info = ColdStartInfo::WarmCached;
let (_, mut cached) = self.caches.node_info.insert(key.clone(), node);
cached.aux.cold_start_info = cold_start_info;
let mut stored_node = node.clone();
// store the cached node as 'warm_cached'
stored_node.aux.cold_start_info = ColdStartInfo::WarmCached;
info!(key = &*key, "created a cache entry for compute node info");
let (_, cached) = self.caches.node_info.insert_unit(key, Ok(stored_node));
Ok(cached)
Ok(cached.map(|()| node))
}
Err(err) => match err {
WakeComputeError::ApiError(ApiError::Console(err)) => {
let Some(status) = &err.status else {
return Err(WakeComputeError::ApiError(ApiError::Console(err)));
};
let reason = status
.details
.error_info
.map_or(Reason::Unknown, |x| x.reason);
// if we can retry this error, do not cache it.
if reason.can_retry() {
return Err(WakeComputeError::ApiError(ApiError::Console(err)));
}
// at this point, we should only have quota errors.
debug!(
key = &*key,
"created a cache entry for the wake compute error"
);
self.caches.node_info.insert_ttl(
key,
Err(Box::new(err.clone())),
Duration::from_secs(30),
);
Err(WakeComputeError::ApiError(ApiError::Console(err)))
}
err => return Err(err),
},
}
}
}

View File

@@ -540,8 +540,8 @@ fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeIn
},
allow_self_signed_compute: false,
};
let (_, node) = cache.insert("key".into(), node);
node
let (_, node2) = cache.insert_unit("key".into(), Ok(node.clone()));
node2.map(|()| node)
}
fn helper_create_connect_info(

View File

@@ -445,6 +445,19 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
.map(|res| ("WAL service main".to_owned(), res));
tasks_handles.push(Box::pin(wal_service_handle));
let timeline_housekeeping_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
.spawn(async move {
const TOMBSTONE_TTL: Duration = Duration::from_secs(3600 * 24);
loop {
tokio::time::sleep(TOMBSTONE_TTL).await;
GlobalTimelines::housekeeping(&TOMBSTONE_TTL);
}
})
.map(|res| ("Timeline map housekeeping".to_owned(), res));
tasks_handles.push(Box::pin(timeline_housekeeping_handle));
if let Some(pg_listener_tenant_only) = pg_listener_tenant_only {
let conf_ = conf.clone();
let wal_service_handle = current_thread_rt

View File

@@ -15,12 +15,19 @@ use std::collections::HashMap;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tracing::*;
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
struct GlobalTimelinesState {
timelines: HashMap<TenantTimelineId, Arc<Timeline>>,
// A tombstone indicates this timeline used to exist has been deleted. These are used to prevent
// on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as
// this map is dropped on restart.
tombstones: HashMap<TenantTimelineId, Instant>,
conf: Option<SafeKeeperConf>,
broker_active_set: Arc<TimelinesSet>,
load_lock: Arc<tokio::sync::Mutex<TimelineLoadLock>>,
@@ -64,11 +71,17 @@ impl GlobalTimelinesState {
.cloned()
.ok_or(TimelineError::NotFound(*ttid))
}
fn delete(&mut self, ttid: TenantTimelineId) {
self.timelines.remove(&ttid);
self.tombstones.insert(ttid, Instant::now());
}
}
static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
Mutex::new(GlobalTimelinesState {
timelines: HashMap::new(),
tombstones: HashMap::new(),
conf: None,
broker_active_set: Arc::new(TimelinesSet::default()),
load_lock: Arc::new(tokio::sync::Mutex::new(TimelineLoadLock)),
@@ -198,11 +211,17 @@ impl GlobalTimelines {
let tli = Arc::new(timeline);
// TODO: prevent concurrent timeline creation/loading
TIMELINES_STATE
.lock()
.unwrap()
.timelines
.insert(ttid, tli.clone());
{
let mut state = TIMELINES_STATE.lock().unwrap();
// We may be have been asked to load a timeline that was previously deleted (e.g. from `pull_timeline.rs`). We trust
// that the human doing this manual intervention knows what they are doing, and remove its tombstone.
if state.tombstones.remove(&ttid).is_some() {
warn!("Un-deleted timeline {ttid}");
}
state.timelines.insert(ttid, tli.clone());
}
tli.bootstrap(&conf, broker_active_set, partial_backup_rate_limiter);
@@ -229,7 +248,7 @@ impl GlobalTimelines {
/// Create a new timeline with the given id. If the timeline already exists, returns
/// an existing timeline.
pub async fn create(
pub(crate) async fn create(
ttid: TenantTimelineId,
server_info: ServerInfo,
commit_lsn: Lsn,
@@ -241,6 +260,11 @@ impl GlobalTimelines {
// Timeline already exists, return it.
return Ok(timeline);
}
if state.tombstones.contains_key(&ttid) {
anyhow::bail!("Timeline {ttid} is deleted, refusing to recreate");
}
state.get_dependencies()
};
@@ -300,17 +324,19 @@ impl GlobalTimelines {
/// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
/// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid,
/// i.e. loaded in memory and not cancelled.
pub fn get(ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
let res = TIMELINES_STATE.lock().unwrap().get(&ttid);
match res {
pub(crate) fn get(ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
let tli_res = {
let state = TIMELINES_STATE.lock().unwrap();
state.get(&ttid)
};
match tli_res {
Ok(tli) => {
if tli.is_cancelled() {
return Err(TimelineError::Cancelled(ttid));
}
Ok(tli)
}
_ => res,
_ => tli_res,
}
}
@@ -339,12 +365,26 @@ impl GlobalTimelines {
/// Cancels timeline, then deletes the corresponding data directory.
/// If only_local, doesn't remove WAL segments in remote storage.
pub async fn delete(
pub(crate) async fn delete(
ttid: &TenantTimelineId,
only_local: bool,
) -> Result<TimelineDeleteForceResult> {
let tli_res = TIMELINES_STATE.lock().unwrap().get(ttid);
match tli_res {
let tli_res = {
let state = TIMELINES_STATE.lock().unwrap();
if state.tombstones.contains_key(ttid) {
// Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do.
info!("Timeline {ttid} was already deleted");
return Ok(TimelineDeleteForceResult {
dir_existed: false,
was_active: false,
});
}
state.get(ttid)
};
let result = match tli_res {
Ok(timeline) => {
let was_active = timeline.broker_active.load(Ordering::Relaxed);
@@ -354,11 +394,6 @@ impl GlobalTimelines {
info!("deleting timeline {}, only_local={}", ttid, only_local);
let dir_existed = timeline.delete(&mut shared_state, only_local).await?;
// Remove timeline from the map.
// FIXME: re-enable it once we fix the issue with recreation of deleted timelines
// https://github.com/neondatabase/neon/issues/3146
// TIMELINES_STATE.lock().unwrap().timelines.remove(ttid);
Ok(TimelineDeleteForceResult {
dir_existed,
was_active, // TODO: we probably should remove this field
@@ -374,7 +409,14 @@ impl GlobalTimelines {
was_active: false,
})
}
}
};
// Finalize deletion, by dropping Timeline objects and storing smaller tombstones. The tombstones
// are used to prevent still-running computes from re-creating the same timeline when they send data,
// and to speed up repeated deletion calls by avoiding re-listing objects.
TIMELINES_STATE.lock().unwrap().delete(*ttid);
result
}
/// Deactivates and deletes all timelines for the tenant. Returns map of all timelines which
@@ -420,19 +462,20 @@ impl GlobalTimelines {
tenant_id,
))?;
// FIXME: we temporarily disabled removing timelines from the map, see `delete_force`
// let tlis_after_delete = Self::get_all_for_tenant(*tenant_id);
// if !tlis_after_delete.is_empty() {
// // Some timelines were created while we were deleting them, returning error
// // to the caller, so it can retry later.
// bail!(
// "failed to delete all timelines for tenant {}: some timelines were created while we were deleting them",
// tenant_id
// );
// }
Ok(deleted)
}
pub fn housekeeping(tombstone_ttl: &Duration) {
let mut state = TIMELINES_STATE.lock().unwrap();
// We keep tombstones long enough to have a good chance of preventing rogue computes from re-creating deleted
// timelines. If a compute kept running for longer than this TTL (or across a safekeeper restart) then they
// may recreate a deleted timeline.
let now = Instant::now();
state
.tombstones
.retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
}
}
#[derive(Clone, Copy, Serialize)]

View File

@@ -259,7 +259,7 @@ pub(crate) enum BlobDataParseResult {
Incorrect(Vec<String>),
}
fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generation), String> {
pub(crate) fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generation), String> {
match name.rsplit_once('-') {
// FIXME: this is gross, just use a regex?
Some((layer_filename, gen)) if gen.len() == 8 => {

View File

@@ -0,0 +1,120 @@
use futures::{StreamExt, TryStreamExt};
use pageserver::tenant::storage_layer::LayerName;
use serde::{Deserialize, Serialize};
use crate::{
checks::parse_layer_object_name, init_remote, list_objects_with_retries,
metadata_stream::stream_tenants, BucketConfig, NodeKind,
};
#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
enum LargeObjectKind {
DeltaLayer,
ImageLayer,
Other,
}
impl LargeObjectKind {
fn from_key(key: &str) -> Self {
let fname = key.split('/').last().unwrap();
let Ok((layer_name, _generation)) = parse_layer_object_name(fname) else {
return LargeObjectKind::Other;
};
match layer_name {
LayerName::Image(_) => LargeObjectKind::ImageLayer,
LayerName::Delta(_) => LargeObjectKind::DeltaLayer,
}
}
}
#[derive(Serialize, Deserialize, Clone)]
pub struct LargeObject {
pub key: String,
pub size: u64,
kind: LargeObjectKind,
}
#[derive(Serialize, Deserialize)]
pub struct LargeObjectListing {
pub objects: Vec<LargeObject>,
}
pub async fn find_large_objects(
bucket_config: BucketConfig,
min_size: u64,
ignore_deltas: bool,
concurrency: usize,
) -> anyhow::Result<LargeObjectListing> {
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
let tenants = std::pin::pin!(stream_tenants(&s3_client, &target));
let objects_stream = tenants.map_ok(|tenant_shard_id| {
let mut tenant_root = target.tenant_root(&tenant_shard_id);
let s3_client = s3_client.clone();
async move {
let mut objects = Vec::new();
let mut total_objects_ctr = 0u64;
// We want the objects and not just common prefixes
tenant_root.delimiter.clear();
let mut continuation_token = None;
loop {
let fetch_response =
list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone())
.await?;
for obj in fetch_response.contents().iter().filter(|o| {
if let Some(obj_size) = o.size {
min_size as i64 <= obj_size
} else {
false
}
}) {
let key = obj.key().expect("couldn't get key").to_owned();
let kind = LargeObjectKind::from_key(&key);
if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
continue;
}
objects.push(LargeObject {
key,
size: obj.size.unwrap() as u64,
kind,
})
}
total_objects_ctr += fetch_response.contents().len() as u64;
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
Ok((tenant_shard_id, objects, total_objects_ctr))
}
});
let mut objects_stream = std::pin::pin!(objects_stream.try_buffer_unordered(concurrency));
let mut objects = Vec::new();
let mut tenant_ctr = 0u64;
let mut object_ctr = 0u64;
while let Some(res) = objects_stream.next().await {
let (tenant_shard_id, objects_slice, total_objects_ctr) = res?;
objects.extend_from_slice(&objects_slice);
object_ctr += total_objects_ctr;
tenant_ctr += 1;
if tenant_ctr % 100 == 0 {
tracing::info!(
"Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.",
objects.len()
);
}
}
let bucket_name = target.bucket_name();
tracing::info!(
"Scan of {bucket_name} finished. Scanned {tenant_ctr} shards. objects={object_ctr}, found={}.",
objects.len()
);
Ok(LargeObjectListing { objects })
}

View File

@@ -2,6 +2,7 @@
#![deny(clippy::undocumented_unsafe_blocks)]
pub mod checks;
pub mod cloud_admin_api;
pub mod find_large_objects;
pub mod garbage;
pub mod metadata_stream;
pub mod pageserver_physical_gc;

View File

@@ -1,6 +1,7 @@
use anyhow::bail;
use camino::Utf8PathBuf;
use pageserver_api::shard::TenantShardId;
use storage_scrubber::find_large_objects;
use storage_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode};
use storage_scrubber::pageserver_physical_gc::GcMode;
use storage_scrubber::scan_pageserver_metadata::scan_metadata;
@@ -72,6 +73,14 @@ enum Command {
#[arg(short, long, default_value_t = GcMode::IndicesOnly)]
mode: GcMode,
},
FindLargeObjects {
#[arg(long = "min-size")]
min_size: u64,
#[arg(short, long, default_value_t = false)]
ignore_deltas: bool,
#[arg(long = "concurrency", short = 'j', default_value_t = 64)]
concurrency: usize,
},
}
#[tokio::main]
@@ -86,6 +95,7 @@ async fn main() -> anyhow::Result<()> {
Command::PurgeGarbage { .. } => "purge-garbage",
Command::TenantSnapshot { .. } => "tenant-snapshot",
Command::PageserverPhysicalGc { .. } => "pageserver-physical-gc",
Command::FindLargeObjects { .. } => "find-large-objects",
};
let _guard = init_logging(&format!(
"{}_{}_{}_{}.log",
@@ -199,5 +209,20 @@ async fn main() -> anyhow::Result<()> {
println!("{}", serde_json::to_string(&summary).unwrap());
Ok(())
}
Command::FindLargeObjects {
min_size,
ignore_deltas,
concurrency,
} => {
let summary = find_large_objects::find_large_objects(
bucket_config,
min_size,
ignore_deltas,
concurrency,
)
.await?;
println!("{}", serde_json::to_string(&summary).unwrap());
Ok(())
}
}
}

View File

@@ -144,6 +144,8 @@ PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
"pageserver_smgr_query_seconds_bucket",
"pageserver_smgr_query_seconds_count",
"pageserver_smgr_query_seconds_sum",
"pageserver_archive_size",
"pageserver_pitr_history_size",
"pageserver_storage_operations_seconds_count_total",
"pageserver_storage_operations_seconds_sum_total",
"pageserver_evictions_total",

View File

@@ -943,6 +943,8 @@ class NeonEnvBuilder:
# if the test threw an exception, don't check for errors
# as a failing assertion would cause the cleanup below to fail
ps_assert_metric_no_errors=(exc_type is None),
# do not fail on endpoint errors to allow the rest of cleanup to proceed
fail_on_endpoint_errors=False,
)
cleanup_error = None
@@ -1214,11 +1216,11 @@ class NeonEnv:
for f in futs:
f.result()
def stop(self, immediate=False, ps_assert_metric_no_errors=False):
def stop(self, immediate=False, ps_assert_metric_no_errors=False, fail_on_endpoint_errors=True):
"""
After this method returns, there should be no child processes running.
"""
self.endpoints.stop_all()
self.endpoints.stop_all(fail_on_endpoint_errors)
# Stop storage controller before pageservers: we don't want it to spuriously
# detect a pageserver "failure" during test teardown
@@ -2113,6 +2115,21 @@ class NeonStorageController(MetricsGetter, LogUtils):
self.running = False
return self
@staticmethod
def retryable_node_operation(op, ps_id, max_attempts, backoff):
while max_attempts > 0:
try:
op(ps_id)
return
except StorageControllerApiException as e:
max_attempts -= 1
log.info(f"Operation failed ({max_attempts} attempts left): {e}")
if max_attempts == 0:
raise e
time.sleep(backoff)
@staticmethod
def raise_api_exception(res: requests.Response):
try:
@@ -2453,6 +2470,38 @@ class NeonStorageController(MetricsGetter, LogUtils):
)
log.info("storage controller passed consistency check")
def poll_node_status(
self, node_id: int, desired_scheduling_policy: str, max_attempts: int, backoff: int
):
"""
Poll the node status until it reaches 'desired_scheduling_policy' or 'max_attempts' have been exhausted
"""
log.info(f"Polling {node_id} for {desired_scheduling_policy} scheduling policy")
while max_attempts > 0:
try:
status = self.node_status(node_id)
policy = status["scheduling"]
if policy == desired_scheduling_policy:
return
else:
max_attempts -= 1
log.info(f"Status call returned {policy=} ({max_attempts} attempts left)")
if max_attempts == 0:
raise AssertionError(
f"Status for {node_id=} did not reach {desired_scheduling_policy=}"
)
time.sleep(backoff)
except StorageControllerApiException as e:
max_attempts -= 1
log.info(f"Status call failed ({max_attempts} retries left): {e}")
if max_attempts == 0:
raise e
time.sleep(backoff)
def configure_failpoints(self, config_strings: Tuple[str, str] | List[Tuple[str, str]]):
if isinstance(config_strings, tuple):
pairs = [config_strings]
@@ -3852,9 +3901,17 @@ class EndpointFactory:
pageserver_id=pageserver_id,
)
def stop_all(self) -> "EndpointFactory":
def stop_all(self, fail_on_error=True) -> "EndpointFactory":
exception = None
for ep in self.endpoints:
ep.stop()
try:
ep.stop()
except Exception as e:
log.error(f"Failed to stop endpoint {ep.endpoint_id}: {e}")
exception = e
if fail_on_error and exception is not None:
raise exception
return self

View File

@@ -599,6 +599,22 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
res_json = res.json()
return res_json
def timeline_lsn_lease(
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, lsn: Lsn
):
data = {
"lsn": str(lsn),
}
log.info(f"Requesting lsn lease for {lsn=}, {tenant_id=}, {timeline_id=}")
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/lsn_lease",
json=data,
)
self.verbose_error(res)
res_json = res.json()
return res_json
def timeline_get_timestamp_of_lsn(
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, lsn: Lsn
):

View File

@@ -42,10 +42,6 @@ def single_timeline(
log.info("detach template tenant form pageserver")
env.pageserver.tenant_detach(template_tenant)
env.pageserver.allowed_errors.append(
# tenant detach causes this because the underlying attach-hook removes the tenant from storage controller entirely
".*Dropped remote consistent LSN updates.*",
)
log.info(f"duplicating template tenant {ncopies} times in S3")
tenants = fixtures.pageserver.remote_storage.duplicate_tenant(env, template_tenant, ncopies)

View File

@@ -55,10 +55,6 @@ def setup_env(
}
template_tenant, template_timeline = env.neon_cli.create_tenant(set_default=True)
env.pageserver.tenant_detach(template_tenant)
env.pageserver.allowed_errors.append(
# tenant detach causes this because the underlying attach-hook removes the tenant from storage controller entirely
".*Dropped remote consistent LSN updates.*",
)
env.pageserver.tenant_attach(template_tenant, config)
ep = env.endpoints.create_start("main", tenant_id=template_tenant)
ep.safe_psql("create table foo(b text)")

View File

@@ -86,10 +86,6 @@ def setup_tenant_template(env: NeonEnv, n_txns: int):
template_tenant, template_timeline = env.neon_cli.create_tenant(set_default=True)
env.pageserver.tenant_detach(template_tenant)
env.pageserver.allowed_errors.append(
# tenant detach causes this because the underlying attach-hook removes the tenant from storage controller entirely
".*Dropped remote consistent LSN updates.*",
)
env.pageserver.tenant_attach(template_tenant, config)
ps_http = env.pageserver.http_client()

View File

@@ -1,4 +1,5 @@
import json
import os
from pathlib import Path
from typing import Any, Dict, Tuple
@@ -17,30 +18,74 @@ from performance.pageserver.util import (
setup_pageserver_with_tenants,
)
# The following tests use pagebench "getpage at latest LSN" to characterize the throughput of the pageserver.
# originally there was a single test named `test_pageserver_max_throughput_getpage_at_latest_lsn``
# so you still see some references to this name in the code.
# To avoid recreating the snapshots for each test, we continue to use the name `max_throughput_latest_lsn`
# for some files and metrics.
# For reference, the space usage of the snapshots:
# admin@ip-172-31-13-23:[~/neon-main]: sudo du -hs /instance_store/test_output/shared-snapshots
# 137G /instance_store/test_output/shared-snapshots
# admin@ip-172-31-13-23:[~/neon-main]: sudo du -hs /instance_store/test_output/shared-snapshots/*
# 1.8G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-1-13
# 1.1G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-1-6
# 8.5G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-10-13
# 5.1G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-10-6
# 76G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-100-13
# 46G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-100-6
@pytest.mark.parametrize("duration", [30])
@pytest.mark.parametrize("pgbench_scale", [get_scale_for_db(s) for s in [100, 200]])
@pytest.mark.parametrize("n_tenants", [1, 10])
@pytest.mark.timeout(
10000
) # TODO: this value is just "a really high number"; have this per instance type
def test_pageserver_max_throughput_getpage_at_latest_lsn(
# sudo du -hs /instance_store/neon/test_output/shared-snapshots/*
# 416G /instance_store/neon/test_output/shared-snapshots/max_throughput_latest_lsn-500-13
@pytest.mark.parametrize("duration", [60 * 60])
@pytest.mark.parametrize("pgbench_scale", [get_scale_for_db(200)])
@pytest.mark.parametrize("n_tenants", [500])
@pytest.mark.timeout(10000)
@pytest.mark.skipif(
os.getenv("CI", "false") == "true",
reason="This test needs lot of resources and should run on dedicated HW, not in github action runners as part of CI",
)
def test_pageserver_characterize_throughput_with_n_tenants(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
pg_bin: PgBin,
n_tenants: int,
pgbench_scale: int,
duration: int,
):
setup_and_run_pagebench_benchmark(
neon_env_builder, zenbenchmark, pg_bin, n_tenants, pgbench_scale, duration, 1
)
# For reference, the space usage of the snapshots:
# sudo du -hs /instance_store/neon/test_output/shared-snapshots/*
# 19G /instance_store/neon/test_output/shared-snapshots/max_throughput_latest_lsn-1-136
@pytest.mark.parametrize("duration", [20 * 60])
@pytest.mark.parametrize("pgbench_scale", [get_scale_for_db(2048)])
# we use 1 client to characterize latencies, and 64 clients to characterize throughput/scalability
# we use 64 clients because typically for a high number of connections we recommend the connection pooler
# which by default uses 64 connections
@pytest.mark.parametrize("n_clients", [1, 64])
@pytest.mark.parametrize("n_tenants", [1])
@pytest.mark.timeout(2400)
@pytest.mark.skipif(
os.getenv("CI", "false") == "true",
reason="This test needs lot of resources and should run on dedicated HW, not in github action runners as part of CI",
)
def test_pageserver_characterize_latencies_with_1_client_and_throughput_with_many_clients_one_tenant(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
pg_bin: PgBin,
n_tenants: int,
pgbench_scale: int,
duration: int,
n_clients: int,
):
setup_and_run_pagebench_benchmark(
neon_env_builder, zenbenchmark, pg_bin, n_tenants, pgbench_scale, duration, n_clients
)
def setup_and_run_pagebench_benchmark(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
pg_bin: PgBin,
n_tenants: int,
pgbench_scale: int,
duration: int,
n_clients: int,
):
def record(metric, **kwargs):
zenbenchmark.record(
@@ -55,6 +100,7 @@ def test_pageserver_max_throughput_getpage_at_latest_lsn(
"n_tenants": (n_tenants, {"unit": ""}),
"pgbench_scale": (pgbench_scale, {"unit": ""}),
"duration": (duration, {"unit": "s"}),
"n_clients": (n_clients, {"unit": ""}),
}
)
@@ -96,7 +142,7 @@ def test_pageserver_max_throughput_getpage_at_latest_lsn(
r".*query handler for.*pagestream.*failed: unexpected message: CopyFail during COPY.*"
)
run_benchmark_max_throughput_latest_lsn(env, pg_bin, record, duration)
run_pagebench_benchmark(env, pg_bin, record, duration, n_clients)
def setup_tenant_template(env: NeonEnv, pg_bin: PgBin, scale: int):
@@ -118,10 +164,6 @@ def setup_tenant_template(env: NeonEnv, pg_bin: PgBin, scale: int):
}
template_tenant, template_timeline = env.neon_cli.create_tenant(set_default=True)
env.pageserver.tenant_detach(template_tenant)
env.pageserver.allowed_errors.append(
# tenant detach causes this because the underlying attach-hook removes the tenant from storage controller entirely
".*Dropped remote consistent LSN updates.*",
)
env.pageserver.tenant_attach(template_tenant, config)
ps_http = env.pageserver.http_client()
with env.endpoints.create_start("main", tenant_id=template_tenant) as ep:
@@ -157,8 +199,8 @@ def setup_tenant_template(env: NeonEnv, pg_bin: PgBin, scale: int):
return (template_tenant, template_timeline, config)
def run_benchmark_max_throughput_latest_lsn(
env: NeonEnv, pg_bin: PgBin, record, duration_secs: int
def run_pagebench_benchmark(
env: NeonEnv, pg_bin: PgBin, record, duration_secs: int, n_clients: int
):
"""
Benchmark `env.pageserver` for max throughput @ latest LSN and record results in `zenbenchmark`.
@@ -172,6 +214,8 @@ def run_benchmark_max_throughput_latest_lsn(
ps_http.base_url,
"--page-service-connstring",
env.pageserver.connstr(password=None),
"--num-clients",
str(n_clients),
"--runtime",
f"{duration_secs}s",
# don't specify the targets explicitly, let pagebench auto-discover them

View File

@@ -22,7 +22,7 @@ def ensure_pageserver_ready_for_benchmarking(env: NeonEnv, n_tenants: int):
log.info("wait for all tenants to become active")
wait_until_all_tenants_state(
ps_http, "Active", iterations=n_tenants, period=1, http_error_ok=False
ps_http, "Active", iterations=10 + n_tenants, period=1, http_error_ok=False
)
# ensure all layers are resident for predictiable performance

View File

@@ -1,18 +1,89 @@
import concurrent.futures
import random
import time
from collections import defaultdict
from typing import Any, Dict
import pytest
from fixtures.common_types import TenantId, TenantShardId, TimelineId
from fixtures.compute_reconfigure import ComputeReconfigure
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
)
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pg_version import PgVersion
def get_consistent_node_shard_counts(env: NeonEnv, total_shards) -> defaultdict[str, int]:
"""
Get the number of shards attached to each node.
This function takes into account the intersection of the intent and the observed state.
If they do not match, it asserts out.
"""
tenants = env.storage_controller.tenant_list()
intent = dict()
observed = dict()
tenant_placement: defaultdict[str, Dict[str, Any]] = defaultdict(
lambda: {
"observed": {"attached": None, "secondary": []},
"intent": {"attached": None, "secondary": []},
}
)
for t in tenants:
for node_id, loc_state in t["observed"]["locations"].items():
if (
loc_state is not None
and "conf" in loc_state
and loc_state["conf"] is not None
and loc_state["conf"]["mode"]
in set(["AttachedSingle", "AttachedMulti", "AttachedStale"])
):
observed[t["tenant_shard_id"]] = int(node_id)
tenant_placement[t["tenant_shard_id"]]["observed"]["attached"] = int(node_id)
if (
loc_state is not None
and "conf" in loc_state
and loc_state["conf"] is not None
and loc_state["conf"]["mode"] == "Secondary"
):
tenant_placement[t["tenant_shard_id"]]["observed"]["secondary"].append(int(node_id))
if "attached" in t["intent"]:
intent[t["tenant_shard_id"]] = t["intent"]["attached"]
tenant_placement[t["tenant_shard_id"]]["intent"]["attached"] = t["intent"]["attached"]
if "secondary" in t["intent"]:
tenant_placement[t["tenant_shard_id"]]["intent"]["secondary"] += t["intent"][
"secondary"
]
log.info(f"{tenant_placement=}")
matching = {
tid: intent[tid] for tid in observed if tid in intent and intent[tid] == observed[tid]
}
assert len(matching) == total_shards
attached_per_node: defaultdict[str, int] = defaultdict(int)
for node_id in matching.values():
attached_per_node[node_id] += 1
return attached_per_node
def assert_consistent_balanced_attachments(env: NeonEnv, total_shards):
attached_per_node = get_consistent_node_shard_counts(env, total_shards)
min_shard_count = min(attached_per_node.values())
max_shard_count = max(attached_per_node.values())
flake_factor = 5 / 100
assert max_shard_count - min_shard_count <= int(total_shards * flake_factor)
@pytest.mark.timeout(3600) # super long running test: should go down as we optimize
def test_storage_controller_many_tenants(
neon_env_builder: NeonEnvBuilder, compute_reconfigure_listener: ComputeReconfigure
@@ -44,7 +115,8 @@ def test_storage_controller_many_tenants(
# A small sleep on each call into the notify hook, to simulate the latency of doing a database write
compute_reconfigure_listener.register_on_notify(lambda body: time.sleep(0.01))
env = neon_env_builder.init_start()
env = neon_env_builder.init_configs()
neon_env_builder.start()
# We will intentionally stress reconciler concurrrency, which triggers a warning when lots
# of shards are hitting the delayed path.
@@ -60,14 +132,6 @@ def test_storage_controller_many_tenants(
)
for ps in env.pageservers:
# This can happen because when we do a loop over all pageservers and mark them offline/active,
# reconcilers might get cancelled, and the next reconcile can follow a not-so-elegant path of
# bumping generation before other attachments are detached.
#
# We could clean this up by making reconcilers respect the .observed of their predecessor, if
# we spawn with a wait for the predecessor.
ps.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
# Storage controller is allowed to drop pageserver requests when the cancellation token
# for a Reconciler fires.
ps.allowed_errors.append(".*request was dropped before completing.*")
@@ -79,6 +143,8 @@ def test_storage_controller_many_tenants(
shard_count = 2
stripe_size = 1024
total_shards = tenant_count * shard_count
tenants = set(TenantId.generate() for _i in range(0, tenant_count))
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
@@ -195,10 +261,44 @@ def test_storage_controller_many_tenants(
env.storage_controller.consistency_check()
check_memory()
# Restart pageservers: this exercises the /re-attach API
for pageserver in env.pageservers:
pageserver.stop()
pageserver.start()
shard_counts = get_consistent_node_shard_counts(env, total_shards)
log.info(f"Shard counts before rolling restart: {shard_counts}")
assert_consistent_balanced_attachments(env, total_shards)
# Restart pageservers gracefully: this exercises the /re-attach pageserver API
# and the storage controller drain and fill API
for ps in env.pageservers:
env.storage_controller.retryable_node_operation(
lambda ps_id: env.storage_controller.node_drain(ps_id), ps.id, max_attempts=3, backoff=2
)
env.storage_controller.poll_node_status(
ps.id, "PauseForRestart", max_attempts=24, backoff=5
)
shard_counts = get_consistent_node_shard_counts(env, total_shards)
log.info(f"Shard counts after draining node {ps.id}: {shard_counts}")
# Assert that we've drained the node
assert shard_counts[str(ps.id)] == 0
# Assert that those shards actually went somewhere
assert sum(shard_counts.values()) == total_shards
ps.restart()
env.storage_controller.poll_node_status(ps.id, "Active", max_attempts=24, backoff=1)
env.storage_controller.retryable_node_operation(
lambda ps_id: env.storage_controller.node_fill(ps_id), ps.id, max_attempts=3, backoff=2
)
env.storage_controller.poll_node_status(ps.id, "Active", max_attempts=24, backoff=5)
shard_counts = get_consistent_node_shard_counts(env, total_shards)
log.info(f"Shard counts after filling node {ps.id}: {shard_counts}")
assert_consistent_balanced_attachments(env, total_shards)
env.storage_controller.reconcile_until_idle()
env.storage_controller.consistency_check()
# Consistency check is safe here: restarting pageservers should not have caused any Reconcilers to spawn,
# as they were not offline long enough to trigger any scheduling changes.

View File

@@ -1,4 +1,4 @@
FROM openjdk:21
FROM openjdk:22
WORKDIR /source
COPY . .

View File

@@ -1,2 +1,2 @@
pg8000==1.30.5
pg8000==1.31.2
scramp>=1.4.3

View File

@@ -4,9 +4,9 @@ version = 3
[[package]]
name = "addr2line"
version = "0.21.0"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb"
checksum = "6e4503c46a5c0c7844e948c9a4d6acd9f50cccb4de1c48eb9e291ea17470c678"
dependencies = [
"gimli",
]
@@ -19,9 +19,9 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "async-trait"
version = "0.1.77"
version = "0.1.80"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9"
checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca"
dependencies = [
"proc-macro2",
"quote",
@@ -30,15 +30,15 @@ dependencies = [
[[package]]
name = "autocfg"
version = "1.1.0"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0"
[[package]]
name = "backtrace"
version = "0.3.69"
version = "0.3.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837"
checksum = "5cc23269a4f8976d0a4d2e7109211a419fe30e8d88d677cd60b6bc79c5732e0a"
dependencies = [
"addr2line",
"cc",
@@ -63,9 +63,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitflags"
version = "2.4.2"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf"
checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de"
[[package]]
name = "block-buffer"
@@ -78,9 +78,9 @@ dependencies = [
[[package]]
name = "bumpalo"
version = "3.15.3"
version = "3.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ea184aa71bb362a1157c896979544cc23974e08fd265f29ea96b59f0b4a555b"
checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c"
[[package]]
name = "byteorder"
@@ -90,15 +90,15 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
version = "1.5.0"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223"
checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9"
[[package]]
name = "cc"
version = "1.0.89"
version = "1.0.101"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0ba8f7aaa012f30d5b2861462f6708eccd49c3c39863fe083a308035f63d723"
checksum = "ac367972e516d45567c7eafc73d24e1c193dcf200a8d94e9db7b3d38b349572d"
[[package]]
name = "cfg-if"
@@ -154,9 +154,9 @@ dependencies = [
[[package]]
name = "errno"
version = "0.3.8"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245"
checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba"
dependencies = [
"libc",
"windows-sys 0.52.0",
@@ -170,15 +170,9 @@ checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
[[package]]
name = "fastrand"
version = "2.0.1"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5"
[[package]]
name = "finl_unicode"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fcfdc7a0362c9f4444381a9e697c79d435fe65b52a37466fc2c1184cee9edc6"
checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a"
[[package]]
name = "foreign-types"
@@ -296,9 +290,9 @@ dependencies = [
[[package]]
name = "getrandom"
version = "0.2.12"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "190092ea657667030ac6a35e305e62fc4dd69fd98ac98631e5d3a2b1575a12b5"
checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7"
dependencies = [
"cfg-if",
"libc",
@@ -307,9 +301,9 @@ dependencies = [
[[package]]
name = "gimli"
version = "0.28.1"
version = "0.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253"
checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd"
[[package]]
name = "hmac"
@@ -329,29 +323,23 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "lazy_static"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.153"
version = "0.2.155"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd"
checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c"
[[package]]
name = "linux-raw-sys"
version = "0.4.13"
version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c"
checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89"
[[package]]
name = "lock_api"
version = "0.4.11"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45"
checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17"
dependencies = [
"autocfg",
"scopeguard",
@@ -375,15 +363,15 @@ dependencies = [
[[package]]
name = "memchr"
version = "2.7.1"
version = "2.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149"
checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
[[package]]
name = "miniz_oxide"
version = "0.7.2"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d811f3e15f28568be3407c8e7fdb6514c1cda3cb30683f15b6a1a1dc4ea14a7"
checksum = "b8a240ddb74feaf34a79a7add65a741f3167852fba007066dcac1ca548d89c08"
dependencies = [
"adler",
]
@@ -401,11 +389,10 @@ dependencies = [
[[package]]
name = "native-tls"
version = "0.2.11"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e"
checksum = "a8614eb2c83d59d1c8cc974dd3f920198647674a0a035e1af1fa58707e317466"
dependencies = [
"lazy_static",
"libc",
"log",
"openssl",
@@ -419,9 +406,9 @@ dependencies = [
[[package]]
name = "object"
version = "0.32.2"
version = "0.36.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441"
checksum = "576dfe1fc8f9df304abb159d767a29d0476f7750fbf8aa7ad07816004a207434"
dependencies = [
"memchr",
]
@@ -438,7 +425,7 @@ version = "0.10.64"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f"
dependencies = [
"bitflags 2.4.2",
"bitflags 2.6.0",
"cfg-if",
"foreign-types",
"libc",
@@ -466,9 +453,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "openssl-sys"
version = "0.9.101"
version = "0.9.102"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dda2b0f344e78efc2facf7d195d098df0dd72151b26ab98da807afc26c198dff"
checksum = "c597637d56fbc83893a35eb0dd04b2b8e7a50c91e64e9493e398b5df4fb45fa2"
dependencies = [
"cc",
"libc",
@@ -478,9 +465,9 @@ dependencies = [
[[package]]
name = "parking_lot"
version = "0.12.1"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27"
dependencies = [
"lock_api",
"parking_lot_core",
@@ -488,15 +475,15 @@ dependencies = [
[[package]]
name = "parking_lot_core"
version = "0.9.9"
version = "0.9.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e"
checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"redox_syscall 0.5.2",
"smallvec",
"windows-targets 0.48.5",
"windows-targets 0.52.5",
]
[[package]]
@@ -525,9 +512,9 @@ dependencies = [
[[package]]
name = "pin-project-lite"
version = "0.2.13"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58"
checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02"
[[package]]
name = "pin-utils"
@@ -591,18 +578,18 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "proc-macro2"
version = "1.0.78"
version = "1.0.86"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae"
checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.35"
version = "1.0.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef"
checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7"
dependencies = [
"proc-macro2",
]
@@ -646,6 +633,15 @@ dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "redox_syscall"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c82cf8cff14456045f55ec4241383baeff27af886adb72ffb2162f99911de0fd"
dependencies = [
"bitflags 2.6.0",
]
[[package]]
name = "rust-neon-example"
version = "0.1.0"
@@ -658,17 +654,17 @@ dependencies = [
[[package]]
name = "rustc-demangle"
version = "0.1.23"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
[[package]]
name = "rustix"
version = "0.38.31"
version = "0.38.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ea3e1a662af26cd7a3ba09c0297a31af215563ecf42817c98df621387f4e949"
checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f"
dependencies = [
"bitflags 2.4.2",
"bitflags 2.6.0",
"errno",
"libc",
"linux-raw-sys",
@@ -692,11 +688,11 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "security-framework"
version = "2.9.2"
version = "2.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05b64fb303737d99b81884b2c63433e9ae28abebe5eb5045dcdd175dc2ecf4de"
checksum = "c627723fd09706bacdb5cf41499e95098555af3c3c29d014dc3c458ef6be11c0"
dependencies = [
"bitflags 1.3.2",
"bitflags 2.6.0",
"core-foundation",
"core-foundation-sys",
"libc",
@@ -705,9 +701,9 @@ dependencies = [
[[package]]
name = "security-framework-sys"
version = "2.9.1"
version = "2.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e932934257d3b408ed8f30db49d85ea163bfe74961f017f405b025af298f0c7a"
checksum = "317936bbbd05227752583946b9e66d7ce3b489f84e11a94a510b4437fef407d7"
dependencies = [
"core-foundation-sys",
"libc",
@@ -741,15 +737,15 @@ dependencies = [
[[package]]
name = "smallvec"
version = "1.13.1"
version = "1.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7"
checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
[[package]]
name = "socket2"
version = "0.5.6"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871"
checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c"
dependencies = [
"libc",
"windows-sys 0.52.0",
@@ -757,26 +753,26 @@ dependencies = [
[[package]]
name = "stringprep"
version = "0.1.4"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb41d74e231a107a1b4ee36bd1214b11285b77768d2e3824aedafa988fd36ee6"
checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1"
dependencies = [
"finl_unicode",
"unicode-bidi",
"unicode-normalization",
"unicode-properties",
]
[[package]]
name = "subtle"
version = "2.5.0"
version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc"
checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "syn"
version = "2.0.52"
version = "2.0.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b699d15b36d1f02c3e7c69f8ffef53de37aefae075d8488d4ba1a7788d574a07"
checksum = "901fa70d88b9d6c98022e23b4136f9f3e54e4662c3bc1bd1d84a42a9a0f0c1e9"
dependencies = [
"proc-macro2",
"quote",
@@ -797,9 +793,9 @@ dependencies = [
[[package]]
name = "tinyvec"
version = "1.6.0"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50"
checksum = "c55115c6fbe2d2bef26eb09ad74bde02d8255476fc0c7b515ef09fbb35742d82"
dependencies = [
"tinyvec_macros",
]
@@ -812,9 +808,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.36.0"
version = "1.38.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931"
checksum = "ba4f4a02a7a80d6f274636f0aa95c7e383b912d41fe721a31f29e29698585a4a"
dependencies = [
"backtrace",
"bytes",
@@ -828,9 +824,9 @@ dependencies = [
[[package]]
name = "tokio-macros"
version = "2.2.0"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b"
checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a"
dependencies = [
"proc-macro2",
"quote",
@@ -875,35 +871,15 @@ dependencies = [
[[package]]
name = "tokio-util"
version = "0.7.10"
version = "0.7.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15"
checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"pin-project-lite",
"tokio",
"tracing",
]
[[package]]
name = "tracing"
version = "0.1.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
dependencies = [
"pin-project-lite",
"tracing-core",
]
[[package]]
name = "tracing-core"
version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
dependencies = [
"once_cell",
]
[[package]]
@@ -933,6 +909,12 @@ dependencies = [
"tinyvec",
]
[[package]]
name = "unicode-properties"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4259d9d4425d9f0661581b804cb85fe66a4c631cadd8f490d1c13a35d5d9291"
[[package]]
name = "vcpkg"
version = "0.2.15"
@@ -1023,11 +1005,11 @@ dependencies = [
[[package]]
name = "whoami"
version = "1.5.0"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fec781d48b41f8163426ed18e8fc2864c12937df9ce54c88ede7bd47270893e"
checksum = "a44ab49fad634e88f55bf8f9bb3abd2f27d7204172a112c7c9987e01c1c94ea9"
dependencies = [
"redox_syscall",
"redox_syscall 0.4.1",
"wasite",
"web-sys",
]
@@ -1047,7 +1029,7 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
dependencies = [
"windows-targets 0.52.4",
"windows-targets 0.52.5",
]
[[package]]
@@ -1067,17 +1049,18 @@ dependencies = [
[[package]]
name = "windows-targets"
version = "0.52.4"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7dd37b7e5ab9018759f893a1952c9420d060016fc19a472b4bb20d1bdd694d1b"
checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb"
dependencies = [
"windows_aarch64_gnullvm 0.52.4",
"windows_aarch64_msvc 0.52.4",
"windows_i686_gnu 0.52.4",
"windows_i686_msvc 0.52.4",
"windows_x86_64_gnu 0.52.4",
"windows_x86_64_gnullvm 0.52.4",
"windows_x86_64_msvc 0.52.4",
"windows_aarch64_gnullvm 0.52.5",
"windows_aarch64_msvc 0.52.5",
"windows_i686_gnu 0.52.5",
"windows_i686_gnullvm",
"windows_i686_msvc 0.52.5",
"windows_x86_64_gnu 0.52.5",
"windows_x86_64_gnullvm 0.52.5",
"windows_x86_64_msvc 0.52.5",
]
[[package]]
@@ -1088,9 +1071,9 @@ checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8"
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.52.4"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bcf46cf4c365c6f2d1cc93ce535f2c8b244591df96ceee75d8e83deb70a9cac9"
checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263"
[[package]]
name = "windows_aarch64_msvc"
@@ -1100,9 +1083,9 @@ checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc"
[[package]]
name = "windows_aarch64_msvc"
version = "0.52.4"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da9f259dd3bcf6990b55bffd094c4f7235817ba4ceebde8e6d11cd0c5633b675"
checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6"
[[package]]
name = "windows_i686_gnu"
@@ -1112,9 +1095,15 @@ checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e"
[[package]]
name = "windows_i686_gnu"
version = "0.52.4"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b474d8268f99e0995f25b9f095bc7434632601028cf86590aea5c8a5cb7801d3"
checksum = "88ba073cf16d5372720ec942a8ccbf61626074c6d4dd2e745299726ce8b89670"
[[package]]
name = "windows_i686_gnullvm"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9"
[[package]]
name = "windows_i686_msvc"
@@ -1124,9 +1113,9 @@ checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406"
[[package]]
name = "windows_i686_msvc"
version = "0.52.4"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1515e9a29e5bed743cb4415a9ecf5dfca648ce85ee42e15873c3cd8610ff8e02"
checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf"
[[package]]
name = "windows_x86_64_gnu"
@@ -1136,9 +1125,9 @@ checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e"
[[package]]
name = "windows_x86_64_gnu"
version = "0.52.4"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5eee091590e89cc02ad514ffe3ead9eb6b660aedca2183455434b93546371a03"
checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9"
[[package]]
name = "windows_x86_64_gnullvm"
@@ -1148,9 +1137,9 @@ checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.52.4"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77ca79f2451b49fa9e2af39f0747fe999fcda4f5e241b2898624dca97a1f2177"
checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596"
[[package]]
name = "windows_x86_64_msvc"
@@ -1160,6 +1149,6 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538"
[[package]]
name = "windows_x86_64_msvc"
version = "0.52.4"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8"
checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0"

View File

@@ -7,9 +7,9 @@ publish = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
native-tls = "0.2.11"
native-tls = "0.2.12"
postgres-native-tls = "0.5.0"
tokio = { version = "1.36", features=["rt", "macros"] }
tokio = { version = "1.38", features=["rt", "macros"] }
tokio-postgres = "0.7.10"

View File

@@ -1,4 +1,4 @@
FROM rust:1.76
FROM rust:1.79
WORKDIR /source
COPY . .

View File

@@ -1,11 +1,11 @@
FROM swift:5.9 AS build
FROM swift:5.10 AS build
RUN apt-get -q update && apt-get -q install -y libssl-dev
WORKDIR /source
COPY . .
RUN swift build --configuration release
FROM swift:5.9
FROM swift:5.10
WORKDIR /app
COPY --from=build /source/.build/release .
CMD ["/app/PostgresClientKitExample"]

View File

@@ -1,4 +1,5 @@
{
"originHash" : "8eff8c577ba246ce7824d3434839acefced2b1a1d2b1ad700554502538a50558",
"pins" : [
{
"identity" : "bluesocket",
@@ -18,15 +19,6 @@
"version" : "2.0.2"
}
},
{
"identity" : "openssl",
"kind" : "remoteSourceControl",
"location" : "https://github.com/Kitura/OpenSSL.git",
"state" : {
"revision" : "5dc8cb4f971135c17343e3c6df4f28904a0600e2",
"version" : "2.3.1"
}
},
{
"identity" : "postgresclientkit",
"kind" : "remoteSourceControl",
@@ -37,5 +29,5 @@
}
}
],
"version" : 2
"version" : 3
}

View File

@@ -1,4 +1,4 @@
// swift-tools-version:5.8
// swift-tools-version:5.10
import PackageDescription
let package = Package(

View File

@@ -1,10 +1,10 @@
FROM swift:5.9 AS build
FROM swift:5.10 AS build
WORKDIR /source
COPY . .
RUN swift build --configuration release
FROM swift:5.9
FROM swift:5.10
WORKDIR /app
COPY --from=build /source/.build/release .
CMD ["/app/PostgresNIOExample"]

View File

@@ -1,12 +1,22 @@
{
"originHash" : "11b5dcece349a3e56a7a9a7d0af6d0f5b83dff321b43124a01b158ed7aac5302",
"pins" : [
{
"identity" : "postgres-nio",
"kind" : "remoteSourceControl",
"location" : "https://github.com/vapor/postgres-nio.git",
"state" : {
"revision" : "69ccfdf4c80144d845e3b439961b7ec6cd7ae33f",
"version" : "1.20.2"
"revision" : "5c268768890b062803a49f1358becc478f954265",
"version" : "1.21.5"
}
},
{
"identity" : "swift-async-algorithms",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-async-algorithms.git",
"state" : {
"revision" : "da4e36f86544cdf733a40d59b3a2267e3a7bbf36",
"version" : "1.0.0"
}
},
{
@@ -81,6 +91,15 @@
"version" : "1.20.1"
}
},
{
"identity" : "swift-service-lifecycle",
"kind" : "remoteSourceControl",
"location" : "https://github.com/swift-server/swift-service-lifecycle.git",
"state" : {
"revision" : "d58e6bf2b1ae2884cf204a8b5bcaaa7aae3c1ff0",
"version" : "2.6.0"
}
},
{
"identity" : "swift-system",
"kind" : "remoteSourceControl",
@@ -91,5 +110,5 @@
}
}
],
"version" : 2
"version" : 3
}

View File

@@ -1,10 +1,10 @@
// swift-tools-version:5.9
// swift-tools-version:5.10
import PackageDescription
let package = Package(
name: "PostgresNIOExample",
dependencies: [
.package(url: "https://github.com/vapor/postgres-nio.git", from: "1.20.2")
.package(url: "https://github.com/vapor/postgres-nio.git", from: "1.21.5")
],
targets: [
.executableTarget(

View File

@@ -1,4 +1,4 @@
FROM node:21
FROM node:22
WORKDIR /source
COPY . .

View File

@@ -5,7 +5,7 @@
"packages": {
"": {
"dependencies": {
"postgresql-client": "2.10.5"
"postgresql-client": "2.11.0"
}
},
"node_modules/doublylinked": {
@@ -42,9 +42,10 @@
}
},
"node_modules/postgresql-client": {
"version": "2.10.5",
"resolved": "https://registry.npmjs.org/postgresql-client/-/postgresql-client-2.10.5.tgz",
"integrity": "sha512-R3EC16pUdbgrzk1J2MQLj7jY2TepWurJHoK90nOeLZj1XTpL/+wL1VCneTmclRVKDuKVjFHr+FASV47KrLpAbw==",
"version": "2.11.0",
"resolved": "https://registry.npmjs.org/postgresql-client/-/postgresql-client-2.11.0.tgz",
"integrity": "sha512-QSPHcWVaiBG+JyASaDojOXvhRmsc2n8j2COdIjUDENFAtFls16Zy240asY2ENzZRQJUMAA8vpR8w4SAdI8jdbw==",
"license": "MIT",
"dependencies": {
"doublylinked": "^2.5.4",
"lightning-pool": "^4.2.2",
@@ -55,8 +56,7 @@
"putil-varhelpers": "^1.6.5"
},
"engines": {
"node": ">=16.0",
"npm": ">=7.0.0"
"node": ">=16.0"
}
},
"node_modules/power-tasks": {

View File

@@ -1,6 +1,6 @@
{
"type": "module",
"dependencies": {
"postgresql-client": "2.10.5"
"postgresql-client": "2.11.0"
}
}

View File

@@ -1,4 +1,4 @@
FROM node:21
FROM node:22
WORKDIR /source
COPY . .

View File

@@ -5,96 +5,138 @@
"packages": {
"": {
"dependencies": {
"@neondatabase/serverless": "0.9.0",
"@neondatabase/serverless": "0.9.4",
"ws": "8.17.1"
}
},
"node_modules/@neondatabase/serverless": {
"version": "0.9.0",
"resolved": "https://registry.npmjs.org/@neondatabase/serverless/-/serverless-0.9.0.tgz",
"integrity": "sha512-mmJnUAzlzvxNSZuuhI6kgJjH+JgFdBMYUWxihtq/nj0Tjt+Y5UU3W+SvRFoucnd5NObYkuLYQzk+zV5DGFKGJg==",
"version": "0.9.4",
"resolved": "https://registry.npmjs.org/@neondatabase/serverless/-/serverless-0.9.4.tgz",
"integrity": "sha512-D0AXgJh6xkf+XTlsO7iwE2Q1w8981E1cLCPAALMU2YKtkF/1SF6BiAzYARZFYo175ON+b1RNIy9TdSFHm5nteg==",
"license": "MIT",
"dependencies": {
"@types/pg": "8.6.6"
"@types/pg": "8.11.6"
}
},
"node_modules/@types/node": {
"version": "18.16.3",
"resolved": "https://registry.npmjs.org/@types/node/-/node-18.16.3.tgz",
"integrity": "sha512-OPs5WnnT1xkCBiuQrZA4+YAV4HEJejmHneyraIaxsbev5yCEr6KMwINNFP9wQeFIw8FWcoTqF3vQsa5CDaI+8Q=="
"version": "20.14.9",
"resolved": "https://registry.npmjs.org/@types/node/-/node-20.14.9.tgz",
"integrity": "sha512-06OCtnTXtWOZBJlRApleWndH4JsRVs1pDCc8dLSQp+7PpUpX3ePdHyeNSFTeSe7FtKyQkrlPvHwJOW3SLd8Oyg==",
"license": "MIT",
"dependencies": {
"undici-types": "~5.26.4"
}
},
"node_modules/@types/pg": {
"version": "8.6.6",
"resolved": "https://registry.npmjs.org/@types/pg/-/pg-8.6.6.tgz",
"integrity": "sha512-O2xNmXebtwVekJDD+02udOncjVcMZQuTEQEMpKJ0ZRf5E7/9JJX3izhKUcUifBkyKpljyUM6BTgy2trmviKlpw==",
"version": "8.11.6",
"resolved": "https://registry.npmjs.org/@types/pg/-/pg-8.11.6.tgz",
"integrity": "sha512-/2WmmBXHLsfRqzfHW7BNZ8SbYzE8OSk7i3WjFYvfgRHj7S1xj+16Je5fUKv3lVdVzk/zn9TXOqf+avFCFIE0yQ==",
"license": "MIT",
"dependencies": {
"@types/node": "*",
"pg-protocol": "*",
"pg-types": "^2.2.0"
"pg-types": "^4.0.1"
}
},
"node_modules/obuf": {
"version": "1.1.2",
"resolved": "https://registry.npmjs.org/obuf/-/obuf-1.1.2.tgz",
"integrity": "sha512-PX1wu0AmAdPqOL1mWhqmlOd8kOIZQwGZw6rh7uby9fTc5lhaOWFLX3I6R1hrF9k3zUY40e6igsLGkDXK92LJNg==",
"license": "MIT"
},
"node_modules/pg-int8": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/pg-int8/-/pg-int8-1.0.1.tgz",
"integrity": "sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw==",
"license": "ISC",
"engines": {
"node": ">=4.0.0"
}
},
"node_modules/pg-protocol": {
"version": "1.6.0",
"resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.6.0.tgz",
"integrity": "sha512-M+PDm637OY5WM307051+bsDia5Xej6d9IR4GwJse1qA1DIhiKlksvrneZOYQq42OM+spubpcNYEo2FcKQrDk+Q=="
},
"node_modules/pg-types": {
"version": "2.2.0",
"resolved": "https://registry.npmjs.org/pg-types/-/pg-types-2.2.0.tgz",
"integrity": "sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==",
"dependencies": {
"pg-int8": "1.0.1",
"postgres-array": "~2.0.0",
"postgres-bytea": "~1.0.0",
"postgres-date": "~1.0.4",
"postgres-interval": "^1.1.0"
},
"node_modules/pg-numeric": {
"version": "1.0.2",
"resolved": "https://registry.npmjs.org/pg-numeric/-/pg-numeric-1.0.2.tgz",
"integrity": "sha512-BM/Thnrw5jm2kKLE5uJkXqqExRUY/toLHda65XgFTBTFYZyopbKjBe29Ii3RbkvlsMoFwD+tHeGaCjjv0gHlyw==",
"license": "ISC",
"engines": {
"node": ">=4"
}
},
"node_modules/pg-protocol": {
"version": "1.6.1",
"resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.6.1.tgz",
"integrity": "sha512-jPIlvgoD63hrEuihvIg+tJhoGjUsLPn6poJY9N5CnlPd91c2T18T/9zBtLxZSb1EhYxBRoZJtzScCaWlYLtktg==",
"license": "MIT"
},
"node_modules/pg-types": {
"version": "4.0.2",
"resolved": "https://registry.npmjs.org/pg-types/-/pg-types-4.0.2.tgz",
"integrity": "sha512-cRL3JpS3lKMGsKaWndugWQoLOCoP+Cic8oseVcbr0qhPzYD5DWXK+RZ9LY9wxRf7RQia4SCwQlXk0q6FCPrVng==",
"license": "MIT",
"dependencies": {
"pg-int8": "1.0.1",
"pg-numeric": "1.0.2",
"postgres-array": "~3.0.1",
"postgres-bytea": "~3.0.0",
"postgres-date": "~2.1.0",
"postgres-interval": "^3.0.0",
"postgres-range": "^1.1.1"
},
"engines": {
"node": ">=10"
}
},
"node_modules/postgres-array": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/postgres-array/-/postgres-array-2.0.0.tgz",
"integrity": "sha512-VpZrUqU5A69eQyW2c5CA1jtLecCsN2U/bD6VilrFDWq5+5UIEVO7nazS3TEcHf1zuPYO/sqGvUvW62g86RXZuA==",
"version": "3.0.2",
"resolved": "https://registry.npmjs.org/postgres-array/-/postgres-array-3.0.2.tgz",
"integrity": "sha512-6faShkdFugNQCLwucjPcY5ARoW1SlbnrZjmGl0IrrqewpvxvhSLHimCVzqeuULCbG0fQv7Dtk1yDbG3xv7Veog==",
"license": "MIT",
"engines": {
"node": ">=4"
"node": ">=12"
}
},
"node_modules/postgres-bytea": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/postgres-bytea/-/postgres-bytea-1.0.0.tgz",
"integrity": "sha512-xy3pmLuQqRBZBXDULy7KbaitYqLcmxigw14Q5sj8QBVLqEwXfeybIKVWiqAXTlcvdvb0+xkOtDbfQMOf4lST1w==",
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/postgres-bytea/-/postgres-bytea-3.0.0.tgz",
"integrity": "sha512-CNd4jim9RFPkObHSjVHlVrxoVQXz7quwNFpz7RY1okNNme49+sVyiTvTRobiLV548Hx/hb1BG+iE7h9493WzFw==",
"license": "MIT",
"dependencies": {
"obuf": "~1.1.2"
},
"engines": {
"node": ">=0.10.0"
"node": ">= 6"
}
},
"node_modules/postgres-date": {
"version": "1.0.7",
"resolved": "https://registry.npmjs.org/postgres-date/-/postgres-date-1.0.7.tgz",
"integrity": "sha512-suDmjLVQg78nMK2UZ454hAG+OAW+HQPZ6n++TNDUX+L0+uUlLywnoxJKDou51Zm+zTCjrCl0Nq6J9C5hP9vK/Q==",
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/postgres-date/-/postgres-date-2.1.0.tgz",
"integrity": "sha512-K7Juri8gtgXVcDfZttFKVmhglp7epKb1K4pgrkLxehjqkrgPhfG6OO8LHLkfaqkbpjNRnra018XwAr1yQFWGcA==",
"license": "MIT",
"engines": {
"node": ">=0.10.0"
"node": ">=12"
}
},
"node_modules/postgres-interval": {
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/postgres-interval/-/postgres-interval-1.2.0.tgz",
"integrity": "sha512-9ZhXKM/rw350N1ovuWHbGxnGh/SNJ4cnxHiM0rxE4VN41wsg8P8zWn9hv/buK00RP4WvlOyr/RBDiptyxVbkZQ==",
"dependencies": {
"xtend": "^4.0.0"
},
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/postgres-interval/-/postgres-interval-3.0.0.tgz",
"integrity": "sha512-BSNDnbyZCXSxgA+1f5UU2GmwhoI0aU5yMxRGO8CdFEcY2BQF9xm/7MqKnYoM1nJDk8nONNWDk9WeSmePFhQdlw==",
"license": "MIT",
"engines": {
"node": ">=0.10.0"
"node": ">=12"
}
},
"node_modules/postgres-range": {
"version": "1.1.4",
"resolved": "https://registry.npmjs.org/postgres-range/-/postgres-range-1.1.4.tgz",
"integrity": "sha512-i/hbxIE9803Alj/6ytL7UHQxRvZkI9O4Sy+J3HGc4F4oo/2eQAjTSNJ0bfxyse3bH0nuVesCk+3IRLaMtG3H6w==",
"license": "MIT"
},
"node_modules/undici-types": {
"version": "5.26.5",
"resolved": "https://registry.npmjs.org/undici-types/-/undici-types-5.26.5.tgz",
"integrity": "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==",
"license": "MIT"
},
"node_modules/ws": {
"version": "8.17.1",
"resolved": "https://registry.npmjs.org/ws/-/ws-8.17.1.tgz",
@@ -114,14 +156,6 @@
"optional": true
}
}
},
"node_modules/xtend": {
"version": "4.0.2",
"resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz",
"integrity": "sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==",
"engines": {
"node": ">=0.4"
}
}
}
}

View File

@@ -1,7 +1,7 @@
{
"type": "module",
"dependencies": {
"@neondatabase/serverless": "0.9.0",
"@neondatabase/serverless": "0.9.4",
"ws": "8.17.1"
}
}

View File

@@ -21,8 +21,6 @@ def positive_env(neon_env_builder: NeonEnvBuilder) -> NeonEnv:
[
# eviction might be the first one after an attach to access the layers
".*unexpectedly on-demand downloading remote layer .* for task kind Eviction",
# detach can happen before we get to validate the generation number
".*deletion backend: Dropped remote consistent LSN updates for tenant.*",
]
)
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
@@ -58,10 +56,6 @@ def negative_env(neon_env_builder: NeonEnvBuilder) -> Generator[NegativeTests, N
env.pageserver.allowed_errors.extend(
[
# This fixture detaches the tenant, and tests using it will tend to re-attach it
# shortly after. There may be un-processed deletion_queue validations from the
# initial attachment
".*Dropped remote consistent LSN updates.*",
# This fixture is for tests that will intentionally generate 400 responses
".*Error processing HTTP request: Bad request",
]

View File

@@ -211,7 +211,7 @@ def test_auth_failures(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
def check_pageserver(expect_success: bool, **conn_kwargs):
check_connection(
env.pageserver,
f"get_last_record_rlsn {env.initial_tenant} {timeline_id}",
f"show {env.initial_tenant}",
expect_success,
**conn_kwargs,
)

View File

@@ -14,11 +14,6 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
)
env = neon_env_builder.init_start()
for pageserver in env.pageservers:
# This test dual-attaches a tenant, one of the pageservers will therefore
# be running with a stale generation.
pageserver.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
env.neon_cli.create_branch("test_change_pageserver")
endpoint = env.endpoints.create_start("test_change_pageserver")

View File

@@ -0,0 +1,23 @@
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder
@pytest.mark.parametrize(
"sql_func",
[
"trigger_panic",
"trigger_segfault",
"💣", # calls `trigger_segfault` internally
],
)
def test_endpoint_crash(neon_env_builder: NeonEnvBuilder, sql_func: str):
"""
Test that triggering crash from neon_test_utils crashes the endpoint
"""
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_endpoint_crash")
endpoint = env.endpoints.create_start("test_endpoint_crash")
endpoint.safe_psql("CREATE EXTENSION neon_test_utils;")
with pytest.raises(Exception, match="This probably means the server terminated abnormally"):
endpoint.safe_psql(f"SELECT {sql_func}();")

View File

@@ -39,9 +39,6 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_configs()
env.start()
env.pageserver.allowed_errors.extend(
[".*Dropped remote consistent LSN updates.*", ".*Dropping stale deletions.*"]
)
ps_http = env.pageserver.http_client()

View File

@@ -1,3 +1,4 @@
import time
from pathlib import Path
from fixtures.log_helper import log
@@ -72,3 +73,46 @@ WITH (fillfactor='100');
blocks = query_scalar(cur, "select approximate_working_set_size(true)")
log.info(f"working set size after some index access of a few select pages only {blocks}")
assert blocks < 10
def test_sliding_working_set_approximation(neon_simple_env: NeonEnv):
env = neon_simple_env
endpoint = env.endpoints.create_start(
branch_name="main",
config_lines=[
"autovacuum = off",
"shared_buffers=1MB",
"neon.max_file_cache_size=256MB",
"neon.file_cache_size_limit=245MB",
],
)
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create extension neon version '1.4'")
cur.execute(
"create table t(pk integer primary key, count integer default 0, payload text default repeat('?', 128))"
)
cur.execute("insert into t (pk) values (generate_series(1,1000000))")
time.sleep(2)
before_10k = time.monotonic()
cur.execute("select sum(count) from t where pk between 10000 and 20000")
time.sleep(2)
before_1k = time.monotonic()
cur.execute("select sum(count) from t where pk between 1000 and 2000")
after = time.monotonic()
cur.execute(f"select approximate_working_set_size_seconds({int(after - before_1k + 1)})")
estimation_1k = cur.fetchall()[0][0]
log.info(f"Working set size for selecting 1k records {estimation_1k}")
cur.execute(f"select approximate_working_set_size_seconds({int(after - before_10k + 1)})")
estimation_10k = cur.fetchall()[0][0]
log.info(f"Working set size for selecting 10k records {estimation_10k}")
cur.execute("select pg_table_size('t')")
size = cur.fetchall()[0][0] // 8192
log.info(f"Table size {size} blocks")
assert estimation_1k >= 20 and estimation_1k <= 40
assert estimation_10k >= 200 and estimation_10k <= 400

View File

@@ -50,7 +50,7 @@ def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder):
# Ensure that the default version is also updated in the neon.control file
assert cur.fetchone() == ("1.3",)
cur.execute("SELECT * from neon.NEON_STAT_FILE_CACHE")
all_versions = ["1.3", "1.2", "1.1", "1.0"]
all_versions = ["1.4", "1.3", "1.2", "1.1", "1.0"]
current_version = "1.3"
for idx, begin_version in enumerate(all_versions):
for target_version in all_versions[idx + 1 :]:

View File

@@ -249,10 +249,6 @@ def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
assert timeline["remote_consistent_lsn"] == timeline["remote_consistent_lsn_visible"]
assert get_deletion_queue_dropped_lsn_updates(ps_http) == 0
main_pageserver.allowed_errors.extend(
[".*Dropped remote consistent LSN updates.*", ".*Dropping stale deletions.*"]
)
# Now advance the generation in the control plane: subsequent validations
# from the running pageserver will fail. No more deletions should happen.
env.storage_controller.attach_hook_issue(env.initial_tenant, other_pageserver.id)
@@ -397,8 +393,6 @@ def test_deletion_queue_recovery(
# validated before restart.
assert get_deletion_queue_executed(ps_http) == before_restart_depth
else:
main_pageserver.allowed_errors.extend([".*Dropping stale deletions.*"])
# If we lost the attachment, we should have dropped our pre-restart deletions.
assert get_deletion_queue_dropped(ps_http) == before_restart_depth
@@ -553,13 +547,6 @@ def test_multi_attach(
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
# We will intentionally create situations where stale deletions happen from non-latest-generation
# nodes when the tenant is multiply-attached
for ps in env.pageservers:
ps.allowed_errors.extend(
[".*Dropped remote consistent LSN updates.*", ".*Dropping stale deletions.*"]
)
# Initially, the tenant will be attached to the first pageserver (first is default in our test harness)
wait_until(10, 0.2, lambda: assert_tenant_state(http_clients[0], tenant_id, "Active"))
_detail = http_clients[0].timeline_detail(tenant_id, timeline_id)

View File

@@ -16,6 +16,8 @@ from fixtures.pageserver.utils import (
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind, S3Storage, s3_storage
from fixtures.utils import wait_until
from fixtures.workload import Workload
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
# A tenant configuration that is convenient for generating uploads and deletions
# without a large amount of postgres traffic.
@@ -59,7 +61,7 @@ def evict_random_layers(
@pytest.mark.parametrize("seed", [1, 2, 3])
def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, seed: int):
def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, make_httpserver, seed: int):
"""
Issue many location configuration changes, ensure that tenants
remain readable & we don't get any unexpected errors. We should
@@ -73,6 +75,20 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, seed: int):
neon_env_builder.enable_pageserver_remote_storage(
remote_storage_kind=s3_storage(),
)
neon_env_builder.control_plane_compute_hook_api = (
f"http://{make_httpserver.host}:{make_httpserver.port}/notify-attach"
)
def ignore_notify(request: Request):
# This test does all its own compute configuration (by passing explicit pageserver ID to Workload functions),
# so we send controller notifications to /dev/null to prevent it fighting the test for control of the compute.
log.info(f"Ignoring storage controller compute notification: {request.json}")
return Response(status=200)
make_httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(
ignore_notify
)
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
pageservers = env.pageservers
@@ -83,9 +99,6 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, seed: int):
for ps in env.pageservers:
ps.allowed_errors.extend(
[
# We will make no effort to avoid stale attachments
".*Dropped remote consistent LSN updates.*",
".*Dropping stale deletions.*",
# page_service_conn_main{peer_addr=[::1]:41176}: query handler for 'pagestream 3b19aec5038c796f64b430b30a555121 d07776761d44050b8aab511df1657d83' failed: Tenant 3b19aec5038c796f64b430b30a555121 not found
".*query handler.*Tenant.*not found.*",
# page_service_conn_main{peer_addr=[::1]:45552}: query handler for 'pagestream 414ede7ad50f775a8e7d9ba0e43b9efc a43884be16f44b3626482b6981b2c745' failed: Tenant 414ede7ad50f775a8e7d9ba0e43b9efc is not active
@@ -102,6 +115,15 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, seed: int):
workload.init(env.pageservers[0].id)
workload.write_rows(256, env.pageservers[0].id)
# Discourage the storage controller from interfering with the changes we will make directly on the pageserver
env.storage_controller.tenant_policy_update(
tenant_id,
{
"scheduling": "Stop",
},
)
env.storage_controller.allowed_errors.append(".*Scheduling is disabled by policy Stop.*")
# We use a fixed seed to make the test reproducible: we want a randomly
# chosen order, but not to change the order every time we run the test.
rng = random.Random(seed)

View File

@@ -355,13 +355,6 @@ def test_remote_storage_upload_queue_retries(
env.pageserver.stop(immediate=True)
env.endpoints.stop_all()
# We are about to forcibly drop local dirs. Storage controller will increment generation in re-attach before
# we later increment when actually attaching it again, leading to skipping a generation and potentially getting
# these warnings if there was a durable but un-executed deletion list at time of restart.
env.pageserver.allowed_errors.extend(
[".*Dropped remote consistent LSN updates.*", ".*Dropping stale deletions.*"]
)
dir_to_clear = env.pageserver.tenant_dir()
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)

View File

@@ -1144,10 +1144,6 @@ def test_sharding_split_failures(
)
for ps in env.pageservers:
# When we do node failures and abandon a shard, it will de-facto have old generation and
# thereby be unable to publish remote consistent LSN updates
ps.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
# If we're using a failure that will panic the storage controller, all background
# upcalls from the pageserver can fail
ps.allowed_errors.append(".*calling control plane generation validation API failed.*")

View File

@@ -60,11 +60,6 @@ def test_storage_controller_smoke(
neon_env_builder.num_pageservers = 3
env = neon_env_builder.init_configs()
for pageserver in env.pageservers:
# This test detaches tenants during migration, which can race with deletion queue operations,
# during detach we only do an advisory flush, we don't wait for it.
pageserver.allowed_errors.extend([".*Dropped remote consistent LSN updates.*"])
# Start services by hand so that we can skip a pageserver (this will start + register later)
env.broker.try_start()
env.storage_controller.start()
@@ -484,9 +479,6 @@ def test_storage_controller_compute_hook(
# Start running
env = neon_env_builder.init_start()
# We will to an unclean migration, which will result in deletion queue warnings
env.pageservers[0].allowed_errors.append(".*Dropped remote consistent LSN updates for tenant.*")
# Initial notification from tenant creation
assert len(notifications) == 1
expect: Dict[str, Union[List[Dict[str, int]], str, None, int]] = {
@@ -1054,13 +1046,6 @@ def test_storage_controller_heartbeats(
online_node_ids = set(range(1, len(env.pageservers) + 1)) - offline_node_ids
for node_id in offline_node_ids:
env.get_pageserver(node_id).allowed_errors.append(
# In the case of the failpoint failure, the impacted pageserver
# still believes it has the tenant attached since location
# config calls into it will fail due to being marked offline.
".*Dropped remote consistent LSN updates.*",
)
if len(offline_node_ids) > 1:
env.get_pageserver(node_id).allowed_errors.append(
".*Scheduling error when marking pageserver.*offline.*",
@@ -1518,49 +1503,6 @@ def test_tenant_import(neon_env_builder: NeonEnvBuilder, shard_count, remote_sto
workload.validate()
def retryable_node_operation(op, ps_id, max_attempts, backoff):
while max_attempts > 0:
try:
op(ps_id)
return
except StorageControllerApiException as e:
max_attempts -= 1
log.info(f"Operation failed ({max_attempts} attempts left): {e}")
if max_attempts == 0:
raise e
time.sleep(backoff)
def poll_node_status(env, node_id, desired_scheduling_policy, max_attempts, backoff):
log.info(f"Polling {node_id} for {desired_scheduling_policy} scheduling policy")
while max_attempts > 0:
try:
status = env.storage_controller.node_status(node_id)
policy = status["scheduling"]
if policy == desired_scheduling_policy:
return
else:
max_attempts -= 1
log.info(f"Status call returned {policy=} ({max_attempts} attempts left)")
if max_attempts == 0:
raise AssertionError(
f"Status for {node_id=} did not reach {desired_scheduling_policy=}"
)
time.sleep(backoff)
except StorageControllerApiException as e:
max_attempts -= 1
log.info(f"Status call failed ({max_attempts} retries left): {e}")
if max_attempts == 0:
raise e
time.sleep(backoff)
def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder):
"""
Graceful reststart of storage controller clusters use the drain and
@@ -1601,10 +1543,10 @@ def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder):
# Perform a graceful rolling restart
for ps in env.pageservers:
retryable_node_operation(
env.storage_controller.retryable_node_operation(
lambda ps_id: env.storage_controller.node_drain(ps_id), ps.id, max_attempts=3, backoff=2
)
poll_node_status(env, ps.id, "PauseForRestart", max_attempts=6, backoff=5)
env.storage_controller.poll_node_status(ps.id, "PauseForRestart", max_attempts=6, backoff=5)
shard_counts = get_node_shard_counts(env, tenant_ids)
log.info(f"Shard counts after draining node {ps.id}: {shard_counts}")
@@ -1614,12 +1556,12 @@ def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder):
assert sum(shard_counts.values()) == total_shards
ps.restart()
poll_node_status(env, ps.id, "Active", max_attempts=10, backoff=1)
env.storage_controller.poll_node_status(ps.id, "Active", max_attempts=10, backoff=1)
retryable_node_operation(
env.storage_controller.retryable_node_operation(
lambda ps_id: env.storage_controller.node_fill(ps_id), ps.id, max_attempts=3, backoff=2
)
poll_node_status(env, ps.id, "Active", max_attempts=6, backoff=5)
env.storage_controller.poll_node_status(ps.id, "Active", max_attempts=6, backoff=5)
shard_counts = get_node_shard_counts(env, tenant_ids)
log.info(f"Shard counts after filling node {ps.id}: {shard_counts}")
@@ -1657,15 +1599,15 @@ def test_background_operation_cancellation(neon_env_builder: NeonEnvBuilder):
ps_id_to_drain = env.pageservers[0].id
retryable_node_operation(
env.storage_controller.retryable_node_operation(
lambda ps_id: env.storage_controller.node_drain(ps_id),
ps_id_to_drain,
max_attempts=3,
backoff=2,
)
poll_node_status(env, ps_id_to_drain, "Draining", max_attempts=6, backoff=2)
env.storage_controller.poll_node_status(ps_id_to_drain, "Draining", max_attempts=6, backoff=2)
env.storage_controller.cancel_node_drain(ps_id_to_drain)
poll_node_status(env, ps_id_to_drain, "Active", max_attempts=6, backoff=2)
env.storage_controller.poll_node_status(ps_id_to_drain, "Active", max_attempts=6, backoff=2)

View File

@@ -54,4 +54,4 @@ def test_subscriber_restart(neon_simple_env: NeonEnv):
pcur.execute(f"INSERT into t values ({n_records}, 0)")
n_records += 1
with sub.cursor() as scur:
wait_until(10, 0.5, check_that_changes_propagated)
wait_until(60, 0.5, check_that_changes_propagated)

View File

@@ -320,10 +320,6 @@ def test_creating_tenant_conf_after_attach(neon_env_builder: NeonEnvBuilder):
assert not config_path.exists(), "detach did not remove config file"
# The re-attach's increment of the generation number may invalidate deletion queue
# updates in flight from the previous attachment.
env.pageserver.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
env.pageserver.tenant_attach(tenant_id)
wait_until(
number_of_iterations=5,

View File

@@ -67,8 +67,9 @@ def test_tenant_delete_smoke(
# first try to delete non existing tenant
tenant_id = TenantId.generate()
env.pageserver.allowed_errors.append(".*NotFound.*")
env.pageserver.allowed_errors.append(".*simulated failure.*")
env.pageserver.allowed_errors.extend(
[".*NotFound.*", ".*simulated failure.*", ".*failed to delete .+ objects.*"]
)
# Check that deleting a non-existent tenant gives the expected result: this is a loop because we
# may need to retry on some remote storage errors injected by the test harness

View File

@@ -76,10 +76,6 @@ def test_tenant_reattach(neon_env_builder: NeonEnvBuilder, mode: str):
env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS)
# Our re-attach may race with the deletion queue processing LSN updates
# from the original attachment.
env.pageserver.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
with endpoint.cursor() as cur:
cur.execute("CREATE TABLE t(key int primary key, value text)")
@@ -349,10 +345,6 @@ def test_detach_while_attaching(
env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS)
# Our re-attach may race with the deletion queue processing LSN updates
# from the original attachment.
env.pageserver.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
# Create table, and insert some rows. Make it big enough that it doesn't fit in
# shared_buffers, otherwise the SELECT after restart will just return answer
# from shared_buffers without hitting the page server, which defeats the point
@@ -422,10 +414,6 @@ def test_detach_while_activating(
env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS)
# Our re-attach may race with the deletion queue processing LSN updates
# from the original attachment.
env.pageserver.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
data_id = 1
data_secret = "very secret secret"
insert_test_data(pageserver_http, tenant_id, timeline_id, data_id, data_secret, endpoint)

View File

@@ -203,8 +203,6 @@ def test_tenant_relocation(
[
# Needed for detach polling on the original pageserver
f".*NotFound: tenant {tenant_id}.*",
# We will dual-attach in this test, so stale generations are expected
".*Dropped remote consistent LSN updates.*",
]
)

View File

@@ -10,6 +10,7 @@ from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
NeonEnvBuilder,
flush_ep_to_pageserver,
wait_for_last_flush_lsn,
wait_for_wal_insert_lsn,
)
@@ -710,3 +711,90 @@ def mask_model_inputs(x):
return newlist
else:
return x
@pytest.mark.parametrize("zero_gc", [True, False])
def test_lsn_lease_size(neon_env_builder: NeonEnvBuilder, test_output_dir: Path, zero_gc: bool):
"""
Compare a LSN lease to a read-only branch for synthetic size calculation.
They should have the same effect.
"""
conf = {
"pitr_interval": "0s" if zero_gc else "3600s",
"gc_period": "0s",
}
env = neon_env_builder.init_start(initial_tenant_conf=conf)
ro_branch_res = insert_with_action(
env, env.initial_tenant, env.initial_timeline, test_output_dir, action="branch"
)
tenant, timeline = env.neon_cli.create_tenant(conf=conf)
lease_res = insert_with_action(env, tenant, timeline, test_output_dir, action="lease")
assert_size_approx_equal(lease_res, ro_branch_res)
def insert_with_action(
env: NeonEnv,
tenant: TenantId,
timeline: TimelineId,
test_output_dir: Path,
action: str,
) -> int:
"""
Inserts some data on the timeline, perform an action, and insert more data on the same timeline.
Returns the size at the end of the insertion.
Valid actions:
- "lease": Acquires a lease.
- "branch": Creates a child branch but never writes to it.
"""
client = env.pageserver.http_client()
with env.endpoints.create_start("main", tenant_id=tenant) as ep:
initial_size = client.tenant_size(tenant)
log.info(f"initial size: {initial_size}")
with ep.cursor() as cur:
cur.execute(
"CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, 1000000) s(i)"
)
last_flush_lsn = wait_for_last_flush_lsn(env, ep, tenant, timeline)
if action == "lease":
res = client.timeline_lsn_lease(tenant, timeline, last_flush_lsn)
log.info(f"result from lsn_lease api: {res}")
elif action == "branch":
ro_branch = env.neon_cli.create_branch(
"ro_branch", tenant_id=tenant, ancestor_start_lsn=last_flush_lsn
)
log.info(f"{ro_branch=} created")
else:
raise AssertionError("Invalid action type, only `lease` and `branch`are accepted")
with ep.cursor() as cur:
cur.execute(
"CREATE TABLE t1 AS SELECT i::bigint n FROM generate_series(0, 1000000) s(i)"
)
cur.execute(
"CREATE TABLE t2 AS SELECT i::bigint n FROM generate_series(0, 1000000) s(i)"
)
cur.execute(
"CREATE TABLE t3 AS SELECT i::bigint n FROM generate_series(0, 1000000) s(i)"
)
last_flush_lsn = wait_for_last_flush_lsn(env, ep, tenant, timeline)
# Avoid flakiness when calculating logical size.
flush_ep_to_pageserver(env, ep, tenant, timeline)
size_after_action_and_insert = client.tenant_size(tenant)
log.info(f"{size_after_action_and_insert=}")
size_debug_file = open(test_output_dir / f"size_debug_{action}.html", "w")
size_debug = client.tenant_size_debug(tenant)
size_debug_file.write(size_debug)
return size_after_action_and_insert

View File

@@ -386,10 +386,6 @@ def test_create_churn_during_restart(neon_env_builder: NeonEnvBuilder):
# generation nubmers out of order.
env.pageserver.allowed_errors.append(".*Generation .+ is less than existing .+")
# Our multiple creation requests will advance generation quickly, and when we skip
# a generation number we can generate these warnings
env.pageserver.allowed_errors.append(".*Dropped remote consistent LSN updates for tenant .+")
# Timeline::flush_and_shutdown cannot tell if it is hitting a failure because of
# an incomplete attach, or some other problem. In the field this should be rare,
# so we allow it to log at WARN, even if it is occasionally a false positive.