Compare commits

...

29 Commits

Author SHA1 Message Date
Conrad Ludgate
b0411e612a use flag-bearer 2025-04-02 12:51:27 +01:00
Conrad Ludgate
0579533ac8 remove dynamic limiter internal Box<dyn> 2025-04-02 12:45:04 +01:00
Conrad Ludgate
99c0da9607 proxy: simplify dynamic limiter impl 2025-04-02 12:40:45 +01:00
a-masterov
c179d098ef Increase the timeout for extensions upgrade tests (on schedule). (#11406)
## Problem
Sometimes the forced extension upgrade test fails (on schedule) due to a
timeout.
## Summary of changes
The timeout is increased to 60 mins.
2025-04-02 11:18:37 +00:00
Peter Bendel
4bc6dbdd5f use a prod-like shared_buffers size for some perf unit tests (#11373)
## Problem

In Neon DBaaS we adjust the shared_buffers to the size of the compute,
or better described we adjust the max number of connections to the
compute size and we adjust the shared_buffers size to the number of max
connections according to about the following sizes
`2 CU: 225mb; 4 CU: 450mb; 8 CU: 900mb`

[see](877e33b428/goapp/controlplane/internal/pkg/compute/computespec/pg_settings.go (L405))

## Summary of changes

We should run perf unit tests with settings that is realistic for a
paying customer and select 8 CU as the reference for those tests.
2025-04-02 10:43:05 +00:00
John Spray
7dc8370848 storcon: do graceful migrations from chaos injector (#11028)
## Problem

Followup to https://github.com/neondatabase/neon/pull/10913

Existing chaos injection just does simple cutovers to secondary
locations. Let's also exercise code for doing graceful migrations. This
should implicitly test how such migrations cope with overlapping with
service restarts.

## Summary of changes
2025-04-02 10:08:05 +00:00
Alex Chi Z.
c4fc602115 feat(pageserver): support synthetic size calculation for invisible branches (#11335)
## Problem

ref https://github.com/neondatabase/neon/issues/11279


Imagine we have a branch with 3 snapshots A, B, and C:
```
base---+---+---+---main
        \-A \-B \-C
base=100G, base-A=1G, A-B=1G, B-C=1G, C-main=1G
```
at this point, the synthetic size should be 100+1+1+1+1=104G.

after the deletion, the structure looks like:
```
base---+---+---+
       \-A \-B \-C
```
If we simply assume main never exists, the size will be calculated as
size(A) + size(B) + size(C)=300GB, which obviously is not what the user
would expect.

The correct way to do this is to assume part of main still exists, that
is to say, set C-main=1G:
```
base---+---+---+main
       \-A \-B \-C
```
And we will get the correct synthetic size of 100G+1+1+1=103G.


## Summary of changes

* Do not generate gc cutoff point for invisible branches.
* Use the same LSN as the last branchpoint for branch end.
* Remove test_api_handler for mark_invisible.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-01 18:50:58 +00:00
Konstantin Knizhnik
02936b82c5 Fix effective_lsn calculation for prefetch (#11219)
## Problem

See  https://neondb.slack.com/archives/C04DGM6SMTM/p1741594233757489

Consider the following scenario:

1. Backend A wants to prefetch some block B
2. Backend A checks that block B is not present in shared buffer
3. Backend A registers new prefetch request and calls
prefetch_do_request
4. prefetch_do_request calls neon_get_request_lsns
5. neon_get_request_lsns obtains LwLSN for block B
6. Backend B downloads B, updates and wallogs it (let say to Lsn1)
7. Block B is once again thrown from shared buffers, its LwLSN is set to
Lsn1
8. Backend A obtains current flush LSN, let's say that it is Lsn1
9. Backend A stores Lsn1 as effective_lsn in prefetch slot.
10. Backend A reads page B with LwLSN=Lsn1
11. Backend A finds in prefetch ring response for prefetch request for
block B with effective_lsn=Lsn1, so that it satisfies
neon_prefetch_response_usable condition
12. Backend A uses deteriorated version of the page!

## Summary of changes

Use `not_modified_since` as `effective_lsn`. 
It should not cause some degrade of performance because we store LwLSN
when it was not found in LwLSN hash, so if page is not changed till
prefetch response is arrived, then LwLSN should not be changed.

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-04-01 15:48:02 +00:00
Arpad Müller
1fad1abb24 storcon: timeline deletion improvements and fixes (#11334)
This PR contains a bunch of smaller followups and fixes of the original
PR #11058. most of these implement suggestions from Arseny:

* remove `Queryable, Selectable` from `TimelinePersistence`: they are
not needed.
* no `Arc` around `CancellationToken`: it itself is an arc wrapper
* only schedule deletes instead of scheduling excludes and deletes
* persist and delete deletion ops
* delete rows in timelines table upon tenant and timeline deletion
* set `deleted_at` for timelines we are deleting before we start any
reconciles: this flag will help us later to recognize half-executed
deletions, or when we crashed before we could remove the timeline row
but after we removed the last pending op (handling these situations are
left for later).

Part of #9011
2025-04-01 15:16:33 +00:00
Arpad Müller
016068b966 API spec: add safekeepers field returned by storcon (#11385)
Add an optional `safekeepers` field to `TimelineInfo` which is returned
by the storcon upon timeline creation if the
`--timelines-onto-safekeepers` flag is enabled. It contains the list of
safekeepers chosen.

Other contexts where we return `TimelineInfo` do not contain the
`safekeepers` field, sadly I couldn't make this more type safe like done
in Rust via `TimelineCreateResponseStorcon`, as there is no way of
flattening or inheritance (and I don't that duplicating the entire type
for some minor type safety improvements is worth it).

The storcon side has been done in #11058.

Part of https://github.com/neondatabase/cloud/issues/16176
cc https://github.com/neondatabase/cloud/issues/16796
2025-04-01 12:39:10 +00:00
Erik Grinaker
80596feeaa pageserver: invert CompactFlags::NoYield as YieldForL0 (#11382)
## Problem

`CompactFlags::NoYield` was a bit inconvenient, since every caller
except for the background compaction loop should generally set it (e.g.
HTTP API calls, tests, etc). It was also inconsistent with
`CompactionOutcome::YieldForL0`.

## Summary of changes

Invert `CompactFlags::NoYield` as `CompactFlags::YieldForL0`. There
should be no behavioral changes.
2025-04-01 11:43:58 +00:00
Erik Grinaker
225cabd84d pageserver: update upload queue TODOs (#11377)
Update some upload queue TODOs, particularly to track
https://github.com/neondatabase/neon/issues/10283, which I won't get
around to.
2025-04-01 11:38:12 +00:00
Alexey Kondratov
557127550c feat(compute): Add compute_ctl_up metric (#11376)
## Problem

For computes running inside NeonVM, the actual compute image tag is
buried inside the NeonVM spec, and we cannot get it as part of standard
k8s container metrics (it's always an image and a tag of the NeonVM
runner container). The workaround we currently use is to extract the
running computes info from the control plane database with SQL. It has
several drawbacks: i) it's complicated, separate DB per region; ii) it's
slow; iii) it's still an indirect source of info, i.e. k8s state could
be different from what the control plane expects.

## Summary of changes

Add a new `compute_ctl_up` gauge metric with `build_tag` and `status`
labels. It will help us to both overview what are the tags/versions of
all running computes; and to break them down by current status (`empty`,
`running`, `failed`, etc.)

Later, we could introduce low cardinality (no endpoint or compute ids)
streaming aggregates for such metrics, so they will be blazingly fast
and usable for monitoring the fleet-wide state.
2025-04-01 08:51:17 +00:00
Konstantin Knizhnik
cfe3e6d4e1 Remove loop from pageserver_try_receive (#11387)
## Problem

Commit
3da70abfa5
cause noticeable performance regression (40% in update-with-prefetch in
test_bulk_update):
https://neondb.slack.com/archives/C04BLQ4LW7K/p1742633167580879

## Summary of changes

Remove loop from pageserver_try_receive to make it fetch not more than
one response. There is still loop in `pump_prefetch_state` which can
fetch as many responses as available.

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-03-31 19:49:32 +00:00
Alex Chi Z.
47d47000df fix(pageserver): passthrough lsn lease in storcon API (#11386)
## Problem

part of https://github.com/neondatabase/cloud/issues/23667

## Summary of changes

lsn_lease API can only be used on pageservers. This patch enables
storcon passthrough.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-03-31 19:16:42 +00:00
Matthias van de Meent
e5b95bc9dc Neon LFC/prefetch: Improve page read handling (#11380)
Previously we had different meanings for the bitmask of vector IOps.
That has now been unified to "bit set = final result, no more
scribbling".

Furthermore, the LFC read path scribbled on pages that were already
read; that's probably not a good thing so that's been fixed too. In
passing, the read path of LFC has been updated to read only the
requested pages into the provided buffers, thus reducing the IO size of
vectorized IOs.

## Problem

## Summary of changes
2025-03-31 17:04:00 +00:00
Alex Chi Z.
0ee5bfa2fc fix(pageserver): allow sibling archived branch for detaching (#11383)
## Problem

close https://github.com/neondatabase/neon/issues/11379

## Summary of changes

Remove checks around archived branches for detach v2. I also updated the
comments `ancestor_retain_lsn`.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-03-31 16:32:55 +00:00
Fedor Dikarev
00bcafe82e chore(ci): upgrade stats action with docker images from ghcr.io (#11378)
## Problem
Current version of GitHub Workflow Stats action pull docker images from
DockerHub, that could be an issue with the new pull limits on DockerHub
side.

## Summary of changes
Switch to version `v0.2.2`, with docker images hosted on `ghcr.io`
2025-03-31 14:21:07 +00:00
Conrad Ludgate
ed117af73e chore(proxy/tokio-postgres): remove phf from sqlstate and switch to tracing (#11249)
In sqlstate, we have a manual `phf` construction, which is not
explicitly guaranteed to be stable - you're intended to use a build.rs
or the macro to make sure it's constructed correctly each time. This was
inherited from tokio-postgres upstream, which has the same issue
(https://github.com/rust-phf/rust-phf/pull/321#issuecomment-2724521193).

We don't need this encoding of sqlstate, so I've switched it to simply
parse 5 bytes
(https://www.postgresql.org/docs/current/errcodes-appendix.html).

While here, I switched out log for tracing.
2025-03-31 12:35:51 +00:00
Konstantin Knizhnik
21a891a06d Fix IS_LOCAL_REL macro (first class has oid=FirstNormalObjectId) (#11369)
## Problem

Macro IS_LOCAL_REL used for DEBUG_COMPARE_LOCAL mode use greater-than
rather than greater-or-equal comparison while first table really is
assigned FirstNormalObjectId.

## Summary of changes

Replace strict greater with greater-or-equal comparison.

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-03-31 11:16:35 +00:00
Alexander Bayandin
30a7dd630c ruff: enable TC — flake8-type-checking (#11368)
## Problem

`TYPE_CHECKING` is used inconsistently across Python tests.

## Summary of changes
- Update `ruff`: 0.7.0 -> 0.11.2
- Enable TC (flake8-type-checking):
https://docs.astral.sh/ruff/rules/#flake8-type-checking-tc
- (auto)fix all new issues
2025-03-30 18:58:33 +00:00
Erik Grinaker
db5384e1b0 pageserver: remove L0 flush upload wait (#11196)
## Problem

Previously, L0 flushes would wait for uploads, as a simple form of
backpressure. However, this prevented flush pipelining and upload
parallelism. It has since been disabled by default and replaced by L0
compaction backpressure.

Touches https://github.com/neondatabase/cloud/issues/24664.

## Summary of changes

This patch removes L0 flush upload waits, along with the
`l0_flush_wait_upload`. This can't be merged until the setting has been
removed across the fleet.
2025-03-30 13:14:04 +00:00
JC Grünhage
5cb6a4bc8b fix(ci): use the right sha in release PRs (#11365)
## Problem

`github.sha` contains a merge commit of `head` and `base` if we're in a
PR. In release PRs, this makes no sense, because we fast-forward the
`base` branch to contain the changes from `head`.

Even though we correctly use `${{ github.event.pull_request.head.sha ||
github.sha }}` to reference the git commit when building artifacts, we
don't use that when checking out code, because we want to test the merge
of head and base usually. In the case of release PRs, we definitely
always want to test on the head sha though, because we're going to
forward that, and it already has the base sha as a parent, so the merge
would end up with the same tree anyway.

As a side effect, not checking out `${{
github.event.pull_request.head.sha || github.sha }}` also caused
https://github.com/neondatabase/neon/actions/runs/13986389780/job/39173256184#step:6:49
to say `release-tag=release-compute-8187`, while
https://github.com/neondatabase/neon/actions/runs/14084613121/job/39445314780#step:6:48
is talking about `build-tag=release-compute-8186`

## Summary of changes
Run a few things on `github.event.pull_request.head.sha`, if we're in a
release PR.
2025-03-28 11:56:24 +00:00
Folke Behrens
1dbf40ee2c proxy: Update redis crate (#11372) 2025-03-28 11:43:52 +00:00
Fedor Dikarev
939354abea chore(ci): pin python base images to sha (#11367)
Similar to how we pin base `debian` images, also pin `python` base
images, so we better cache them and have reproducible builds.
2025-03-27 17:42:28 +00:00
Fedor Dikarev
1d5d168626 impr(ci): use hetzner buckets for cache (#11364)
## Problem
Occasionally getting data from GH cache could be slow, with less than
10MB/s and taking 5+ minutes to download cache:
```
Received 20971520 of 2987085791 (0.7%), 9.9 MBs/sec
Received 50331648 of 2987085791 (1.7%), 15.9 MBs/sec
...
Received 1065353216 of 2987085791 (35.7%), 4.8 MBs/sec
Received 1065353216 of 2987085791 (35.7%), 4.7 MBs/sec
...
```

https://github.com/neondatabase/neon/actions/runs/13956437454/job/39068664599#step:7:17

Resulting in getting cache even longer that build time.

## Summary of changes
Switch to the caches, that are closer to the runners, and they provided
stable throughput about 70-80MB/s
2025-03-27 11:11:45 +00:00
Folke Behrens
b40dd54732 compute-node: Add some debugging tools to image (#11352)
## Problem

Some useful debugging tools are missing from the compute image and
sometimes it's impossible to install them because memory is tightly
packed.

## Summary of changes

Add the following tools: iproute2, lsof, screen, tcpdump.
The other changes come from sorting the packages alphabetically.

```bash
$ docker image inspect ghcr.io/neondatabase/vm-compute-node-v16:7555 | jaq '.[0].Size'
1389759645
$ docker image inspect ghcr.io/neondatabase/vm-compute-node-v16:14083125313 | jaq '.[0].Size'
1396051101
$ echo $((1396051101 - 1389759645))
6291456
```
2025-03-27 11:09:27 +00:00
Folke Behrens
4bb7087d4d proxy: Fix some clippy warnings coming in next versions (#11359) 2025-03-26 10:50:16 +00:00
Arpad Müller
5f3551e405 Add "still waiting for task" for slow shutdowns (#11351)
To help with narrowing down
https://github.com/neondatabase/cloud/issues/26362, we make the case
more noisy where we are wait for the shutdown of a specific task (in the
case of that issue, the `gc_loop`).
2025-03-24 17:29:44 +00:00
244 changed files with 2802 additions and 3095 deletions

View File

@@ -8,6 +8,7 @@ self-hosted-runner:
- small-arm64
- us-east-2
config-variables:
- AWS_ECR_REGION
- AZURE_DEV_CLIENT_ID
- AZURE_DEV_REGISTRY_NAME
- AZURE_DEV_SUBSCRIPTION_ID
@@ -15,23 +16,25 @@ config-variables:
- AZURE_PROD_REGISTRY_NAME
- AZURE_PROD_SUBSCRIPTION_ID
- AZURE_TENANT_ID
- BENCHMARK_INGEST_TARGET_PROJECTID
- BENCHMARK_LARGE_OLTP_PROJECTID
- BENCHMARK_PROJECT_ID_PUB
- BENCHMARK_PROJECT_ID_SUB
- REMOTE_STORAGE_AZURE_CONTAINER
- REMOTE_STORAGE_AZURE_REGION
- SLACK_UPCOMING_RELEASE_CHANNEL_ID
- DEV_AWS_OIDC_ROLE_ARN
- BENCHMARK_INGEST_TARGET_PROJECTID
- PGREGRESS_PG16_PROJECT_ID
- PGREGRESS_PG17_PROJECT_ID
- SLACK_ON_CALL_QA_STAGING_STREAM
- DEV_AWS_OIDC_ROLE_MANAGE_BENCHMARK_EC2_VMS_ARN
- SLACK_ON_CALL_STORAGE_STAGING_STREAM
- SLACK_CICD_CHANNEL_ID
- SLACK_STORAGE_CHANNEL_ID
- HETZNER_CACHE_BUCKET
- HETZNER_CACHE_ENDPOINT
- HETZNER_CACHE_REGION
- NEON_DEV_AWS_ACCOUNT_ID
- NEON_PROD_AWS_ACCOUNT_ID
- AWS_ECR_REGION
- BENCHMARK_LARGE_OLTP_PROJECTID
- PGREGRESS_PG16_PROJECT_ID
- PGREGRESS_PG17_PROJECT_ID
- REMOTE_STORAGE_AZURE_CONTAINER
- REMOTE_STORAGE_AZURE_REGION
- SLACK_CICD_CHANNEL_ID
- SLACK_ON_CALL_DEVPROD_STREAM
- SLACK_ON_CALL_QA_STAGING_STREAM
- SLACK_ON_CALL_STORAGE_STAGING_STREAM
- SLACK_RUST_CHANNEL_ID
- SLACK_STORAGE_CHANNEL_ID
- SLACK_UPCOMING_RELEASE_CHANNEL_ID

View File

@@ -128,29 +128,49 @@ jobs:
- name: Cache postgres v14 build
id: cache_pg_14
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v14
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
- name: Cache postgres v15 build
id: cache_pg_15
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v15
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
- name: Cache postgres v16 build
id: cache_pg_16
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v16
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
- name: Cache postgres v17 build
id: cache_pg_17
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v17
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v17_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}

View File

@@ -37,8 +37,14 @@ jobs:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
- name: Cache poetry deps
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: ~/.cache/pypoetry/virtualenvs
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-bookworm-${{ hashFiles('poetry.lock') }}

View File

@@ -48,8 +48,13 @@ jobs:
submodules: true
- name: Cache cargo deps
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: |
~/.cargo/registry
!~/.cargo/registry/src

View File

@@ -5,6 +5,9 @@ on:
github-event-name:
type: string
required: true
github-event-json:
type: string
required: true
outputs:
build-tag:
description: "Tag for the current workflow run"
@@ -27,6 +30,9 @@ on:
release-pr-run-id:
description: "Only available if `run-kind in [storage-release, proxy-release, compute-release]`. Contains the run ID of the `Build and Test` workflow, assuming one with the current commit can be found."
value: ${{ jobs.tags.outputs.release-pr-run-id }}
sha:
description: "github.event.pull_request.head.sha on release PRs, github.sha otherwise"
value: ${{ jobs.tags.outputs.sha }}
permissions: {}
@@ -45,6 +51,7 @@ jobs:
storage: ${{ steps.previous-releases.outputs.storage }}
run-kind: ${{ steps.run-kind.outputs.run-kind }}
release-pr-run-id: ${{ steps.release-pr-run-id.outputs.release-pr-run-id }}
sha: ${{ steps.sha.outputs.sha }}
permissions:
contents: read
steps:
@@ -54,10 +61,6 @@ jobs:
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
fetch-depth: 0
- name: Get run kind
id: run-kind
env:
@@ -78,6 +81,23 @@ jobs:
run: |
echo "run-kind=$RUN_KIND" | tee -a $GITHUB_OUTPUT
- name: Get the right SHA
id: sha
env:
SHA: >
${{
contains(fromJSON('["storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), steps.run-kind.outputs.run-kind)
&& fromJSON(inputs.github-event-json).pull_request.head.sha
|| github.sha
}}
run: |
echo "sha=$SHA" | tee -a $GITHUB_OUTPUT
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
fetch-depth: 0
ref: ${{ steps.sha.outputs.sha }}
- name: Get build tag
id: build-tag
env:
@@ -143,7 +163,7 @@ jobs:
if: ${{ contains(fromJSON('["storage-release", "compute-release", "proxy-release"]'), steps.run-kind.outputs.run-kind) }}
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CURRENT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
CURRENT_SHA: ${{ github.sha }}
run: |
RELEASE_PR_RUN_ID=$(gh api "/repos/${GITHUB_REPOSITORY}/actions/runs?head_sha=$CURRENT_SHA" | jq '[.workflow_runs[] | select(.name == "Build and Test") | select(.head_branch | test("^rc/release(-(proxy|compute))?/[0-9]{4}-[0-9]{2}-[0-9]{2}$"; "s"))] | first | .id // ("Failed to find Build and Test run from RC PR!" | halt_error(1))')
echo "release-pr-run-id=$RELEASE_PR_RUN_ID" | tee -a $GITHUB_OUTPUT

View File

@@ -63,8 +63,13 @@ jobs:
- name: Cache postgres ${{ matrix.postgres-version }} build
id: cache_pg
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/${{ matrix.postgres-version }}
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ matrix.postgres-version }}-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
@@ -129,15 +134,25 @@ jobs:
- name: Cache postgres v17 build
id: cache_pg
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v17
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v17-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache walproposer-lib
id: cache_walproposer_lib
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/build/walproposer-lib
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-walproposer_lib-v17-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
@@ -203,32 +218,57 @@ jobs:
- name: Cache postgres v14 build
id: cache_pg
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v14
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v14-${{ steps.pg_rev_v14.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache postgres v15 build
id: cache_pg_v15
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v15
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v15-${{ steps.pg_rev_v15.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache postgres v16 build
id: cache_pg_v16
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v16
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v16-${{ steps.pg_rev_v16.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache postgres v17 build
id: cache_pg_v17
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v17
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v17-${{ steps.pg_rev_v17.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache cargo deps (only for v17)
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: |
~/.cargo/registry
!~/.cargo/registry/src
@@ -238,8 +278,13 @@ jobs:
- name: Cache walproposer-lib
id: cache_walproposer_lib
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/build/walproposer-lib
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-walproposer_lib-v17-${{ steps.pg_rev_v17.outputs.pg_rev }}-${{ hashFiles('Makefile') }}

View File

@@ -80,6 +80,7 @@ jobs:
uses: ./.github/workflows/_meta.yml
with:
github-event-name: ${{ github.event_name }}
github-event-json: ${{ toJSON(github.event) }}
build-build-tools-image:
needs: [ check-permissions ]
@@ -248,8 +249,13 @@ jobs:
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Cache poetry deps
uses: actions/cache@d4323d4df104b026a6aa633fdb11d772146be0bf # v4.2.2
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: ~/.cache/pypoetry/virtualenvs
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-bookworm-${{ hashFiles('poetry.lock') }}
@@ -540,6 +546,7 @@ jobs:
uses: ./.github/workflows/trigger-e2e-tests.yml
with:
github-event-name: ${{ github.event_name }}
github-event-json: ${{ toJSON(github.event) }}
secrets: inherit
neon-image-arch:
@@ -563,6 +570,7 @@ jobs:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
submodules: true
ref: ${{ needs.meta.outputs.sha }}
- uses: neondatabase/dev-actions/set-docker-config-dir@6094485bf440001c94a94a3f9e221e81ff6b6193
- uses: docker/setup-buildx-action@b5ca514318bd6ebac0fb2aedd5d36ec1b5c232a2 # v3.10.0
@@ -672,6 +680,7 @@ jobs:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
submodules: true
ref: ${{ needs.meta.outputs.sha }}
- uses: neondatabase/dev-actions/set-docker-config-dir@6094485bf440001c94a94a3f9e221e81ff6b6193
- uses: docker/setup-buildx-action@b5ca514318bd6ebac0fb2aedd5d36ec1b5c232a2 # v3.10.0

View File

@@ -55,7 +55,7 @@ jobs:
echo tag=${tag} >> ${GITHUB_OUTPUT}
- name: Test extension upgrade
timeout-minutes: 20
timeout-minutes: 60
env:
NEW_COMPUTE_TAG: latest
OLD_COMPUTE_TAG: ${{ steps.get-last-compute-release-tag.outputs.tag }}

View File

@@ -23,7 +23,7 @@ jobs:
egress-policy: audit
- name: Export Workflow Run for the past 2 hours
uses: neondatabase/gh-workflow-stats-action@4c998b25ab5cc6588b52a610b749531f6a566b6b # v0.2.1
uses: neondatabase/gh-workflow-stats-action@701b1f202666d0b82e67b4d387e909af2b920127 # v0.2.2
with:
db_uri: ${{ secrets.GH_REPORT_STATS_DB_RW_CONNSTR }}
db_table: "gh_workflow_stats_neon"
@@ -43,7 +43,7 @@ jobs:
egress-policy: audit
- name: Export Workflow Run for the past 48 hours
uses: neondatabase/gh-workflow-stats-action@4c998b25ab5cc6588b52a610b749531f6a566b6b # v0.2.1
uses: neondatabase/gh-workflow-stats-action@701b1f202666d0b82e67b4d387e909af2b920127 # v0.2.2
with:
db_uri: ${{ secrets.GH_REPORT_STATS_DB_RW_CONNSTR }}
db_table: "gh_workflow_stats_neon"
@@ -63,7 +63,7 @@ jobs:
egress-policy: audit
- name: Export Workflow Run for the past 30 days
uses: neondatabase/gh-workflow-stats-action@4c998b25ab5cc6588b52a610b749531f6a566b6b # v0.2.1
uses: neondatabase/gh-workflow-stats-action@701b1f202666d0b82e67b4d387e909af2b920127 # v0.2.2
with:
db_uri: ${{ secrets.GH_REPORT_STATS_DB_RW_CONNSTR }}
db_table: "gh_workflow_stats_neon"

View File

@@ -9,6 +9,9 @@ on:
github-event-name:
type: string
required: true
github-event-json:
type: string
required: true
defaults:
run:
@@ -48,6 +51,7 @@ jobs:
uses: ./.github/workflows/_meta.yml
with:
github-event-name: ${{ inputs.github-event-name || github.event_name }}
github-event-json: ${{ inputs.github-event-json || toJSON(github.event) }}
trigger-e2e-tests:
needs: [ meta ]

64
Cargo.lock generated
View File

@@ -148,9 +148,9 @@ dependencies = [
[[package]]
name = "arc-swap"
version = "1.6.0"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
[[package]]
name = "archery"
@@ -2248,6 +2248,17 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
[[package]]
name = "flag-bearer"
version = "0.1.0-rc.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8feaa1b7a5ad6e6dd7791d42d36c1a25004f1c25eae9ab7b904c864109d8260"
dependencies = [
"parking_lot 0.12.1",
"pin-list",
"pin-project-lite",
]
[[package]]
name = "flagset"
version = "0.4.6"
@@ -3861,11 +3872,10 @@ dependencies = [
[[package]]
name = "num-bigint"
version = "0.4.3"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f"
checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9"
dependencies = [
"autocfg",
"num-integer",
"num-traits",
]
@@ -3914,11 +3924,10 @@ dependencies = [
[[package]]
name = "num-integer"
version = "0.1.45"
version = "0.1.46"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9"
checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f"
dependencies = [
"autocfg",
"num-traits",
]
@@ -3947,9 +3956,9 @@ dependencies = [
[[package]]
name = "num-traits"
version = "0.2.15"
version = "0.2.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd"
checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841"
dependencies = [
"autocfg",
"libm",
@@ -4556,6 +4565,16 @@ dependencies = [
"siphasher",
]
[[package]]
name = "pin-list"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a3c0987a7464afc0d593f13429732ef87e9a6c8e7909a1a22faeff7e1d2159d"
dependencies = [
"pin-project-lite",
"pinned-aliasable",
]
[[package]]
name = "pin-project"
version = "1.1.9"
@@ -4588,6 +4607,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pinned-aliasable"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d0f9ae89bf0ed03b69ac1f3f7ea2e6e09b4fa5448011df2e67d581c2b850b7b"
[[package]]
name = "pkcs1"
version = "0.7.5"
@@ -5081,6 +5106,7 @@ dependencies = [
"ed25519-dalek",
"env_logger",
"fallible-iterator",
"flag-bearer",
"flate2",
"framed-websockets",
"futures",
@@ -5362,26 +5388,25 @@ dependencies = [
[[package]]
name = "redis"
version = "0.25.2"
version = "0.29.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71d64e978fd98a0e6b105d066ba4889a7301fca65aeac850a877d8797343feeb"
checksum = "b110459d6e323b7cda23980c46c77157601199c9da6241552b284cd565a7a133"
dependencies = [
"async-trait",
"arc-swap",
"bytes",
"combine",
"futures-util",
"itoa",
"num-bigint",
"percent-encoding",
"pin-project-lite",
"rustls 0.22.4",
"rustls-native-certs 0.7.0",
"rustls-pemfile 2.1.1",
"rustls-pki-types",
"rustls 0.23.18",
"rustls-native-certs 0.8.0",
"ryu",
"sha1_smol",
"socket2",
"tokio",
"tokio-rustls 0.25.0",
"tokio-rustls 0.26.0",
"tokio-util",
"url",
]
@@ -7217,15 +7242,14 @@ dependencies = [
"bytes",
"fallible-iterator",
"futures-util",
"log",
"parking_lot 0.12.1",
"phf",
"pin-project-lite",
"postgres-protocol2",
"postgres-types2",
"serde",
"tokio",
"tokio-util",
"tracing",
]
[[package]]

View File

@@ -50,7 +50,7 @@ license = "Apache-2.0"
[workspace.dependencies]
ahash = "0.8"
anyhow = { version = "1.0", features = ["backtrace"] }
arc-swap = "1.6"
arc-swap = "1.7"
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
atomic-take = "1.1.0"
flate2 = "1.0.26"
@@ -130,7 +130,7 @@ nix = { version = "0.27", features = ["dir", "fs", "process", "socket", "signal"
# on compute startup metrics (start_postgres_ms), >= 25% degradation.
notify = "6.0.0"
num_cpus = "1.15"
num-traits = "0.2.15"
num-traits = "0.2.19"
once_cell = "1.13"
opentelemetry = "0.27"
opentelemetry_sdk = "0.27"
@@ -146,7 +146,7 @@ procfs = "0.16"
prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency
prost = "0.13"
rand = "0.8"
redis = { version = "0.25.2", features = ["tokio-rustls-comp", "keep-alive"] }
redis = { version = "0.29.2", features = ["tokio-rustls-comp", "keep-alive"] }
regex = "1.10.2"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] }
reqwest-tracing = { version = "0.5", features = ["opentelemetry_0_27"] }

View File

@@ -1916,26 +1916,30 @@ RUN apt update && \
;; \
esac && \
apt install --no-install-recommends -y \
ca-certificates \
gdb \
liblz4-1 \
libreadline8 \
iproute2 \
libboost-iostreams1.74.0 \
libboost-regex1.74.0 \
libboost-serialization1.74.0 \
libboost-system1.74.0 \
libossp-uuid16 \
libcurl4 \
libevent-2.1-7 \
libgeos-c1v5 \
liblz4-1 \
libossp-uuid16 \
libprotobuf-c1 \
libreadline8 \
libsfcgal1 \
libxml2 \
libxslt1.1 \
libzstd1 \
libcurl4 \
libevent-2.1-7 \
locales \
lsof \
procps \
ca-certificates \
rsyslog \
screen \
tcpdump \
$VERSION_INSTALLS && \
apt clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8

View File

@@ -45,7 +45,9 @@ use anyhow::{Context, Result};
use clap::Parser;
use compute_api::responses::ComputeCtlConfig;
use compute_api::spec::ComputeSpec;
use compute_tools::compute::{ComputeNode, ComputeNodeParams, forward_termination_signal};
use compute_tools::compute::{
BUILD_TAG, ComputeNode, ComputeNodeParams, forward_termination_signal,
};
use compute_tools::extension_server::get_pg_version_string;
use compute_tools::logger::*;
use compute_tools::params::*;
@@ -57,10 +59,6 @@ use tracing::{error, info};
use url::Url;
use utils::failpoint_support;
// this is an arbitrary build tag. Fine as a default / for testing purposes
// in-case of not-set environment var
const BUILD_TAG_DEFAULT: &str = "latest";
// Compatibility hack: if the control plane specified any remote-ext-config
// use the default value for extension storage proxy gateway.
// Remove this once the control plane is updated to pass the gateway URL
@@ -147,7 +145,7 @@ fn main() -> Result<()> {
.build()?;
let _rt_guard = runtime.enter();
let build_tag = runtime.block_on(init())?;
runtime.block_on(init())?;
// enable core dumping for all child processes
setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
@@ -174,8 +172,6 @@ fn main() -> Result<()> {
cgroup: cli.cgroup,
#[cfg(target_os = "linux")]
vm_monitor_addr: cli.vm_monitor_addr,
build_tag,
live_config_allowed: cli_spec.live_config_allowed,
},
cli_spec.spec,
@@ -189,7 +185,7 @@ fn main() -> Result<()> {
deinit_and_exit(exit_code);
}
async fn init() -> Result<String> {
async fn init() -> Result<()> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL).await?;
let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
@@ -199,12 +195,9 @@ async fn init() -> Result<String> {
}
});
let build_tag = option_env!("BUILD_TAG")
.unwrap_or(BUILD_TAG_DEFAULT)
.to_string();
info!("build_tag: {build_tag}");
info!("compute build_tag: {}", &BUILD_TAG.to_string());
Ok(build_tag)
Ok(())
}
fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {

View File

@@ -20,6 +20,7 @@ use futures::future::join_all;
use futures::stream::FuturesUnordered;
use nix::sys::signal::{Signal, kill};
use nix::unistd::Pid;
use once_cell::sync::Lazy;
use postgres;
use postgres::NoTls;
use postgres::error::SqlState;
@@ -35,6 +36,7 @@ use crate::disk_quota::set_disk_quota;
use crate::installed_extensions::get_installed_extensions;
use crate::logger::startup_context_from_env;
use crate::lsn_lease::launch_lsn_lease_bg_task_for_static;
use crate::metrics::COMPUTE_CTL_UP;
use crate::monitor::launch_monitor;
use crate::pg_helpers::*;
use crate::rsyslog::{
@@ -49,6 +51,17 @@ use crate::{config, extension_server, local_proxy};
pub static SYNC_SAFEKEEPERS_PID: AtomicU32 = AtomicU32::new(0);
pub static PG_PID: AtomicU32 = AtomicU32::new(0);
// This is an arbitrary build tag. Fine as a default / for testing purposes
// in-case of not-set environment var
const BUILD_TAG_DEFAULT: &str = "latest";
/// Build tag/version of the compute node binaries/image. It's tricky and ugly
/// to pass it everywhere as a part of `ComputeNodeParams`, so we use a
/// global static variable.
pub static BUILD_TAG: Lazy<String> = Lazy::new(|| {
option_env!("BUILD_TAG")
.unwrap_or(BUILD_TAG_DEFAULT)
.to_string()
});
/// Static configuration params that don't change after startup. These mostly
/// come from the CLI args, or are derived from them.
@@ -72,7 +85,6 @@ pub struct ComputeNodeParams {
pub pgdata: String,
pub pgbin: String,
pub pgversion: String,
pub build_tag: String,
/// The port that the compute's external HTTP server listens on
pub external_http_port: u16,
@@ -173,6 +185,11 @@ impl ComputeState {
info!("Changing compute status from {} to {}", prev, status);
self.status = status;
state_changed.notify_all();
COMPUTE_CTL_UP.reset();
COMPUTE_CTL_UP
.with_label_values(&[&BUILD_TAG, format!("{}", status).as_str()])
.set(1);
}
pub fn set_failed_status(&mut self, err: anyhow::Error, state_changed: &Condvar) {
@@ -352,13 +369,19 @@ impl ComputeNode {
}
.launch(&this);
// The internal HTTP server could be launched later, but there isn't much
// sense in waiting.
// The internal HTTP server is needed for a further activation by control plane
// if compute was started for a pool, so we have to start server before hanging
// waiting for a spec.
crate::http::server::Server::Internal {
port: this.params.internal_http_port,
}
.launch(&this);
// HTTP server is running, so we can officially declare compute_ctl as 'up'
COMPUTE_CTL_UP
.with_label_values(&[&BUILD_TAG, ComputeStatus::Empty.to_string().as_str()])
.set(1);
// If we got a spec from the CLI already, use that. Otherwise wait for the
// control plane to pass it to us with a /configure HTTP request
let pspec = if let Some(cli_spec) = cli_spec {
@@ -2032,12 +2055,8 @@ LIMIT 100",
let mut download_tasks = Vec::new();
for library in &libs_vec {
let (ext_name, ext_path) = remote_extensions.get_ext(
library,
true,
&self.params.build_tag,
&self.params.pgversion,
)?;
let (ext_name, ext_path) =
remote_extensions.get_ext(library, true, &BUILD_TAG, &self.params.pgversion)?;
download_tasks.push(self.download_extension(ext_name, ext_path));
}
let results = join_all(download_tasks).await;

View File

@@ -5,7 +5,7 @@ use axum::response::{IntoResponse, Response};
use http::StatusCode;
use serde::Deserialize;
use crate::compute::ComputeNode;
use crate::compute::{BUILD_TAG, ComputeNode};
use crate::http::JsonResponse;
use crate::http::extract::{Path, Query};
@@ -47,7 +47,7 @@ pub(in crate::http) async fn download_extension(
remote_extensions.get_ext(
&filename,
ext_server_params.is_library,
&compute.params.build_tag,
&BUILD_TAG,
&compute.params.pgversion,
)
};

View File

@@ -1,7 +1,8 @@
use metrics::core::{AtomicF64, Collector, GenericGauge};
use metrics::proto::MetricFamily;
use metrics::{
IntCounterVec, UIntGaugeVec, register_gauge, register_int_counter_vec, register_uint_gauge_vec,
IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter_vec,
register_int_gauge_vec, register_uint_gauge_vec,
};
use once_cell::sync::Lazy;
@@ -70,8 +71,19 @@ pub(crate) static AUDIT_LOG_DIR_SIZE: Lazy<GenericGauge<AtomicF64>> = Lazy::new(
.expect("failed to define a metric")
});
// Report that `compute_ctl` is up and what's the current compute status.
pub(crate) static COMPUTE_CTL_UP: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"compute_ctl_up",
"Whether compute_ctl is running",
&["build_tag", "status"]
)
.expect("failed to define a metric")
});
pub fn collect() -> Vec<MetricFamily> {
let mut metrics = INSTALLED_EXTENSIONS.collect();
let mut metrics = COMPUTE_CTL_UP.collect();
metrics.extend(INSTALLED_EXTENSIONS.collect());
metrics.extend(CPLANE_REQUESTS_TOTAL.collect());
metrics.extend(REMOTE_EXT_REQUESTS_TOTAL.collect());
metrics.extend(DB_MIGRATION_FAILED.collect());

View File

@@ -428,11 +428,6 @@ impl PageServerNode {
.map(|x| x.parse::<usize>())
.transpose()
.context("Failed to parse 'l0_flush_delay_threshold' as an integer")?,
l0_flush_wait_upload: settings
.remove("l0_flush_wait_upload")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'l0_flush_wait_upload' as a boolean")?,
l0_flush_stall_threshold: settings
.remove("l0_flush_stall_threshold")
.map(|x| x.parse::<usize>())

View File

@@ -285,12 +285,6 @@ pub struct TenantConfigToml {
/// Level0 delta layer threshold at which to stall layer flushes. Must be >compaction_threshold
/// to avoid deadlock. 0 to disable. Disabled by default.
pub l0_flush_stall_threshold: Option<usize>,
/// If true, Level0 delta layer flushes will wait for S3 upload before flushing the next
/// layer. This is a temporary backpressure mechanism which should be removed once
/// l0_flush_{delay,stall}_threshold is fully enabled.
///
/// TODO: this is no longer enabled, remove it when the config option is no longer set.
pub l0_flush_wait_upload: bool,
// Determines how much history is retained, to allow
// branching and read replicas at an older point in time.
// The unit is #of bytes of WAL.
@@ -579,8 +573,6 @@ pub mod tenant_conf_defaults {
pub const DEFAULT_COMPACTION_ALGORITHM: crate::models::CompactionAlgorithm =
crate::models::CompactionAlgorithm::Legacy;
pub const DEFAULT_L0_FLUSH_WAIT_UPLOAD: bool = false;
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
// Large DEFAULT_GC_PERIOD is fine as long as PITR_INTERVAL is larger.
@@ -627,7 +619,6 @@ impl Default for TenantConfigToml {
compaction_l0_semaphore: DEFAULT_COMPACTION_L0_SEMAPHORE,
l0_flush_delay_threshold: None,
l0_flush_stall_threshold: None,
l0_flush_wait_upload: DEFAULT_L0_FLUSH_WAIT_UPLOAD,
gc_horizon: DEFAULT_GC_HORIZON,
gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD)
.expect("cannot parse default gc period"),

View File

@@ -523,8 +523,6 @@ pub struct TenantConfigPatch {
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub l0_flush_stall_threshold: FieldPatch<usize>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub l0_flush_wait_upload: FieldPatch<bool>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub gc_horizon: FieldPatch<u64>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub gc_period: FieldPatch<String>,
@@ -614,9 +612,6 @@ pub struct TenantConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub l0_flush_stall_threshold: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
pub l0_flush_wait_upload: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub gc_horizon: Option<u64>,
@@ -712,7 +707,6 @@ impl TenantConfig {
mut compaction_l0_semaphore,
mut l0_flush_delay_threshold,
mut l0_flush_stall_threshold,
mut l0_flush_wait_upload,
mut gc_horizon,
mut gc_period,
mut image_creation_threshold,
@@ -765,7 +759,6 @@ impl TenantConfig {
patch
.l0_flush_stall_threshold
.apply(&mut l0_flush_stall_threshold);
patch.l0_flush_wait_upload.apply(&mut l0_flush_wait_upload);
patch.gc_horizon.apply(&mut gc_horizon);
patch
.gc_period
@@ -844,7 +837,6 @@ impl TenantConfig {
compaction_l0_semaphore,
l0_flush_delay_threshold,
l0_flush_stall_threshold,
l0_flush_wait_upload,
gc_horizon,
gc_period,
image_creation_threshold,
@@ -911,9 +903,6 @@ impl TenantConfig {
l0_flush_stall_threshold: self
.l0_flush_stall_threshold
.or(global_conf.l0_flush_stall_threshold),
l0_flush_wait_upload: self
.l0_flush_wait_upload
.unwrap_or(global_conf.l0_flush_wait_upload),
gc_horizon: self.gc_horizon.unwrap_or(global_conf.gc_horizon),
gc_period: self.gc_period.unwrap_or(global_conf.gc_period),
image_creation_threshold: self

View File

@@ -8,10 +8,9 @@ license = "MIT/Apache-2.0"
bytes.workspace = true
fallible-iterator.workspace = true
futures-util = { workspace = true, features = ["sink"] }
log = "0.4"
tracing.workspace = true
parking_lot.workspace = true
pin-project-lite.workspace = true
phf = "0.11"
postgres-protocol2 = { path = "../postgres-protocol2" }
postgres-types2 = { path = "../postgres-types2" }
tokio = { workspace = true, features = ["io-util", "time", "net"] }

View File

@@ -6,13 +6,13 @@ use std::task::{Context, Poll};
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
use futures_util::{Sink, Stream, ready};
use log::{info, trace};
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::mpsc;
use tokio_util::codec::Framed;
use tokio_util::sync::PollSender;
use tracing::{info, trace};
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
use crate::error::DbError;

File diff suppressed because it is too large Load Diff

View File

@@ -5,9 +5,9 @@ use std::sync::Arc;
use bytes::Bytes;
use fallible_iterator::FallibleIterator;
use futures_util::{TryStreamExt, pin_mut};
use log::debug;
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use tracing::debug;
use crate::client::{CachedTypeInfo, InnerClient};
use crate::codec::FrontendMessage;

View File

@@ -7,11 +7,11 @@ use std::task::{Context, Poll};
use bytes::{BufMut, Bytes, BytesMut};
use fallible_iterator::FallibleIterator;
use futures_util::{Stream, ready};
use log::{Level, debug, log_enabled};
use pin_project_lite::pin_project;
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use postgres_types2::{Format, ToSql, Type};
use tracing::debug;
use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;
@@ -36,7 +36,7 @@ where
I: IntoIterator<Item = &'a (dyn ToSql + Sync)>,
I::IntoIter: ExactSizeIterator,
{
let buf = if log_enabled!(Level::Debug) {
let buf = if tracing::enabled!(tracing::Level::DEBUG) {
let params = params.into_iter().collect::<Vec<_>>();
debug!(
"executing statement {} with parameters: {:?}",

View File

@@ -6,10 +6,10 @@ use std::task::{Context, Poll};
use bytes::Bytes;
use fallible_iterator::FallibleIterator;
use futures_util::{Stream, ready};
use log::debug;
use pin_project_lite::pin_project;
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use tracing::debug;
use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;

View File

@@ -1133,6 +1133,40 @@ components:
applied_gc_cutoff_lsn:
type: string
format: hex
safekeepers:
$ref: "#/components/schemas/TimelineSafekeepersInfo"
TimelineSafekeepersInfo:
type: object
required:
- tenant_id
- timeline_id
- generation
- safekeepers
properties:
tenant_id:
type: string
format: hex
timeline_id:
type: string
format: hex
generation:
type: integer
safekeepers:
type: array
items:
$ref: "#/components/schemas/TimelineSafekeeperInfo"
TimelineSafekeeperInfo:
type: object
required:
- id
- hostname
properties:
id:
type: integer
hostname:
type: string
SyntheticSizeResponse:
type: object

View File

@@ -2256,7 +2256,6 @@ async fn timeline_compact_handler(
let state = get_state(&request);
let mut flags = EnumSet::empty();
flags |= CompactFlags::NoYield; // run compaction to completion
if Some(true) == parse_query_param::<_, bool>(&request, "force_l0_compaction")? {
flags |= CompactFlags::ForceL0Compaction;
@@ -2417,7 +2416,6 @@ async fn timeline_checkpoint_handler(
let state = get_state(&request);
let mut flags = EnumSet::empty();
flags |= CompactFlags::NoYield; // run compaction to completion
if Some(true) == parse_query_param::<_, bool>(&request, "force_l0_compaction")? {
flags |= CompactFlags::ForceL0Compaction;
}
@@ -3776,7 +3774,7 @@ pub fn make_router(
)
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/mark_invisible",
|r| testing_api_handler("mark timeline invisible", r, timeline_mark_invisible_handler),
|r| api_handler( r, timeline_mark_invisible_handler),
)
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/checkpoint",

View File

@@ -10,7 +10,7 @@ use std::time::{Duration, Instant};
use enum_map::{Enum as _, EnumMap};
use futures::Future;
use metrics::{
Counter, CounterVec, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterPair,
Counter, CounterVec, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterPair,
IntCounterPairVec, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec,
register_counter_vec, register_gauge_vec, register_histogram, register_histogram_vec,
register_int_counter, register_int_counter_pair_vec, register_int_counter_vec,
@@ -499,15 +499,6 @@ pub(crate) static WAIT_LSN_IN_PROGRESS_GLOBAL_MICROS: Lazy<IntCounter> = Lazy::n
.expect("failed to define a metric")
});
static FLUSH_WAIT_UPLOAD_TIME: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
"pageserver_flush_wait_upload_seconds",
"Time spent waiting for preceding uploads during layer flush",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});
static LAST_RECORD_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_last_record_lsn",
@@ -2864,7 +2855,6 @@ pub(crate) struct TimelineMetrics {
timeline_id: String,
pub flush_time_histo: StorageTimeMetrics,
pub flush_delay_histo: StorageTimeMetrics,
pub flush_wait_upload_time_gauge: Gauge,
pub compact_time_histo: StorageTimeMetrics,
pub create_images_time_histo: StorageTimeMetrics,
pub logical_size_histo: StorageTimeMetrics,
@@ -2916,9 +2906,6 @@ impl TimelineMetrics {
&shard_id,
&timeline_id,
);
let flush_wait_upload_time_gauge = FLUSH_WAIT_UPLOAD_TIME
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let compact_time_histo = StorageTimeMetrics::new(
StorageTimeOperation::Compact,
&tenant_id,
@@ -3046,7 +3033,6 @@ impl TimelineMetrics {
timeline_id,
flush_time_histo,
flush_delay_histo,
flush_wait_upload_time_gauge,
compact_time_histo,
create_images_time_histo,
logical_size_histo,
@@ -3096,14 +3082,6 @@ impl TimelineMetrics {
self.resident_physical_size_gauge.get()
}
pub(crate) fn flush_wait_upload_time_gauge_add(&self, duration: f64) {
self.flush_wait_upload_time_gauge.add(duration);
crate::metrics::FLUSH_WAIT_UPLOAD_TIME
.get_metric_with_label_values(&[&self.tenant_id, &self.shard_id, &self.timeline_id])
.unwrap()
.add(duration);
}
/// Generates TIMELINE_LAYER labels for a persistent layer.
fn make_layer_labels(&self, layer_desc: &PersistentLayerDesc) -> [&str; 5] {
let level = match LayerMap::is_l0(&layer_desc.key_range, layer_desc.is_delta()) {
@@ -3207,7 +3185,6 @@ impl TimelineMetrics {
let shard_id = &self.shard_id;
let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = DISK_CONSISTENT_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = FLUSH_WAIT_UPLOAD_TIME.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = STANDBY_HORIZON.remove_label_values(&[tenant_id, shard_id, timeline_id]);
{
RESIDENT_PHYSICAL_SIZE_GLOBAL.sub(self.resident_physical_size_get());

View File

@@ -38,6 +38,7 @@ use std::panic::AssertUnwindSafe;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use futures::FutureExt;
use once_cell::sync::Lazy;
@@ -584,18 +585,25 @@ pub async fn shutdown_tasks(
// warn to catch these in tests; there shouldn't be any
warn!(name = task.name, tenant_shard_id = ?tenant_shard_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
}
if tokio::time::timeout(std::time::Duration::from_secs(1), &mut join_handle)
const INITIAL_COMPLAIN_TIMEOUT: Duration = Duration::from_secs(1);
const PERIODIC_COMPLAIN_TIMEOUT: Duration = Duration::from_secs(60);
if tokio::time::timeout(INITIAL_COMPLAIN_TIMEOUT, &mut join_handle)
.await
.is_err()
{
// allow some time to elapse before logging to cut down the number of log
// lines.
info!("waiting for task {} to shut down", task.name);
// we never handled this return value, but:
// - we don't deschedule which would lead to is_cancelled
// - panics are already logged (is_panicked)
// - task errors are already logged in the wrapper
let _ = join_handle.await;
loop {
tokio::select! {
// we never handled this return value, but:
// - we don't deschedule which would lead to is_cancelled
// - panics are already logged (is_panicked)
// - task errors are already logged in the wrapper
_ = &mut join_handle => break,
_ = tokio::time::sleep(PERIODIC_COMPLAIN_TIMEOUT) => info!("still waiting for task {} to shut down", task.name),
}
}
info!("task {} completed", task.name);
}
} else {

View File

@@ -3080,6 +3080,7 @@ impl Tenant {
let mut has_pending_l0 = false;
for timeline in compact_l0 {
let ctx = &ctx.with_scope_timeline(&timeline);
// NB: don't set CompactFlags::YieldForL0, since this is an L0-only compaction pass.
let outcome = timeline
.compact(cancel, CompactFlags::OnlyL0Compaction.into(), ctx)
.instrument(info_span!("compact_timeline", timeline_id = %timeline.timeline_id))
@@ -3097,14 +3098,9 @@ impl Tenant {
}
}
// Pass 2: image compaction and timeline offloading. If any timelines have accumulated
// more L0 layers, they may also be compacted here.
//
// NB: image compaction may yield if there is pending L0 compaction.
//
// TODO: it will only yield if there is pending L0 compaction on the same timeline. If a
// different timeline needs compaction, it won't. It should check `l0_compaction_trigger`.
// We leave this for a later PR.
// Pass 2: image compaction and timeline offloading. If any timelines have accumulated more
// L0 layers, they may also be compacted here. Image compaction will yield if there is
// pending L0 compaction on any tenant timeline.
//
// TODO: consider ordering timelines by some priority, e.g. time since last full compaction,
// amount of L1 delta debt or garbage, offload-eligible timelines first, etc.
@@ -3115,8 +3111,14 @@ impl Tenant {
}
let ctx = &ctx.with_scope_timeline(&timeline);
// Yield for L0 if the separate L0 pass is enabled (otherwise there's no point).
let mut flags = EnumSet::default();
if self.get_compaction_l0_first() {
flags |= CompactFlags::YieldForL0;
}
let mut outcome = timeline
.compact(cancel, EnumSet::default(), ctx)
.compact(cancel, flags, ctx)
.instrument(info_span!("compact_timeline", timeline_id = %timeline.timeline_id))
.await
.inspect_err(|err| self.maybe_trip_compaction_breaker(err))?;
@@ -6516,11 +6518,7 @@ mod tests {
tline.freeze_and_flush().await?;
tline
.compact(
&CancellationToken::new(),
CompactFlags::NoYield.into(),
&ctx,
)
.compact(&CancellationToken::new(), EnumSet::default(), &ctx)
.await?;
let mut writer = tline.writer().await;
@@ -6537,11 +6535,7 @@ mod tests {
tline.freeze_and_flush().await?;
tline
.compact(
&CancellationToken::new(),
CompactFlags::NoYield.into(),
&ctx,
)
.compact(&CancellationToken::new(), EnumSet::default(), &ctx)
.await?;
let mut writer = tline.writer().await;
@@ -6558,11 +6552,7 @@ mod tests {
tline.freeze_and_flush().await?;
tline
.compact(
&CancellationToken::new(),
CompactFlags::NoYield.into(),
&ctx,
)
.compact(&CancellationToken::new(), EnumSet::default(), &ctx)
.await?;
let mut writer = tline.writer().await;
@@ -6579,11 +6569,7 @@ mod tests {
tline.freeze_and_flush().await?;
tline
.compact(
&CancellationToken::new(),
CompactFlags::NoYield.into(),
&ctx,
)
.compact(&CancellationToken::new(), EnumSet::default(), &ctx)
.await?;
assert_eq!(
@@ -6666,9 +6652,7 @@ mod tests {
timeline.freeze_and_flush().await?;
if compact {
// this requires timeline to be &Arc<Timeline>
timeline
.compact(&cancel, CompactFlags::NoYield.into(), ctx)
.await?;
timeline.compact(&cancel, EnumSet::default(), ctx).await?;
}
// this doesn't really need to use the timeline_id target, but it is closer to what it
@@ -6995,7 +6979,6 @@ mod tests {
child_timeline.freeze_and_flush().await?;
let mut flags = EnumSet::new();
flags.insert(CompactFlags::ForceRepartition);
flags.insert(CompactFlags::NoYield);
child_timeline
.compact(&CancellationToken::new(), flags, &ctx)
.await?;
@@ -7374,9 +7357,7 @@ mod tests {
// Perform a cycle of flush, compact, and GC
tline.freeze_and_flush().await?;
tline
.compact(&cancel, CompactFlags::NoYield.into(), &ctx)
.await?;
tline.compact(&cancel, EnumSet::default(), &ctx).await?;
tenant
.gc_iteration(Some(tline.timeline_id), 0, Duration::ZERO, &cancel, &ctx)
.await?;
@@ -7705,7 +7686,6 @@ mod tests {
let mut flags = EnumSet::new();
flags.insert(CompactFlags::ForceImageLayerCreation);
flags.insert(CompactFlags::ForceRepartition);
flags.insert(CompactFlags::NoYield);
flags
} else {
EnumSet::empty()
@@ -7756,9 +7736,7 @@ mod tests {
let before_num_l0_delta_files =
tline.layers.read().await.layer_map()?.level0_deltas().len();
tline
.compact(&cancel, CompactFlags::NoYield.into(), &ctx)
.await?;
tline.compact(&cancel, EnumSet::default(), &ctx).await?;
let after_num_l0_delta_files = tline.layers.read().await.layer_map()?.level0_deltas().len();
@@ -7923,7 +7901,6 @@ mod tests {
let mut flags = EnumSet::new();
flags.insert(CompactFlags::ForceImageLayerCreation);
flags.insert(CompactFlags::ForceRepartition);
flags.insert(CompactFlags::NoYield);
flags
},
&ctx,
@@ -8386,7 +8363,6 @@ mod tests {
let mut flags = EnumSet::new();
flags.insert(CompactFlags::ForceImageLayerCreation);
flags.insert(CompactFlags::ForceRepartition);
flags.insert(CompactFlags::NoYield);
flags
},
&ctx,
@@ -8454,7 +8430,6 @@ mod tests {
let mut flags = EnumSet::new();
flags.insert(CompactFlags::ForceImageLayerCreation);
flags.insert(CompactFlags::ForceRepartition);
flags.insert(CompactFlags::NoYield);
flags
},
&ctx,
@@ -11551,4 +11526,255 @@ mod tests {
Ok(())
}
#[cfg(feature = "testing")]
#[tokio::test]
async fn test_synthetic_size_calculation_with_invisible_branches() -> anyhow::Result<()> {
use pageserver_api::models::TimelineVisibilityState;
use crate::tenant::size::gather_inputs;
let tenant_conf = pageserver_api::models::TenantConfig {
// Ensure that we don't compute gc_cutoffs (which needs reading the layer files)
pitr_interval: Some(Duration::ZERO),
..Default::default()
};
let harness = TenantHarness::create_custom(
"test_synthetic_size_calculation_with_invisible_branches",
tenant_conf,
TenantId::generate(),
ShardIdentity::unsharded(),
Generation::new(0xdeadbeef),
)
.await?;
let (tenant, ctx) = harness.load().await;
let main_tline = tenant
.create_test_timeline_with_layers(
TIMELINE_ID,
Lsn(0x10),
DEFAULT_PG_VERSION,
&ctx,
vec![],
vec![],
vec![],
Lsn(0x100),
)
.await?;
let snapshot1 = TimelineId::from_array(hex!("11223344556677881122334455667790"));
tenant
.branch_timeline_test_with_layers(
&main_tline,
snapshot1,
Some(Lsn(0x20)),
&ctx,
vec![],
vec![],
Lsn(0x50),
)
.await?;
let snapshot2 = TimelineId::from_array(hex!("11223344556677881122334455667791"));
tenant
.branch_timeline_test_with_layers(
&main_tline,
snapshot2,
Some(Lsn(0x30)),
&ctx,
vec![],
vec![],
Lsn(0x50),
)
.await?;
let snapshot3 = TimelineId::from_array(hex!("11223344556677881122334455667792"));
tenant
.branch_timeline_test_with_layers(
&main_tline,
snapshot3,
Some(Lsn(0x40)),
&ctx,
vec![],
vec![],
Lsn(0x50),
)
.await?;
let limit = Arc::new(Semaphore::new(1));
let max_retention_period = None;
let mut logical_size_cache = HashMap::new();
let cause = LogicalSizeCalculationCause::EvictionTaskImitation;
let cancel = CancellationToken::new();
let inputs = gather_inputs(
&tenant,
&limit,
max_retention_period,
&mut logical_size_cache,
cause,
&cancel,
&ctx,
)
.instrument(info_span!(
"gather_inputs",
tenant_id = "unknown",
shard_id = "unknown",
))
.await?;
use crate::tenant::size::{LsnKind, ModelInputs, SegmentMeta};
use LsnKind::*;
use tenant_size_model::Segment;
let ModelInputs { mut segments, .. } = inputs;
segments.retain(|s| s.timeline_id == TIMELINE_ID);
for segment in segments.iter_mut() {
segment.segment.parent = None; // We don't care about the parent for the test
segment.segment.size = None; // We don't care about the size for the test
}
assert_eq!(
segments,
[
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x10,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchStart,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x20,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchPoint,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x30,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchPoint,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x40,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchPoint,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x100,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: GcCutOff,
}, // we need to retain everything above the last branch point
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x100,
size: None,
needed: true,
},
timeline_id: TIMELINE_ID,
kind: BranchEnd,
},
]
);
main_tline
.remote_client
.schedule_index_upload_for_timeline_invisible_state(
TimelineVisibilityState::Invisible,
)?;
main_tline.remote_client.wait_completion().await?;
let inputs = gather_inputs(
&tenant,
&limit,
max_retention_period,
&mut logical_size_cache,
cause,
&cancel,
&ctx,
)
.instrument(info_span!(
"gather_inputs",
tenant_id = "unknown",
shard_id = "unknown",
))
.await?;
let ModelInputs { mut segments, .. } = inputs;
segments.retain(|s| s.timeline_id == TIMELINE_ID);
for segment in segments.iter_mut() {
segment.segment.parent = None; // We don't care about the parent for the test
segment.segment.size = None; // We don't care about the size for the test
}
assert_eq!(
segments,
[
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x10,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchStart,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x20,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchPoint,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x30,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchPoint,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x40,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchPoint,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x40, // Branch end LSN == last branch point LSN
size: None,
needed: true,
},
timeline_id: TIMELINE_ID,
kind: BranchEnd,
},
]
);
Ok(())
}
}

View File

@@ -1968,9 +1968,7 @@ impl RemoteTimelineClient {
/// Pick next tasks from the queue, and start as many of them as possible without violating
/// the ordering constraints.
///
/// TODO: consider limiting the number of in-progress tasks, beyond what remote_storage does.
/// This can launch an unbounded number of queued tasks. `UploadQueue::next_ready()` also has
/// worst-case quadratic cost in the number of tasks, and may struggle beyond 10,000 tasks.
/// The number of inprogress tasks is limited by `Self::inprogress_tasks`, see `next_ready`.
fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
while let Some((mut next_op, coalesced_ops)) = upload_queue.next_ready() {
debug!("starting op: {next_op}");
@@ -2218,6 +2216,11 @@ impl RemoteTimelineClient {
}
res
}
// TODO: this should wait for the deletion to be executed by the deletion queue.
// Otherwise, the deletion may race with an upload and wrongfully delete a newer
// file. Some of the above logic attempts to work around this, it should be replaced
// by the upload queue ordering guarantees (see `can_bypass`). See:
// <https://github.com/neondatabase/neon/issues/10283>.
UploadOp::Delete(delete) => {
if self.config.read().unwrap().block_deletions {
let mut queue_locked = self.upload_queue.lock().unwrap();

View File

@@ -33,7 +33,7 @@ pub struct ModelInputs {
}
/// A [`Segment`], with some extra information for display purposes
#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct SegmentMeta {
pub segment: Segment,
pub timeline_id: TimelineId,
@@ -248,6 +248,8 @@ pub(super) async fn gather_inputs(
None
};
let branch_is_invisible = timeline.is_invisible() == Some(true);
let lease_points = gc_info
.leases
.keys()
@@ -271,7 +273,10 @@ pub(super) async fn gather_inputs(
.map(|(lsn, _child_id, _is_offloaded)| (lsn, LsnKind::BranchPoint))
.collect::<Vec<_>>();
lsns.extend(lease_points.iter().map(|&lsn| (lsn, LsnKind::LeasePoint)));
if !branch_is_invisible {
// Do not count lease points for invisible branches.
lsns.extend(lease_points.iter().map(|&lsn| (lsn, LsnKind::LeasePoint)));
}
drop(gc_info);
@@ -287,7 +292,9 @@ pub(super) async fn gather_inputs(
// Add a point for the PITR cutoff
let branch_start_needed = next_pitr_cutoff <= branch_start_lsn;
if !branch_start_needed {
if !branch_start_needed && !branch_is_invisible {
// Only add the GcCutOff point when the timeline is visible; otherwise, do not compute the size for the LSN
// range from the last branch point to the latest data.
lsns.push((next_pitr_cutoff, LsnKind::GcCutOff));
}
@@ -373,11 +380,19 @@ pub(super) async fn gather_inputs(
}
}
let branch_end_lsn = if branch_is_invisible {
// If the branch is invisible, the branch end is the last requested LSN (likely a branch cutoff point).
segments.last().unwrap().segment.lsn
} else {
// Otherwise, the branch end is the last record LSN.
last_record_lsn.0
};
// Current end of the timeline
segments.push(SegmentMeta {
segment: Segment {
parent: Some(parent),
lsn: last_record_lsn.0,
lsn: branch_end_lsn,
size: None, // Filled in later, if necessary
needed: true,
},
@@ -609,6 +624,7 @@ async fn calculate_logical_size(
Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))
}
#[cfg(test)]
#[test]
fn verify_size_for_multiple_branches() {
// this is generated from integration test test_tenant_size_with_multiple_branches, but this way
@@ -766,6 +782,7 @@ fn verify_size_for_multiple_branches() {
assert_eq!(inputs.calculate(), 37_851_408);
}
#[cfg(test)]
#[test]
fn verify_size_for_one_branch() {
let doc = r#"

View File

@@ -84,8 +84,8 @@ use self::eviction_task::EvictionTaskTimelineState;
use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::remote_timeline_client::RemoteTimelineClient;
use super::remote_timeline_client::index::{GcCompactionState, IndexPart};
use super::remote_timeline_client::{RemoteTimelineClient, WaitCompletionError};
use super::secondary::heatmap::HeatMapLayer;
use super::storage_layer::{LayerFringe, LayerVisibilityHint, ReadableLayer};
use super::tasks::log_compaction_error;
@@ -870,9 +870,14 @@ pub(crate) enum CompactFlags {
OnlyL0Compaction,
EnhancedGcBottomMostCompaction,
DryRun,
/// Disables compaction yielding e.g. due to high L0 count. This is set e.g. when requesting
/// compaction via HTTP API.
NoYield,
/// Makes image compaction yield if there's pending L0 compaction. This should always be used in
/// the background compaction task, since we want to aggressively compact down L0 to bound
/// read amplification.
///
/// It only makes sense to use this when `compaction_l0_first` is enabled (such that we yield to
/// an L0 compaction pass), and without `OnlyL0Compaction` (L0 compaction shouldn't yield for L0
/// compaction).
YieldForL0,
}
#[serde_with::serde_as]
@@ -1891,18 +1896,19 @@ impl Timeline {
// out by other background tasks (including image compaction). We request this via
// `BackgroundLoopKind::L0Compaction`.
//
// If this is a regular compaction pass, and L0-only compaction is enabled in the config,
// then we should yield for immediate L0 compaction if necessary while we're waiting for the
// background task semaphore. There's no point yielding otherwise, since we'd just end up
// right back here.
// Yield for pending L0 compaction while waiting for the semaphore.
let is_l0_only = options.flags.contains(CompactFlags::OnlyL0Compaction);
let semaphore_kind = match is_l0_only && self.get_compaction_l0_semaphore() {
true => BackgroundLoopKind::L0Compaction,
false => BackgroundLoopKind::Compaction,
};
let yield_for_l0 = !is_l0_only
&& self.get_compaction_l0_first()
&& !options.flags.contains(CompactFlags::NoYield);
let yield_for_l0 = options.flags.contains(CompactFlags::YieldForL0);
if yield_for_l0 {
// If this is an L0 pass, it doesn't make sense to yield for L0.
debug_assert!(!is_l0_only, "YieldForL0 during L0 pass");
// If `compaction_l0_first` is disabled, there's no point yielding.
debug_assert!(self.get_compaction_l0_first(), "YieldForL0 without L0 pass");
}
let acquire = async move {
let guard = self.compaction_lock.lock().await;
@@ -2209,6 +2215,10 @@ impl Timeline {
self.remote_client.is_archived()
}
pub(crate) fn is_invisible(&self) -> Option<bool> {
self.remote_client.is_invisible()
}
pub(crate) fn is_stopping(&self) -> bool {
self.current_state() == TimelineState::Stopping
}
@@ -2562,14 +2572,6 @@ impl Timeline {
Some(max(l0_flush_stall_threshold, compaction_threshold))
}
fn get_l0_flush_wait_upload(&self) -> bool {
let tenant_conf = self.tenant_conf.load();
tenant_conf
.tenant_conf
.l0_flush_wait_upload
.unwrap_or(self.conf.default_tenant_conf.l0_flush_wait_upload)
}
fn get_image_creation_threshold(&self) -> usize {
let tenant_conf = self.tenant_conf.load();
tenant_conf
@@ -4591,27 +4593,6 @@ impl Timeline {
// release lock on 'layers'
};
// Backpressure mechanism: wait with continuation of the flush loop until we have uploaded all layer files.
// This makes us refuse ingest until the new layers have been persisted to the remote
// TODO: remove this, and rely on l0_flush_{delay,stall}_threshold instead.
if self.get_l0_flush_wait_upload() {
let start = Instant::now();
self.remote_client
.wait_completion()
.await
.map_err(|e| match e {
WaitCompletionError::UploadQueueShutDownOrStopped
| WaitCompletionError::NotInitialized(
NotInitialized::ShuttingDown | NotInitialized::Stopped,
) => FlushLayerError::Cancelled,
WaitCompletionError::NotInitialized(NotInitialized::Uninitialized) => {
FlushLayerError::Other(anyhow!(e).into())
}
})?;
let duration = start.elapsed().as_secs_f64();
self.metrics.flush_wait_upload_time_gauge_add(duration);
}
// FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
// a compaction can delete the file and then it won't be available for uploads any more.
// We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this

View File

@@ -394,8 +394,8 @@ impl GcCompactionQueue {
if job.dry_run {
flags |= CompactFlags::DryRun;
}
if options.flags.contains(CompactFlags::NoYield) {
flags |= CompactFlags::NoYield;
if options.flags.contains(CompactFlags::YieldForL0) {
flags |= CompactFlags::YieldForL0;
}
let options = CompactOptions {
flags,
@@ -983,7 +983,7 @@ impl Timeline {
// Yield if we have pending L0 compaction. The scheduler will do another pass.
if (l0_outcome == CompactionOutcome::Pending || l0_outcome == CompactionOutcome::YieldForL0)
&& !options.flags.contains(CompactFlags::NoYield)
&& options.flags.contains(CompactFlags::YieldForL0)
{
info!("image/ancestor compaction yielding for L0 compaction");
return Ok(CompactionOutcome::YieldForL0);
@@ -1028,7 +1028,7 @@ impl Timeline {
.load()
.as_ref()
.clone(),
!options.flags.contains(CompactFlags::NoYield),
options.flags.contains(CompactFlags::YieldForL0),
)
.await
.inspect_err(|err| {
@@ -2635,7 +2635,7 @@ impl Timeline {
) -> Result<CompactionOutcome, CompactionError> {
let sub_compaction = options.sub_compaction;
let job = GcCompactJob::from_compact_options(options.clone());
let no_yield = options.flags.contains(CompactFlags::NoYield);
let yield_for_l0 = options.flags.contains(CompactFlags::YieldForL0);
if sub_compaction {
info!(
"running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
@@ -2650,7 +2650,7 @@ impl Timeline {
idx + 1,
jobs_len
);
self.compact_with_gc_inner(cancel, job, ctx, no_yield)
self.compact_with_gc_inner(cancel, job, ctx, yield_for_l0)
.await?;
}
if jobs_len == 0 {
@@ -2658,7 +2658,8 @@ impl Timeline {
}
return Ok(CompactionOutcome::Done);
}
self.compact_with_gc_inner(cancel, job, ctx, no_yield).await
self.compact_with_gc_inner(cancel, job, ctx, yield_for_l0)
.await
}
async fn compact_with_gc_inner(
@@ -2666,7 +2667,7 @@ impl Timeline {
cancel: &CancellationToken,
job: GcCompactJob,
ctx: &RequestContext,
no_yield: bool,
yield_for_l0: bool,
) -> Result<CompactionOutcome, CompactionError> {
// Block other compaction/GC tasks from running for now. GC-compaction could run along
// with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
@@ -2936,18 +2937,15 @@ impl Timeline {
if cancel.is_cancelled() {
return Err(CompactionError::ShuttingDown);
}
if !no_yield {
let should_yield = self
let should_yield = yield_for_l0
&& self
.l0_compaction_trigger
.notified()
.now_or_never()
.is_some();
if should_yield {
tracing::info!(
"preempt gc-compaction when downloading layers: too many L0 layers"
);
return Ok(CompactionOutcome::YieldForL0);
}
if should_yield {
tracing::info!("preempt gc-compaction when downloading layers: too many L0 layers");
return Ok(CompactionOutcome::YieldForL0);
}
let resident_layer = layer
.download_and_keep_resident(ctx)
@@ -3081,21 +3079,17 @@ impl Timeline {
return Err(CompactionError::ShuttingDown);
}
if !no_yield {
keys_processed += 1;
if keys_processed % 1000 == 0 {
let should_yield = self
.l0_compaction_trigger
.notified()
.now_or_never()
.is_some();
if should_yield {
tracing::info!(
"preempt gc-compaction in the main loop: too many L0 layers"
);
return Ok(CompactionOutcome::YieldForL0);
}
}
keys_processed += 1;
let should_yield = yield_for_l0
&& keys_processed % 1000 == 0
&& self
.l0_compaction_trigger
.notified()
.now_or_never()
.is_some();
if should_yield {
tracing::info!("preempt gc-compaction in the main loop: too many L0 layers");
return Ok(CompactionOutcome::YieldForL0);
}
if self.shard_identity.is_key_disposable(&key) {
// If this shard does not need to store this key, simply skip it.

View File

@@ -235,7 +235,7 @@ pub(super) async fn prepare(
return Err(NoAncestor);
}
check_no_archived_children_of_ancestor(tenant, detached, &ancestor, ancestor_lsn)?;
check_no_archived_children_of_ancestor(tenant, detached, &ancestor, ancestor_lsn, behavior)?;
if let DetachBehavior::MultiLevelAndNoReparent = behavior {
// If the ancestor has an ancestor, we might be able to fast-path detach it if the current ancestor does not have any data written/used by the detaching timeline.
@@ -249,7 +249,13 @@ pub(super) async fn prepare(
ancestor_lsn = ancestor.ancestor_lsn; // Get the LSN first before resetting the `ancestor` variable
ancestor = ancestor_of_ancestor;
// TODO: do we still need to check if we don't want to reparent?
check_no_archived_children_of_ancestor(tenant, detached, &ancestor, ancestor_lsn)?;
check_no_archived_children_of_ancestor(
tenant,
detached,
&ancestor,
ancestor_lsn,
behavior,
)?;
}
} else if ancestor.ancestor_timeline.is_some() {
// non-technical requirement; we could flatten N ancestors just as easily but we chose
@@ -1156,31 +1162,44 @@ fn check_no_archived_children_of_ancestor(
detached: &Arc<Timeline>,
ancestor: &Arc<Timeline>,
ancestor_lsn: Lsn,
detach_behavior: DetachBehavior,
) -> Result<(), Error> {
let timelines = tenant.timelines.lock().unwrap();
let timelines_offloaded = tenant.timelines_offloaded.lock().unwrap();
for timeline in reparentable_timelines(timelines.values(), detached, ancestor, ancestor_lsn) {
if timeline.is_archived() == Some(true) {
return Err(Error::Archived(timeline.timeline_id));
}
}
for timeline_offloaded in timelines_offloaded.values() {
if timeline_offloaded.ancestor_timeline_id != Some(ancestor.timeline_id) {
continue;
}
// This forbids the detach ancestor feature if flattened timelines are present,
// even if the ancestor_lsn is from after the branchpoint of the detached timeline.
// But as per current design, we don't record the ancestor_lsn of flattened timelines.
// This is a bit unfortunate, but as of writing this we don't support flattening
// anyway. Maybe we can evolve the data model in the future.
if let Some(retain_lsn) = timeline_offloaded.ancestor_retain_lsn {
let is_earlier = retain_lsn <= ancestor_lsn;
if !is_earlier {
continue;
match detach_behavior {
DetachBehavior::NoAncestorAndReparent => {
let timelines = tenant.timelines.lock().unwrap();
let timelines_offloaded = tenant.timelines_offloaded.lock().unwrap();
for timeline in
reparentable_timelines(timelines.values(), detached, ancestor, ancestor_lsn)
{
if timeline.is_archived() == Some(true) {
return Err(Error::Archived(timeline.timeline_id));
}
}
for timeline_offloaded in timelines_offloaded.values() {
if timeline_offloaded.ancestor_timeline_id != Some(ancestor.timeline_id) {
continue;
}
// This forbids the detach ancestor feature if flattened timelines are present,
// even if the ancestor_lsn is from after the branchpoint of the detached timeline.
// But as per current design, we don't record the ancestor_lsn of flattened timelines.
// This is a bit unfortunate, but as of writing this we don't support flattening
// anyway. Maybe we can evolve the data model in the future.
if let Some(retain_lsn) = timeline_offloaded.ancestor_retain_lsn {
let is_earlier = retain_lsn <= ancestor_lsn;
if !is_earlier {
continue;
}
}
return Err(Error::Archived(timeline_offloaded.timeline_id));
}
}
return Err(Error::Archived(timeline_offloaded.timeline_id));
DetachBehavior::MultiLevelAndNoReparent => {
// We don't need to check anything if the user requested to not reparent.
}
}
Ok(())
}

View File

@@ -647,18 +647,25 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
return found;
}
#if PG_MAJORVERSION_NUM >= 16
static PGIOAlignedBlock voidblock = {0};
#else
static PGAlignedBlock voidblock = {0};
#endif
#define SCRIBBLEPAGE (&voidblock.data)
/*
* Try to read pages from local cache.
* Returns the number of pages read from the local cache, and sets bits in
* 'read' for the pages which were read. This may scribble over buffers not
* marked in 'read', so be careful with operation ordering.
* 'mask' for the pages which were read. This may scribble over buffers not
* marked in 'mask', so be careful with operation ordering.
*
* In case of error local file cache is disabled (lfc->limit is set to zero),
* and -1 is returned. Note that 'read' and the buffers may be touched and in
* an otherwise invalid state.
* and -1 is returned.
*
* If the mask argument is supplied, bits will be set at the offsets of pages
* that were present and read from the LFC.
* If the mask argument is supplied, we'll only try to read those pages which
* don't have their bits set on entry. At exit, pages which were successfully
* read from LFC will have their bits set.
*/
int
lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
@@ -693,23 +700,43 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
while (nblocks > 0)
{
struct iovec iov[PG_IOV_MAX];
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
int8 chunk_mask[BLOCKS_PER_CHUNK / 8] = {0};
int chunk_offs = (blkno & (BLOCKS_PER_CHUNK - 1));
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK));
int iteration_hits = 0;
int iteration_misses = 0;
uint64 io_time_us = 0;
int n_blocks_to_read = 0;
int n_blocks_to_read = 0;
int iov_last_used = 0;
int first_block_in_chunk_read = -1;
ConditionVariable* cv;
Assert(blocks_in_chunk > 0);
for (int i = 0; i < blocks_in_chunk; i++)
{
n_blocks_to_read += (BITMAP_ISSET(mask, buf_offset + i) != 0);
iov[i].iov_base = buffers[buf_offset + i];
iov[i].iov_len = BLCKSZ;
BITMAP_CLR(mask, buf_offset + i);
/* mask not set = we must do work */
if (!BITMAP_ISSET(mask, buf_offset + i))
{
iov[i].iov_base = buffers[buf_offset + i];
n_blocks_to_read++;
iov_last_used = i + 1;
if (first_block_in_chunk_read == -1)
{
first_block_in_chunk_read = i;
}
}
/* mask set = we must do no work */
else
{
/* don't scribble on pages we weren't requested to write to */
iov[i].iov_base = SCRIBBLEPAGE;
}
}
/* shortcut IO */
if (n_blocks_to_read == 0)
{
buf_offset += blocks_in_chunk;
@@ -718,6 +745,12 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
continue;
}
/*
* The effective iov size must be >= the number of blocks we're about
* to read.
*/
Assert(iov_last_used - first_block_in_chunk_read >= n_blocks_to_read);
tag.blockNum = blkno - chunk_offs;
hash = get_hash_value(lfc_hash, &tag);
cv = &lfc_ctl->cv[hash % N_COND_VARS];
@@ -762,10 +795,15 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
generation = lfc_ctl->generation;
entry_offset = entry->offset;
for (int i = 0; i < blocks_in_chunk; i++)
for (int i = first_block_in_chunk_read; i < iov_last_used; i++)
{
FileCacheBlockState state = UNAVAILABLE;
bool sleeping = false;
/* no need to work on something we're not interested in */
if (BITMAP_ISSET(mask, buf_offset + i))
continue;
while (lfc_ctl->generation == generation)
{
state = GET_STATE(entry, chunk_offs + i);
@@ -789,7 +827,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
}
if (state == AVAILABLE)
{
BITMAP_SET(mask, buf_offset + i);
BITMAP_SET(chunk_mask, i);
iteration_hits++;
}
else
@@ -801,16 +839,34 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (iteration_hits != 0)
{
/* chunk offset (# of pages) into the LFC file */
off_t first_read_offset = (off_t) entry_offset * BLOCKS_PER_CHUNK;
int nwrite = iov_last_used - first_block_in_chunk_read;
/* offset of first IOV */
first_read_offset += chunk_offs + first_block_in_chunk_read;
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
rc = preadv(lfc_desc, iov, blocks_in_chunk,
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
/* Read only the blocks we're interested in, limiting */
rc = preadv(lfc_desc, &iov[first_block_in_chunk_read],
nwrite, first_read_offset * BLCKSZ);
pgstat_report_wait_end();
if (rc != (BLCKSZ * blocks_in_chunk))
if (rc != (BLCKSZ * nwrite))
{
lfc_disable("read");
return -1;
}
/*
* We successfully read the pages we know were valid when we
* started reading; now mark those pages as read
*/
for (int i = first_block_in_chunk_read; i < iov_last_used; i++)
{
if (BITMAP_ISSET(chunk_mask, i))
BITMAP_SET(mask, buf_offset + i);
}
}
/* Place entry to the head of LRU list */

View File

@@ -1142,37 +1142,23 @@ pageserver_try_receive(shardno_t shard_no)
NeonResponse *resp;
PageServer *shard = &page_servers[shard_no];
PGconn *pageserver_conn = shard->conn;
/* read response */
int rc;
int rc;
if (shard->state != PS_Connected)
return NULL;
Assert(pageserver_conn);
while (true)
rc = PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async */);
if (rc == 0)
{
if (PQisBusy(shard->conn))
if (!PQconsumeInput(shard->conn))
{
WaitEvent event;
if (WaitEventSetWait(shard->wes_read, 0, &event, 1,
WAIT_EVENT_NEON_PS_READ) != 1
|| (event.events & WL_SOCKET_READABLE) == 0)
{
return NULL;
}
return NULL;
}
rc = PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async */);
if (rc == 0)
{
if (!PQconsumeInput(shard->conn))
{
return NULL;
}
}
else
break;
}
if (rc == 0)
return NULL;
else if (rc > 0)

View File

@@ -315,7 +315,7 @@ static inline bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
void *buffer)
{
bits8 rv = 1;
bits8 rv = 0;
return lfc_readv_select(rinfo, forkNum, blkno, &buffer, 1, &rv) == 1;
}

View File

@@ -99,7 +99,7 @@ static char *hexdump_page(char *page);
#define IS_LOCAL_REL(reln) (\
NInfoGetDbOid(InfoFromSMgrRel(reln)) != 0 && \
NInfoGetRelNumber(InfoFromSMgrRel(reln)) > FirstNormalObjectId \
NInfoGetRelNumber(InfoFromSMgrRel(reln)) >= FirstNormalObjectId \
)
const int SmgrTrace = DEBUG5;
@@ -1081,6 +1081,9 @@ prefetch_lookup(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkn, neon_r
* pageserver. If NULL, we utilize the lastWrittenLsn -infrastructure
* to calculate the LSNs to send.
*
* Bits set in *mask (if present) indicate pages already read; i.e. pages we
* can skip in this process.
*
* When performing a prefetch rather than a synchronous request,
* is_prefetch==true. Currently, it only affects how the request is accounted
* in the perf counters.
@@ -1126,7 +1129,7 @@ Retry:
uint64 ring_index;
neon_request_lsns *lsns;
if (PointerIsValid(mask) && !BITMAP_ISSET(mask, i))
if (PointerIsValid(mask) && BITMAP_ISSET(mask, i))
continue;
if (frlsns)
@@ -2381,7 +2384,6 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
LSN_FORMAT_ARGS(last_written_lsn),
LSN_FORMAT_ARGS(flushlsn));
XLogFlush(last_written_lsn);
flushlsn = last_written_lsn;
}
/*
@@ -2397,18 +2399,35 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
* requesting the latest page, by setting request LSN to
* UINT64_MAX.
*
* Remember the current LSN, however, so that we can later
* correctly determine if the response to the request is still
* valid. The most up-to-date LSN we could use for that purpose
* would be the current insert LSN, but to avoid the overhead of
* looking it up, use 'flushlsn' instead. This relies on the
* assumption that if the page was modified since the last WAL
* flush, it should still be in the buffer cache, and we
* wouldn't be requesting it.
* effective_request_lsn is used to check that received response is still valid.
* In case of primary node it is last written LSN. Originally we used flush_lsn here,
* but it is not correct. Consider the following scenario:
* 1. Backend A wants to prefetch block X
* 2. Backend A checks that block X is not present in the shared buffer cache
* 3. Backend A calls prefetch_do_request, which calls neon_get_request_lsns
* 4. neon_get_request_lsns obtains LwLSN=11 for the block
* 5. Backend B downloads block X, updates and wallogs it with LSN=13
* 6. Block X is once again evicted from shared buffers, its LwLSN is set to LSN=13
* 7. Backend A is still executing in neon_get_request_lsns(). It calls 'flushlsn = GetFlushRecPtr();'.
* Let's say that it is LSN=14
* 8. Backend A uses LSN=14 as effective_lsn in the prefetch slot. The request stored in the slot is
* [not_modified_since=11, effective_request_lsn=14]
* 9. Backend A sends the prefetch request, pageserver processes it, and sends response.
* The last LSN that the pageserver had processed was LSN=12, so the page image in the response is valid at LSN=12.
* 10. Backend A calls smgrread() for page X with LwLSN=13
* 11. Backend A finds in prefetch ring the response for the prefetch request with [not_modified_since=11, effective_lsn=Lsn14],
* so it satisfies neon_prefetch_response_usable condition.
*
* Things go wrong in step 7-8, when [not_modified_since=11, effective_request_lsn=14] is determined for the request.
* That is incorrect, because the page has in fact been modified at LSN=13. The invariant is that for any request,
* there should not be any modifications to a page between its not_modified_since and (effective_)request_lsn values.
*
* The problem can be fixed by callingGetFlushRecPtr() before checking if the page is in the buffer cache.
* But you can't do that within smgrprefetch(), would need to modify the caller.
*/
result->request_lsn = UINT64_MAX;
result->not_modified_since = last_written_lsn;
result->effective_request_lsn = flushlsn;
result->effective_request_lsn = last_written_lsn;
}
}
}
@@ -2467,11 +2486,8 @@ neon_prefetch_response_usable(neon_request_lsns *request_lsns,
* `not_modified_since` and `request_lsn` are sent to the pageserver, but
* in the primary node, we always use UINT64_MAX as the `request_lsn`, so
* we remember `effective_request_lsn` separately. In a primary,
* `effective_request_lsn` is the last flush WAL position when the request
* was sent to the pageserver. That's logically the LSN that we are
* requesting the page at, but we send UINT64_MAX to the pageserver so
* that if the GC horizon advances past that position, we still get a
* valid response instead of an error.
* `effective_request_lsn` is the same as `not_modified_since`.
* See comments in neon_get_request_lsns why we can not use last flush WAL position here.
*
* To determine whether a response to a GetPage request issued earlier is
* still valid to satisfy a new page read, we look at the
@@ -3026,9 +3042,6 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
tag.blockNum = blocknum;
for (int i = 0; i < PG_IOV_MAX / 8; i++)
lfc_present[i] = ~(lfc_present[i]);
ring_index = prefetch_register_bufferv(tag, NULL, iterblocks,
lfc_present, true);
@@ -3134,6 +3147,15 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum,
#endif
}
/*
* Read N pages at a specific LSN.
*
* *mask is set for pages read at a previous point in time, and which we
* should not touch, nor overwrite.
* New bits should be set in *mask for the pages we'successfully read.
*
* The offsets in request_lsns, buffers, and mask are linked.
*/
static void
#if PG_MAJORVERSION_NUM < 16
neon_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber base_blockno, neon_request_lsns *request_lsns,
@@ -3186,7 +3208,7 @@ neon_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber base_block
neon_request_lsns *reqlsns = &request_lsns[i];
TimestampTz start_ts, end_ts;
if (PointerIsValid(mask) && !BITMAP_ISSET(mask, i))
if (PointerIsValid(mask) && BITMAP_ISSET(mask, i))
continue;
start_ts = GetCurrentTimestamp();
@@ -3485,9 +3507,7 @@ static void
neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
void **buffers, BlockNumber nblocks)
{
bits8 prefetch_hits[PG_IOV_MAX / 8] = {0};
bits8 lfc_hits[PG_IOV_MAX / 8];
bits8 read[PG_IOV_MAX / 8];
bits8 read_pages[PG_IOV_MAX / 8];
neon_request_lsns request_lsns[PG_IOV_MAX];
int lfc_result;
int prefetch_result;
@@ -3519,19 +3539,18 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum,
request_lsns, nblocks);
memset(read_pages, 0, sizeof(read_pages));
prefetch_result = prefetch_lookupv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns, nblocks, buffers, prefetch_hits);
prefetch_result = prefetch_lookupv(InfoFromSMgrRel(reln), forknum,
blocknum, request_lsns, nblocks,
buffers, read_pages);
if (prefetch_result == nblocks)
return;
/* invert the result: exclude prefetched blocks */
for (int i = 0; i < PG_IOV_MAX / 8; i++)
lfc_hits[i] = ~prefetch_hits[i];
/* Try to read from local file cache */
lfc_result = lfc_readv_select(InfoFromSMgrRel(reln), forknum, blocknum, buffers,
nblocks, lfc_hits);
nblocks, read_pages);
if (lfc_result > 0)
MyNeonCounters->file_cache_hits_total += lfc_result;
@@ -3540,21 +3559,8 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
if (prefetch_result + lfc_result == nblocks)
return;
if (lfc_result <= 0)
{
/* can't use the LFC result, so read all blocks from PS */
for (int i = 0; i < PG_IOV_MAX / 8; i++)
read[i] = ~prefetch_hits[i];
}
else
{
/* invert the result: exclude blocks read from lfc */
for (int i = 0; i < PG_IOV_MAX / 8; i++)
read[i] = ~(prefetch_hits[i] | lfc_hits[i]);
}
neon_read_at_lsnv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns,
buffers, nblocks, read);
buffers, nblocks, read_pages);
/*
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.

40
poetry.lock generated
View File

@@ -3111,30 +3111,30 @@ six = "*"
[[package]]
name = "ruff"
version = "0.7.0"
version = "0.11.2"
description = "An extremely fast Python linter and code formatter, written in Rust."
optional = false
python-versions = ">=3.7"
groups = ["dev"]
files = [
{file = "ruff-0.7.0-py3-none-linux_armv6l.whl", hash = "sha256:0cdf20c2b6ff98e37df47b2b0bd3a34aaa155f59a11182c1303cce79be715628"},
{file = "ruff-0.7.0-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:496494d350c7fdeb36ca4ef1c9f21d80d182423718782222c29b3e72b3512737"},
{file = "ruff-0.7.0-py3-none-macosx_11_0_arm64.whl", hash = "sha256:214b88498684e20b6b2b8852c01d50f0651f3cc6118dfa113b4def9f14faaf06"},
{file = "ruff-0.7.0-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:630fce3fefe9844e91ea5bbf7ceadab4f9981f42b704fae011bb8efcaf5d84be"},
{file = "ruff-0.7.0-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:211d877674e9373d4bb0f1c80f97a0201c61bcd1e9d045b6e9726adc42c156aa"},
{file = "ruff-0.7.0-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:194d6c46c98c73949a106425ed40a576f52291c12bc21399eb8f13a0f7073495"},
{file = "ruff-0.7.0-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:82c2579b82b9973a110fab281860403b397c08c403de92de19568f32f7178598"},
{file = "ruff-0.7.0-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:9af971fe85dcd5eaed8f585ddbc6bdbe8c217fb8fcf510ea6bca5bdfff56040e"},
{file = "ruff-0.7.0-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:b641c7f16939b7d24b7bfc0be4102c56562a18281f84f635604e8a6989948914"},
{file = "ruff-0.7.0-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d71672336e46b34e0c90a790afeac8a31954fd42872c1f6adaea1dff76fd44f9"},
{file = "ruff-0.7.0-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:ab7d98c7eed355166f367597e513a6c82408df4181a937628dbec79abb2a1fe4"},
{file = "ruff-0.7.0-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:1eb54986f770f49edb14f71d33312d79e00e629a57387382200b1ef12d6a4ef9"},
{file = "ruff-0.7.0-py3-none-musllinux_1_2_i686.whl", hash = "sha256:dc452ba6f2bb9cf8726a84aa877061a2462afe9ae0ea1d411c53d226661c601d"},
{file = "ruff-0.7.0-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:4b406c2dce5be9bad59f2de26139a86017a517e6bcd2688da515481c05a2cb11"},
{file = "ruff-0.7.0-py3-none-win32.whl", hash = "sha256:f6c968509f767776f524a8430426539587d5ec5c662f6addb6aa25bc2e8195ec"},
{file = "ruff-0.7.0-py3-none-win_amd64.whl", hash = "sha256:ff4aabfbaaba880e85d394603b9e75d32b0693152e16fa659a3064a85df7fce2"},
{file = "ruff-0.7.0-py3-none-win_arm64.whl", hash = "sha256:10842f69c245e78d6adec7e1db0a7d9ddc2fff0621d730e61657b64fa36f207e"},
{file = "ruff-0.7.0.tar.gz", hash = "sha256:47a86360cf62d9cd53ebfb0b5eb0e882193fc191c6d717e8bef4462bc3b9ea2b"},
{file = "ruff-0.11.2-py3-none-linux_armv6l.whl", hash = "sha256:c69e20ea49e973f3afec2c06376eb56045709f0212615c1adb0eda35e8a4e477"},
{file = "ruff-0.11.2-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:2c5424cc1c4eb1d8ecabe6d4f1b70470b4f24a0c0171356290b1953ad8f0e272"},
{file = "ruff-0.11.2-py3-none-macosx_11_0_arm64.whl", hash = "sha256:ecf20854cc73f42171eedb66f006a43d0a21bfb98a2523a809931cda569552d9"},
{file = "ruff-0.11.2-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0c543bf65d5d27240321604cee0633a70c6c25c9a2f2492efa9f6d4b8e4199bb"},
{file = "ruff-0.11.2-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:20967168cc21195db5830b9224be0e964cc9c8ecf3b5a9e3ce19876e8d3a96e3"},
{file = "ruff-0.11.2-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:955a9ce63483999d9f0b8f0b4a3ad669e53484232853054cc8b9d51ab4c5de74"},
{file = "ruff-0.11.2-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:86b3a27c38b8fce73bcd262b0de32e9a6801b76d52cdb3ae4c914515f0cef608"},
{file = "ruff-0.11.2-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:a3b66a03b248c9fcd9d64d445bafdf1589326bee6fc5c8e92d7562e58883e30f"},
{file = "ruff-0.11.2-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:0397c2672db015be5aa3d4dac54c69aa012429097ff219392c018e21f5085147"},
{file = "ruff-0.11.2-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:869bcf3f9abf6457fbe39b5a37333aa4eecc52a3b99c98827ccc371a8e5b6f1b"},
{file = "ruff-0.11.2-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:2a2b50ca35457ba785cd8c93ebbe529467594087b527a08d487cf0ee7b3087e9"},
{file = "ruff-0.11.2-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:7c69c74bf53ddcfbc22e6eb2f31211df7f65054bfc1f72288fc71e5f82db3eab"},
{file = "ruff-0.11.2-py3-none-musllinux_1_2_i686.whl", hash = "sha256:6e8fb75e14560f7cf53b15bbc55baf5ecbe373dd5f3aab96ff7aa7777edd7630"},
{file = "ruff-0.11.2-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:842a472d7b4d6f5924e9297aa38149e5dcb1e628773b70e6387ae2c97a63c58f"},
{file = "ruff-0.11.2-py3-none-win32.whl", hash = "sha256:aca01ccd0eb5eb7156b324cfaa088586f06a86d9e5314b0eb330cb48415097cc"},
{file = "ruff-0.11.2-py3-none-win_amd64.whl", hash = "sha256:3170150172a8f994136c0c66f494edf199a0bbea7a409f649e4bc8f4d7084080"},
{file = "ruff-0.11.2-py3-none-win_arm64.whl", hash = "sha256:52933095158ff328f4c77af3d74f0379e34fd52f175144cefc1b192e7ccd32b4"},
{file = "ruff-0.11.2.tar.gz", hash = "sha256:ec47591497d5a1050175bdf4e1a4e6272cddff7da88a2ad595e1e326041d8d94"},
]
[[package]]
@@ -3844,4 +3844,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.1"
python-versions = "^3.11"
content-hash = "715fc8c896dcfa1b15054deeddcdec557ef93af91b26e1c8e4688fe4dbef5296"
content-hash = "fb50cb6b291169dce3188560cdb31a14af95647318f8f0f0d718131dbaf1817a"

View File

@@ -103,6 +103,7 @@ uuid.workspace = true
x509-cert.workspace = true
redis.workspace = true
zerocopy.workspace = true
flag-bearer = { version = "0.1.0-rc.4" }
# jwt stuff
jose-jwa = "0.1.2"

View File

@@ -314,9 +314,9 @@ pub async fn run() -> anyhow::Result<()> {
None => {
bail!("plain auth requires redis_notifications to be set");
}
Some(url) => Some(
ConnectionWithCredentialsProvider::new_with_static_credentials(url.to_string()),
),
Some(url) => {
Some(ConnectionWithCredentialsProvider::new_with_static_credentials(url.clone()))
}
},
("irsa", _) => match (&args.redis_host, args.redis_port) {
(Some(host), Some(port)) => Some(

View File

@@ -1,5 +1,6 @@
//! Mock console backend which relies on a user-provided postgres instance.
use std::io;
use std::net::{IpAddr, Ipv4Addr};
use std::str::FromStr;
use std::sync::Arc;
@@ -22,7 +23,6 @@ use crate::control_plane::errors::{
};
use crate::control_plane::messages::MetricsAuxInfo;
use crate::control_plane::{AccessBlockerFlags, AuthInfo, AuthSecret, CachedNodeInfo, NodeInfo};
use crate::error::io_error;
use crate::intern::RoleNameInt;
use crate::types::{BranchId, EndpointId, ProjectId, RoleName};
use crate::url::ApiUrl;
@@ -36,13 +36,13 @@ enum MockApiError {
impl From<MockApiError> for ControlPlaneError {
fn from(e: MockApiError) -> Self {
io_error(e).into()
io::Error::other(e).into()
}
}
impl From<tokio_postgres::Error> for ControlPlaneError {
fn from(e: tokio_postgres::Error) -> Self {
io_error(e).into()
io::Error::other(e).into()
}
}

View File

@@ -1,8 +1,10 @@
use std::io;
use thiserror::Error;
use crate::control_plane::client::ApiLockError;
use crate::control_plane::messages::{self, ControlPlaneErrorMessage, Reason};
use crate::error::{ErrorKind, ReportableError, UserFacingError, io_error};
use crate::error::{ErrorKind, ReportableError, UserFacingError};
use crate::proxy::retry::CouldRetry;
/// A go-to error message which doesn't leak any detail.
@@ -79,13 +81,13 @@ impl CouldRetry for ControlPlaneError {
impl From<reqwest::Error> for ControlPlaneError {
fn from(e: reqwest::Error) -> Self {
io_error(e).into()
io::Error::other(e).into()
}
}
impl From<reqwest_middleware::Error> for ControlPlaneError {
fn from(e: reqwest_middleware::Error) -> Self {
io_error(e).into()
io::Error::other(e).into()
}
}

View File

@@ -1,15 +1,9 @@
use std::error::Error as StdError;
use std::{fmt, io};
use std::fmt;
use anyhow::Context;
use measured::FixedCardinalityLabel;
use tokio::task::JoinError;
/// Upcast (almost) any error into an opaque [`io::Error`].
pub(crate) fn io_error(e: impl Into<Box<dyn StdError + Send + Sync>>) -> io::Error {
io::Error::new(io::ErrorKind::Other, e)
}
/// Marks errors that may be safely shown to a client.
/// This trait can be seen as a specialized version of [`ToString`].
///

View File

@@ -163,8 +163,7 @@ fn process_proxy_payload(
// other values are unassigned and must not be emitted by senders. Receivers
// must drop connections presenting unexpected values here.
#[rustfmt::skip] // https://github.com/rust-lang/rustfmt/issues/6384
_ => return Err(io::Error::new(
io::ErrorKind::Other,
_ => return Err(io::Error::other(
format!(
"invalid proxy protocol command 0x{:02X}. expected local (0x20) or proxy (0x21)",
header.version_and_command
@@ -178,21 +177,20 @@ fn process_proxy_payload(
TCP_OVER_IPV4 | UDP_OVER_IPV4 => {
let addr = payload
.try_get::<ProxyProtocolV2HeaderV4>()
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, size_err))?;
.ok_or_else(|| io::Error::other(size_err))?;
SocketAddr::from((addr.src_addr.get(), addr.src_port.get()))
}
TCP_OVER_IPV6 | UDP_OVER_IPV6 => {
let addr = payload
.try_get::<ProxyProtocolV2HeaderV6>()
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, size_err))?;
.ok_or_else(|| io::Error::other(size_err))?;
SocketAddr::from((addr.src_addr.get(), addr.src_port.get()))
}
// unspecified or unix stream. ignore the addresses
_ => {
return Err(io::Error::new(
io::ErrorKind::Other,
return Err(io::Error::other(
"invalid proxy protocol address family/transport protocol.",
));
}

View File

@@ -1,10 +1,8 @@
//! Algorithms for controlling concurrency limits.
use std::pin::pin;
use std::sync::Arc;
use std::time::Duration;
use parking_lot::Mutex;
use tokio::sync::Notify;
use flag_bearer::{Semaphore, SemaphoreState, Uncloseable};
use tokio::time::Instant;
use tokio::time::error::Elapsed;
@@ -65,23 +63,13 @@ pub struct RateLimiterConfig {
pub(crate) initial_limit: usize,
}
impl RateLimiterConfig {
pub(crate) fn create_rate_limit_algorithm(self) -> Box<dyn LimitAlgorithm> {
match self.algorithm {
RateLimitAlgorithm::Fixed => Box::new(Fixed),
RateLimitAlgorithm::Aimd { conf } => Box::new(conf),
}
}
}
pub(crate) struct LimiterInner {
alg: Box<dyn LimitAlgorithm>,
available: usize,
pub(crate) struct LimiterInner<L: ?Sized> {
limit: usize,
in_flight: usize,
alg: L,
}
impl LimiterInner {
impl<L: ?Sized + LimitAlgorithm> LimiterInner<L> {
fn update_limit(&mut self, latency: Duration, outcome: Option<Outcome>) {
if let Some(outcome) = outcome {
let sample = Sample {
@@ -92,21 +80,26 @@ impl LimiterInner {
self.limit = self.alg.update(self.limit, sample);
}
}
}
fn take(&mut self, ready: &Notify) -> Option<()> {
if self.available >= 1 {
self.available -= 1;
impl<L: ?Sized> SemaphoreState for LimiterInner<L> {
type Params = ();
type Permit = ();
type Closeable = Uncloseable;
fn acquire(&mut self, p: Self::Params) -> Result<Self::Permit, Self::Params> {
if self.in_flight < self.limit {
self.in_flight += 1;
// tell the next in the queue that there is a permit ready
if self.available >= 1 {
ready.notify_one();
}
Some(())
Ok(p)
} else {
None
Err(p)
}
}
fn release(&mut self, _p: Self::Permit) {
self.in_flight -= 1;
}
}
/// Limits the number of concurrent jobs.
@@ -116,11 +109,9 @@ impl LimiterInner {
///
/// The limit will be automatically adjusted based on observed latency (delay) and/or failures
/// caused by overload (loss).
pub(crate) struct DynamicLimiter {
config: RateLimiterConfig,
inner: Mutex<LimiterInner>,
// to notify when a token is available
ready: Notify,
pub(crate) struct DynamicLimiter<L: ?Sized = dyn LimitAlgorithm> {
disabled: bool,
sem: Semaphore<LimiterInner<L>>,
}
/// A concurrency token, required to run a job.
@@ -140,22 +131,27 @@ struct LimiterState {
limit: usize,
}
impl<L> DynamicLimiter<L> {
pub(crate) fn new_inner(initial_limit: usize, alg: L) -> Arc<Self> {
Arc::new(Self {
disabled: initial_limit == 0,
sem: Semaphore::new_fifo(LimiterInner {
limit: initial_limit,
in_flight: 0,
alg,
}),
})
}
}
impl DynamicLimiter {
/// Create a limiter with a given limit control algorithm.
pub(crate) fn new(config: RateLimiterConfig) -> Arc<Self> {
let ready = Notify::new();
ready.notify_one();
Arc::new(Self {
inner: Mutex::new(LimiterInner {
alg: config.create_rate_limit_algorithm(),
available: config.initial_limit,
limit: config.initial_limit,
in_flight: 0,
}),
ready,
config,
})
let initial_limit = config.initial_limit;
match config.algorithm {
RateLimitAlgorithm::Fixed => DynamicLimiter::new_inner(initial_limit, Fixed),
RateLimitAlgorithm::Aimd { conf } => DynamicLimiter::new_inner(initial_limit, conf),
}
}
/// Try to acquire a concurrency [Token], waiting for `duration` if there are none available.
@@ -163,27 +159,21 @@ impl DynamicLimiter {
self: &Arc<Self>,
duration: Duration,
) -> Result<Token, Elapsed> {
tokio::time::timeout(duration, self.acquire()).await?
tokio::time::timeout(duration, self.acquire()).await
}
/// Try to acquire a concurrency [Token].
async fn acquire(self: &Arc<Self>) -> Result<Token, Elapsed> {
if self.config.initial_limit == 0 {
async fn acquire(self: &Arc<Self>) -> Token {
if self.disabled {
// If the rate limiter is disabled, we can always acquire a token.
Ok(Token::disabled())
Token::disabled()
} else {
let mut notified = pin!(self.ready.notified());
let mut ready = notified.as_mut().enable();
loop {
if ready {
let mut inner = self.inner.lock();
if inner.take(&self.ready).is_some() {
break Ok(Token::new(self.clone()));
}
notified.set(self.ready.notified());
match self.sem.acquire(()).await {
Err(close) => close.never(),
Ok(permit) => {
permit.take();
Token::new(self.clone())
}
notified.as_mut().await;
ready = true;
}
}
}
@@ -200,27 +190,20 @@ impl DynamicLimiter {
} else {
tracing::debug!("outcome is {:?}", outcome);
}
if self.config.initial_limit == 0 {
if self.disabled {
return;
}
let mut inner = self.inner.lock();
inner.update_limit(start.elapsed(), outcome);
inner.in_flight -= 1;
if inner.in_flight < inner.limit {
inner.available = inner.limit - inner.in_flight;
// At least 1 permit is now available
self.ready.notify_one();
}
self.sem.with_state(|s| {
s.update_limit(start.elapsed(), outcome);
s.release(());
});
}
/// The current state of the limiter.
#[cfg(test)]
fn state(&self) -> LimiterState {
let inner = self.inner.lock();
LimiterState { limit: inner.limit }
self.sem.with_state(|s| LimiterState { limit: s.limit })
}
}

View File

@@ -143,6 +143,8 @@ impl ConnectionWithCredentialsProvider {
db: 0,
username: Some(username),
password: Some(password.clone()),
// TODO: switch to RESP3 after testing new client version.
protocol: redis::ProtocolVersion::RESP2,
},
})
}

View File

@@ -19,7 +19,7 @@ fn json_value_to_pg_text(value: &Value) -> Option<String> {
v @ (Value::Bool(_) | Value::Number(_) | Value::Object(_)) => Some(v.to_string()),
// avoid escaping here, as we pass this as a parameter
Value::String(s) => Some(s.to_string()),
Value::String(s) => Some(s.clone()),
// special care for arrays
Value::Array(_) => json_array_to_pg_array(value),

View File

@@ -866,7 +866,7 @@ impl QueryData {
let (inner, mut discard) = client.inner();
let cancel_token = inner.cancel_token();
let res = match select(
match select(
pin!(query_to_json(
config,
&mut *inner,
@@ -889,7 +889,7 @@ impl QueryData {
// The query failed with an error
Either::Left((Err(e), __not_yet_cancelled)) => {
discard.discard();
return Err(e);
Err(e)
}
// The query was cancelled.
Either::Right((_cancelled, query)) => {
@@ -930,8 +930,7 @@ impl QueryData {
}
}
}
};
res
}
}
}

View File

@@ -15,7 +15,7 @@ use tracing::warn;
use crate::cancellation::CancellationHandler;
use crate::config::ProxyConfig;
use crate::context::RequestContext;
use crate::error::{ReportableError, io_error};
use crate::error::ReportableError;
use crate::metrics::Metrics;
use crate::proxy::{ClientMode, ErrorSource, handle_client};
use crate::rate_limiter::EndpointRateLimiter;
@@ -50,23 +50,23 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for WebSocketRw<S> {
let this = self.project();
let mut stream = this.stream;
ready!(stream.as_mut().poll_ready(cx).map_err(io_error))?;
ready!(stream.as_mut().poll_ready(cx).map_err(io::Error::other))?;
this.send.put(buf);
match stream.as_mut().start_send(Frame::binary(this.send.split())) {
Ok(()) => Poll::Ready(Ok(buf.len())),
Err(e) => Poll::Ready(Err(io_error(e))),
Err(e) => Poll::Ready(Err(io::Error::other(e))),
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let stream = self.project().stream;
stream.poll_flush(cx).map_err(io_error)
stream.poll_flush(cx).map_err(io::Error::other)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let stream = self.project().stream;
stream.poll_close(cx).map_err(io_error)
stream.poll_close(cx).map_err(io::Error::other)
}
}
@@ -97,7 +97,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncBufRead for WebSocketRw<S> {
}
let res = ready!(this.stream.as_mut().poll_next(cx));
match res.transpose().map_err(io_error)? {
match res.transpose().map_err(io::Error::other)? {
Some(message) => match message.opcode {
OpCode::Ping => {}
OpCode::Pong => {}
@@ -105,7 +105,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncBufRead for WebSocketRw<S> {
// We expect to see only binary messages.
let error = "unexpected text message in the websocket";
warn!(length = message.payload.len(), error);
return Poll::Ready(Err(io_error(error)));
return Poll::Ready(Err(io::Error::other(error)));
}
OpCode::Binary | OpCode::Continuation => {
debug_assert!(this.recv.is_empty());

View File

@@ -173,7 +173,7 @@ impl CertResolver {
}
pub fn get_common_names(&self) -> HashSet<String> {
self.certs.keys().map(|s| s.to_string()).collect()
self.certs.keys().cloned().collect()
}
}

View File

@@ -53,7 +53,7 @@ jsonnet = "^0.21.0-rc2"
[tool.poetry.group.dev.dependencies]
mypy = "==1.13.0"
ruff = "^0.7.0"
ruff = "^0.11.2"
[build-system]
requires = ["poetry-core>=1.0.0"]
@@ -109,4 +109,5 @@ select = [
"W", # pycodestyle
"B", # bugbear
"UP", # pyupgrade
"TC", # flake8-type-checking
]

View File

@@ -8,9 +8,12 @@
from __future__ import annotations
import argparse
from typing import TYPE_CHECKING
import psycopg2
from psycopg2.extensions import connection as PgConnection
if TYPE_CHECKING:
from psycopg2.extensions import connection as PgConnection
def main(args: argparse.Namespace):

View File

@@ -7,13 +7,13 @@ import logging
import signal
import sys
from collections import defaultdict
from collections.abc import Awaitable
from dataclasses import dataclass
from typing import TYPE_CHECKING
import aiohttp
if TYPE_CHECKING:
from collections.abc import Awaitable
from typing import Any

View File

@@ -24,9 +24,9 @@ use pageserver_api::controller_api::{
ShardsPreferredAzsRequest, TenantCreateRequest, TenantPolicyRequest, TenantShardMigrateRequest,
};
use pageserver_api::models::{
DetachBehavior, TenantConfigPatchRequest, TenantConfigRequest, TenantLocationConfigRequest,
TenantShardSplitRequest, TenantTimeTravelRequest, TimelineArchivalConfigRequest,
TimelineCreateRequest,
DetachBehavior, LsnLeaseRequest, TenantConfigPatchRequest, TenantConfigRequest,
TenantLocationConfigRequest, TenantShardSplitRequest, TenantTimeTravelRequest,
TimelineArchivalConfigRequest, TimelineCreateRequest,
};
use pageserver_api::shard::TenantShardId;
use pageserver_api::upcall_api::{ReAttachRequest, ValidateRequest};
@@ -582,6 +582,32 @@ async fn handle_tenant_timeline_download_heatmap_layers(
json_response(StatusCode::OK, ())
}
async fn handle_tenant_timeline_lsn_lease(
service: Arc<Service>,
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
check_permissions(&req, Scope::PageServerApi)?;
maybe_rate_limit(&req, tenant_id).await;
let mut req = match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
return res;
}
ForwardOutcome::NotForwarded(req) => req,
};
let lsn_lease_request = json_request::<LsnLeaseRequest>(&mut req).await?;
service
.tenant_timeline_lsn_lease(tenant_id, timeline_id, lsn_lease_request.lsn)
.await?;
json_response(StatusCode::OK, ())
}
// For metric labels where we would like to include the approximate path, but exclude high-cardinality fields like query parameters
// and tenant/timeline IDs. Since we are proxying to arbitrary paths, we don't have routing templates to
// compare to, so we can just filter out our well known ID format with regexes.
@@ -2192,6 +2218,17 @@ pub fn make_router(
)
},
)
// LSN lease passthrough to all shards
.post(
"/v1/tenant/:tenant_id/timeline/:timeline_id/lsn_lease",
|r| {
tenant_service_handler(
r,
handle_tenant_timeline_lsn_lease,
RequestName("v1_tenant_timeline_lsn_lease"),
)
},
)
// Tenant detail GET passthrough to shard zero:
.get("/v1/tenant/:tenant_id", |r| {
tenant_service_handler(

View File

@@ -1,6 +1,6 @@
use pageserver_api::models::detach_ancestor::AncestorDetached;
use pageserver_api::models::{
DetachBehavior, LocationConfig, LocationConfigListResponse, PageserverUtilization,
DetachBehavior, LocationConfig, LocationConfigListResponse, LsnLease, PageserverUtilization,
SecondaryProgress, TenantScanRemoteStorageResponse, TenantShardSplitRequest,
TenantShardSplitResponse, TenantWaitLsnRequest, TimelineArchivalConfigRequest,
TimelineCreateRequest, TimelineInfo, TopTenantShardsRequest, TopTenantShardsResponse,
@@ -10,6 +10,7 @@ use pageserver_client::BlockUnblock;
use pageserver_client::mgmt_api::{Client, Result};
use reqwest::StatusCode;
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
/// Thin wrapper around [`pageserver_client::mgmt_api::Client`]. It allows the storage
/// controller to collect metrics in a non-intrusive manner.
@@ -195,6 +196,22 @@ impl PageserverClient {
)
}
pub(crate) async fn timeline_lease_lsn(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Result<LsnLease> {
measured_request!(
"timeline_lease_lsn",
crate::metrics::Method::Post,
&self.node_id_label,
self.inner
.timeline_init_lsn_lease(tenant_shard_id, timeline_id, lsn)
.await
)
}
pub(crate) async fn tenant_shard_split(
&self,
tenant_shard_id: TenantShardId,

View File

@@ -1369,6 +1369,65 @@ impl Persistence {
Ok(timeline_from_db)
}
/// Set `delete_at` for the given timeline
pub(crate) async fn timeline_set_deleted_at(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> DatabaseResult<()> {
use crate::schema::timelines;
let deletion_time = chrono::Local::now().to_utc();
self.with_measured_conn(DatabaseOperation::InsertTimeline, move |conn| {
Box::pin(async move {
let updated = diesel::update(timelines::table)
.filter(timelines::tenant_id.eq(tenant_id.to_string()))
.filter(timelines::timeline_id.eq(timeline_id.to_string()))
.set(timelines::deleted_at.eq(Some(deletion_time)))
.execute(conn)
.await?;
match updated {
0 => Ok(()),
1 => Ok(()),
_ => Err(DatabaseError::Logical(format!(
"unexpected number of rows ({})",
updated
))),
}
})
})
.await
}
/// Load timeline from db. Returns `None` if not present.
///
/// Only works if `deleted_at` is set, so you should call [`Self::timeline_set_deleted_at`] before.
pub(crate) async fn delete_timeline(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> DatabaseResult<()> {
use crate::schema::timelines::dsl;
let tenant_id = &tenant_id;
let timeline_id = &timeline_id;
self.with_measured_conn(DatabaseOperation::GetTimeline, move |conn| {
Box::pin(async move {
diesel::delete(dsl::timelines)
.filter(dsl::tenant_id.eq(&tenant_id.to_string()))
.filter(dsl::timeline_id.eq(&timeline_id.to_string()))
.filter(dsl::deleted_at.is_not_null())
.execute(conn)
.await?;
Ok(())
})
})
.await?;
Ok(())
}
/// Loads a list of all timelines from database.
pub(crate) async fn list_timelines_for_tenant(
&self,
@@ -1491,6 +1550,34 @@ impl Persistence {
Ok(timeline_from_db)
}
/// List pending operations for a given timeline (including tenant-global ones)
pub(crate) async fn list_pending_ops_for_timeline(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> DatabaseResult<Vec<TimelinePendingOpPersistence>> {
use crate::schema::safekeeper_timeline_pending_ops::dsl;
let timelines_from_db = self
.with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| {
Box::pin(async move {
let from_db: Vec<TimelinePendingOpPersistence> =
dsl::safekeeper_timeline_pending_ops
.filter(dsl::tenant_id.eq(tenant_id.to_string()))
.filter(
dsl::timeline_id
.eq(timeline_id.to_string())
.or(dsl::timeline_id.eq("")),
)
.load(conn)
.await?;
Ok(from_db)
})
})
.await?;
Ok(timelines_from_db)
}
/// Delete all pending ops for the given timeline.
///
@@ -1974,7 +2061,7 @@ impl ToSql<crate::schema::sql_types::PgLsn, Pg> for LsnWrapper {
}
}
#[derive(Insertable, AsChangeset, Queryable, Selectable, Clone)]
#[derive(Insertable, AsChangeset, Clone)]
#[diesel(table_name = crate::schema::timelines)]
pub(crate) struct TimelinePersistence {
pub(crate) tenant_id: String,

View File

@@ -12,7 +12,7 @@ use std::ops::{Deref, DerefMut};
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::{Duration, Instant, SystemTime};
use anyhow::Context;
use context_iterator::TenantShardContextIterator;
@@ -34,7 +34,7 @@ use pageserver_api::controller_api::{
TenantShardMigrateRequest, TenantShardMigrateResponse,
};
use pageserver_api::models::{
self, DetachBehavior, LocationConfig, LocationConfigListResponse, LocationConfigMode,
self, DetachBehavior, LocationConfig, LocationConfigListResponse, LocationConfigMode, LsnLease,
PageserverUtilization, SecondaryProgress, ShardParameters, TenantConfig,
TenantConfigPatchRequest, TenantConfigRequest, TenantLocationConfigRequest,
TenantLocationConfigResponse, TenantShardLocation, TenantShardSplitRequest,
@@ -60,6 +60,7 @@ use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
use utils::completion::Barrier;
use utils::generation::Generation;
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::sync::gate::Gate;
use utils::{failpoint_support, pausable_failpoint};
@@ -152,6 +153,7 @@ enum TenantOperations {
TimelineGcBlockUnblock,
DropDetached,
DownloadHeatmapLayers,
TimelineLsnLease,
}
#[derive(Clone, strum_macros::Display)]
@@ -3987,6 +3989,75 @@ impl Service {
Ok(())
}
pub(crate) async fn tenant_timeline_lsn_lease(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Result<LsnLease, ApiError> {
let _tenant_lock = trace_shared_lock(
&self.tenant_op_locks,
tenant_id,
TenantOperations::TimelineLsnLease,
)
.await;
let targets = {
let locked = self.inner.read().unwrap();
let mut targets = Vec::new();
// If the request got an unsharded tenant id, then apply
// the operation to all shards. Otherwise, apply it to a specific shard.
let shards_range = TenantShardId::tenant_range(tenant_id);
for (tenant_shard_id, shard) in locked.tenants.range(shards_range) {
if let Some(node_id) = shard.intent.get_attached() {
let node = locked
.nodes
.get(node_id)
.expect("Pageservers may not be deleted while referenced");
targets.push((*tenant_shard_id, node.clone()));
}
}
targets
};
let res = self
.tenant_for_shards_api(
targets,
|tenant_shard_id, client| async move {
client
.timeline_lease_lsn(tenant_shard_id, timeline_id, lsn)
.await
},
1,
1,
SHORT_RECONCILE_TIMEOUT,
&self.cancel,
)
.await;
let mut valid_until = None;
for r in res {
match r {
Ok(lease) => {
if let Some(ref mut valid_until) = valid_until {
*valid_until = std::cmp::min(*valid_until, lease.valid_until);
} else {
valid_until = Some(lease.valid_until);
}
}
Err(e) => {
return Err(ApiError::InternalServerError(anyhow::anyhow!(e)));
}
}
}
Ok(LsnLease {
valid_until: valid_until.unwrap_or_else(SystemTime::now),
})
}
pub(crate) async fn tenant_timeline_download_heatmap_layers(
&self,
tenant_shard_id: TenantShardId,

View File

@@ -4,7 +4,7 @@ use std::time::Duration;
use pageserver_api::controller_api::ShardSchedulingPolicy;
use rand::seq::SliceRandom;
use rand::thread_rng;
use rand::{Rng, thread_rng};
use tokio_util::sync::CancellationToken;
use utils::id::NodeId;
use utils::shard::TenantShardId;
@@ -64,17 +64,22 @@ impl ChaosInjector {
let mut interval = tokio::time::interval(self.interval);
#[derive(Debug)]
enum ChaosEvent {
ShuffleTenant,
ForceKill,
MigrationsToSecondary,
ForceKillController,
GracefulMigrationsAnywhere,
}
loop {
let cron_interval = self.get_cron_interval_sleep_future();
let chaos_type = tokio::select! {
_ = interval.tick() => {
ChaosEvent::ShuffleTenant
if thread_rng().gen_bool(0.5) {
ChaosEvent::MigrationsToSecondary
} else {
ChaosEvent::GracefulMigrationsAnywhere
}
}
Some(_) = maybe_sleep(cron_interval) => {
ChaosEvent::ForceKill
ChaosEvent::ForceKillController
}
_ = cancel.cancelled() => {
tracing::info!("Shutting down");
@@ -83,16 +88,29 @@ impl ChaosInjector {
};
tracing::info!("Chaos iteration: {chaos_type:?}...");
match chaos_type {
ChaosEvent::ShuffleTenant => {
self.inject_chaos().await;
ChaosEvent::MigrationsToSecondary => {
self.inject_migrations_to_secondary();
}
ChaosEvent::ForceKill => {
ChaosEvent::GracefulMigrationsAnywhere => {
self.inject_graceful_migrations_anywhere();
}
ChaosEvent::ForceKillController => {
self.force_kill().await;
}
}
}
}
fn is_shard_eligible_for_chaos(&self, shard: &TenantShard) -> bool {
// - Skip non-active scheduling policies, so that a shard with a policy like Pause can
// be pinned without being disrupted by us.
// - Skip shards doing a graceful migration already, so that we allow these to run to
// completion rather than only exercising the first part and then cancelling with
// some other chaos.
!matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active)
&& shard.get_preferred_node().is_none()
}
/// If a shard has a secondary and attached location, then re-assign the secondary to be
/// attached and the attached to be secondary.
///
@@ -108,13 +126,7 @@ impl ChaosInjector {
.get_mut(&tenant_shard_id)
.expect("Held lock between choosing ID and this get");
if !matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active) {
// Skip non-active scheduling policies, so that a shard with a policy like Pause can
// be pinned without being disrupted by us.
tracing::info!(
"Skipping shard {tenant_shard_id}: scheduling policy is {:?}",
shard.get_scheduling_policy()
);
if !self.is_shard_eligible_for_chaos(shard) {
return;
}
@@ -152,7 +164,77 @@ impl ChaosInjector {
std::process::exit(1);
}
async fn inject_chaos(&mut self) {
// Unlike [`Self::inject_migrations_to_secondary`], this function will not only cut over to secondary, it
// will migrate a tenant to a random node in its home AZ using a graceful migration of the same type
// that my be initiated by an API caller using prewarm=true.
//
// This is a much more expensive operation in terms of I/O and time, as we will fully warm up
// some new location in order to migrate the tenant there. For that reason we do far fewer of these.
fn inject_graceful_migrations_anywhere(&mut self) {
let batch_size = 1;
let mut inner = self.service.inner.write().unwrap();
let (nodes, tenants, _scheduler) = inner.parts_mut();
let mut candidates = tenants
.values_mut()
.filter(|shard| self.is_shard_eligible_for_chaos(shard))
.collect::<Vec<_>>();
tracing::info!(
"Injecting chaos: found {} candidates for graceful migrations anywhere",
candidates.len()
);
let mut victims: Vec<&mut TenantShard> = Vec::new();
// Pick our victims: use a hand-rolled loop rather than choose_multiple() because we want
// to take the mutable refs from our candidates rather than ref'ing them.
while !candidates.is_empty() && victims.len() < batch_size {
let i = thread_rng().gen_range(0..candidates.len());
victims.push(candidates.swap_remove(i));
}
for victim in victims.into_iter() {
// Find a node in the same AZ as the shard, or if the shard has no AZ preference, which
// is not where they are currently attached.
let candidate_nodes = nodes
.values()
.filter(|node| {
if let Some(preferred_az) = victim.preferred_az() {
node.get_availability_zone_id() == preferred_az
} else if let Some(attached) = *victim.intent.get_attached() {
node.get_id() != attached
} else {
true
}
})
.collect::<Vec<_>>();
let Some(victim_node) = candidate_nodes.choose(&mut thread_rng()) else {
// This can happen if e.g. we are in a small region with only one pageserver per AZ.
tracing::info!(
"no candidate nodes found for migrating shard {tenant_shard_id} within its home AZ",
tenant_shard_id = victim.tenant_shard_id
);
continue;
};
// This doesn't change intent immediately: next iteration of Service::optimize_all should do that. We avoid
// doing it here because applying optimizations requires dropping lock to do some async work to check the optimisation
// is valid given remote state, and it would be a shame to duplicate that dance here.
tracing::info!(
"Injecting chaos: migrate {} to {}",
victim.tenant_shard_id,
victim_node
);
victim.set_preferred_node(Some(victim_node.get_id()));
}
}
/// Migrations of attached locations to their secondary location. This exercises reconciliation in general,
/// live migration in particular, and the pageserver code for cleanly shutting down and starting up tenants
/// during such migrations.
fn inject_migrations_to_secondary(&mut self) {
// Pick some shards to interfere with
let batch_size = 128;
let mut inner = self.service.inner.write().unwrap();

View File

@@ -160,9 +160,8 @@ pub(crate) struct ScheduleRequest {
}
struct ReconcilerHandle {
tx: UnboundedSender<(ScheduleRequest, Arc<CancellationToken>)>,
#[allow(clippy::type_complexity)]
ongoing_tokens: Arc<ClashMap<(TenantId, Option<TimelineId>), Arc<CancellationToken>>>,
tx: UnboundedSender<(ScheduleRequest, CancellationToken)>,
ongoing_tokens: Arc<ClashMap<(TenantId, Option<TimelineId>), CancellationToken>>,
cancel: CancellationToken,
}
@@ -172,13 +171,13 @@ impl ReconcilerHandle {
&self,
tenant_id: TenantId,
timeline_id: Option<TimelineId>,
) -> Arc<CancellationToken> {
) -> CancellationToken {
let entry = self.ongoing_tokens.entry((tenant_id, timeline_id));
if let Entry::Occupied(entry) = &entry {
let cancel: &CancellationToken = entry.get();
cancel.cancel();
}
entry.insert(Arc::new(self.cancel.child_token())).clone()
entry.insert(self.cancel.child_token()).clone()
}
/// Cancel an ongoing reconciliation
fn cancel_reconciliation(&self, tenant_id: TenantId, timeline_id: Option<TimelineId>) {
@@ -197,7 +196,7 @@ impl ReconcilerHandle {
pub(crate) struct SafekeeperReconciler {
service: Arc<Service>,
rx: UnboundedReceiver<(ScheduleRequest, Arc<CancellationToken>)>,
rx: UnboundedReceiver<(ScheduleRequest, CancellationToken)>,
cancel: CancellationToken,
}
@@ -243,7 +242,7 @@ impl SafekeeperReconciler {
.await;
}
}
async fn reconcile_one(&self, req: ScheduleRequest, req_cancel: Arc<CancellationToken>) {
async fn reconcile_one(&self, req: ScheduleRequest, req_cancel: CancellationToken) {
let req_host = req.safekeeper.skp.host.clone();
match req.kind {
SafekeeperTimelineOpKind::Pull => {
@@ -300,36 +299,96 @@ impl SafekeeperReconciler {
SafekeeperTimelineOpKind::Delete => {
let tenant_id = req.tenant_id;
if let Some(timeline_id) = req.timeline_id {
self.reconcile_inner(
let deleted = self.reconcile_inner(
req,
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|_resp| {
tracing::info!("deleted timeline from {req_host}");
tracing::info!(%tenant_id, %timeline_id, "deleted timeline from {req_host}");
},
req_cancel,
)
.await;
if deleted {
self.delete_timeline_from_db(tenant_id, timeline_id).await;
}
} else {
self.reconcile_inner(
req,
async |client| client.delete_tenant(tenant_id).await,
|_resp| {
tracing::info!("deleted tenant from {req_host}");
},
req_cancel,
)
.await;
let deleted = self
.reconcile_inner(
req,
async |client| client.delete_tenant(tenant_id).await,
|_resp| {
tracing::info!(%tenant_id, "deleted tenant from {req_host}");
},
req_cancel,
)
.await;
if deleted {
self.delete_tenant_timelines_from_db(tenant_id).await;
}
}
}
}
}
async fn delete_timeline_from_db(&self, tenant_id: TenantId, timeline_id: TimelineId) {
match self
.service
.persistence
.list_pending_ops_for_timeline(tenant_id, timeline_id)
.await
{
Ok(list) => {
if !list.is_empty() {
tracing::info!(%tenant_id, %timeline_id, "not deleting timeline from db as there is {} open reconciles", list.len());
return;
}
}
Err(e) => {
tracing::warn!(%tenant_id, %timeline_id, "couldn't query pending ops: {e}");
return;
}
}
tracing::info!(%tenant_id, %timeline_id, "deleting timeline from db after all reconciles succeeded");
// In theory we could crash right after deleting the op from the db and right before reaching this,
// but then we'll boot up with a timeline that has deleted_at set, so hopefully we'll issue deletion ops for it again.
if let Err(err) = self
.service
.persistence
.delete_timeline(tenant_id, timeline_id)
.await
{
tracing::warn!(%tenant_id, %timeline_id, "couldn't delete timeline from db: {err}");
}
}
async fn delete_tenant_timelines_from_db(&self, tenant_id: TenantId) {
let timeline_list = match self
.service
.persistence
.list_timelines_for_tenant(tenant_id)
.await
{
Ok(timeline_list) => timeline_list,
Err(e) => {
tracing::warn!(%tenant_id, "couldn't query timelines: {e}");
return;
}
};
for timeline in timeline_list {
let Ok(timeline_id) = TimelineId::from_str(&timeline.timeline_id) else {
tracing::warn!("Invalid timeline ID in database {}", timeline.timeline_id);
continue;
};
self.delete_timeline_from_db(tenant_id, timeline_id).await;
}
}
/// Returns whether the reconciliation happened successfully
async fn reconcile_inner<T, F, U>(
&self,
req: ScheduleRequest,
closure: impl Fn(SafekeeperClient) -> F,
log_success: impl FnOnce(T) -> U,
req_cancel: Arc<CancellationToken>,
) where
req_cancel: CancellationToken,
) -> bool
where
F: Future<Output = Result<T, safekeeper_client::mgmt_api::Error>>,
{
let jwt = self
@@ -373,11 +432,11 @@ impl SafekeeperReconciler {
req.safekeeper.skp.host
);
}
return;
return true;
}
Err(mgmt_api::Error::Cancelled) => {
// On cancellation, the code that issued it will take care of removing db entries (if needed)
return;
return false;
}
Err(e) => {
tracing::info!(

View File

@@ -1,4 +1,4 @@
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
@@ -313,25 +313,32 @@ impl Service {
);
return Ok(());
};
self.persistence
.timeline_set_deleted_at(tenant_id, timeline_id)
.await?;
let all_sks = tl
.new_sk_set
.iter()
.flat_map(|sks| {
sks.iter()
.map(|sk| (*sk, SafekeeperTimelineOpKind::Exclude))
})
.chain(
tl.sk_set
.iter()
.map(|v| (*v, SafekeeperTimelineOpKind::Delete)),
)
.collect::<HashMap<_, _>>();
.flatten()
.chain(tl.sk_set.iter())
.collect::<HashSet<_>>();
// Schedule reconciliations
for &sk_id in all_sks.iter() {
let pending_op = TimelinePendingOpPersistence {
tenant_id: tenant_id.to_string(),
timeline_id: timeline_id.to_string(),
generation: tl.generation,
op_kind: SafekeeperTimelineOpKind::Delete,
sk_id: *sk_id,
};
tracing::info!("writing pending op for sk id {sk_id}");
self.persistence.insert_pending_op(pending_op).await?;
}
{
let mut locked = self.inner.write().unwrap();
for (sk_id, kind) in all_sks {
let sk_id = NodeId(sk_id as u64);
for sk_id in all_sks {
let sk_id = NodeId(*sk_id as u64);
let Some(sk) = locked.safekeepers.get(&sk_id) else {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Couldn't find safekeeper with id {sk_id}"
@@ -345,7 +352,7 @@ impl Service {
tenant_id,
timeline_id: Some(timeline_id),
generation: tl.generation as u32,
kind,
kind: SafekeeperTimelineOpKind::Delete,
};
locked.safekeeper_reconcilers.schedule_request(self, req);
}
@@ -379,32 +386,50 @@ impl Service {
})
.collect::<Result<Vec<_>, ApiError>>()?;
// Remove pending ops from db.
// Remove pending ops from db, and set `deleted_at`.
// We cancel them in a later iteration once we hold the state lock.
for (timeline_id, _timeline) in timeline_list.iter() {
self.persistence
.remove_pending_ops_for_timeline(tenant_id, Some(*timeline_id))
.await?;
self.persistence
.timeline_set_deleted_at(tenant_id, *timeline_id)
.await?;
}
let mut locked = self.inner.write().unwrap();
// The list of safekeepers that have any of the timelines
let mut sk_list = HashSet::new();
// List all pending ops for all timelines, cancel them
for (timeline_id, timeline) in timeline_list.iter() {
for (_timeline_id, timeline) in timeline_list.iter() {
let sk_iter = timeline
.sk_set
.iter()
.chain(timeline.new_sk_set.iter().flatten())
.map(|id| NodeId(*id as u64));
for sk_id in sk_iter.clone() {
sk_list.extend(sk_iter);
}
for &sk_id in sk_list.iter() {
let pending_op = TimelinePendingOpPersistence {
tenant_id: tenant_id.to_string(),
timeline_id: String::new(),
generation: i32::MAX,
op_kind: SafekeeperTimelineOpKind::Delete,
sk_id: sk_id.0 as i64,
};
tracing::info!("writing pending op for sk id {sk_id}");
self.persistence.insert_pending_op(pending_op).await?;
}
let mut locked = self.inner.write().unwrap();
for (timeline_id, _timeline) in timeline_list.iter() {
for sk_id in sk_list.iter() {
locked
.safekeeper_reconcilers
.cancel_reconciles_for_timeline(sk_id, tenant_id, Some(*timeline_id));
.cancel_reconciles_for_timeline(*sk_id, tenant_id, Some(*timeline_id));
}
sk_list.extend(sk_iter);
}
// unwrap is safe: we return above for an empty timeline list

View File

@@ -4,11 +4,15 @@ Run the regression tests on the cloud instance of Neon
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
import pytest
from fixtures.neon_fixtures import RemotePostgres
from fixtures.pg_version import PgVersion
if TYPE_CHECKING:
from pathlib import Path
from fixtures.neon_fixtures import RemotePostgres
from fixtures.pg_version import PgVersion
@pytest.mark.timeout(7200)

View File

@@ -2,11 +2,12 @@ from __future__ import annotations
from dataclasses import dataclass
from enum import StrEnum
from typing import Any
from typing import TYPE_CHECKING, Any
import jwt
from fixtures.common_types import TenantId
if TYPE_CHECKING:
from fixtures.common_types import TenantId
@dataclass

View File

@@ -15,18 +15,20 @@ from typing import TYPE_CHECKING
import allure
import pytest
from _pytest.config import Config
from _pytest.config.argparsing import Parser
from _pytest.fixtures import FixtureRequest
from _pytest.terminal import TerminalReporter
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonPageserver
if TYPE_CHECKING:
from collections.abc import Callable, Iterator, Mapping
from _pytest.config import Config
from _pytest.config.argparsing import Parser
from _pytest.fixtures import FixtureRequest
from _pytest.terminal import TerminalReporter
from fixtures.common_types import TenantId, TimelineId
from fixtures.neon_fixtures import NeonPageserver
"""
This file contains fixtures for micro-benchmarks.

View File

@@ -11,7 +11,6 @@ from pathlib import Path
from typing import TYPE_CHECKING, final
import pytest
from _pytest.fixtures import FixtureRequest
from typing_extensions import override
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
@@ -24,11 +23,14 @@ from fixtures.neon_fixtures import (
VanillaPostgres,
wait_for_last_flush_lsn,
)
from fixtures.pg_stats import PgStatTable
if TYPE_CHECKING:
from collections.abc import Iterator
from _pytest.fixtures import FixtureRequest
from fixtures.pg_stats import PgStatTable
class PgCompare(ABC):
"""Common interface of all postgres implementations, useful for benchmarks.

View File

@@ -4,8 +4,6 @@ import concurrent.futures
from typing import TYPE_CHECKING
import pytest
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
from fixtures.common_types import TenantId
@@ -15,6 +13,9 @@ if TYPE_CHECKING:
from collections.abc import Callable
from typing import Any
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
class ComputeReconfigure:
def __init__(self, server: HTTPServer):

View File

@@ -5,6 +5,8 @@ import urllib.parse
import requests
from requests.adapters import HTTPAdapter
from fixtures.log_helper import log
class EndpointHttpClient(requests.Session):
def __init__(
@@ -51,6 +53,7 @@ class EndpointHttpClient(requests.Session):
def metrics(self) -> str:
res = self.get(f"http://localhost:{self.external_port}/metrics")
res.raise_for_status()
log.debug("raw compute metrics: %s", res.text)
return res.text
# Current compute status.

View File

@@ -147,7 +147,7 @@ def fast_import(
pg_distrib_dir,
pg_version,
workdir,
cleanup=not cast(bool, pytestconfig.getoption("--preserve-database-files")),
cleanup=not cast("bool", pytestconfig.getoption("--preserve-database-files")),
) as fi:
yield fi

View File

@@ -10,7 +10,6 @@ import asyncio
import collections
import io
import json
from collections.abc import AsyncIterable
from typing import TYPE_CHECKING, final
import pytest_asyncio
@@ -31,6 +30,7 @@ from h2.settings import SettingCodes
from typing_extensions import override
if TYPE_CHECKING:
from collections.abc import AsyncIterable
from typing import Any

View File

@@ -1,12 +1,15 @@
from __future__ import annotations
from collections import defaultdict
from typing import TYPE_CHECKING
from prometheus_client.parser import text_string_to_metric_families
from prometheus_client.samples import Sample
from fixtures.log_helper import log
if TYPE_CHECKING:
from prometheus_client.samples import Sample
class Metrics:
metrics: dict[str, list[Sample]]
@@ -168,7 +171,6 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
"pageserver_evictions_with_low_residence_duration_total",
"pageserver_aux_file_estimated_size",
"pageserver_valid_lsn_lease_count",
"pageserver_flush_wait_upload_seconds",
counter("pageserver_tenant_throttling_count_accounted_start"),
counter("pageserver_tenant_throttling_count_accounted_finish"),
counter("pageserver_tenant_throttling_wait_usecs_sum"),

View File

@@ -7,7 +7,6 @@ import subprocess
import tempfile
import textwrap
from itertools import chain, product
from pathlib import Path
from typing import TYPE_CHECKING, cast
import toml
@@ -15,14 +14,15 @@ import toml
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.pageserver.common_types import IndexPartDump
from fixtures.pg_version import PgVersion
if TYPE_CHECKING:
from pathlib import Path
from typing import (
Any,
cast,
)
from fixtures.pg_version import PgVersion
# Used to be an ABC. abc.ABC removed due to linter without name change.
class AbstractNeonCli:
@@ -36,7 +36,7 @@ class AbstractNeonCli:
self.extra_env = extra_env
self.binpath = binpath
COMMAND: str = cast(str, None) # To be overwritten by the derived class.
COMMAND: str = cast("str", None) # To be overwritten by the derived class.
def raw_cli(
self,

View File

@@ -14,14 +14,12 @@ import threading
import time
import uuid
from collections import defaultdict
from collections.abc import Iterable, Iterator
from contextlib import closing, contextmanager
from dataclasses import dataclass
from datetime import datetime
from enum import StrEnum
from functools import cached_property
from pathlib import Path
from types import TracebackType
from typing import TYPE_CHECKING, cast
from urllib.parse import quote, urlparse
@@ -34,19 +32,12 @@ import psycopg2.sql
import pytest
import requests
import toml
from _pytest.config import Config
from _pytest.config.argparsing import Parser
from _pytest.fixtures import FixtureRequest
from jwcrypto import jwk
from mypy_boto3_kms import KMSClient
from mypy_boto3_s3 import S3Client
# Type-related stuff
from psycopg2.extensions import connection as PgConnection
from psycopg2.extensions import cursor as PgCursor
from psycopg2.extensions import make_dsn, parse_dsn
from pytest_httpserver import HTTPServer
from urllib3.util.retry import Retry
from fixtures import overlayfs
from fixtures.auth_tokens import AuthKeys, TokenScope
@@ -60,7 +51,6 @@ from fixtures.common_types import (
)
from fixtures.compute_migrations import NUM_COMPUTE_MIGRATIONS
from fixtures.endpoint.http import EndpointHttpClient
from fixtures.h2server import H2Server
from fixtures.log_helper import log
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
from fixtures.neon_cli import NeonLocalCli, Pagectl
@@ -78,7 +68,6 @@ from fixtures.pageserver.utils import (
wait_for_last_record_lsn,
)
from fixtures.paths import get_test_repo_dir, shared_snapshot_dir
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import (
LocalFsStorage,
@@ -108,10 +97,21 @@ from fixtures.utils import (
from .neon_api import NeonAPI, NeonApiEndpoint
if TYPE_CHECKING:
from collections.abc import Callable
from collections.abc import Callable, Iterable, Iterator
from types import TracebackType
from typing import Any, Self, TypeVar
from _pytest.config import Config
from _pytest.config.argparsing import Parser
from _pytest.fixtures import FixtureRequest
from mypy_boto3_kms import KMSClient
from mypy_boto3_s3 import S3Client
from pytest_httpserver import HTTPServer
from urllib3.util.retry import Retry
from fixtures.h2server import H2Server
from fixtures.paths import SnapshotDirLocked
from fixtures.pg_version import PgVersion
T = TypeVar("T")
@@ -497,9 +497,9 @@ class NeonEnvBuilder:
else:
self.pageserver_wal_receiver_protocol = PageserverWalReceiverProtocol.INTERPRETED
assert test_name.startswith(
"test_"
), "Unexpectedly instantiated from outside a test function"
assert test_name.startswith("test_"), (
"Unexpectedly instantiated from outside a test function"
)
self.test_name = test_name
self.compatibility_neon_binpath = compatibility_neon_binpath
self.compatibility_pg_distrib_dir = compatibility_pg_distrib_dir
@@ -508,12 +508,12 @@ class NeonEnvBuilder:
self.mixdir = self.test_output_dir / "mixdir_neon"
if self.version_combination is not None:
assert (
self.compatibility_neon_binpath is not None
), "the environment variable COMPATIBILITY_NEON_BIN is required when using mixed versions"
assert (
self.compatibility_pg_distrib_dir is not None
), "the environment variable COMPATIBILITY_POSTGRES_DISTRIB_DIR is required when using mixed versions"
assert self.compatibility_neon_binpath is not None, (
"the environment variable COMPATIBILITY_NEON_BIN is required when using mixed versions"
)
assert self.compatibility_pg_distrib_dir is not None, (
"the environment variable COMPATIBILITY_POSTGRES_DISTRIB_DIR is required when using mixed versions"
)
self.mixdir.mkdir(mode=0o755, exist_ok=True)
self._mix_versions()
self.test_may_use_compatibility_snapshot_binaries = True
@@ -795,9 +795,9 @@ class NeonEnvBuilder:
work = ident_state_dir / "work"
assert upper.is_dir()
assert work.is_dir()
assert (
self.test_overlay_dir not in dst.parents
), "otherwise workdir cleanup below wouldn't work"
assert self.test_overlay_dir not in dst.parents, (
"otherwise workdir cleanup below wouldn't work"
)
# find index, still not mutating state
idxmap = {
existing_ident: idx
@@ -863,9 +863,9 @@ class NeonEnvBuilder:
self.pageserver_remote_storage = ret
def enable_safekeeper_remote_storage(self, kind: RemoteStorageKind):
assert (
self.safekeepers_remote_storage is None
), "safekeepers_remote_storage already configured"
assert self.safekeepers_remote_storage is None, (
"safekeepers_remote_storage already configured"
)
self.safekeepers_remote_storage = self._configure_and_create_remote_storage(
kind, RemoteStorageUser.SAFEKEEPER
@@ -1421,9 +1421,9 @@ class NeonEnv:
assert that there is only one. Tests with multiple pageservers should always use
get_pageserver with an explicit ID.
"""
assert (
len(self.pageservers) == 1
), "env.pageserver must only be used with single pageserver NeonEnv"
assert len(self.pageservers) == 1, (
"env.pageserver must only be used with single pageserver NeonEnv"
)
return self.pageservers[0]
def get_pageserver(self, id: int | None) -> NeonPageserver:
@@ -1614,7 +1614,7 @@ def neon_simple_env(
compatibility_pg_distrib_dir=compatibility_pg_distrib_dir,
pg_version=pg_version,
run_id=run_id,
preserve_database_files=cast(bool, pytestconfig.getoption("--preserve-database-files")),
preserve_database_files=cast("bool", pytestconfig.getoption("--preserve-database-files")),
test_name=request.node.name,
test_output_dir=test_output_dir,
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
@@ -1683,7 +1683,7 @@ def neon_env_builder(
combination=combination,
pg_version=pg_version,
run_id=run_id,
preserve_database_files=cast(bool, pytestconfig.getoption("--preserve-database-files")),
preserve_database_files=cast("bool", pytestconfig.getoption("--preserve-database-files")),
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
test_name=request.node.name,
test_output_dir=test_output_dir,
@@ -3577,9 +3577,9 @@ class NeonProxy(PgProtocol):
@backoff.on_exception(backoff.expo, requests.exceptions.RequestException, max_time=10)
def _wait_until_ready(self):
assert (
self._popen and self._popen.poll() is None
), "Proxy exited unexpectedly. Check test log."
assert self._popen and self._popen.poll() is None, (
"Proxy exited unexpectedly. Check test log."
)
requests.get(f"http://{self.host}:{self.http_port}/v1/status")
def http_query(self, query, args, **kwargs):
@@ -3787,9 +3787,9 @@ class NeonAuthBroker:
@backoff.on_exception(backoff.expo, requests.exceptions.RequestException, max_time=10)
def _wait_until_ready(self):
assert (
self._popen and self._popen.poll() is None
), "Proxy exited unexpectedly. Check test log."
assert self._popen and self._popen.poll() is None, (
"Proxy exited unexpectedly. Check test log."
)
requests.get(f"http://{self.host}:{self.http_port}/v1/status")
async def query(self, query, args, **kwargs):
@@ -4069,9 +4069,9 @@ class Endpoint(PgProtocol, LogUtils):
m = re.search(r"=\s*(\S+)", line)
assert m is not None, f"malformed config line {line}"
size = m.group(1)
assert size_to_bytes(size) >= size_to_bytes(
"1MB"
), "LFC size cannot be set less than 1MB"
assert size_to_bytes(size) >= size_to_bytes("1MB"), (
"LFC size cannot be set less than 1MB"
)
lfc_path_escaped = str(lfc_path).replace("'", "''")
config_lines = [
f"neon.file_cache_path = '{lfc_path_escaped}'",
@@ -4082,12 +4082,12 @@ class Endpoint(PgProtocol, LogUtils):
] + config_lines
else:
for line in config_lines:
assert (
line.find("neon.max_file_cache_size") == -1
), "Setting LFC parameters is not allowed when LFC is disabled"
assert (
line.find("neon.file_cache_size_limit") == -1
), "Setting LFC parameters is not allowed when LFC is disabled"
assert line.find("neon.max_file_cache_size") == -1, (
"Setting LFC parameters is not allowed when LFC is disabled"
)
assert line.find("neon.file_cache_size_limit") == -1, (
"Setting LFC parameters is not allowed when LFC is disabled"
)
self.config(config_lines)
@@ -4209,7 +4209,7 @@ class Endpoint(PgProtocol, LogUtils):
# Write it back updated
with open(config_path, "w") as file:
log.info(json.dumps(dict(data_dict, **kwargs)))
log.debug(json.dumps(dict(data_dict, **kwargs)))
json.dump(dict(data_dict, **kwargs), file, indent=4)
def respec_deep(self, **kwargs: Any) -> None:
@@ -4226,7 +4226,7 @@ class Endpoint(PgProtocol, LogUtils):
with open(config_path) as f:
data_dict: dict[str, Any] = json.load(f)
log.info("Current compute spec: %s", json.dumps(data_dict, indent=4))
log.debug("Current compute spec: %s", json.dumps(data_dict, indent=4))
for key, value in kwargs.items():
if isinstance(value, dict):
@@ -4238,7 +4238,7 @@ class Endpoint(PgProtocol, LogUtils):
data_dict[key] = value
with open(config_path, "w") as file:
log.info("Updating compute spec to: %s", json.dumps(data_dict, indent=4))
log.debug("Updating compute spec to: %s", json.dumps(data_dict, indent=4))
json.dump(data_dict, file, indent=4)
def wait_for_migrations(self, wait_for: int = NUM_COMPUTE_MIGRATIONS) -> None:
@@ -4925,9 +4925,9 @@ class StorageScrubber:
healthy = False
else:
for _, warnings in with_warnings.items():
assert (
len(warnings) > 0
), "with_warnings value should not be empty, running without verbose mode?"
assert len(warnings) > 0, (
"with_warnings value should not be empty, running without verbose mode?"
)
if not self._check_line_list_allowed(warnings):
healthy = False
break
@@ -4941,9 +4941,9 @@ class StorageScrubber:
healthy = False
else:
for _, errors in with_errors.items():
assert (
len(errors) > 0
), "with_errors value should not be empty, running without verbose mode?"
assert len(errors) > 0, (
"with_errors value should not be empty, running without verbose mode?"
)
if not self._check_line_list_allowed(errors):
healthy = False
break

View File

@@ -5,7 +5,10 @@ from __future__ import annotations
import argparse
import re
import sys
from collections.abc import Iterable
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Iterable
def scan_pageserver_log_for_errors(

View File

@@ -7,8 +7,7 @@ import string
import time
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime
from typing import Any
from typing import TYPE_CHECKING, Any
import requests
from requests.adapters import HTTPAdapter
@@ -26,6 +25,9 @@ from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
from fixtures.pg_version import PgVersion
from fixtures.utils import EnhancedJSONEncoder, Fn
if TYPE_CHECKING:
from datetime import datetime
class PageserverApiException(Exception):
def __init__(self, message, status_code: int):

View File

@@ -4,18 +4,19 @@ import concurrent.futures
from typing import TYPE_CHECKING
import fixtures.pageserver.remote_storage
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
)
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
if TYPE_CHECKING:
from collections.abc import Callable
from typing import Any
from fixtures.common_types import TenantId, TimelineId
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
)
def single_timeline(
neon_env_builder: NeonEnvBuilder,

View File

@@ -5,11 +5,9 @@ import os
import queue
import shutil
import threading
from pathlib import Path
from typing import TYPE_CHECKING
from fixtures.common_types import TenantId, TimelineId
from fixtures.neon_fixtures import NeonEnv
from fixtures.pageserver.common_types import (
InvalidFileName,
parse_layer_file_name,
@@ -17,8 +15,11 @@ from fixtures.pageserver.common_types import (
from fixtures.remote_storage import LocalFsStorage
if TYPE_CHECKING:
from pathlib import Path
from typing import Any
from fixtures.neon_fixtures import NeonEnv
def duplicate_one_tenant(env: NeonEnv, template_tenant: TenantId, new_tenant: TenantId):
remote_storage = env.pageserver_remote_storage

View File

@@ -3,13 +3,6 @@ from __future__ import annotations
import time
from typing import TYPE_CHECKING
from mypy_boto3_s3.type_defs import (
DeleteObjectOutputTypeDef,
EmptyResponseMetadataTypeDef,
ListObjectsV2OutputTypeDef,
ObjectTypeDef,
)
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.log_helper import log
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
@@ -19,6 +12,13 @@ from fixtures.utils import wait_until
if TYPE_CHECKING:
from typing import Any
from mypy_boto3_s3.type_defs import (
DeleteObjectOutputTypeDef,
EmptyResponseMetadataTypeDef,
ListObjectsV2OutputTypeDef,
ObjectTypeDef,
)
def assert_tenant_state(
pageserver_http: PageserverHttpClient,
@@ -241,9 +241,9 @@ def wait_for_upload_queue_empty(
found = False
for f in finished:
if all([s.labels[label] == f.labels[label] for label in remaining_labels]):
assert (
not found
), "duplicate match, remaining_labels don't uniquely identify sample"
assert not found, (
"duplicate match, remaining_labels don't uniquely identify sample"
)
tl.append((s.labels, int(s.value) - int(f.value)))
found = True
if not found:

View File

@@ -6,13 +6,14 @@ from typing import TYPE_CHECKING
import allure
import pytest
import toml
from _pytest.python import Metafunc
from fixtures.pg_version import PgVersion
if TYPE_CHECKING:
from typing import Any
from _pytest.python import Metafunc
"""
Dynamically parametrize tests by different parameters

View File

@@ -6,7 +6,6 @@ import subprocess
import threading
from fcntl import LOCK_EX, LOCK_UN, flock
from pathlib import Path
from types import TracebackType
from typing import TYPE_CHECKING
import pytest
@@ -18,6 +17,7 @@ from fixtures.utils import allure_attach_from_dir
if TYPE_CHECKING:
from collections.abc import Iterator
from types import TracebackType
BASE_DIR = Path(__file__).parents[2]
@@ -101,9 +101,9 @@ def compatibility_snapshot_dir() -> Iterator[Path]:
if os.getenv("REMOTE_ENV"):
return
compatibility_snapshot_dir_env = os.environ.get("COMPATIBILITY_SNAPSHOT_DIR")
assert (
compatibility_snapshot_dir_env is not None
), "COMPATIBILITY_SNAPSHOT_DIR is not set. It should be set to `compatibility_snapshot_pg(PG_VERSION)` path generateted by test_create_snapshot (ideally generated by the previous version of Neon)"
assert compatibility_snapshot_dir_env is not None, (
"COMPATIBILITY_SNAPSHOT_DIR is not set. It should be set to `compatibility_snapshot_pg(PG_VERSION)` path generateted by test_create_snapshot (ideally generated by the previous version of Neon)"
)
compatibility_snapshot_dir = Path(compatibility_snapshot_dir_env).resolve()
yield compatibility_snapshot_dir

View File

@@ -7,22 +7,24 @@ import os
import re
from dataclasses import dataclass
from enum import StrEnum
from pathlib import Path
from typing import TYPE_CHECKING
import boto3
import toml
from moto.server import ThreadedMotoServer
from mypy_boto3_s3 import S3Client
from typing_extensions import override
from fixtures.common_types import TenantId, TenantShardId, TimelineId
from fixtures.log_helper import log
from fixtures.pageserver.common_types import IndexPartDump
if TYPE_CHECKING:
from pathlib import Path
from typing import Any
from mypy_boto3_s3 import S3Client
from fixtures.common_types import TenantId, TenantShardId, TimelineId
TIMELINE_INDEX_PART_FILE_NAME = "index_part.json"
TENANT_HEATMAP_FILE_NAME = "heatmap-v1.json"
@@ -448,9 +450,9 @@ class RemoteStorageKind(StrEnum):
env_secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")
env_access_token = os.getenv("AWS_SESSION_TOKEN")
env_profile = os.getenv("AWS_PROFILE")
assert (
env_access_key and env_secret_key and env_access_token
) or env_profile, "need to specify either access key and secret access key or profile"
assert (env_access_key and env_secret_key and env_access_token) or env_profile, (
"need to specify either access key and secret access key or profile"
)
bucket_name = bucket_name or os.getenv("REMOTE_STORAGE_S3_BUCKET")
assert bucket_name is not None, "no remote storage bucket name provided"

View File

@@ -3,12 +3,11 @@ from __future__ import annotations
from collections.abc import MutableMapping
from typing import TYPE_CHECKING, cast
import pytest
if TYPE_CHECKING:
from collections.abc import MutableMapping
from typing import Any
import pytest
from _pytest.config import Config

View File

@@ -1,10 +1,14 @@
from __future__ import annotations
from fixtures.common_types import TenantId, TimelineId
from typing import TYPE_CHECKING
from fixtures.log_helper import log
from fixtures.safekeeper.http import SafekeeperHttpClient
from fixtures.utils import wait_until
if TYPE_CHECKING:
from fixtures.common_types import TenantId, TimelineId
from fixtures.safekeeper.http import SafekeeperHttpClient
def wait_walreceivers_absent(
sk_http_cli: SafekeeperHttpClient, tenant_id: TenantId, timeline_id: TimelineId

View File

@@ -3,12 +3,13 @@ from __future__ import annotations
from typing import TYPE_CHECKING
import pytest
from _pytest.config import Config
from _pytest.config.argparsing import Parser
if TYPE_CHECKING:
from typing import Any
from _pytest.config import Config
from _pytest.config.argparsing import Parser
"""
This plugin allows tests to be marked as slow using pytest.mark.slow. By default slow

View File

@@ -5,9 +5,7 @@ from typing import TYPE_CHECKING
import pytest
import requests
from pytest_httpserver import HTTPServer
from werkzeug.datastructures import Headers
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
from fixtures.log_helper import log
@@ -15,6 +13,9 @@ from fixtures.log_helper import log
if TYPE_CHECKING:
from typing import Any
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
class StorageControllerProxy:
def __init__(self, server: HTTPServer):

View File

@@ -19,7 +19,6 @@ from urllib.parse import urlencode
import allure
import pytest
import zstandard
from psycopg2.extensions import cursor
from typing_extensions import override
from fixtures.common_types import Id, Lsn
@@ -34,6 +33,8 @@ if TYPE_CHECKING:
from collections.abc import Iterable
from typing import IO
from psycopg2.extensions import cursor
from fixtures.common_types import TimelineId
from fixtures.neon_fixtures import PgBin
@@ -512,7 +513,9 @@ def assert_no_errors(log_file: Path, service: str, allowed_errors: list[str]):
for _lineno, error in errors:
log.info(f"not allowed {service} error: {error.strip()}")
assert not errors, f"First log error on {service}: {errors[0]}\nHint: use scripts/check_allowed_errors.sh to test any new allowed_error you add"
assert not errors, (
f"First log error on {service}: {errors[0]}\nHint: use scripts/check_allowed_errors.sh to test any new allowed_error you add"
)
def assert_pageserver_backups_equal(left: Path, right: Path, skip_files: set[str]):
@@ -550,18 +553,18 @@ def assert_pageserver_backups_equal(left: Path, right: Path, skip_files: set[str
left_list, right_list = map(build_hash_list, [left, right])
assert len(left_list) == len(
right_list
), f"unexpected number of files on tar files, {len(left_list)} != {len(right_list)}"
assert len(left_list) == len(right_list), (
f"unexpected number of files on tar files, {len(left_list)} != {len(right_list)}"
)
mismatching: set[str] = set()
for left_tuple, right_tuple in zip(left_list, right_list, strict=False):
left_path, left_hash = left_tuple
right_path, right_hash = right_tuple
assert (
left_path == right_path
), f"file count matched, expected these to be same paths: {left_path}, {right_path}"
assert left_path == right_path, (
f"file count matched, expected these to be same paths: {left_path}, {right_path}"
)
if left_hash != right_hash:
mismatching.add(left_path)
@@ -721,3 +724,20 @@ def skip_on_ci(reason: str):
os.getenv("CI", "false") == "true",
reason=reason,
)
def shared_buffers_for_max_cu(max_cu: float) -> str:
"""
Returns the string value of shared_buffers for the given max CU.
Use shared_buffers size like in production for max CU compute.
See https://github.com/neondatabase/cloud/blob/877e33b4289a471b8f0a35c84009846358f3e5a3/goapp/controlplane/internal/pkg/compute/computespec/pg_settings.go#L405
e.g. // 2 CU: 225mb; 4 CU: 450mb; 8 CU: 900mb
"""
ramBytes = int(4096 * max_cu * 1024 * 1024)
maxConnections = max(100, min(int(ramBytes / 9531392), 4000))
maxWorkerProcesses = 12 + int(max_cu * 2)
maxBackends = 1 + maxConnections + maxWorkerProcesses
sharedBuffersMb = int(max(128, (1023 + maxBackends * 256) / 1024))
sharedBuffers = int(sharedBuffersMb * 1024 / 8)
return str(sharedBuffers)

View File

@@ -3,7 +3,6 @@ from __future__ import annotations
import threading
from typing import TYPE_CHECKING
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
@@ -17,6 +16,8 @@ from fixtures.pageserver.utils import wait_for_last_record_lsn
if TYPE_CHECKING:
from typing import Any
from fixtures.common_types import TenantId, TimelineId
# neon_local doesn't handle creating/modifying endpoints concurrently, so we use a mutex
# to ensure we don't do that: this enables running lots of Workloads in parallel safely.
ENDPOINT_LOCK = threading.Lock()

View File

@@ -7,14 +7,17 @@ from __future__ import annotations
import hashlib
import os
import time
from typing import TYPE_CHECKING
import clickhouse_connect
import psycopg2
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import RemotePostgres
from fixtures.utils import wait_until
if TYPE_CHECKING:
from fixtures.neon_fixtures import RemotePostgres
def query_clickhouse(
client,

View File

@@ -7,14 +7,17 @@ from __future__ import annotations
import json
import os
import time
from typing import TYPE_CHECKING
import psycopg2
import pytest
import requests
from fixtures.log_helper import log
from fixtures.neon_fixtures import RemotePostgres
from fixtures.utils import wait_until
if TYPE_CHECKING:
from fixtures.neon_fixtures import RemotePostgres
class DebeziumAPI:
"""

View File

@@ -7,18 +7,19 @@ from __future__ import annotations
from typing import TYPE_CHECKING
import fixtures.pageserver.many_tenants as many_tenants
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
)
from fixtures.pageserver.utils import wait_until_all_tenants_state
if TYPE_CHECKING:
from collections.abc import Callable
from typing import Any
from fixtures.common_types import TenantId, TimelineId
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
)
def ensure_pageserver_ready_for_benchmarking(env: NeonEnv, n_tenants: int):
"""

View File

@@ -7,16 +7,19 @@ import threading
import time
import timeit
from contextlib import closing
from typing import TYPE_CHECKING
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.common_types import Lsn
from fixtures.compare_fixtures import NeonCompare
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonPageserver
from fixtures.pageserver.utils import wait_for_last_record_lsn
from fixtures.utils import wait_until
from prometheus_client.samples import Sample
if TYPE_CHECKING:
from fixtures.compare_fixtures import NeonCompare
from fixtures.neon_fixtures import NeonPageserver
from prometheus_client.samples import Sample
def _record_branch_creation_durations(neon_compare: NeonCompare, durs: list[float]):
@@ -45,9 +48,9 @@ def test_branch_creation_heavy_write(neon_compare: NeonCompare, n_branches: int)
tenant, _ = env.create_tenant(
conf={
"gc_period": "5 s",
"gc_horizon": f"{4 * 1024 ** 2}",
"checkpoint_distance": f"{2 * 1024 ** 2}",
"compaction_target_size": f"{1024 ** 2}",
"gc_horizon": f"{4 * 1024**2}",
"checkpoint_distance": f"{2 * 1024**2}",
"compaction_target_size": f"{1024**2}",
"compaction_threshold": "2",
# set PITR interval to be small, so we can do GC
"pitr_interval": "5 s",
@@ -82,10 +85,10 @@ def test_branch_creation_heavy_write(neon_compare: NeonCompare, n_branches: int)
env.create_branch(f"b{i + 1}", ancestor_branch_name=f"b{p}", tenant_id=tenant)
dur = timeit.default_timer() - timer
log.info(f"Creating branch b{i+1} took {dur}s")
log.info(f"Creating branch b{i + 1} took {dur}s")
branch_creation_durations.append(dur)
threads.append(threading.Thread(target=run_pgbench, args=(f"b{i+1}",), daemon=True))
threads.append(threading.Thread(target=run_pgbench, args=(f"b{i + 1}",), daemon=True))
threads[-1].start()
for thread in threads:

View File

@@ -2,13 +2,16 @@ from __future__ import annotations
import timeit
from pathlib import Path
from typing import TYPE_CHECKING
from fixtures.benchmark_fixture import PgBenchRunResult
from fixtures.compare_fixtures import NeonCompare
from fixtures.neon_fixtures import fork_at_current_lsn
from performance.test_perf_pgbench import utc_now_timestamp
if TYPE_CHECKING:
from fixtures.compare_fixtures import NeonCompare
# -----------------------------------------------------------------------
# Start of `test_compare_child_and_root_*` tests
# -----------------------------------------------------------------------

View File

@@ -1,10 +1,13 @@
from __future__ import annotations
import timeit
from typing import TYPE_CHECKING
import pytest
from fixtures.benchmark_fixture import MetricReport
from fixtures.neon_fixtures import NeonEnvBuilder
if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnvBuilder
# Run bulk tenant creation test.
#

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder, wait_for_last_flush_lsn
from fixtures.utils import shared_buffers_for_max_cu
#
@@ -20,7 +21,10 @@ def test_bulk_update(neon_env_builder: NeonEnvBuilder, zenbenchmark, fillfactor)
timeline_id = env.create_branch("test_bulk_update")
tenant_id = env.initial_tenant
endpoint = env.endpoints.create_start("test_bulk_update")
# use shared_buffers size like in production for 8 CU compute
endpoint = env.endpoints.create_start(
"test_bulk_update", config_lines=[f"shared_buffers={shared_buffers_for_max_cu(8.0)}"]
)
cur = endpoint.connect().cursor()
cur.execute("set statement_timeout=0")

View File

@@ -1,12 +1,15 @@
from __future__ import annotations
from contextlib import closing
from typing import TYPE_CHECKING
import pytest
from fixtures.compare_fixtures import NeonCompare
from fixtures.log_helper import log
from fixtures.neon_fixtures import wait_for_last_flush_lsn
if TYPE_CHECKING:
from fixtures.compare_fixtures import NeonCompare
#
# Test compaction and image layer creation performance.

View File

@@ -3,13 +3,16 @@ from __future__ import annotations
import os
import threading
import time
from typing import TYPE_CHECKING
import pytest
from fixtures.compare_fixtures import PgCompare
from fixtures.pg_stats import PgStatTable
from performance.test_perf_pgbench import get_durations_matrix, get_scales_matrix
if TYPE_CHECKING:
from fixtures.compare_fixtures import PgCompare
from fixtures.pg_stats import PgStatTable
def get_seeds_matrix(default: int = 100):
seeds = os.getenv("TEST_PG_BENCH_SEEDS_MATRIX", default=str(default))

View File

@@ -1,10 +1,13 @@
from __future__ import annotations
import datetime
from typing import TYPE_CHECKING
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.neon_fixtures import NeonEnv
if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnv
@pytest.mark.timeout(120)

View File

@@ -1,9 +1,13 @@
from __future__ import annotations
from typing import TYPE_CHECKING
import pytest
import requests
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
# Just start and measure duration.

View File

@@ -2,11 +2,13 @@ from __future__ import annotations
from contextlib import closing
from io import BufferedReader, RawIOBase
from typing import final
from typing import TYPE_CHECKING, final
from fixtures.compare_fixtures import PgCompare
from typing_extensions import override
if TYPE_CHECKING:
from fixtures.compare_fixtures import PgCompare
@final
class CopyTestData(RawIOBase):

Some files were not shown because too many files have changed in this diff Show More