Compare commits

..

22 Commits

Author SHA1 Message Date
Heikki Linnakangas
8b9b395be4 temporary debugging of test_pgdata_import_smoke 2025-01-29 13:48:21 +02:00
alexanderlaw
4d2328ebe3 Fix C code to satisfy sanitizers (#10473) 2025-01-29 10:05:43 +00:00
a-masterov
9f81828429 Test extension upgrade compatibility (#10244)
## Problem
We have to test the extensions, shipped with Neon for compatibility
before the upgrade.
## Summary of changes
Added the test for compatibility with the upgraded extensions.
2025-01-29 09:19:11 +00:00
Arseny Sher
9ab13d6e2c Log statements in test_layer_map (#10554)
## Problem

test_layer_map doesn't log statements and it is not clear how long they
take.

## Summary of changes

Do log them.

ref https://github.com/neondatabase/neon/issues/10409
2025-01-29 09:16:00 +00:00
Alex Chi Z.
983e18e63e feat(pageserver): add compaction_upper_limit config (#10550)
## Problem

Follow-up of the incident, we should not use the same bound on
lower/upper limit of compaction files. This patch adds an upper bound
limit, which is set to 50 for now.

## Summary of changes

Add `compaction_upper_limit`.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: Christian Schwarz <christian@neon.tech>
2025-01-28 23:18:32 +00:00
Alex Chi Z.
b735df6ff0 fix(pageserver): make image layer generation atomic (#10516)
## Problem

close https://github.com/neondatabase/neon/issues/8362

## Summary of changes

Use `BatchLayerWriter` to ensure we clean up image layers after failed
compaction.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-01-28 21:29:51 +00:00
Fedor Dikarev
68cf0ba439 run benchmark tests on small-metal runners (#10549)
## Problem
Ref: https://github.com/neondatabase/cloud/issues/23314

We suspect some inconsistency in Benchmark tests runs could be due to
different type of runners they are landed in.
To have that aligned in both terms: failure rates and benchmark results,
lets run them for now on `small-metal` servers and see the progress for
the tests stability.
 
## Summary of changes
2025-01-28 21:26:38 +00:00
Alexey Kondratov
d04d924649 feat(compute): Add some basic compute_ctl metrics (#10504)
## Problem

There are several parts of `compute_ctl` with a very low visibility of
errors:
1. DB migrations that run async in the background after compute start.
2. Requests made to control plane (currently only `GetSpec`).
3. Requests made to the remote extensions server.

## Summary of changes

Add new counters to quickly evaluate the amount of errors among the
fleet.

Part of neondatabase/cloud#17590
2025-01-28 19:24:07 +00:00
JC Grünhage
f5fdaa6dc6 feat(ci): generate basic release notes with links (#10511)
## Problem
https://github.com/neondatabase/neon/pull/10448 removed release notes,
because if their generation failed, the whole release was failing.
People liked them though, and wanted some basic release notes as a
fall-back instead of completely removing them.

## Summary of changes
Include basic release notes that link to the release PR and to a diff to
the previous release.
2025-01-28 19:13:39 +00:00
Vlad Lazar
c54cd9e76a storcon: signal LSN wait to pageserver during live migration (#10452)
## Problem

We've seen the ingest connection manager get stuck shortly after a
migration.

## Summary of changes

A speculative mitigation is to use the same mechanism as get page
requests for kicking LSN ingest. The connection manager monitors
LSN waits and queries the broker if no updates are received for the
timeline.

Closes https://github.com/neondatabase/neon/issues/10351
2025-01-28 17:33:07 +00:00
Erik Grinaker
1010b8add4 pageserver: add l0_flush_wait_upload setting (#10534)
## Problem

We need a setting to disable the flush upload wait, to test L0 flush
backpressure in staging.

## Summary of changes

Add `l0_flush_wait_upload` setting.
2025-01-28 17:21:05 +00:00
Folke Behrens
ae4b2af299 fix(proxy): Use correct identifier for usage metrics upload (#10538)
## Problem

The request data and usage metrics S3 requests use the same identifier
shown in logs, causing confusion about what type of upload failed.

## Summary of changes

Use the correct identifier for usage metrics uploads.

neondatabase/cloud#23084
2025-01-28 17:08:17 +00:00
Tristan Partin
15fecb8474 Update axum to 0.8.1 (#10332)
Only a few things that needed updating:

- async_trait was removed
- Message::Text takes a Utf8Bytes object instead of a String

Signed-off-by: Tristan Partin <tristan@neon.tech>
Co-authored-by: Conrad Ludgate <connor@neon.tech>
2025-01-28 15:32:59 +00:00
Erik Grinaker
47677ba578 pageserver: disable L0 backpressure by default (#10535)
## Problem

We'll need further improvements to compaction before enabling L0 flush
backpressure by default. See:
https://neondb.slack.com/archives/C033RQ5SPDH/p1738066068960519?thread_ts=1737818888.474179&cid=C033RQ5SPDH.

Touches #5415.

## Summary of changes

Disable `l0_flush_delay_threshold` by default.
2025-01-28 14:51:30 +00:00
Arpad Müller
83b6bfa229 Re-download layer if its local and on-disk metadata diverge (#10529)
In #10308, we noticed many warnings about the local layer having
different sizes on-disk compared to the metadata.

However, the layer downloader would never redownload layer files if the
sizes or generation numbers change. This is obviously a bug, which we
aim to fix with this PR.

This change also moves the code deciding what to do about a layer to a
dedicated function: before we handled the "routing" via control flow,
but now it's become too complicated and it is nicer to have the
different verdicts for a layer spelled out in a list/match.
2025-01-28 13:39:53 +00:00
Erik Grinaker
ed942b05f7 Revert "pageserver: revert flush backpressure" (#10402)" (#10533)
This reverts commit 9e55d79803.

We'll still need this until we can tune L0 flush backpressure and
compaction. I'll add a setting to disable this separately.
2025-01-28 13:33:58 +00:00
Vlad Lazar
62a717a2ca pageserver: use PS node id for SK appname (#10522)
## Problem

This one is fairly embarrassing. Safekeeper node id was used in the
pageserver application name
when connecting to safekeepers.

## Summary of changes

Use the right node id.

Closes https://github.com/neondatabase/neon/issues/10461
2025-01-28 13:11:51 +00:00
Peter Bendel
c8fbbb9b65 Test ingest_benchmark with different stripe size and also PostgreSQL version 17 (#10510)
We want to verify if pageserver stripe size has an impact on ingest
performance.
We want to verify if ingest performance has improved or regressed with
postgres version 17.

## Summary of changes

- Allow to create new project with different postgres versions
- allow to pre-shard new project with different stripe sizes instead of
relying on storage manager to shard_split the project once a threshold
is exceeded

Replaces https://github.com/neondatabase/neon/pull/10509

Test run https://github.com/neondatabase/neon/actions/runs/12986410381
2025-01-27 21:06:05 +00:00
John Spray
d73f4a6470 pageserver: retry wrapper on manifest upload (#10524)
## Problem

On remote storage errors (e.g. I/O timeout) uploading tenant manifest,
all of compaction could fail. This is a problem IRL because we shouldn't
abort compaction on a single IO error, and in tests because it generates
spurious failures.

Related:
https://github.com/orgs/neondatabase/projects/51/views/2?sliceBy%5Bvalue%5D=jcsp&pane=issue&itemId=93692919&issue=neondatabase%7Cneon%7C10389

## Summary of changes

- Use `backoff::retry` when uploading tenant manifest
2025-01-27 21:02:25 +00:00
Heikki Linnakangas
5477d7db93 fast_import: fixes for Postgres v17 (#10414)
Now that the tests are run on v17, they're also run in debug mode, which
is slow. Increase statement_timeout in the test to work around that.
2025-01-27 19:47:49 +00:00
Arpad Müller
eb9832d846 Remove PQ_LIB_DIR env var (#10526)
We now don't need libpq any more for the build of the storage
controller, as we use `diesel-async` since #10280. Therefore, we remove
the env var that gave cargo/rustc the location for libpq.

Follow-up of #10280
2025-01-27 19:38:18 +00:00
Christian Schwarz
3d36dfe533 fix: noisy broker subscription failed error during storage broker deploys (#10521)
During broker deploys, pageservers log this noisy WARN en masse.

I can trivially reproduce the WARN message in neon_local by SIGKILLing
broker during e.g. `pgbench -i`.

I don't understand why tonic is not detecting the error as
`Code::Unavailable`.

Until we find time to understand that / fix upstream, this PR adds the
error message to the existing list of known error messages that get
demoted to INFO level.

Refs:
-  refs https://github.com/neondatabase/neon/issues/9562
2025-01-27 19:19:55 +00:00
86 changed files with 1560 additions and 431 deletions

View File

@@ -4,6 +4,7 @@ self-hosted-runner:
- large
- large-arm64
- small
- small-metal
- small-arm64
- us-east-2
config-variables:

View File

@@ -17,6 +17,31 @@ inputs:
compute_units:
description: '[Min, Max] compute units'
default: '[1, 1]'
# settings below only needed if you want the project to be sharded from the beginning
shard_split_project:
description: 'by default new projects are not shard-split, specify true to shard-split'
required: false
default: 'false'
admin_api_key:
description: 'Admin API Key needed for shard-splitting. Must be specified if shard_split_project is true'
required: false
shard_count:
description: 'Number of shards to split the project into, only applies if shard_split_project is true'
required: false
default: '8'
stripe_size:
description: 'Stripe size, optional, in 8kiB pages. e.g. set 2048 for 16MB stripes. Default is 128 MiB, only applies if shard_split_project is true'
required: false
default: '32768'
psql_path:
description: 'Path to psql binary - it is caller responsibility to provision the psql binary'
required: false
default: '/tmp/neon/pg_install/v16/bin/psql'
libpq_lib_path:
description: 'Path to directory containing libpq library - it is caller responsibility to provision the libpq library'
required: false
default: '/tmp/neon/pg_install/v16/lib'
outputs:
dsn:
@@ -63,6 +88,23 @@ runs:
echo "project_id=${project_id}" >> $GITHUB_OUTPUT
echo "Project ${project_id} has been created"
if [ "${SHARD_SPLIT_PROJECT}" = "true" ]; then
# determine tenant ID
TENANT_ID=`${PSQL} ${dsn} -t -A -c "SHOW neon.tenant_id"`
echo "Splitting project ${project_id} with tenant_id ${TENANT_ID} into $((SHARD_COUNT)) shards with stripe size $((STRIPE_SIZE))"
echo "Sending PUT request to https://${API_HOST}/regions/${REGION_ID}/api/v1/admin/storage/proxy/control/v1/tenant/${TENANT_ID}/shard_split"
echo "with body {\"new_shard_count\": $((SHARD_COUNT)), \"new_stripe_size\": $((STRIPE_SIZE))}"
# we need an ADMIN API KEY to invoke storage controller API for shard splitting (bash -u above checks that the variable is set)
curl -X PUT \
"https://${API_HOST}/regions/${REGION_ID}/api/v1/admin/storage/proxy/control/v1/tenant/${TENANT_ID}/shard_split" \
-H "Accept: application/json" -H "Content-Type: application/json" -H "Authorization: Bearer ${ADMIN_API_KEY}" \
-d "{\"new_shard_count\": $SHARD_COUNT, \"new_stripe_size\": $STRIPE_SIZE}"
fi
env:
API_HOST: ${{ inputs.api_host }}
API_KEY: ${{ inputs.api_key }}
@@ -70,3 +112,9 @@ runs:
POSTGRES_VERSION: ${{ inputs.postgres_version }}
MIN_CU: ${{ fromJSON(inputs.compute_units)[0] }}
MAX_CU: ${{ fromJSON(inputs.compute_units)[1] }}
SHARD_SPLIT_PROJECT: ${{ inputs.shard_split_project }}
ADMIN_API_KEY: ${{ inputs.admin_api_key }}
SHARD_COUNT: ${{ inputs.shard_count }}
STRIPE_SIZE: ${{ inputs.stripe_size }}
PSQL: ${{ inputs.psql_path }}
LD_LIBRARY_PATH: ${{ inputs.libpq_lib_path }}

View File

@@ -158,8 +158,6 @@ jobs:
- name: Run cargo build
run: |
PQ_LIB_DIR=$(pwd)/pg_install/v16/lib
export PQ_LIB_DIR
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests
# Do install *before* running rust tests because they might recompile the
@@ -217,8 +215,6 @@ jobs:
env:
NEXTEST_RETRIES: 3
run: |
PQ_LIB_DIR=$(pwd)/pg_install/v16/lib
export PQ_LIB_DIR
LD_LIBRARY_PATH=$(pwd)/pg_install/v17/lib
export LD_LIBRARY_PATH

View File

@@ -235,7 +235,7 @@ jobs:
echo 'CPPFLAGS=-I/usr/local/opt/openssl@3/include' >> $GITHUB_ENV
- name: Run cargo build (only for v17)
run: PQ_LIB_DIR=$(pwd)/pg_install/v17/lib cargo build --all --release -j$(sysctl -n hw.ncpu)
run: cargo build --all --release -j$(sysctl -n hw.ncpu)
- name: Check that no warnings are produced (only for v17)
run: ./run_clippy.sh

View File

@@ -242,7 +242,7 @@ jobs:
statuses: write
contents: write
pull-requests: write
runs-on: [ self-hosted, small ]
runs-on: [ self-hosted, small-metal ]
container:
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
credentials:
@@ -786,6 +786,17 @@ jobs:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
- name: Get the last compute release tag
id: get-last-compute-release-tag
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
run: |
tag=$(gh api -q '[.[].tag_name | select(startswith("release-compute"))][0]'\
-H "Accept: application/vnd.github+json" \
-H "X-GitHub-Api-Version: 2022-11-28" \
"/repos/${{ github.repository }}/releases")
echo tag=${tag} >> ${GITHUB_OUTPUT}
# `neondatabase/neon` contains multiple binaries, all of them use the same input for the version into the same version formatting library.
# Pick pageserver as currently the only binary with extra "version" features printed in the string to verify.
# Regular pageserver version string looks like
@@ -817,6 +828,20 @@ jobs:
TEST_VERSION_ONLY: ${{ matrix.pg_version }}
run: ./docker-compose/docker_compose_test.sh
- name: Print logs and clean up docker-compose test
if: always()
run: |
docker compose --profile test-extensions -f ./docker-compose/docker-compose.yml logs || true
docker compose --profile test-extensions -f ./docker-compose/docker-compose.yml down
- name: Test extension upgrade
timeout-minutes: 20
if: ${{ needs.tag.outputs.build-tag == github.run_id }}
env:
NEWTAG: ${{ needs.tag.outputs.build-tag }}
OLDTAG: ${{ steps.get-last-compute-release-tag.outputs.tag }}
run: ./docker-compose/test_extensions_upgrade.sh
- name: Print logs and clean up
if: always()
run: |
@@ -1050,6 +1075,7 @@ jobs:
retries: 5
script: |
const tag = "${{ needs.tag.outputs.build-tag }}";
const branch = "${{ github.ref_name }}";
try {
const existingRef = await github.rest.git.getRef({
@@ -1092,12 +1118,48 @@ jobs:
}
console.log(`Release for tag ${tag} does not exist. Creating it...`);
// Find the PR number using the commit SHA
const pullRequests = await github.rest.pulls.list({
owner: context.repo.owner,
repo: context.repo.repo,
state: 'closed',
base: branch,
});
const pr = pullRequests.data.find(pr => pr.merge_commit_sha === context.sha);
const prNumber = pr ? pr.number : null;
// Find the previous release on the branch
const releases = await github.rest.repos.listReleases({
owner: context.repo.owner,
repo: context.repo.repo,
per_page: 100,
});
const branchReleases = releases.data
.filter((release) => {
const regex = new RegExp(`^${branch}-\\d+$`);
return regex.test(release.tag_name) && !release.draft && !release.prerelease;
})
.sort((a, b) => new Date(b.created_at) - new Date(a.created_at));
const previousTag = branchReleases.length > 0 ? branchReleases[0].tag_name : null;
const releaseNotes = [
prNumber
? `Release PR https://github.com/${context.repo.owner}/${context.repo.repo}/pull/${prNumber}.`
: 'Release PR not found.',
previousTag
? `Diff with the previous release https://github.com/${context.repo.owner}/${context.repo.repo}/compare/${previousTag}...${tag}.`
: `No previous release found on branch ${branch}.`,
].join('\n\n');
await github.rest.repos.createRelease({
owner: context.repo.owner,
repo: context.repo.repo,
tag_name: tag,
// TODO: Automate release notes properly
generate_release_notes: false,
body: releaseNotes,
});
console.log(`Release for tag ${tag} created successfully.`);
}

View File

@@ -28,7 +28,24 @@ jobs:
strategy:
fail-fast: false # allow other variants to continue even if one fails
matrix:
target_project: [new_empty_project, large_existing_project]
include:
- target_project: new_empty_project_stripe_size_2048
stripe_size: 2048 # 16 MiB
postgres_version: 16
- target_project: new_empty_project_stripe_size_32768
stripe_size: 32768 # 256 MiB # note that this is different from null because using null will shard_split the project only if it reaches the threshold
# while here it is sharded from the beginning with a shard size of 256 MiB
postgres_version: 16
- target_project: new_empty_project
stripe_size: null # run with neon defaults which will shard split only when reaching the threshold
postgres_version: 16
- target_project: new_empty_project
stripe_size: null # run with neon defaults which will shard split only when reaching the threshold
postgres_version: 17
- target_project: large_existing_project
stripe_size: null # cannot re-shared or choose different stripe size for existing, already sharded project
postgres_version: 16
max-parallel: 1 # we want to run each stripe size sequentially to be able to compare the results
permissions:
contents: write
statuses: write
@@ -67,17 +84,21 @@ jobs:
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Create Neon Project
if: ${{ matrix.target_project == 'new_empty_project' }}
if: ${{ startsWith(matrix.target_project, 'new_empty_project') }}
id: create-neon-project-ingest-target
uses: ./.github/actions/neon-project-create
with:
region_id: aws-us-east-2
postgres_version: 16
postgres_version: ${{ matrix.postgres_version }}
compute_units: '[7, 7]' # we want to test large compute here to avoid compute-side bottleneck
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
shard_split_project: ${{ matrix.stripe_size != null && 'true' || 'false' }}
admin_api_key: ${{ secrets.NEON_STAGING_ADMIN_API_KEY }}
shard_count: 8
stripe_size: ${{ matrix.stripe_size }}
- name: Initialize Neon project
if: ${{ matrix.target_project == 'new_empty_project' }}
if: ${{ startsWith(matrix.target_project, 'new_empty_project') }}
env:
BENCHMARK_INGEST_TARGET_CONNSTR: ${{ steps.create-neon-project-ingest-target.outputs.dsn }}
NEW_PROJECT_ID: ${{ steps.create-neon-project-ingest-target.outputs.project_id }}
@@ -130,7 +151,7 @@ jobs:
test_selection: performance/test_perf_ingest_using_pgcopydb.py
run_in_parallel: false
extra_params: -s -m remote_cluster --timeout 86400 -k test_ingest_performance_using_pgcopydb
pg_version: v16
pg_version: v${{ matrix.postgres_version }}
save_perf_report: true
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
@@ -146,7 +167,7 @@ jobs:
${PSQL} "${BENCHMARK_INGEST_TARGET_CONNSTR}" -c "\dt+"
- name: Delete Neon Project
if: ${{ always() && matrix.target_project == 'new_empty_project' }}
if: ${{ always() && startsWith(matrix.target_project, 'new_empty_project') }}
uses: ./.github/actions/neon-project-delete
with:
project_id: ${{ steps.create-neon-project-ingest-target.outputs.project_id }}

View File

@@ -114,7 +114,7 @@ jobs:
run: make walproposer-lib -j$(nproc)
- name: Produce the build stats
run: PQ_LIB_DIR=$(pwd)/pg_install/v17/lib cargo build --all --release --timings -j$(nproc)
run: cargo build --all --release --timings -j$(nproc)
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4

125
Cargo.lock generated
View File

@@ -179,7 +179,7 @@ dependencies = [
"nom",
"num-traits",
"rusticata-macros",
"thiserror",
"thiserror 1.0.69",
"time",
]
@@ -718,14 +718,14 @@ dependencies = [
[[package]]
name = "axum"
version = "0.7.9"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f"
checksum = "6d6fd624c75e18b3b4c6b9caf42b1afe24437daaee904069137d8bab077be8b8"
dependencies = [
"async-trait",
"axum-core",
"base64 0.22.1",
"bytes",
"form_urlencoded",
"futures-util",
"http 1.1.0",
"http-body 1.0.0",
@@ -733,7 +733,7 @@ dependencies = [
"hyper 1.4.1",
"hyper-util",
"itoa",
"matchit 0.7.0",
"matchit",
"memchr",
"mime",
"percent-encoding",
@@ -746,7 +746,7 @@ dependencies = [
"sha1",
"sync_wrapper 1.0.1",
"tokio",
"tokio-tungstenite 0.24.0",
"tokio-tungstenite 0.26.1",
"tower 0.5.2",
"tower-layer",
"tower-service",
@@ -755,11 +755,10 @@ dependencies = [
[[package]]
name = "axum-core"
version = "0.4.5"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199"
checksum = "df1362f362fd16024ae199c1970ce98f9661bf5ef94b9808fee734bc3698b733"
dependencies = [
"async-trait",
"bytes",
"futures-util",
"http 1.1.0",
@@ -1130,7 +1129,7 @@ dependencies = [
"log",
"nix 0.25.1",
"regex",
"thiserror",
"thiserror 1.0.69",
]
[[package]]
@@ -1311,7 +1310,7 @@ dependencies = [
"serde_with",
"signal-hook",
"tar",
"thiserror",
"thiserror 1.0.69",
"tokio",
"tokio-postgres 0.7.7",
"tokio-stream",
@@ -1420,7 +1419,7 @@ dependencies = [
"serde",
"serde_json",
"storage_broker",
"thiserror",
"thiserror 1.0.69",
"tokio",
"tokio-postgres 0.7.7",
"tokio-util",
@@ -2264,7 +2263,7 @@ dependencies = [
"pin-project",
"rand 0.8.5",
"sha1",
"thiserror",
"thiserror 1.0.69",
"tokio",
"tokio-util",
]
@@ -3390,12 +3389,6 @@ dependencies = [
"regex-automata 0.1.10",
]
[[package]]
name = "matchit"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40"
[[package]]
name = "matchit"
version = "0.8.4"
@@ -3786,7 +3779,7 @@ dependencies = [
"serde_json",
"serde_path_to_error",
"sha2",
"thiserror",
"thiserror 1.0.69",
"url",
]
@@ -3836,7 +3829,7 @@ dependencies = [
"futures-sink",
"js-sys",
"pin-project-lite",
"thiserror",
"thiserror 1.0.69",
"tracing",
]
@@ -3868,7 +3861,7 @@ dependencies = [
"opentelemetry_sdk",
"prost",
"reqwest",
"thiserror",
"thiserror 1.0.69",
]
[[package]]
@@ -3904,7 +3897,7 @@ dependencies = [
"percent-encoding",
"rand 0.8.5",
"serde_json",
"thiserror",
"thiserror 1.0.69",
"tokio",
"tokio-stream",
"tracing",
@@ -4018,7 +4011,7 @@ dependencies = [
"remote_storage",
"serde_json",
"svg_fmt",
"thiserror",
"thiserror 1.0.69",
"tokio",
"tokio-util",
"utils",
@@ -4094,7 +4087,7 @@ dependencies = [
"strum_macros",
"sysinfo",
"tenant_size_model",
"thiserror",
"thiserror 1.0.69",
"tikv-jemallocator",
"tokio",
"tokio-epoll-uring",
@@ -4140,7 +4133,7 @@ dependencies = [
"storage_broker",
"strum",
"strum_macros",
"thiserror",
"thiserror 1.0.69",
"utils",
]
@@ -4155,7 +4148,7 @@ dependencies = [
"postgres",
"reqwest",
"serde",
"thiserror",
"thiserror 1.0.69",
"tokio",
"tokio-postgres 0.7.7",
"tokio-stream",
@@ -4559,7 +4552,7 @@ dependencies = [
"rustls 0.23.18",
"rustls-pemfile 2.1.1",
"serde",
"thiserror",
"thiserror 1.0.69",
"tokio",
"tokio-postgres 0.7.7",
"tokio-postgres-rustls",
@@ -4597,7 +4590,7 @@ dependencies = [
"pprof",
"regex",
"serde",
"thiserror",
"thiserror 1.0.69",
"tracing",
"utils",
]
@@ -4608,7 +4601,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"camino",
"thiserror",
"thiserror 1.0.69",
"tokio",
"workspace_hack",
]
@@ -4641,7 +4634,7 @@ dependencies = [
"smallvec",
"symbolic-demangle",
"tempfile",
"thiserror",
"thiserror 1.0.69",
]
[[package]]
@@ -4673,7 +4666,7 @@ dependencies = [
"postgres-protocol 0.6.4",
"rand 0.8.5",
"serde",
"thiserror",
"thiserror 1.0.69",
"tokio",
]
@@ -4744,7 +4737,7 @@ dependencies = [
"memchr",
"parking_lot 0.12.1",
"procfs",
"thiserror",
"thiserror 1.0.69",
]
[[package]]
@@ -4914,7 +4907,7 @@ dependencies = [
"strum",
"strum_macros",
"subtle",
"thiserror",
"thiserror 1.0.69",
"tikv-jemalloc-ctl",
"tikv-jemallocator",
"tokio",
@@ -5311,7 +5304,7 @@ dependencies = [
"http 1.1.0",
"reqwest",
"serde",
"thiserror",
"thiserror 1.0.69",
"tower-service",
]
@@ -5331,7 +5324,7 @@ dependencies = [
"reqwest",
"reqwest-middleware",
"retry-policies",
"thiserror",
"thiserror 1.0.69",
"tokio",
"tracing",
"wasm-timer",
@@ -5347,7 +5340,7 @@ dependencies = [
"async-trait",
"getrandom 0.2.11",
"http 1.1.0",
"matchit 0.8.4",
"matchit",
"opentelemetry",
"reqwest",
"reqwest-middleware",
@@ -5726,7 +5719,7 @@ dependencies = [
"storage_broker",
"strum",
"strum_macros",
"thiserror",
"thiserror 1.0.69",
"tikv-jemallocator",
"tokio",
"tokio-io-timeout",
@@ -5765,7 +5758,7 @@ dependencies = [
"reqwest",
"safekeeper_api",
"serde",
"thiserror",
"thiserror 1.0.69",
"utils",
"workspace_hack",
]
@@ -5974,7 +5967,7 @@ dependencies = [
"rand 0.8.5",
"serde",
"serde_json",
"thiserror",
"thiserror 1.0.69",
"time",
"url",
"uuid",
@@ -6046,7 +6039,7 @@ checksum = "c7715380eec75f029a4ef7de39a9200e0a63823176b759d055b613f5a87df6a6"
dependencies = [
"percent-encoding",
"serde",
"thiserror",
"thiserror 1.0.69",
]
[[package]]
@@ -6208,7 +6201,7 @@ checksum = "adc4e5204eb1910f40f9cfa375f6f05b68c3abac4b6fd879c8ff5e7ae8a0a085"
dependencies = [
"num-bigint",
"num-traits",
"thiserror",
"thiserror 1.0.69",
"time",
]
@@ -6353,7 +6346,7 @@ dependencies = [
"serde_json",
"strum",
"strum_macros",
"thiserror",
"thiserror 1.0.69",
"tokio",
"tokio-util",
"tracing",
@@ -6645,7 +6638,16 @@ version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52"
dependencies = [
"thiserror-impl",
"thiserror-impl 1.0.69",
]
[[package]]
name = "thiserror"
version = "2.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d452f284b73e6d76dd36758a0c8684b1d5be31f92b89d07fd5822175732206fc"
dependencies = [
"thiserror-impl 2.0.11",
]
[[package]]
@@ -6659,6 +6661,17 @@ dependencies = [
"syn 2.0.90",
]
[[package]]
name = "thiserror-impl"
version = "2.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26afc1baea8a989337eeb52b6e72a039780ce45c3edfcc9c5b9d112feeb173c2"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
]
[[package]]
name = "thread_local"
version = "1.1.7"
@@ -6815,7 +6828,7 @@ dependencies = [
"nix 0.26.4",
"once_cell",
"scopeguard",
"thiserror",
"thiserror 1.0.69",
"tokio",
"tokio-util",
"tracing",
@@ -6998,14 +7011,14 @@ dependencies = [
[[package]]
name = "tokio-tungstenite"
version = "0.24.0"
version = "0.26.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9"
checksum = "be4bf6fecd69fcdede0ec680aaf474cdab988f9de6bc73d3758f0160e3b7025a"
dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite 0.24.0",
"tungstenite 0.26.1",
]
[[package]]
@@ -7315,16 +7328,16 @@ dependencies = [
"log",
"rand 0.8.5",
"sha1",
"thiserror",
"thiserror 1.0.69",
"url",
"utf-8",
]
[[package]]
name = "tungstenite"
version = "0.24.0"
version = "0.26.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a"
checksum = "413083a99c579593656008130e29255e54dcaae495be556cc26888f211648c24"
dependencies = [
"byteorder",
"bytes",
@@ -7334,7 +7347,7 @@ dependencies = [
"log",
"rand 0.8.5",
"sha1",
"thiserror",
"thiserror 2.0.11",
"utf-8",
]
@@ -7529,7 +7542,7 @@ dependencies = [
"signal-hook",
"strum",
"strum_macros",
"thiserror",
"thiserror 1.0.69",
"tokio",
"tokio-stream",
"tokio-tar",
@@ -7629,7 +7642,7 @@ dependencies = [
"remote_storage",
"serde",
"serde_json",
"thiserror",
"thiserror 1.0.69",
"tikv-jemallocator",
"tokio",
"tokio-util",
@@ -8158,7 +8171,7 @@ dependencies = [
"ring",
"signature 2.2.0",
"spki 0.7.3",
"thiserror",
"thiserror 1.0.69",
"zeroize",
]
@@ -8175,7 +8188,7 @@ dependencies = [
"nom",
"oid-registry",
"rusticata-macros",
"thiserror",
"thiserror 1.0.69",
"time",
]

View File

@@ -65,7 +65,7 @@ aws-smithy-types = "1.2"
aws-credential-types = "1.2.0"
aws-sigv4 = { version = "1.2", features = ["sign-http"] }
aws-types = "1.3"
axum = { version = "0.7.9", features = ["ws"] }
axum = { version = "0.8.1", features = ["ws"] }
base64 = "0.13.0"
bincode = "1.3"
bindgen = "0.70"

View File

@@ -45,7 +45,7 @@ COPY --chown=nonroot . .
ARG ADDITIONAL_RUSTFLAGS
RUN set -e \
&& PQ_LIB_DIR=$(pwd)/pg_install/v${STABLE_PG_VERSION}/lib RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo build \
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo build \
--bin pg_sni_router \
--bin pageserver \
--bin pagectl \

View File

@@ -1486,7 +1486,6 @@ impl ComputeNode {
// First, create control files for all availale extensions
extension_server::create_control_files(remote_extensions, &self.pgbin);
// Second, preload all remote extensions specified in the shared_preload_libraries
let library_load_start_time = Utc::now();
let remote_ext_metrics = self.prepare_preload_libraries(&pspec.spec)?;
@@ -1909,9 +1908,8 @@ LIMIT 100",
.as_ref()
.ok_or(anyhow::anyhow!("Remote extensions are not configured"))?;
let mut libs_vec = Vec::new();
info!("parse shared_preload_libraries from spec.cluster.settings");
let mut libs_vec = Vec::new();
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
libs_vec = libs
.split(&[',', '\'', ' '])
@@ -1919,9 +1917,9 @@ LIMIT 100",
.map(str::to_string)
.collect();
}
// This is used in neon_local and python tests
info!("parse shared_preload_libraries from provided postgresql.conf");
// that is used in neon_local and python tests
if let Some(conf) = &spec.cluster.postgresql_conf {
let conf_lines = conf.split('\n').collect::<Vec<&str>>();
let mut shared_preload_libraries_line = "";
@@ -1945,10 +1943,7 @@ LIMIT 100",
// Assume that they are already present locally.
libs_vec.retain(|lib| remote_extensions.library_index.contains_key(lib));
info!(
"Downloading extensions specified in shared_preload_libraries: {:?}",
&libs_vec
);
info!("Downloading to shared preload libraries: {:?}", &libs_vec);
let mut download_tasks = Vec::new();
for library in &libs_vec {

View File

@@ -85,6 +85,8 @@ use tracing::info;
use tracing::log::warn;
use zstd::stream::read::Decoder;
use crate::metrics::{REMOTE_EXT_REQUESTS_FAILED, REMOTE_EXT_REQUESTS_TOTAL, UNKNOWN_HTTP_STATUS};
fn get_pg_config(argument: &str, pgbin: &str) -> String {
// gives the result of `pg_config [argument]`
// where argument is a flag like `--version` or `--sharedir`
@@ -148,18 +150,18 @@ fn parse_pg_version(human_version: &str) -> PostgresMajorVersion {
},
_ => {}
}
panic!("Unsupported Postgres version {human_version}");
panic!("Unsuported postgres version {human_version}");
}
/// Download the archive for a given extension,
/// unzip it, and place files in the appropriate locations (share/lib)
// download the archive for a given extension,
// unzip it, and place files in the appropriate locations (share/lib)
pub async fn download_extension(
ext_name: &str,
ext_path: &RemotePath,
ext_remote_storage: &str,
pgbin: &str,
) -> Result<u64> {
info!("Downloading extension {:?} from {:?}", ext_name, ext_path);
info!("Download extension {:?} from {:?}", ext_name, ext_path);
// TODO add retry logic
let download_buffer =
@@ -200,23 +202,23 @@ pub async fn download_extension(
// move contents of the libdir / sharedir in unzipped archive to the correct local paths
for paths in [sharedir_paths, libdir_paths] {
let (zip_dir, real_dir) = paths;
info!("Moving {zip_dir:?}/* to {real_dir:?}");
info!("mv {zip_dir:?}/* {real_dir:?}");
for file in std::fs::read_dir(zip_dir)? {
let old_file = file?.path();
let new_file =
Path::new(&real_dir).join(old_file.file_name().context("error parsing file")?);
info!("Moving {old_file:?} to {new_file:?}");
info!("moving {old_file:?} to {new_file:?}");
// extension download failed: Directory not empty (os error 39)
match std::fs::rename(old_file, new_file) {
Ok(()) => info!("Move succeeded"),
Ok(()) => info!("move succeeded"),
Err(e) => {
warn!("Move failed, probably because the extension already exists: {e}")
warn!("move failed, probably because the extension already exists: {e}")
}
}
}
}
info!("Done moving extension {ext_name}");
info!("done moving extension {ext_name}");
Ok(download_size)
}
@@ -239,16 +241,10 @@ pub fn create_control_files(remote_extensions: &RemoteExtSpec, pgbin: &str) {
for (control_name, control_content) in &ext_data.control_data {
let control_path = local_sharedir.join(control_name);
if !control_path.exists() {
info!(
"Writing control file content {:?}: {:?}",
control_path, control_content
);
info!("writing file {:?}{:?}", control_path, control_content);
std::fs::write(control_path, control_content).unwrap();
} else {
warn!(
"Control file {:?} exists locally. Ignoring the version from the spec.",
control_path
);
warn!("control file {:?} exists both locally and remotely. ignoring the remote version.", control_path);
}
}
}
@@ -256,27 +252,68 @@ pub fn create_control_files(remote_extensions: &RemoteExtSpec, pgbin: &str) {
// Do request to extension storage proxy, i.e.
// curl http://pg-ext-s3-gateway/latest/v15/extensions/anon.tar.zst
// using HTTP GET and return the response body as bytes.
// using HHTP GET
// and return the response body as bytes
//
async fn download_extension_tar(ext_remote_storage: &str, ext_path: &str) -> Result<Bytes> {
let uri = format!("{}/{}", ext_remote_storage, ext_path);
info!("Download extension {:?} from uri {:?}", ext_path, uri);
let resp = reqwest::get(uri).await?;
REMOTE_EXT_REQUESTS_TOTAL.with_label_values(&[]).inc();
match resp.status() {
match do_extension_server_request(&uri).await {
Ok(resp) => {
info!(
"Successfully downloaded remote extension data {:?}",
ext_path
);
Ok(resp)
}
Err((msg, status)) => {
let status_str = status
.map(|s| s.to_string())
.unwrap_or(UNKNOWN_HTTP_STATUS.to_string());
REMOTE_EXT_REQUESTS_FAILED
.with_label_values(&[&status_str])
.inc();
bail!(msg);
}
}
}
// Do a single remote extensions server request.
// Return result or (error message + status code) in case of any failures.
async fn do_extension_server_request(uri: &str) -> Result<Bytes, (String, Option<StatusCode>)> {
let resp = reqwest::get(uri).await.map_err(|e| {
(
format!("could not perform remote extensions server request: {}", e),
None,
)
})?;
let status = resp.status();
match status {
StatusCode::OK => match resp.bytes().await {
Ok(resp) => {
info!("Download extension {:?} completed successfully", ext_path);
Ok(resp)
}
Err(e) => bail!("could not deserialize remote extension response: {}", e),
Ok(resp) => Ok(resp),
Err(e) => Err((
format!("could not read remote extensions server response: {}", e),
// It's fine to return and report error with status as 200 OK,
// because we still failed to read the response.
Some(status),
)),
},
StatusCode::SERVICE_UNAVAILABLE => bail!("remote extension is temporarily unavailable"),
_ => bail!(
"unexpected remote extension response status code: {}",
resp.status()
),
StatusCode::SERVICE_UNAVAILABLE => Err((
"remote extensions server is temporarily unavailable".to_string(),
Some(status),
)),
_ => Err((
format!(
"unexpected remote extensions server response status code: {}",
status
),
Some(status),
)),
}
}

View File

@@ -1,9 +1,6 @@
use std::ops::{Deref, DerefMut};
use axum::{
async_trait,
extract::{rejection::JsonRejection, FromRequest, Request},
};
use axum::extract::{rejection::JsonRejection, FromRequest, Request};
use compute_api::responses::GenericAPIError;
use http::StatusCode;
@@ -12,7 +9,6 @@ use http::StatusCode;
#[derive(Debug, Clone, Copy, Default)]
pub(crate) struct Json<T>(pub T);
#[async_trait]
impl<S, T> FromRequest<S> for Json<T>
where
axum::Json<T>: FromRequest<S, Rejection = JsonRejection>,

View File

@@ -1,9 +1,6 @@
use std::ops::{Deref, DerefMut};
use axum::{
async_trait,
extract::{rejection::PathRejection, FromRequestParts},
};
use axum::extract::{rejection::PathRejection, FromRequestParts};
use compute_api::responses::GenericAPIError;
use http::{request::Parts, StatusCode};
@@ -12,7 +9,6 @@ use http::{request::Parts, StatusCode};
#[derive(Debug, Clone, Copy, Default)]
pub(crate) struct Path<T>(pub T);
#[async_trait]
impl<S, T> FromRequestParts<S> for Path<T>
where
axum::extract::Path<T>: FromRequestParts<S, Rejection = PathRejection>,

View File

@@ -1,9 +1,6 @@
use std::ops::{Deref, DerefMut};
use axum::{
async_trait,
extract::{rejection::QueryRejection, FromRequestParts},
};
use axum::extract::{rejection::QueryRejection, FromRequestParts};
use compute_api::responses::GenericAPIError;
use http::{request::Parts, StatusCode};
@@ -12,7 +9,6 @@ use http::{request::Parts, StatusCode};
#[derive(Debug, Clone, Copy, Default)]
pub(crate) struct Query<T>(pub T);
#[async_trait]
impl<S, T> FromRequestParts<S> for Query<T>
where
axum::extract::Query<T>: FromRequestParts<S, Rejection = QueryRejection>,

View File

@@ -2,17 +2,16 @@ use axum::{body::Body, response::Response};
use http::header::CONTENT_TYPE;
use http::StatusCode;
use metrics::proto::MetricFamily;
use metrics::Encoder;
use metrics::TextEncoder;
use metrics::{Encoder, TextEncoder};
use crate::{http::JsonResponse, installed_extensions};
use crate::{http::JsonResponse, metrics::collect};
/// Expose Prometheus metrics.
pub(in crate::http) async fn get_metrics() -> Response {
// When we call TextEncoder::encode() below, it will immediately return an
// error if a metric family has no metrics, so we need to preemptively
// filter out metric families with no metrics.
let metrics = installed_extensions::collect()
let metrics = collect()
.into_iter()
.filter(|m| !m.get_metric().is_empty())
.collect::<Vec<MetricFamily>>();

View File

@@ -55,7 +55,7 @@ async fn serve(port: u16, compute: Arc<ComputeNode>) {
.route("/database_schema", get(database_schema::get_schema_dump))
.route("/dbs_and_roles", get(dbs_and_roles::get_catalog_objects))
.route(
"/extension_server/*filename",
"/extension_server/{*filename}",
post(extension_server::download_extension),
)
.route("/extensions", post(extensions::install_extension))

View File

@@ -1,13 +1,10 @@
use compute_api::responses::{InstalledExtension, InstalledExtensions};
use metrics::proto::MetricFamily;
use std::collections::HashMap;
use anyhow::Result;
use postgres::{Client, NoTls};
use metrics::core::Collector;
use metrics::{register_uint_gauge_vec, UIntGaugeVec};
use once_cell::sync::Lazy;
use crate::metrics::INSTALLED_EXTENSIONS;
/// We don't reuse get_existing_dbs() just for code clarity
/// and to make database listing query here more explicit.
@@ -102,16 +99,3 @@ pub fn get_installed_extensions(mut conf: postgres::config::Config) -> Result<In
extensions: extensions_map.into_values().collect(),
})
}
static INSTALLED_EXTENSIONS: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"compute_installed_extensions",
"Number of databases where the version of extension is installed",
&["extension_name", "version", "owned_by_superuser"]
)
.expect("failed to define a metric")
});
pub fn collect() -> Vec<MetricFamily> {
INSTALLED_EXTENSIONS.collect()
}

View File

@@ -16,6 +16,7 @@ pub mod extension_server;
pub mod installed_extensions;
pub mod local_proxy;
pub mod lsn_lease;
pub mod metrics;
mod migration;
pub mod monitor;
pub mod params;

View File

@@ -0,0 +1,90 @@
use metrics::core::Collector;
use metrics::proto::MetricFamily;
use metrics::{register_int_counter_vec, register_uint_gauge_vec, IntCounterVec, UIntGaugeVec};
use once_cell::sync::Lazy;
pub(crate) static INSTALLED_EXTENSIONS: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"compute_installed_extensions",
"Number of databases where the version of extension is installed",
&["extension_name", "version", "owned_by_superuser"]
)
.expect("failed to define a metric")
});
// Normally, any HTTP API request is described by METHOD (e.g. GET, POST, etc.) + PATH,
// but for all our APIs we defined a 'slug'/method/operationId in the OpenAPI spec.
// And it's fair to call it a 'RPC' (Remote Procedure Call).
pub enum CPlaneRequestRPC {
GetSpec,
}
impl CPlaneRequestRPC {
pub fn as_str(&self) -> &str {
match self {
CPlaneRequestRPC::GetSpec => "GetSpec",
}
}
}
pub const UNKNOWN_HTTP_STATUS: &str = "unknown";
pub(crate) static CPLANE_REQUESTS_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"compute_ctl_cplane_requests_total",
"Total number of control plane requests made by compute_ctl",
&["rpc"]
)
.expect("failed to define a metric")
});
pub(crate) static CPLANE_REQUESTS_FAILED: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"compute_ctl_cplane_requests_failed_total",
"Total number of failed control plane requests made by compute_ctl",
&["rpc", "http_status"]
)
.expect("failed to define a metric")
});
/// Total number of failed database migrations. Per-compute, this is actually a boolean metric,
/// either empty or with a single value (1, migration_id) because we stop at the first failure.
/// Yet, the sum over the fleet will provide the total number of failures.
pub(crate) static DB_MIGRATION_FAILED: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"compute_ctl_db_migration_failed_total",
"Total number of failed database migrations",
&["migration_id"]
)
.expect("failed to define a metric")
});
pub(crate) static REMOTE_EXT_REQUESTS_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"compute_ctl_remote_ext_requests_total",
"Total number of requests made by compute_ctl to download extensions from S3 proxy",
// Do not use any labels like extension name yet.
// We can add them later if needed.
&[]
)
.expect("failed to define a metric")
});
pub(crate) static REMOTE_EXT_REQUESTS_FAILED: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"compute_ctl_remote_ext_requests_failed_total",
"Total number of failed requests to S3 proxy",
&["http_status"]
)
.expect("failed to define a metric")
});
pub fn collect() -> Vec<MetricFamily> {
let mut metrics = INSTALLED_EXTENSIONS.collect();
metrics.extend(CPLANE_REQUESTS_TOTAL.collect());
metrics.extend(CPLANE_REQUESTS_FAILED.collect());
metrics.extend(DB_MIGRATION_FAILED.collect());
metrics.extend(REMOTE_EXT_REQUESTS_TOTAL.collect());
metrics.extend(REMOTE_EXT_REQUESTS_FAILED.collect());
metrics
}

View File

@@ -1,7 +1,9 @@
use anyhow::{Context, Result};
use fail::fail_point;
use postgres::{Client, Transaction};
use tracing::info;
use tracing::{error, info};
use crate::metrics::DB_MIGRATION_FAILED;
/// Runs a series of migrations on a target database
pub(crate) struct MigrationRunner<'m> {
@@ -78,24 +80,31 @@ impl<'m> MigrationRunner<'m> {
Ok(())
}
/// Run an individual migration
fn run_migration(txn: &mut Transaction, migration_id: i64, migration: &str) -> Result<()> {
/// Run an individual migration in a separate transaction block.
fn run_migration(client: &mut Client, migration_id: i64, migration: &str) -> Result<()> {
let mut txn = client
.transaction()
.with_context(|| format!("begin transaction for migration {migration_id}"))?;
if migration.starts_with("-- SKIP") {
info!("Skipping migration id={}", migration_id);
// Even though we are skipping the migration, updating the
// migration ID should help keep logic easy to understand when
// trying to understand the state of a cluster.
Self::update_migration_id(txn, migration_id)?;
Self::update_migration_id(&mut txn, migration_id)?;
} else {
info!("Running migration id={}:\n{}\n", migration_id, migration);
txn.simple_query(migration)
.with_context(|| format!("apply migration {migration_id}"))?;
Self::update_migration_id(txn, migration_id)?;
Self::update_migration_id(&mut txn, migration_id)?;
}
txn.commit()
.with_context(|| format!("commit transaction for migration {migration_id}"))?;
Ok(())
}
@@ -109,19 +118,20 @@ impl<'m> MigrationRunner<'m> {
// The index lags the migration ID by 1, so the current migration
// ID is also the next index
let migration_id = (current_migration + 1) as i64;
let migration = self.migrations[current_migration];
let mut txn = self
.client
.transaction()
.with_context(|| format!("begin transaction for migration {migration_id}"))?;
Self::run_migration(&mut txn, migration_id, self.migrations[current_migration])
.with_context(|| format!("running migration {migration_id}"))?;
txn.commit()
.with_context(|| format!("commit transaction for migration {migration_id}"))?;
info!("Finished migration id={}", migration_id);
match Self::run_migration(self.client, migration_id, migration) {
Ok(_) => {
info!("Finished migration id={}", migration_id);
}
Err(e) => {
error!("Failed to run migration id={}: {}", migration_id, e);
DB_MIGRATION_FAILED
.with_label_values(&[migration_id.to_string().as_str()])
.inc();
return Err(e);
}
}
current_migration += 1;
}

View File

@@ -6,6 +6,9 @@ use std::path::Path;
use tracing::{error, info, instrument, warn};
use crate::config;
use crate::metrics::{
CPlaneRequestRPC, CPLANE_REQUESTS_FAILED, CPLANE_REQUESTS_TOTAL, UNKNOWN_HTTP_STATUS,
};
use crate::migration::MigrationRunner;
use crate::params::PG_HBA_ALL_MD5;
use crate::pg_helpers::*;
@@ -19,7 +22,7 @@ use compute_api::spec::ComputeSpec;
fn do_control_plane_request(
uri: &str,
jwt: &str,
) -> Result<ControlPlaneSpecResponse, (bool, String)> {
) -> Result<ControlPlaneSpecResponse, (bool, String, Option<StatusCode>)> {
let resp = reqwest::blocking::Client::new()
.get(uri)
.header("Authorization", format!("Bearer {}", jwt))
@@ -28,34 +31,41 @@ fn do_control_plane_request(
(
true,
format!("could not perform spec request to control plane: {}", e),
None,
)
})?;
match resp.status() {
let status = resp.status();
match status {
StatusCode::OK => match resp.json::<ControlPlaneSpecResponse>() {
Ok(spec_resp) => Ok(spec_resp),
Err(e) => Err((
true,
format!("could not deserialize control plane response: {}", e),
Some(status),
)),
},
StatusCode::SERVICE_UNAVAILABLE => {
Err((true, "control plane is temporarily unavailable".to_string()))
}
StatusCode::SERVICE_UNAVAILABLE => Err((
true,
"control plane is temporarily unavailable".to_string(),
Some(status),
)),
StatusCode::BAD_GATEWAY => {
// We have a problem with intermittent 502 errors now
// https://github.com/neondatabase/cloud/issues/2353
// It's fine to retry GET request in this case.
Err((true, "control plane request failed with 502".to_string()))
Err((
true,
"control plane request failed with 502".to_string(),
Some(status),
))
}
// Another code, likely 500 or 404, means that compute is unknown to the control plane
// or some internal failure happened. Doesn't make much sense to retry in this case.
_ => Err((
false,
format!(
"unexpected control plane response status code: {}",
resp.status()
),
format!("unexpected control plane response status code: {}", status),
Some(status),
)),
}
}
@@ -82,6 +92,9 @@ pub fn get_spec_from_control_plane(
// - no spec for compute yet (Empty state) -> return Ok(None)
// - got spec -> return Ok(Some(spec))
while attempt < 4 {
CPLANE_REQUESTS_TOTAL
.with_label_values(&[CPlaneRequestRPC::GetSpec.as_str()])
.inc();
spec = match do_control_plane_request(&cp_uri, &jwt) {
Ok(spec_resp) => match spec_resp.status {
ControlPlaneComputeStatus::Empty => Ok(None),
@@ -93,7 +106,13 @@ pub fn get_spec_from_control_plane(
}
}
},
Err((retry, msg)) => {
Err((retry, msg, status)) => {
let status_str = status
.map(|s| s.to_string())
.unwrap_or(UNKNOWN_HTTP_STATUS.to_string());
CPLANE_REQUESTS_FAILED
.with_label_values(&[CPlaneRequestRPC::GetSpec.as_str(), &status_str])
.inc();
if retry {
Err(anyhow!(msg))
} else {

View File

@@ -347,6 +347,11 @@ impl PageServerNode {
.map(|x| x.parse::<usize>())
.transpose()
.context("Failed to parse 'compaction_threshold' as an integer")?,
compaction_upper_limit: settings
.remove("compaction_upper_limit")
.map(|x| x.parse::<usize>())
.transpose()
.context("Failed to parse 'compaction_upper_limit' as an integer")?,
compaction_algorithm: settings
.remove("compaction_algorithm")
.map(serde_json::from_str)
@@ -357,6 +362,11 @@ impl PageServerNode {
.map(|x| x.parse::<usize>())
.transpose()
.context("Failed to parse 'l0_flush_delay_threshold' as an integer")?,
l0_flush_wait_upload: settings
.remove("l0_flush_wait_upload")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'l0_flush_wait_upload' as a boolean")?,
l0_flush_stall_threshold: settings
.remove("l0_flush_stall_threshold")
.map(|x| x.parse::<usize>())

View File

@@ -41,8 +41,8 @@ allow = [
"MIT",
"MPL-2.0",
"OpenSSL",
"Unicode-DFS-2016",
"Unicode-3.0",
"Zlib",
]
confidence-threshold = 0.8
exceptions = [

View File

@@ -20,30 +20,55 @@ while ! nc -z pageserver 6400; do
done
echo "Page server is ready."
echo "Create a tenant and timeline"
generate_id tenant_id
PARAMS=(
-X PUT
-H "Content-Type: application/json"
-d "{\"mode\": \"AttachedSingle\", \"generation\": 1, \"tenant_conf\": {}}"
"http://pageserver:9898/v1/tenant/${tenant_id}/location_config"
)
result=$(curl "${PARAMS[@]}")
echo $result | jq .
cp ${SPEC_FILE_ORG} ${SPEC_FILE}
generate_id timeline_id
PARAMS=(
-sbf
-X POST
-H "Content-Type: application/json"
-d "{\"new_timeline_id\": \"${timeline_id}\", \"pg_version\": ${PG_VERSION}}"
"http://pageserver:9898/v1/tenant/${tenant_id}/timeline/"
)
result=$(curl "${PARAMS[@]}")
echo $result | jq .
if [ -n "${TENANT_ID:-}" ] && [ -n "${TIMELINE_ID:-}" ]; then
tenant_id=${TENANT_ID}
timeline_id=${TIMELINE_ID}
else
echo "Check if a tenant present"
PARAMS=(
-X GET
-H "Content-Type: application/json"
"http://pageserver:9898/v1/tenant"
)
tenant_id=$(curl "${PARAMS[@]}" | jq -r .[0].id)
if [ -z "${tenant_id}" ] || [ "${tenant_id}" = null ]; then
echo "Create a tenant"
generate_id tenant_id
PARAMS=(
-X PUT
-H "Content-Type: application/json"
-d "{\"mode\": \"AttachedSingle\", \"generation\": 1, \"tenant_conf\": {}}"
"http://pageserver:9898/v1/tenant/${tenant_id}/location_config"
)
result=$(curl "${PARAMS[@]}")
echo $result | jq .
fi
echo "Check if a timeline present"
PARAMS=(
-X GET
-H "Content-Type: application/json"
"http://pageserver:9898/v1/tenant/${tenant_id}/timeline"
)
timeline_id=$(curl "${PARAMS[@]}" | jq -r .[0].timeline_id)
if [ -z "${timeline_id}" ] || [ "${timeline_id}" = null ]; then
generate_id timeline_id
PARAMS=(
-sbf
-X POST
-H "Content-Type: application/json"
-d "{\"new_timeline_id\": \"${timeline_id}\", \"pg_version\": ${PG_VERSION}}"
"http://pageserver:9898/v1/tenant/${tenant_id}/timeline/"
)
result=$(curl "${PARAMS[@]}")
echo $result | jq .
fi
fi
echo "Overwrite tenant id and timeline id in spec file"
sed "s/TENANT_ID/${tenant_id}/" ${SPEC_FILE_ORG} > ${SPEC_FILE}
sed -i "s/TENANT_ID/${tenant_id}/" ${SPEC_FILE}
sed -i "s/TIMELINE_ID/${timeline_id}/" ${SPEC_FILE}
cat ${SPEC_FILE}

View File

@@ -149,11 +149,13 @@ services:
args:
- REPOSITORY=${REPOSITORY:-neondatabase}
- COMPUTE_IMAGE=compute-node-v${PG_VERSION:-16}
- TAG=${TAG:-latest}
- TAG=${COMPUTE_TAG:-${TAG:-latest}}
- http_proxy=${http_proxy:-}
- https_proxy=${https_proxy:-}
environment:
- PG_VERSION=${PG_VERSION:-16}
- TENANT_ID=${TENANT_ID:-}
- TIMELINE_ID=${TIMELINE_ID:-}
#- RUST_BACKTRACE=1
# Mount the test files directly, for faster editing cycle.
volumes:

View File

@@ -0,0 +1,5 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --use-existing --inputdir=./ --bindir='/usr/local/pgsql/bin' --dbname=contrib_regression add_agg agg_oob auto_sparse card_op cast_shape copy_binary cumulative_add_cardinality_correction cumulative_add_comprehensive_promotion cumulative_add_sparse_edge cumulative_add_sparse_random cumulative_add_sparse_step cumulative_union_comprehensive cumulative_union_explicit_explicit cumulative_union_explicit_promotion cumulative_union_probabilistic_probabilistic cumulative_union_sparse_full_representation cumulative_union_sparse_promotion cumulative_union_sparse_sparse disable_hashagg equal explicit_thresh hash hash_any meta_func murmur_bigint murmur_bytea nosparse notequal scalar_oob storedproc transaction typmod typmod_insert union_op

View File

@@ -0,0 +1,27 @@
diff --git a/expected/hypopg.out b/expected/hypopg.out
index 90121d0..859260b 100644
--- a/expected/hypopg.out
+++ b/expected/hypopg.out
@@ -11,7 +11,8 @@ BEGIN
END;
$_$
LANGUAGE plpgsql;
-CREATE EXTENSION hypopg;
+CREATE EXTENSION IF NOT EXISTS hypopg;
+NOTICE: extension "hypopg" already exists, skipping
CREATE TABLE hypo (id integer, val text, "Id2" bigint);
INSERT INTO hypo SELECT i, 'line ' || i
FROM generate_series(1,100000) f(i);
diff --git a/test/sql/hypopg.sql b/test/sql/hypopg.sql
index 99722b0..8d6bacb 100644
--- a/test/sql/hypopg.sql
+++ b/test/sql/hypopg.sql
@@ -12,7 +12,7 @@ END;
$_$
LANGUAGE plpgsql;
-CREATE EXTENSION hypopg;
+CREATE EXTENSION IF NOT EXISTS hypopg;
CREATE TABLE hypo (id integer, val text, "Id2" bigint);

View File

@@ -0,0 +1,6 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
patch -p1 <test-upgrade.patch
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --inputdir=./ --bindir='/usr/local/pgsql/bin' --use-existing --inputdir=test --dbname=contrib_regression hypopg hypo_brin hypo_index_part hypo_include hypo_hash hypo_hide_index

View File

@@ -0,0 +1,23 @@
diff --git a/expected/ip4r.out b/expected/ip4r.out
index 7527af3..b38ed29 100644
--- a/expected/ip4r.out
+++ b/expected/ip4r.out
@@ -1,6 +1,5 @@
--
/*CUT-HERE*/
-CREATE EXTENSION ip4r;
-- Check whether any of our opclasses fail amvalidate
DO $d$
DECLARE
diff --git a/sql/ip4r.sql b/sql/ip4r.sql
index 65c49ec..24ade09 100644
--- a/sql/ip4r.sql
+++ b/sql/ip4r.sql
@@ -1,7 +1,6 @@
--
/*CUT-HERE*/
-CREATE EXTENSION ip4r;
-- Check whether any of our opclasses fail amvalidate

View File

@@ -0,0 +1,6 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
patch -p1 <test-upgrade.patch
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --use-existing --inputdir=./ --bindir='/usr/local/pgsql/bin' --dbname=contrib_regression ip4r ip4r-softerr ip4r-v11

View File

@@ -0,0 +1,75 @@
diff --git a/expected/pg_cron-test.out b/expected/pg_cron-test.out
index d79d542..1663886 100644
--- a/expected/pg_cron-test.out
+++ b/expected/pg_cron-test.out
@@ -1,30 +1,3 @@
-CREATE EXTENSION pg_cron VERSION '1.0';
-SELECT extversion FROM pg_extension WHERE extname='pg_cron';
- extversion
-------------
- 1.0
-(1 row)
-
--- Test binary compatibility with v1.4 function signature.
-ALTER EXTENSION pg_cron UPDATE TO '1.4';
-SELECT cron.unschedule(job_name := 'no_such_job');
-ERROR: could not find valid entry for job 'no_such_job'
-SELECT cron.schedule('testjob', '* * * * *', 'SELECT 1');
- schedule
-----------
- 1
-(1 row)
-
-SELECT cron.unschedule('testjob');
- unschedule
-------------
- t
-(1 row)
-
--- Test cache invalidation
-DROP EXTENSION pg_cron;
-CREATE EXTENSION pg_cron VERSION '1.4';
-ALTER EXTENSION pg_cron UPDATE;
-- Vacuum every day at 10:00am (GMT)
SELECT cron.schedule('0 10 * * *', 'VACUUM');
schedule
@@ -300,8 +273,3 @@ SELECT jobid, jobname, schedule, command FROM cron.job ORDER BY jobid;
SELECT cron.schedule('bad-last-dom-job1', '0 11 $foo * *', 'VACUUM FULL');
ERROR: invalid schedule: 0 11 $foo * *
HINT: Use cron format (e.g. 5 4 * * *), or interval format '[1-59] seconds'
--- cleaning
-DROP EXTENSION pg_cron;
-drop user pgcron_cront;
-drop database pgcron_dbno;
-drop database pgcron_dbyes;
diff --git a/sql/pg_cron-test.sql b/sql/pg_cron-test.sql
index 45f94d9..241cf73 100644
--- a/sql/pg_cron-test.sql
+++ b/sql/pg_cron-test.sql
@@ -1,17 +1,3 @@
-CREATE EXTENSION pg_cron VERSION '1.0';
-SELECT extversion FROM pg_extension WHERE extname='pg_cron';
--- Test binary compatibility with v1.4 function signature.
-ALTER EXTENSION pg_cron UPDATE TO '1.4';
-SELECT cron.unschedule(job_name := 'no_such_job');
-SELECT cron.schedule('testjob', '* * * * *', 'SELECT 1');
-SELECT cron.unschedule('testjob');
-
--- Test cache invalidation
-DROP EXTENSION pg_cron;
-CREATE EXTENSION pg_cron VERSION '1.4';
-
-ALTER EXTENSION pg_cron UPDATE;
-
-- Vacuum every day at 10:00am (GMT)
SELECT cron.schedule('0 10 * * *', 'VACUUM');
@@ -156,8 +142,3 @@ SELECT jobid, jobname, schedule, command FROM cron.job ORDER BY jobid;
-- invalid last of day job
SELECT cron.schedule('bad-last-dom-job1', '0 11 $foo * *', 'VACUUM FULL');
--- cleaning
-DROP EXTENSION pg_cron;
-drop user pgcron_cront;
-drop database pgcron_dbno;
-drop database pgcron_dbyes;

View File

@@ -0,0 +1,6 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
patch -p1 <test-upgrade.patch
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --use-existing --inputdir=./ --bindir='/usr/local/pgsql/bin' --dbname=contrib_regression pg_cron-test

View File

@@ -0,0 +1,18 @@
diff --git a/expected/pg_ivm.out b/expected/pg_ivm.out
index e8798ee..cca58d0 100644
--- a/expected/pg_ivm.out
+++ b/expected/pg_ivm.out
@@ -1,4 +1,3 @@
-CREATE EXTENSION pg_ivm;
GRANT ALL ON SCHEMA public TO public;
-- create a table to use as a basis for views and materialized views in various combinations
CREATE TABLE mv_base_a (i int, j int);
diff --git a/sql/pg_ivm.sql b/sql/pg_ivm.sql
index d3c1a01..9382d7f 100644
--- a/sql/pg_ivm.sql
+++ b/sql/pg_ivm.sql
@@ -1,4 +1,3 @@
-CREATE EXTENSION pg_ivm;
GRANT ALL ON SCHEMA public TO public;
-- create a table to use as a basis for views and materialized views in various combinations

View File

@@ -0,0 +1,6 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
patch -p1 <test-upgrade.patch
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --use-existing --inputdir=./ --bindir='/usr/local/pgsql/bin' --dbname=contrib_regression pg_ivm create_immv refresh_immv

View File

@@ -0,0 +1,25 @@
diff --git a/expected/roaringbitmap.out b/expected/roaringbitmap.out
index de70531..a5f7c15 100644
--- a/expected/roaringbitmap.out
+++ b/expected/roaringbitmap.out
@@ -1,7 +1,6 @@
--
-- Test roaringbitmap extension
--
-CREATE EXTENSION if not exists roaringbitmap;
-- Test input and output
set roaringbitmap.output_format='array';
set extra_float_digits = 0;
diff --git a/sql/roaringbitmap.sql b/sql/roaringbitmap.sql
index a0e9c74..84bc966 100644
--- a/sql/roaringbitmap.sql
+++ b/sql/roaringbitmap.sql
@@ -2,8 +2,6 @@
-- Test roaringbitmap extension
--
-CREATE EXTENSION if not exists roaringbitmap;
-
-- Test input and output
set roaringbitmap.output_format='array';

View File

@@ -0,0 +1,6 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
patch -p1 <test-upgrade.patch
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --use-existing --inputdir=./ --bindir='/usr/local/pgsql/bin' --dbname=contrib_regression roaringbitmap

View File

@@ -0,0 +1,24 @@
diff --git a/test/sql/base.sql b/test/sql/base.sql
index af599d8..2eed91b 100644
--- a/test/sql/base.sql
+++ b/test/sql/base.sql
@@ -2,7 +2,6 @@
BEGIN;
\i test/pgtap-core.sql
-\i sql/semver.sql
SELECT plan(334);
--SELECT * FROM no_plan();
diff --git a/test/sql/corpus.sql b/test/sql/corpus.sql
index 1f5f637..a519905 100644
--- a/test/sql/corpus.sql
+++ b/test/sql/corpus.sql
@@ -4,7 +4,6 @@ BEGIN;
-- Test the SemVer corpus from https://regex101.com/r/Ly7O1x/3/.
\i test/pgtap-core.sql
-\i sql/semver.sql
SELECT plan(71);
--SELECT * FROM no_plan();

View File

@@ -0,0 +1,6 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
patch -p1 <test-upgrade.patch
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --use-existing --inputdir=./ --bindir='/usr/local/pgsql/bin' --inputdir=test --dbname=contrib_regression base corpus

View File

@@ -0,0 +1,5 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --use-existing --inputdir=./ --bindir='/usr/local/pgsql/bin' --inputdir=test --dbname=contrib_regression 002_uuid_generate_v7 003_uuid_v7_to_timestamptz 004_uuid_timestamptz_to_v7 005_uuid_v7_to_timestamp 006_uuid_timestamp_to_v7

View File

@@ -0,0 +1,5 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --inputdir=./ --bindir='/usr/local/pgsql/bin' --inputdir=test --use-existing --dbname=contrib_regression bit btree cast copy halfvec hnsw_bit hnsw_halfvec hnsw_sparsevec hnsw_vector ivfflat_bit ivfflat_halfvec ivfflat_vector sparsevec vector_type

View File

@@ -0,0 +1,5 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --inputdir=./ --bindir='/usr/local/pgsql/bin' --use-existing --dbname=contrib_regression plv8 plv8-errors scalar_args inline json startup_pre startup varparam json_conv jsonb_conv window guc es6 arraybuffer composites currentresource startup_perms bytea find_function_perms memory_limits reset show array_spread regression dialect bigint procedure

View File

@@ -0,0 +1,5 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --inputdir=./ --bindir='/usr/local/pgsql/bin' --use-existing --dbname=contrib_regression extension tables unit binary unicode prefix units time temperature functions language_functions round derived compare aggregate iec custom crosstab convert

View File

@@ -0,0 +1,5 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --use-existing --inputdir=./ --bindir='/usr/local/pgsql/bin' --dbname=contrib_regression prefix falcon explain queries

View File

@@ -0,0 +1,19 @@
diff --git a/expected/rum.out b/expected/rum.out
index 5966d19..8860b79 100644
--- a/expected/rum.out
+++ b/expected/rum.out
@@ -1,4 +1,3 @@
-CREATE EXTENSION rum;
CREATE TABLE test_rum( t text, a tsvector );
CREATE TRIGGER tsvectorupdate
BEFORE UPDATE OR INSERT ON test_rum
diff --git a/sql/rum.sql b/sql/rum.sql
index 8414bb9..898e6ab 100644
--- a/sql/rum.sql
+++ b/sql/rum.sql
@@ -1,5 +1,3 @@
-CREATE EXTENSION rum;
-
CREATE TABLE test_rum( t text, a tsvector );
CREATE TRIGGER tsvectorupdate

View File

@@ -0,0 +1,6 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
patch -p1 <test-upgrade.patch
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --inputdir=./ --bindir='/usr/local/pgsql/bin' --use-existing --dbname=contrib_regression rum rum_validate rum_hash ruminv timestamp orderby orderby_hash altorder altorder_hash limits int2 int4 int8 float4 float8 money oid time timetz date interval macaddr inet cidr text varchar char bytea bit varbit numeric rum_weight expr array

View File

@@ -0,0 +1,93 @@
#!/bin/bash
set -eux -o pipefail
cd "$(dirname "${0}")"
# Takes a variable name as argument. The result is stored in that variable.
generate_id() {
local -n resvar=$1
printf -v resvar '%08x%08x%08x%08x' $SRANDOM $SRANDOM $SRANDOM $SRANDOM
}
if [ -z ${OLDTAG+x} ] || [ -z ${NEWTAG+x} ] || [ -z "${OLDTAG}" ] || [ -z "${NEWTAG}" ]; then
echo OLDTAG and NEWTAG must be defined
exit 1
fi
export PG_VERSION=${PG_VERSION:-16}
function wait_for_ready {
TIME=0
while ! docker compose logs compute_is_ready | grep -q "accepting connections" && [ ${TIME} -le 300 ] ; do
((TIME += 1 ))
sleep 1
done
if [ ${TIME} -gt 300 ]; then
echo Time is out.
exit 2
fi
}
function create_extensions() {
for ext in ${1}; do
docker compose exec neon-test-extensions psql -X -v ON_ERROR_STOP=1 -d contrib_regression -c "CREATE EXTENSION IF NOT EXISTS ${ext}"
done
}
EXTENSIONS='[
{"extname": "plv8", "extdir": "plv8-src"},
{"extname": "vector", "extdir": "pgvector-src"},
{"extname": "unit", "extdir": "postgresql-unit-src"},
{"extname": "hypopg", "extdir": "hypopg-src"},
{"extname": "rum", "extdir": "rum-src"},
{"extname": "ip4r", "extdir": "ip4r-src"},
{"extname": "prefix", "extdir": "prefix-src"},
{"extname": "hll", "extdir": "hll-src"},
{"extname": "pg_cron", "extdir": "pg_cron-src"},
{"extname": "pg_uuidv7", "extdir": "pg_uuidv7-src"},
{"extname": "roaringbitmap", "extdir": "pg_roaringbitmap-src"},
{"extname": "semver", "extdir": "pg_semver-src"},
{"extname": "pg_ivm", "extdir": "pg_ivm-src"}
]'
EXTNAMES=$(echo ${EXTENSIONS} | jq -r '.[].extname' | paste -sd ' ' -)
TAG=${NEWTAG} docker compose --profile test-extensions up --quiet-pull --build -d
wait_for_ready
docker compose exec neon-test-extensions psql -c "DROP DATABASE IF EXISTS contrib_regression"
docker compose exec neon-test-extensions psql -c "CREATE DATABASE contrib_regression"
create_extensions "${EXTNAMES}"
query="select json_object_agg(extname,extversion) from pg_extension where extname in ('${EXTNAMES// /\',\'}')"
new_vers=$(docker compose exec neon-test-extensions psql -Aqt -d contrib_regression -c "$query")
docker compose --profile test-extensions down
TAG=${OLDTAG} docker compose --profile test-extensions up --quiet-pull --build -d --force-recreate
wait_for_ready
docker compose cp ext-src neon-test-extensions:/
docker compose exec neon-test-extensions psql -c "DROP DATABASE IF EXISTS contrib_regression"
docker compose exec neon-test-extensions psql -c "CREATE DATABASE contrib_regression"
create_extensions "${EXTNAMES}"
query="select pge.extname from pg_extension pge join (select key as extname, value as extversion from json_each_text('${new_vers}')) x on pge.extname=x.extname and pge.extversion <> x.extversion"
exts=$(docker compose exec neon-test-extensions psql -Aqt -d contrib_regression -c "$query")
if [ -z "${exts}" ]; then
echo "No extensions were upgraded"
else
tenant_id=$(docker compose exec neon-test-extensions psql -Aqt -c "SHOW neon.tenant_id")
timeline_id=$(docker compose exec neon-test-extensions psql -Aqt -c "SHOW neon.timeline_id")
for ext in ${exts}; do
echo Testing ${ext}...
EXTDIR=$(echo ${EXTENSIONS} | jq -r '.[] | select(.extname=="'${ext}'") | .extdir')
generate_id new_timeline_id
PARAMS=(
-sbf
-X POST
-H "Content-Type: application/json"
-d "{\"new_timeline_id\": \"${new_timeline_id}\", \"pg_version\": ${PG_VERSION}, \"ancestor_timeline_id\": \"${timeline_id}\"}"
"http://127.0.0.1:9898/v1/tenant/${tenant_id}/timeline/"
)
result=$(curl "${PARAMS[@]}")
echo $result | jq .
TENANT_ID=${tenant_id} TIMELINE_ID=${new_timeline_id} TAG=${OLDTAG} docker compose down compute compute_is_ready
COMPUTE_TAG=${NEWTAG} TAG=${OLDTAG} TENANT_ID=${tenant_id} TIMELINE_ID=${new_timeline_id} docker compose up --quiet-pull -d --build compute compute_is_ready
wait_for_ready
TID=$(docker compose exec neon-test-extensions psql -Aqt -c "SHOW neon.timeline_id")
if [ ${TID} != ${new_timeline_id} ]; then
echo Timeline mismatch
exit 1
fi
docker compose exec neon-test-extensions psql -d contrib_regression -c "\dx ${ext}"
docker compose exec neon-test-extensions sh -c /ext-src/${EXTDIR}/test-upgrade.sh
docker compose exec neon-test-extensions psql -d contrib_regression -c "alter extension ${ext} update"
docker compose exec neon-test-extensions psql -d contrib_regression -c "\dx ${ext}"
done
fi

View File

@@ -256,16 +256,24 @@ pub struct TenantConfigToml {
pub compaction_period: Duration,
/// Level0 delta layer threshold for compaction.
pub compaction_threshold: usize,
/// Controls the amount of L0 included in a single compaction iteration.
/// The unit is `checkpoint_distance`, i.e., a size.
/// We add L0s to the set of layers to compact until their cumulative
/// size exceeds `compaction_upper_limit * checkpoint_distance`.
pub compaction_upper_limit: usize,
pub compaction_algorithm: crate::models::CompactionAlgorithmSettings,
/// Level0 delta layer threshold at which to delay layer flushes for compaction backpressure,
/// such that they take 2x as long, and start waiting for layer flushes during ephemeral layer
/// rolls. This helps compaction keep up with WAL ingestion, and avoids read amplification
/// blowing up. Should be >compaction_threshold. If None, defaults to 2 * compaction_threshold.
/// 0 to disable.
/// blowing up. Should be >compaction_threshold. 0 to disable. Disabled by default.
pub l0_flush_delay_threshold: Option<usize>,
/// Level0 delta layer threshold at which to stall layer flushes. 0 to disable. If None,
/// defaults to 4 * compaction_threshold. Must be >compaction_threshold to avoid deadlock.
/// Level0 delta layer threshold at which to stall layer flushes. Must be >compaction_threshold
/// to avoid deadlock. 0 to disable. Disabled by default.
pub l0_flush_stall_threshold: Option<usize>,
/// If true, Level0 delta layer flushes will wait for S3 upload before flushing the next
/// layer. This is a temporary backpressure mechanism which should be removed once
/// l0_flush_{delay,stall}_threshold is fully enabled.
pub l0_flush_wait_upload: bool,
// Determines how much history is retained, to allow
// branching and read replicas at an older point in time.
// The unit is #of bytes of WAL.
@@ -520,9 +528,17 @@ pub mod tenant_conf_defaults {
pub const DEFAULT_COMPACTION_PERIOD: &str = "20 s";
pub const DEFAULT_COMPACTION_THRESHOLD: usize = 10;
// This value needs to be tuned to avoid OOM. We have 3/4 of the total CPU threads to do background works, that's 16*3/4=9 on
// most of our pageservers. Compaction ~50 layers requires about 2GB memory (could be reduced later by optimizing L0 hole
// calculation to avoid loading all keys into the memory). So with this config, we can get a maximum peak compaction usage of 18GB.
pub const DEFAULT_COMPACTION_UPPER_LIMIT: usize = 50;
pub const DEFAULT_COMPACTION_ALGORITHM: crate::models::CompactionAlgorithm =
crate::models::CompactionAlgorithm::Legacy;
pub const DEFAULT_L0_FLUSH_WAIT_UPLOAD: bool = true;
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
// Large DEFAULT_GC_PERIOD is fine as long as PITR_INTERVAL is larger.
@@ -558,11 +574,13 @@ impl Default for TenantConfigToml {
compaction_period: humantime::parse_duration(DEFAULT_COMPACTION_PERIOD)
.expect("cannot parse default compaction period"),
compaction_threshold: DEFAULT_COMPACTION_THRESHOLD,
compaction_upper_limit: DEFAULT_COMPACTION_UPPER_LIMIT,
compaction_algorithm: crate::models::CompactionAlgorithmSettings {
kind: DEFAULT_COMPACTION_ALGORITHM,
},
l0_flush_delay_threshold: None,
l0_flush_stall_threshold: None,
l0_flush_wait_upload: DEFAULT_L0_FLUSH_WAIT_UPLOAD,
gc_horizon: DEFAULT_GC_HORIZON,
gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD)
.expect("cannot parse default gc period"),

View File

@@ -282,7 +282,7 @@ pub struct TimelineCreateRequest {
pub mode: TimelineCreateRequestMode,
}
#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(untagged)]
pub enum TimelineCreateRequestMode {
Branch {
@@ -307,7 +307,7 @@ pub enum TimelineCreateRequestMode {
},
}
#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct TimelineCreateRequestModeImportPgdata {
pub location: ImportPgdataLocation,
pub idempotency_key: ImportPgdataIdempotencyKey,
@@ -326,7 +326,7 @@ pub enum ImportPgdataLocation {
},
}
#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(transparent)]
pub struct ImportPgdataIdempotencyKey(pub String);
@@ -458,6 +458,8 @@ pub struct TenantConfigPatch {
pub compaction_period: FieldPatch<String>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub compaction_threshold: FieldPatch<usize>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub compaction_upper_limit: FieldPatch<usize>,
// defer parsing compaction_algorithm, like eviction_policy
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub compaction_algorithm: FieldPatch<CompactionAlgorithmSettings>,
@@ -466,6 +468,8 @@ pub struct TenantConfigPatch {
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub l0_flush_stall_threshold: FieldPatch<usize>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub l0_flush_wait_upload: FieldPatch<bool>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub gc_horizon: FieldPatch<u64>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub gc_period: FieldPatch<String>,
@@ -520,10 +524,12 @@ pub struct TenantConfig {
pub compaction_target_size: Option<u64>,
pub compaction_period: Option<String>,
pub compaction_threshold: Option<usize>,
pub compaction_upper_limit: Option<usize>,
// defer parsing compaction_algorithm, like eviction_policy
pub compaction_algorithm: Option<CompactionAlgorithmSettings>,
pub l0_flush_delay_threshold: Option<usize>,
pub l0_flush_stall_threshold: Option<usize>,
pub l0_flush_wait_upload: Option<bool>,
pub gc_horizon: Option<u64>,
pub gc_period: Option<String>,
pub image_creation_threshold: Option<usize>,
@@ -556,9 +562,11 @@ impl TenantConfig {
mut compaction_target_size,
mut compaction_period,
mut compaction_threshold,
mut compaction_upper_limit,
mut compaction_algorithm,
mut l0_flush_delay_threshold,
mut l0_flush_stall_threshold,
mut l0_flush_wait_upload,
mut gc_horizon,
mut gc_period,
mut image_creation_threshold,
@@ -590,6 +598,9 @@ impl TenantConfig {
.apply(&mut compaction_target_size);
patch.compaction_period.apply(&mut compaction_period);
patch.compaction_threshold.apply(&mut compaction_threshold);
patch
.compaction_upper_limit
.apply(&mut compaction_upper_limit);
patch.compaction_algorithm.apply(&mut compaction_algorithm);
patch
.l0_flush_delay_threshold
@@ -597,6 +608,7 @@ impl TenantConfig {
patch
.l0_flush_stall_threshold
.apply(&mut l0_flush_stall_threshold);
patch.l0_flush_wait_upload.apply(&mut l0_flush_wait_upload);
patch.gc_horizon.apply(&mut gc_horizon);
patch.gc_period.apply(&mut gc_period);
patch
@@ -648,9 +660,11 @@ impl TenantConfig {
compaction_target_size,
compaction_period,
compaction_threshold,
compaction_upper_limit,
compaction_algorithm,
l0_flush_delay_threshold,
l0_flush_stall_threshold,
l0_flush_wait_upload,
gc_horizon,
gc_period,
image_creation_threshold,
@@ -1015,6 +1029,13 @@ pub struct TenantConfigPatchRequest {
pub config: TenantConfigPatch, // as we have a flattened field, we should reject all unknown fields in it
}
#[derive(Serialize, Deserialize, Debug)]
pub struct TenantWaitLsnRequest {
#[serde(flatten)]
pub timelines: HashMap<TimelineId, Lsn>,
pub timeout: Duration,
}
/// See [`TenantState::attachment_status`] and the OpenAPI docs for context.
#[derive(Serialize, Deserialize, Clone)]
#[serde(tag = "slug", content = "data", rename_all = "snake_case")]

View File

@@ -7,7 +7,7 @@
//! (notifying it of upscale).
use anyhow::{bail, Context};
use axum::extract::ws::{Message, WebSocket};
use axum::extract::ws::{Message, Utf8Bytes, WebSocket};
use futures::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
@@ -82,21 +82,21 @@ impl Dispatcher {
let highest_shared_version = match monitor_range.highest_shared_version(&agent_range) {
Ok(version) => {
sink.send(Message::Text(
sink.send(Message::Text(Utf8Bytes::from(
serde_json::to_string(&ProtocolResponse::Version(version)).unwrap(),
))
)))
.await
.context("failed to notify agent of negotiated protocol version")?;
version
}
Err(e) => {
sink.send(Message::Text(
sink.send(Message::Text(Utf8Bytes::from(
serde_json::to_string(&ProtocolResponse::Error(format!(
"Received protocol version range {} which does not overlap with {}",
agent_range, monitor_range
)))
.unwrap(),
))
)))
.await
.context("failed to notify agent of no overlap between protocol version ranges")?;
Err(e).context("error determining suitable protocol version range")?
@@ -126,7 +126,7 @@ impl Dispatcher {
let json = serde_json::to_string(&message).context("failed to serialize message")?;
self.sink
.send(Message::Text(json))
.send(Message::Text(Utf8Bytes::from(json)))
.await
.context("stream error sending message")
}

View File

@@ -763,4 +763,19 @@ impl Client {
.await
.map_err(Error::ReceiveBody)
}
pub async fn wait_lsn(
&self,
tenant_shard_id: TenantShardId,
request: TenantWaitLsnRequest,
) -> Result<StatusCode> {
let uri = format!(
"{}/v1/tenant/{tenant_shard_id}/wait_lsn",
self.mgmt_api_endpoint,
);
self.request_noerror(Method::POST, uri, request)
.await
.map(|resp| resp.status())
}
}

View File

@@ -160,9 +160,12 @@ pub fn draw_history<W: std::io::Write>(history: &[LayerTraceEvent], mut output:
// Fill in and thicken rectangle if it's an
// image layer so that we can see it.
let mut style = Style::default();
style.fill = Fill::Color(rgb(0x80, 0x80, 0x80));
style.stroke = Stroke::Color(rgb(0, 0, 0), 0.5);
let mut style = Style {
fill: Fill::Color(rgb(0x80, 0x80, 0x80)),
stroke: Stroke::Color(rgb(0, 0, 0), 0.5),
opacity: 1.0,
stroke_opacity: 1.0,
};
let y_start = lsn_max - lsn_start;
let y_end = lsn_max - lsn_end;
@@ -214,10 +217,6 @@ pub fn draw_history<W: std::io::Write>(history: &[LayerTraceEvent], mut output:
files_seen.insert(f);
}
let mut record_style = Style::default();
record_style.fill = Fill::Color(rgb(0x80, 0x80, 0x80));
record_style.stroke = Stroke::None;
writeln!(svg, "{}", EndSvg)?;
let mut layer_events_str = String::new();

View File

@@ -984,6 +984,8 @@ components:
type: string
compaction_threshold:
type: string
compaction_upper_limit:
type: string
image_creation_threshold:
type: integer
walreceiver_connect_timeout:

View File

@@ -10,6 +10,7 @@ use std::time::Duration;
use anyhow::{anyhow, Context, Result};
use enumset::EnumSet;
use futures::future::join_all;
use futures::StreamExt;
use futures::TryFutureExt;
use humantime::format_rfc3339;
@@ -40,6 +41,7 @@ use pageserver_api::models::TenantShardSplitRequest;
use pageserver_api::models::TenantShardSplitResponse;
use pageserver_api::models::TenantSorting;
use pageserver_api::models::TenantState;
use pageserver_api::models::TenantWaitLsnRequest;
use pageserver_api::models::TimelineArchivalConfigRequest;
use pageserver_api::models::TimelineCreateRequestMode;
use pageserver_api::models::TimelineCreateRequestModeImportPgdata;
@@ -95,6 +97,8 @@ use crate::tenant::timeline::CompactOptions;
use crate::tenant::timeline::CompactRequest;
use crate::tenant::timeline::CompactionError;
use crate::tenant::timeline::Timeline;
use crate::tenant::timeline::WaitLsnTimeout;
use crate::tenant::timeline::WaitLsnWaiter;
use crate::tenant::GetTimelineError;
use crate::tenant::OffloadedTimeline;
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
@@ -2790,6 +2794,63 @@ async fn secondary_download_handler(
json_response(status, progress)
}
async fn wait_lsn_handler(
mut request: Request<Body>,
cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let wait_lsn_request: TenantWaitLsnRequest = json_request(&mut request).await?;
let state = get_state(&request);
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
let mut wait_futures = Vec::default();
for timeline in tenant.list_timelines() {
let Some(lsn) = wait_lsn_request.timelines.get(&timeline.timeline_id) else {
continue;
};
let fut = {
let timeline = timeline.clone();
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
async move {
timeline
.wait_lsn(
*lsn,
WaitLsnWaiter::HttpEndpoint,
WaitLsnTimeout::Custom(wait_lsn_request.timeout),
&ctx,
)
.await
}
};
wait_futures.push(fut);
}
if wait_futures.is_empty() {
return json_response(StatusCode::NOT_FOUND, ());
}
let all_done = tokio::select! {
results = join_all(wait_futures) => {
results.iter().all(|res| res.is_ok())
},
_ = cancel.cancelled() => {
return Err(ApiError::Cancelled);
}
};
let status = if all_done {
StatusCode::OK
} else {
StatusCode::ACCEPTED
};
json_response(status, ())
}
async fn secondary_status_handler(
request: Request<Body>,
_cancel: CancellationToken,
@@ -3577,6 +3638,9 @@ pub fn make_router(
.post("/v1/tenant/:tenant_shard_id/secondary/download", |r| {
api_handler(r, secondary_download_handler)
})
.post("/v1/tenant/:tenant_shard_id/wait_lsn", |r| {
api_handler(r, wait_lsn_handler)
})
.put("/v1/tenant/:tenant_shard_id/break", |r| {
testing_api_handler("set tenant state to broken", r, handle_tenant_break)
})

View File

@@ -3,7 +3,7 @@ use metrics::{
register_counter_vec, register_gauge_vec, register_histogram, register_histogram_vec,
register_int_counter, register_int_counter_pair_vec, register_int_counter_vec,
register_int_gauge, register_int_gauge_vec, register_uint_gauge, register_uint_gauge_vec,
Counter, CounterVec, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterPair,
Counter, CounterVec, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterPair,
IntCounterPairVec, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec,
};
use once_cell::sync::Lazy;
@@ -398,6 +398,15 @@ pub(crate) static WAIT_LSN_TIME: Lazy<Histogram> = Lazy::new(|| {
.expect("failed to define a metric")
});
static FLUSH_WAIT_UPLOAD_TIME: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
"pageserver_flush_wait_upload_seconds",
"Time spent waiting for preceding uploads during layer flush",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});
static LAST_RECORD_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_last_record_lsn",
@@ -2569,6 +2578,7 @@ pub(crate) struct TimelineMetrics {
timeline_id: String,
pub flush_time_histo: StorageTimeMetrics,
pub flush_delay_histo: StorageTimeMetrics,
pub flush_wait_upload_time_gauge: Gauge,
pub compact_time_histo: StorageTimeMetrics,
pub create_images_time_histo: StorageTimeMetrics,
pub logical_size_histo: StorageTimeMetrics,
@@ -2620,6 +2630,9 @@ impl TimelineMetrics {
&shard_id,
&timeline_id,
);
let flush_wait_upload_time_gauge = FLUSH_WAIT_UPLOAD_TIME
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let compact_time_histo = StorageTimeMetrics::new(
StorageTimeOperation::Compact,
&tenant_id,
@@ -2766,6 +2779,7 @@ impl TimelineMetrics {
timeline_id,
flush_time_histo,
flush_delay_histo,
flush_wait_upload_time_gauge,
compact_time_histo,
create_images_time_histo,
logical_size_histo,
@@ -2815,6 +2829,14 @@ impl TimelineMetrics {
self.resident_physical_size_gauge.get()
}
pub(crate) fn flush_wait_upload_time_gauge_add(&self, duration: f64) {
self.flush_wait_upload_time_gauge.add(duration);
crate::metrics::FLUSH_WAIT_UPLOAD_TIME
.get_metric_with_label_values(&[&self.tenant_id, &self.shard_id, &self.timeline_id])
.unwrap()
.add(duration);
}
pub(crate) fn shutdown(&self) {
let was_shutdown = self
.shutdown
@@ -2832,6 +2854,7 @@ impl TimelineMetrics {
let shard_id = &self.shard_id;
let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = DISK_CONSISTENT_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = FLUSH_WAIT_UPLOAD_TIME.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = STANDBY_HORIZON.remove_label_values(&[tenant_id, shard_id, timeline_id]);
{
RESIDENT_PHYSICAL_SIZE_GLOBAL.sub(self.resident_physical_size_get());

View File

@@ -1708,6 +1708,7 @@ impl PageServerHandler {
.wait_lsn(
not_modified_since,
crate::tenant::timeline::WaitLsnWaiter::PageService,
timeline::WaitLsnTimeout::Default,
ctx,
)
.await?;
@@ -2044,6 +2045,7 @@ impl PageServerHandler {
.wait_lsn(
lsn,
crate::tenant::timeline::WaitLsnWaiter::PageService,
crate::tenant::timeline::WaitLsnTimeout::Default,
ctx,
)
.await?;

View File

@@ -37,6 +37,8 @@ use remote_timeline_client::manifest::{
OffloadedTimelineManifest, TenantManifest, LATEST_TENANT_MANIFEST_VERSION,
};
use remote_timeline_client::UploadQueueNotReadyError;
use remote_timeline_client::FAILED_REMOTE_OP_RETRIES;
use remote_timeline_client::FAILED_UPLOAD_WARN_THRESHOLD;
use std::collections::BTreeMap;
use std::fmt;
use std::future::Future;
@@ -2558,7 +2560,12 @@ impl Tenant {
// sizes etc. and that would get confused if the previous page versions
// are not in the repository yet.
ancestor_timeline
.wait_lsn(*lsn, timeline::WaitLsnWaiter::Tenant, ctx)
.wait_lsn(
*lsn,
timeline::WaitLsnWaiter::Tenant,
timeline::WaitLsnTimeout::Default,
ctx,
)
.await
.map_err(|e| match e {
e @ (WaitLsnError::Timeout(_) | WaitLsnError::BadState { .. }) => {
@@ -3809,6 +3816,13 @@ impl Tenant {
.unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
}
pub fn get_compaction_upper_limit(&self) -> usize {
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
tenant_conf
.compaction_upper_limit
.unwrap_or(self.conf.default_tenant_conf.compaction_upper_limit)
}
pub fn get_gc_horizon(&self) -> u64 {
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
tenant_conf
@@ -5308,27 +5322,37 @@ impl Tenant {
return Ok(());
}
upload_tenant_manifest(
&self.remote_storage,
&self.tenant_shard_id,
self.generation,
&manifest,
// Remote storage does no retries internally, so wrap it
match backoff::retry(
|| async {
upload_tenant_manifest(
&self.remote_storage,
&self.tenant_shard_id,
self.generation,
&manifest,
&self.cancel,
)
.await
},
|_e| self.cancel.is_cancelled(),
FAILED_UPLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
"uploading tenant manifest",
&self.cancel,
)
.await
.map_err(|e| {
if self.cancel.is_cancelled() {
TenantManifestError::Cancelled
} else {
TenantManifestError::RemoteStorage(e)
{
None => Err(TenantManifestError::Cancelled),
Some(Err(_)) if self.cancel.is_cancelled() => Err(TenantManifestError::Cancelled),
Some(Err(e)) => Err(TenantManifestError::RemoteStorage(e)),
Some(Ok(_)) => {
// Store the successfully uploaded manifest, so that future callers can avoid
// re-uploading the same thing.
*guard = Some(manifest);
Ok(())
}
})?;
// Store the successfully uploaded manifest, so that future callers can avoid
// re-uploading the same thing.
*guard = Some(manifest);
Ok(())
}
}
}
@@ -5452,9 +5476,11 @@ pub(crate) mod harness {
compaction_target_size: Some(tenant_conf.compaction_target_size),
compaction_period: Some(tenant_conf.compaction_period),
compaction_threshold: Some(tenant_conf.compaction_threshold),
compaction_upper_limit: Some(tenant_conf.compaction_upper_limit),
compaction_algorithm: Some(tenant_conf.compaction_algorithm),
l0_flush_delay_threshold: tenant_conf.l0_flush_delay_threshold,
l0_flush_stall_threshold: tenant_conf.l0_flush_stall_threshold,
l0_flush_wait_upload: Some(tenant_conf.l0_flush_wait_upload),
gc_horizon: Some(tenant_conf.gc_horizon),
gc_period: Some(tenant_conf.gc_period),
image_creation_threshold: Some(tenant_conf.image_creation_threshold),

View File

@@ -277,6 +277,10 @@ pub struct TenantConfOpt {
#[serde(default)]
pub compaction_threshold: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub compaction_upper_limit: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub compaction_algorithm: Option<CompactionAlgorithmSettings>,
@@ -289,6 +293,10 @@ pub struct TenantConfOpt {
#[serde(default)]
pub l0_flush_stall_threshold: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub l0_flush_wait_upload: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub gc_horizon: Option<u64>,
@@ -397,6 +405,9 @@ impl TenantConfOpt {
compaction_threshold: self
.compaction_threshold
.unwrap_or(global_conf.compaction_threshold),
compaction_upper_limit: self
.compaction_upper_limit
.unwrap_or(global_conf.compaction_upper_limit),
compaction_algorithm: self
.compaction_algorithm
.as_ref()
@@ -408,6 +419,9 @@ impl TenantConfOpt {
l0_flush_stall_threshold: self
.l0_flush_stall_threshold
.or(global_conf.l0_flush_stall_threshold),
l0_flush_wait_upload: self
.l0_flush_wait_upload
.unwrap_or(global_conf.l0_flush_wait_upload),
gc_horizon: self.gc_horizon.unwrap_or(global_conf.gc_horizon),
gc_period: self.gc_period.unwrap_or(global_conf.gc_period),
image_creation_threshold: self
@@ -471,9 +485,11 @@ impl TenantConfOpt {
mut compaction_target_size,
mut compaction_period,
mut compaction_threshold,
mut compaction_upper_limit,
mut compaction_algorithm,
mut l0_flush_delay_threshold,
mut l0_flush_stall_threshold,
mut l0_flush_wait_upload,
mut gc_horizon,
mut gc_period,
mut image_creation_threshold,
@@ -511,6 +527,9 @@ impl TenantConfOpt {
.map(|v| humantime::parse_duration(&v))?
.apply(&mut compaction_period);
patch.compaction_threshold.apply(&mut compaction_threshold);
patch
.compaction_upper_limit
.apply(&mut compaction_upper_limit);
patch.compaction_algorithm.apply(&mut compaction_algorithm);
patch
.l0_flush_delay_threshold
@@ -518,6 +537,7 @@ impl TenantConfOpt {
patch
.l0_flush_stall_threshold
.apply(&mut l0_flush_stall_threshold);
patch.l0_flush_wait_upload.apply(&mut l0_flush_wait_upload);
patch.gc_horizon.apply(&mut gc_horizon);
patch
.gc_period
@@ -587,9 +607,11 @@ impl TenantConfOpt {
compaction_target_size,
compaction_period,
compaction_threshold,
compaction_upper_limit,
compaction_algorithm,
l0_flush_delay_threshold,
l0_flush_stall_threshold,
l0_flush_wait_upload,
gc_horizon,
gc_period,
image_creation_threshold,
@@ -647,8 +669,10 @@ impl From<TenantConfOpt> for models::TenantConfig {
compaction_target_size: value.compaction_target_size,
compaction_period: value.compaction_period.map(humantime),
compaction_threshold: value.compaction_threshold,
compaction_upper_limit: value.compaction_upper_limit,
l0_flush_delay_threshold: value.l0_flush_delay_threshold,
l0_flush_stall_threshold: value.l0_flush_stall_threshold,
l0_flush_wait_upload: value.l0_flush_wait_upload,
gc_horizon: value.gc_horizon,
gc_period: value.gc_period.map(humantime),
image_creation_threshold: value.image_creation_threshold,

View File

@@ -1643,6 +1643,7 @@ impl TenantManager {
.wait_lsn(
*target_lsn,
crate::tenant::timeline::WaitLsnWaiter::Tenant,
crate::tenant::timeline::WaitLsnTimeout::Default,
ctx,
)
.await

View File

@@ -222,6 +222,10 @@ impl LayerFileMetadata {
shard,
}
}
/// Helper to get both generation and file size in a tuple
pub fn generation_file_size(&self) -> (Generation, u64) {
(self.generation, self.file_size)
}
}
/// Limited history of earlier ancestors.

View File

@@ -559,6 +559,13 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
}
}
enum LayerAction {
Download,
NoAction,
Skip,
Touch,
}
/// This type is a convenience to group together the various functions involved in
/// freshening a secondary tenant.
struct TenantDownloader<'a> {
@@ -1008,69 +1015,17 @@ impl<'a> TenantDownloader<'a> {
return (Err(UpdateError::Restart), touched);
}
// Existing on-disk layers: just update their access time.
if let Some(on_disk) = timeline_state.on_disk_layers.get(&layer.name) {
tracing::debug!("Layer {} is already on disk", layer.name);
if cfg!(debug_assertions) {
// Debug for https://github.com/neondatabase/neon/issues/6966: check that the files we think
// are already present on disk are really there.
match tokio::fs::metadata(&on_disk.local_path).await {
Ok(meta) => {
tracing::debug!(
"Layer {} present at {}, size {}",
layer.name,
on_disk.local_path,
meta.len(),
);
}
Err(e) => {
tracing::warn!(
"Layer {} not found at {} ({})",
layer.name,
on_disk.local_path,
e
);
debug_assert!(false);
}
}
}
if on_disk.metadata != layer.metadata || on_disk.access_time != layer.access_time {
// We already have this layer on disk. Update its access time.
tracing::debug!(
"Access time updated for layer {}: {} -> {}",
layer.name,
strftime(&on_disk.access_time),
strftime(&layer.access_time)
);
touched.push(layer);
}
continue;
} else {
tracing::debug!("Layer {} not present on disk yet", layer.name);
}
// Eviction: if we evicted a layer, then do not re-download it unless it was accessed more
// recently than it was evicted.
if let Some(evicted_at) = timeline_state.evicted_at.get(&layer.name) {
if &layer.access_time > evicted_at {
tracing::info!(
"Re-downloading evicted layer {}, accessed at {}, evicted at {}",
layer.name,
strftime(&layer.access_time),
strftime(evicted_at)
);
} else {
tracing::trace!(
"Not re-downloading evicted layer {}, accessed at {}, evicted at {}",
layer.name,
strftime(&layer.access_time),
strftime(evicted_at)
);
match self.layer_action(&timeline_state, &layer).await {
LayerAction::Download => (),
LayerAction::NoAction => continue,
LayerAction::Skip => {
self.skip_layer(layer);
continue;
}
LayerAction::Touch => {
touched.push(layer);
continue;
}
}
match self
@@ -1091,6 +1046,86 @@ impl<'a> TenantDownloader<'a> {
(Ok(()), touched)
}
async fn layer_action(
&self,
timeline_state: &SecondaryDetailTimeline,
layer: &HeatMapLayer,
) -> LayerAction {
// Existing on-disk layers: just update their access time.
if let Some(on_disk) = timeline_state.on_disk_layers.get(&layer.name) {
tracing::debug!("Layer {} is already on disk", layer.name);
if cfg!(debug_assertions) {
// Debug for https://github.com/neondatabase/neon/issues/6966: check that the files we think
// are already present on disk are really there.
match tokio::fs::metadata(&on_disk.local_path).await {
Ok(meta) => {
tracing::debug!(
"Layer {} present at {}, size {}",
layer.name,
on_disk.local_path,
meta.len(),
);
}
Err(e) => {
tracing::warn!(
"Layer {} not found at {} ({})",
layer.name,
on_disk.local_path,
e
);
debug_assert!(false);
}
}
}
if on_disk.metadata.generation_file_size() != on_disk.metadata.generation_file_size() {
tracing::info!(
"Re-downloading layer {} with changed size or generation: {:?}->{:?}",
layer.name,
on_disk.metadata.generation_file_size(),
on_disk.metadata.generation_file_size()
);
return LayerAction::Download;
}
if on_disk.metadata != layer.metadata || on_disk.access_time != layer.access_time {
// We already have this layer on disk. Update its access time.
tracing::debug!(
"Access time updated for layer {}: {} -> {}",
layer.name,
strftime(&on_disk.access_time),
strftime(&layer.access_time)
);
return LayerAction::Touch;
}
return LayerAction::NoAction;
} else {
tracing::debug!("Layer {} not present on disk yet", layer.name);
}
// Eviction: if we evicted a layer, then do not re-download it unless it was accessed more
// recently than it was evicted.
if let Some(evicted_at) = timeline_state.evicted_at.get(&layer.name) {
if &layer.access_time > evicted_at {
tracing::info!(
"Re-downloading evicted layer {}, accessed at {}, evicted at {}",
layer.name,
strftime(&layer.access_time),
strftime(evicted_at)
);
} else {
tracing::trace!(
"Not re-downloading evicted layer {}, accessed at {}, evicted at {}",
layer.name,
strftime(&layer.access_time),
strftime(evicted_at)
);
return LayerAction::Skip;
}
}
LayerAction::Download
}
async fn download_timeline(
&self,
timeline: HeatMapTimeline,

View File

@@ -33,6 +33,7 @@ use utils::sync::gate::GateGuard;
use utils::lsn::Lsn;
pub use batch_split_writer::{BatchLayerWriter, SplitDeltaLayerWriter, SplitImageLayerWriter};
pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef};
pub use image_layer::{ImageLayer, ImageLayerWriter};
pub use inmemory_layer::InMemoryLayer;

View File

@@ -87,6 +87,23 @@ impl BatchLayerWriter {
));
}
pub(crate) async fn finish(
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<Vec<ResidentLayer>> {
let res = self
.finish_with_discard_fn(tline, ctx, |_| async { false })
.await?;
let mut output = Vec::new();
for r in res {
if let BatchWriterResult::Produced(layer) = r {
output.push(layer);
}
}
Ok(output)
}
pub(crate) async fn finish_with_discard_fn<D, F>(
self,
tline: &Arc<Timeline>,

View File

@@ -70,6 +70,7 @@ use std::sync::{Arc, Mutex, OnceLock, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use crate::l0_flush::{self, L0FlushGlobalState};
use crate::tenant::storage_layer::ImageLayerName;
use crate::{
aux_file::AuxFileSizeEstimator,
page_service::TenantManagerTypes,
@@ -78,7 +79,7 @@ use crate::{
layer_map::{LayerMap, SearchResult},
metadata::TimelineMetadata,
storage_layer::{
inmemory_layer::IndexEntry, IoConcurrency, PersistentLayerDesc,
inmemory_layer::IndexEntry, BatchLayerWriter, IoConcurrency, PersistentLayerDesc,
ValueReconstructSituation,
},
},
@@ -144,15 +145,19 @@ use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient;
use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
use super::storage_layer::{LayerFringe, LayerVisibilityHint, ReadableLayer};
use super::upload_queue::NotInitialized;
use super::GcError;
use super::{
debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf, MaybeOffloaded,
config::TenantConf, storage_layer::LayerVisibilityHint, upload_queue::NotInitialized,
MaybeOffloaded,
};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
use super::{
remote_timeline_client::RemoteTimelineClient, remote_timeline_client::WaitCompletionError,
storage_layer::ReadableLayer,
};
use super::{
secondary::heatmap::{HeatMapLayer, HeatMapTimeline},
GcError,
};
#[cfg(test)]
@@ -897,10 +902,17 @@ impl From<GetReadyAncestorError> for PageReconstructError {
}
}
pub(crate) enum WaitLsnTimeout {
Custom(Duration),
// Use the [`PageServerConf::wait_lsn_timeout`] default
Default,
}
pub(crate) enum WaitLsnWaiter<'a> {
Timeline(&'a Timeline),
Tenant,
PageService,
HttpEndpoint,
}
/// Argument to [`Timeline::shutdown`].
@@ -922,7 +934,7 @@ pub(crate) enum ShutdownMode {
}
struct ImageLayerCreationOutcome {
image: Option<ResidentLayer>,
unfinished_image_layer: Option<ImageLayerWriter>,
next_start_key: Key,
}
@@ -1297,6 +1309,7 @@ impl Timeline {
&self,
lsn: Lsn,
who_is_waiting: WaitLsnWaiter<'_>,
timeout: WaitLsnTimeout,
ctx: &RequestContext, /* Prepare for use by cancellation */
) -> Result<(), WaitLsnError> {
let state = self.current_state();
@@ -1313,7 +1326,7 @@ impl Timeline {
| TaskKind::WalReceiverConnectionPoller => {
let is_myself = match who_is_waiting {
WaitLsnWaiter::Timeline(waiter) => Weak::ptr_eq(&waiter.myself, &self.myself),
WaitLsnWaiter::Tenant | WaitLsnWaiter::PageService => unreachable!("tenant or page_service context are not expected to have task kind {:?}", ctx.task_kind()),
WaitLsnWaiter::Tenant | WaitLsnWaiter::PageService | WaitLsnWaiter::HttpEndpoint => unreachable!("tenant or page_service context are not expected to have task kind {:?}", ctx.task_kind()),
};
if is_myself {
if let Err(current) = self.last_record_lsn.would_wait_for(lsn) {
@@ -1329,13 +1342,14 @@ impl Timeline {
}
}
let timeout = match timeout {
WaitLsnTimeout::Custom(t) => t,
WaitLsnTimeout::Default => self.conf.wait_lsn_timeout,
};
let _timer = crate::metrics::WAIT_LSN_TIME.start_timer();
match self
.last_record_lsn
.wait_for_timeout(lsn, self.conf.wait_lsn_timeout)
.await
{
match self.last_record_lsn.wait_for_timeout(lsn, timeout).await {
Ok(()) => Ok(()),
Err(e) => {
use utils::seqwait::SeqWaitError::*;
@@ -2167,9 +2181,17 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
}
fn get_compaction_upper_limit(&self) -> usize {
let tenant_conf = self.tenant_conf.load();
tenant_conf
.tenant_conf
.compaction_upper_limit
.unwrap_or(self.conf.default_tenant_conf.compaction_upper_limit)
}
fn get_l0_flush_delay_threshold(&self) -> Option<usize> {
// Default to delay L0 flushes at 3x compaction threshold.
const DEFAULT_L0_FLUSH_DELAY_FACTOR: usize = 3;
// Disable L0 flushes by default. This and compaction needs further tuning.
const DEFAULT_L0_FLUSH_DELAY_FACTOR: usize = 0; // TODO: default to e.g. 3
// If compaction is disabled, don't delay.
if self.get_compaction_period() == Duration::ZERO {
@@ -2197,10 +2219,9 @@ impl Timeline {
}
fn get_l0_flush_stall_threshold(&self) -> Option<usize> {
// Default to stall L0 flushes at 5x compaction threshold.
// TODO: stalls are temporarily disabled by default, see below.
#[allow(unused)]
const DEFAULT_L0_FLUSH_STALL_FACTOR: usize = 5;
// Disable L0 stalls by default. In ingest benchmarks, we see image compaction take >10
// minutes, blocking L0 compaction, and we can't stall L0 flushes for that long.
const DEFAULT_L0_FLUSH_STALL_FACTOR: usize = 0; // TODO: default to e.g. 5
// If compaction is disabled, don't stall.
if self.get_compaction_period() == Duration::ZERO {
@@ -2232,13 +2253,8 @@ impl Timeline {
return None;
}
// Disable stalls by default. In ingest benchmarks, we see image compaction take >10
// minutes, blocking L0 compaction, and we can't stall L0 flushes for that long.
//
// TODO: fix this.
// let l0_flush_stall_threshold = l0_flush_stall_threshold
// .unwrap_or(DEFAULT_L0_FLUSH_STALL_FACTOR * compaction_threshold);
let l0_flush_stall_threshold = l0_flush_stall_threshold?;
let l0_flush_stall_threshold = l0_flush_stall_threshold
.unwrap_or(DEFAULT_L0_FLUSH_STALL_FACTOR * compaction_threshold);
// 0 disables backpressure.
if l0_flush_stall_threshold == 0 {
@@ -2252,6 +2268,14 @@ impl Timeline {
Some(max(l0_flush_stall_threshold, compaction_threshold))
}
fn get_l0_flush_wait_upload(&self) -> bool {
let tenant_conf = self.tenant_conf.load();
tenant_conf
.tenant_conf
.l0_flush_wait_upload
.unwrap_or(self.conf.default_tenant_conf.l0_flush_wait_upload)
}
fn get_image_creation_threshold(&self) -> usize {
let tenant_conf = self.tenant_conf.load();
tenant_conf
@@ -3584,7 +3608,12 @@ impl Timeline {
}
}
ancestor
.wait_lsn(self.ancestor_lsn, WaitLsnWaiter::Timeline(self), ctx)
.wait_lsn(
self.ancestor_lsn,
WaitLsnWaiter::Timeline(self),
WaitLsnTimeout::Default,
ctx,
)
.await
.map_err(|e| match e {
e @ WaitLsnError::Timeout(_) => GetReadyAncestorError::AncestorLsnTimeout(e),
@@ -4034,6 +4063,27 @@ impl Timeline {
// release lock on 'layers'
};
// Backpressure mechanism: wait with continuation of the flush loop until we have uploaded all layer files.
// This makes us refuse ingest until the new layers have been persisted to the remote
// TODO: remove this, and rely on l0_flush_{delay,stall}_threshold instead.
if self.get_l0_flush_wait_upload() {
let start = Instant::now();
self.remote_client
.wait_completion()
.await
.map_err(|e| match e {
WaitCompletionError::UploadQueueShutDownOrStopped
| WaitCompletionError::NotInitialized(
NotInitialized::ShuttingDown | NotInitialized::Stopped,
) => FlushLayerError::Cancelled,
WaitCompletionError::NotInitialized(NotInitialized::Uninitialized) => {
FlushLayerError::Other(anyhow!(e).into())
}
})?;
let duration = start.elapsed().as_secs_f64();
self.metrics.flush_wait_upload_time_gauge_add(duration);
}
// FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
// a compaction can delete the file and then it won't be available for uploads any more.
// We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this
@@ -4364,11 +4414,15 @@ impl Timeline {
if wrote_keys {
// Normal path: we have written some data into the new image layer for this
// partition, so flush it to disk.
let (desc, path) = image_layer_writer.finish(ctx).await?;
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
info!("created image layer for rel {}", image_layer.local_path());
info!(
"produced image layer for rel {}",
ImageLayerName {
key_range: img_range.clone(),
lsn
},
);
Ok(ImageLayerCreationOutcome {
image: Some(image_layer),
unfinished_image_layer: Some(image_layer_writer),
next_start_key: img_range.end,
})
} else {
@@ -4378,7 +4432,7 @@ impl Timeline {
// layer we write will cover the key range that we just scanned.
tracing::debug!("no data in range {}-{}", img_range.start, img_range.end);
Ok(ImageLayerCreationOutcome {
image: None,
unfinished_image_layer: None,
next_start_key: start,
})
}
@@ -4427,7 +4481,7 @@ impl Timeline {
if !trigger_generation && mode == ImageLayerCreationMode::Try {
return Ok(ImageLayerCreationOutcome {
image: None,
unfinished_image_layer: None,
next_start_key: img_range.end,
});
}
@@ -4453,14 +4507,15 @@ impl Timeline {
if wrote_any_image {
// Normal path: we have written some data into the new image layer for this
// partition, so flush it to disk.
let (desc, path) = image_layer_writer.finish(ctx).await?;
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
info!(
"created image layer for metadata {}",
image_layer.local_path()
ImageLayerName {
key_range: img_range.clone(),
lsn
}
);
Ok(ImageLayerCreationOutcome {
image: Some(image_layer),
unfinished_image_layer: Some(image_layer_writer),
next_start_key: img_range.end,
})
} else {
@@ -4470,7 +4525,7 @@ impl Timeline {
// layer we write will cover the key range that we just scanned.
tracing::debug!("no data in range {}-{}", img_range.start, img_range.end);
Ok(ImageLayerCreationOutcome {
image: None,
unfinished_image_layer: None,
next_start_key: start,
})
}
@@ -4537,7 +4592,6 @@ impl Timeline {
ctx: &RequestContext,
) -> Result<Vec<ResidentLayer>, CreateImageLayersError> {
let timer = self.metrics.create_images_time_histo.start_timer();
let mut image_layers = Vec::new();
// We need to avoid holes between generated image layers.
// Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one
@@ -4552,6 +4606,8 @@ impl Timeline {
let check_for_image_layers = self.should_check_if_image_layers_required(lsn);
let mut batch_image_writer = BatchLayerWriter::new(self.conf).await?;
for partition in partitioning.parts.iter() {
if self.cancel.is_cancelled() {
return Err(CreateImageLayersError::Cancelled);
@@ -4624,45 +4680,45 @@ impl Timeline {
.map_err(|_| CreateImageLayersError::Cancelled)?,
);
if !compact_metadata {
let ImageLayerCreationOutcome {
image,
next_start_key,
} = self
.create_image_layer_for_rel_blocks(
partition,
image_layer_writer,
lsn,
ctx,
img_range,
start,
io_concurrency,
)
.await?;
start = next_start_key;
image_layers.extend(image);
let ImageLayerCreationOutcome {
unfinished_image_layer,
next_start_key,
} = if !compact_metadata {
self.create_image_layer_for_rel_blocks(
partition,
image_layer_writer,
lsn,
ctx,
img_range.clone(),
start,
io_concurrency,
)
.await?
} else {
let ImageLayerCreationOutcome {
image,
next_start_key,
} = self
.create_image_layer_for_metadata_keys(
partition,
image_layer_writer,
lsn,
ctx,
img_range,
mode,
start,
io_concurrency,
)
.await?;
start = next_start_key;
image_layers.extend(image);
self.create_image_layer_for_metadata_keys(
partition,
image_layer_writer,
lsn,
ctx,
img_range.clone(),
mode,
start,
io_concurrency,
)
.await?
};
start = next_start_key;
if let Some(unfinished_image_layer) = unfinished_image_layer {
batch_image_writer.add_unfinished_image_writer(
unfinished_image_layer,
img_range,
lsn,
);
}
}
let image_layers = batch_image_writer.finish(self, ctx).await?;
let mut guard = self.layers.write().await;
// FIXME: we could add the images to be uploaded *before* returning from here, but right

View File

@@ -47,9 +47,7 @@ use crate::tenant::timeline::{ImageLayerCreationOutcome, IoConcurrency};
use crate::tenant::timeline::{Layer, ResidentLayer};
use crate::tenant::{gc_block, DeltaLayer, MaybeOffloaded};
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
use pageserver_api::config::tenant_conf_defaults::{
DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD,
};
use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE;
use pageserver_api::key::Key;
use pageserver_api::keyspace::KeySpace;
@@ -1117,14 +1115,7 @@ impl Timeline {
// Under normal circumstances, we will accumulate up to compaction_interval L0s of size
// checkpoint_distance each. To avoid edge cases using extra system resources, bound our
// work in this function to only operate on this much delta data at once.
//
// Take the max of the configured value & the default, so that tests that configure tiny values
// can still use a sensible amount of memory, but if a deployed system configures bigger values we
// still let them compact a full stack of L0s in one go.
let delta_size_limit = std::cmp::max(
self.get_compaction_threshold(),
DEFAULT_COMPACTION_THRESHOLD,
) as u64
let delta_size_limit = self.get_compaction_upper_limit() as u64
* std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
let mut fully_compacted = true;
@@ -3197,7 +3188,7 @@ impl TimelineAdaptor {
// TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
let start = Key::MIN;
let ImageLayerCreationOutcome {
image,
unfinished_image_layer,
next_start_key: _,
} = self
.timeline
@@ -3212,7 +3203,10 @@ impl TimelineAdaptor {
)
.await?;
if let Some(image_layer) = image {
if let Some(image_layer_writer) = unfinished_image_layer {
let (desc, path) = image_layer_writer.finish(ctx).await?;
let image_layer =
Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
self.new_images.push(image_layer);
}

View File

@@ -113,7 +113,7 @@ pub async fn doit(
match res {
Ok(_) => break,
Err(err) => {
info!(?err, "indefintely waiting for pgdata to finish");
info!(?err, "indefinitely waiting for pgdata to finish");
if tokio::time::timeout(std::time::Duration::from_secs(10), cancel.cancelled())
.await
.is_ok()

View File

@@ -308,7 +308,7 @@ impl ControlFile {
202107181 => 14,
202209061 => 15,
202307071 => 16,
/* XXX pg17 */
202406281 => 17,
catversion => {
anyhow::bail!("unrecognized catalog version {catversion}")
}

View File

@@ -164,9 +164,10 @@ pub(super) async fn connection_manager_loop_step(
Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update),
Err(status) => {
match status.code() {
Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") => {
Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") || status.message().contains("error reading a body from connection") => {
// tonic's error handling doesn't provide a clear code for disconnections: we get
// "h2 protocol error: error reading a body from connection: stream closed because of a broken pipe"
// => https://github.com/neondatabase/neon/issues/9562
info!("broker disconnected: {status}");
},
_ => {
@@ -273,7 +274,7 @@ pub(super) async fn connection_manager_loop_step(
};
last_discovery_ts = Some(std::time::Instant::now());
debug!("No active connection and no candidates, sending discovery request to the broker");
info!("No active connection and no candidates, sending discovery request to the broker");
// Cancellation safety: we want to send a message to the broker, but publish_one()
// function can get cancelled by the other select! arm. This is absolutely fine, because

View File

@@ -118,7 +118,7 @@ pub(super) async fn handle_walreceiver_connection(
cancellation: CancellationToken,
connect_timeout: Duration,
ctx: RequestContext,
node: NodeId,
safekeeper_node: NodeId,
ingest_batch_size: u64,
) -> Result<(), WalReceiverError> {
debug_assert_current_span_has_tenant_and_timeline_id();
@@ -140,7 +140,7 @@ pub(super) async fn handle_walreceiver_connection(
let (replication_client, connection) = {
let mut config = wal_source_connconf.to_tokio_postgres_config();
config.application_name(format!("pageserver-{}", node.0).as_str());
config.application_name(format!("pageserver-{}", timeline.conf.id.0).as_str());
config.replication_mode(tokio_postgres::config::ReplicationMode::Physical);
match time::timeout(connect_timeout, config.connect(postgres::NoTls)).await {
Ok(client_and_conn) => client_and_conn?,
@@ -162,7 +162,7 @@ pub(super) async fn handle_walreceiver_connection(
latest_wal_update: Utc::now().naive_utc(),
streaming_lsn: None,
commit_lsn: None,
node,
node: safekeeper_node,
};
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
warn!("Wal connection event listener dropped right after connection init, aborting the connection: {e}");

View File

@@ -480,7 +480,7 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
if (LFC_ENABLED())
{
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) != 0;
found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & ((uint32)1 << (chunk_offs & 31))) != 0;
}
LWLockRelease(lfc_lock);
return found;
@@ -527,7 +527,7 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
for (; chunk_offs < BLOCKS_PER_CHUNK && i < nblocks; chunk_offs++, i++)
{
if ((entry->bitmap[chunk_offs >> 5] &
(1 << (chunk_offs & 31))) != 0)
((uint32)1 << (chunk_offs & 31))) != 0)
{
BITMAP_SET(bitmap, i);
found++;
@@ -620,7 +620,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
}
/* remove the page from the cache */
entry->bitmap[chunk_offs >> 5] &= ~(1 << (chunk_offs & (32 - 1)));
entry->bitmap[chunk_offs >> 5] &= ~((uint32)1 << (chunk_offs & (32 - 1)));
if (entry->access_count == 0)
{
@@ -774,7 +774,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
* If the page is valid, we consider it "read".
* All other pages will be fetched separately by the next cache
*/
if (entry->bitmap[(chunk_offs + i) / 32] & (1 << ((chunk_offs + i) % 32)))
if (entry->bitmap[(chunk_offs + i) / 32] & ((uint32)1 << ((chunk_offs + i) % 32)))
{
BITMAP_SET(mask, buf_offset + i);
iteration_hits++;
@@ -1034,7 +1034,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
{
lfc_ctl->used_pages += 1 - ((entry->bitmap[(chunk_offs + i) >> 5] >> ((chunk_offs + i) & 31)) & 1);
entry->bitmap[(chunk_offs + i) >> 5] |=
(1 << ((chunk_offs + i) & 31));
((uint32)1 << ((chunk_offs + i) & 31));
}
}
@@ -1282,7 +1282,7 @@ local_cache_pages(PG_FUNCTION_ARGS)
{
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
{
if (entry->bitmap[i >> 5] & (1 << (i & 31)))
if (entry->bitmap[i >> 5] & ((uint32)1 << (i & 31)))
{
fctx->record[n].pageoffs = entry->offset * BLOCKS_PER_CHUNK + i;
fctx->record[n].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));

View File

@@ -1024,7 +1024,8 @@ DetermineEpochStartLsn(WalProposer *wp)
dth = &wp->safekeeper[wp->donor].voteResponse.termHistory;
wp->propTermHistory.n_entries = dth->n_entries + 1;
wp->propTermHistory.entries = palloc(sizeof(TermSwitchEntry) * wp->propTermHistory.n_entries);
memcpy(wp->propTermHistory.entries, dth->entries, sizeof(TermSwitchEntry) * dth->n_entries);
if (dth->n_entries > 0)
memcpy(wp->propTermHistory.entries, dth->entries, sizeof(TermSwitchEntry) * dth->n_entries);
wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].term = wp->propTerm;
wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].lsn = wp->propEpochStartLsn;

View File

@@ -423,11 +423,11 @@ async fn upload_parquet(
.await
.ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
.and_then(|x| x)
.context("request_data_upload")
.with_context(|| format!("request_data_upload: path={path}"))
.err();
if let Some(err) = maybe_err {
tracing::error!(%id, error = ?err, "failed to upload request data");
tracing::error!(%id, %path, error = ?err, "failed to upload request data");
}
Ok(buffer.writer())

View File

@@ -396,13 +396,13 @@ async fn upload_backup_events(
TimeoutOrCancel::caused_by_cancel,
FAILED_UPLOAD_WARN_THRESHOLD,
FAILED_UPLOAD_MAX_RETRIES,
"request_data_upload",
"usage_metrics_upload",
cancel,
)
.await
.ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
.and_then(|x| x)
.context("request_data_upload")?;
.with_context(|| format!("usage_metrics_upload: path={remote_path}"))?;
Ok(())
}

View File

@@ -2,8 +2,9 @@ use pageserver_api::{
models::{
detach_ancestor::AncestorDetached, LocationConfig, LocationConfigListResponse,
PageserverUtilization, SecondaryProgress, TenantScanRemoteStorageResponse,
TenantShardSplitRequest, TenantShardSplitResponse, TimelineArchivalConfigRequest,
TimelineCreateRequest, TimelineInfo, TopTenantShardsRequest, TopTenantShardsResponse,
TenantShardSplitRequest, TenantShardSplitResponse, TenantWaitLsnRequest,
TimelineArchivalConfigRequest, TimelineCreateRequest, TimelineInfo, TopTenantShardsRequest,
TopTenantShardsResponse,
},
shard::TenantShardId,
};
@@ -299,4 +300,17 @@ impl PageserverClient {
self.inner.top_tenant_shards(request).await
)
}
pub(crate) async fn wait_lsn(
&self,
tenant_shard_id: TenantShardId,
request: TenantWaitLsnRequest,
) -> Result<StatusCode> {
measured_request!(
"wait_lsn",
crate::metrics::Method::Post,
&self.node_id_label,
self.inner.wait_lsn(tenant_shard_id, request).await
)
}
}

View File

@@ -3,7 +3,7 @@ use crate::persistence::Persistence;
use crate::{compute_hook, service};
use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy};
use pageserver_api::models::{
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig, TenantWaitLsnRequest,
};
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use pageserver_client::mgmt_api;
@@ -348,6 +348,32 @@ impl Reconciler {
Ok(())
}
async fn wait_lsn(
&self,
node: &Node,
tenant_shard_id: TenantShardId,
timelines: HashMap<TimelineId, Lsn>,
) -> Result<StatusCode, ReconcileError> {
const TIMEOUT: Duration = Duration::from_secs(10);
let client = PageserverClient::new(
node.get_id(),
node.base_url(),
self.service_config.jwt_token.as_deref(),
);
client
.wait_lsn(
tenant_shard_id,
TenantWaitLsnRequest {
timelines,
timeout: TIMEOUT,
},
)
.await
.map_err(|e| e.into())
}
async fn get_lsns(
&self,
tenant_shard_id: TenantShardId,
@@ -461,6 +487,39 @@ impl Reconciler {
node: &Node,
baseline: HashMap<TimelineId, Lsn>,
) -> anyhow::Result<()> {
// Signal to the pageserver that it should ingest up to the baseline LSNs.
loop {
match self.wait_lsn(node, tenant_shard_id, baseline.clone()).await {
Ok(StatusCode::OK) => {
// Everything is caught up
return Ok(());
}
Ok(StatusCode::ACCEPTED) => {
// Some timelines are not caught up yet.
// They'll be polled below.
break;
}
Ok(StatusCode::NOT_FOUND) => {
// None of the timelines are present on the pageserver.
// This is correct if they've all been deleted, but
// let let the polling loop below cross check.
break;
}
Ok(status_code) => {
tracing::warn!(
"Unexpected status code ({status_code}) returned by wait_lsn endpoint"
);
break;
}
Err(e) => {
tracing::info!("🕑 Can't trigger LSN wait on {node} yet, waiting ({e})",);
tokio::time::sleep(Duration::from_millis(500)).await;
continue;
}
}
}
// Poll the LSNs until they catch up
loop {
let latest = match self.get_lsns(tenant_shard_id, node).await {
Ok(l) => l,

View File

@@ -3275,9 +3275,10 @@ impl Service {
mut create_req: TimelineCreateRequest,
) -> Result<TimelineInfo, ApiError> {
tracing::info!(
"Creating timeline {}/{}",
"Creating timeline {}/{}, mode={:?}",
tenant_id,
create_req.new_timeline_id,
create_req.mode
);
let _tenant_lock = trace_shared_lock(

View File

@@ -165,6 +165,7 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
"pageserver_evictions_with_low_residence_duration_total",
"pageserver_aux_file_estimated_size",
"pageserver_valid_lsn_lease_count",
"pageserver_flush_wait_upload_seconds",
counter("pageserver_tenant_throttling_count_accounted_start"),
counter("pageserver_tenant_throttling_count_accounted_finish"),
counter("pageserver_tenant_throttling_wait_usecs_sum"),

View File

@@ -31,6 +31,7 @@ def test_layer_map(neon_env_builder: NeonEnvBuilder, zenbenchmark):
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
cur = endpoint.connect().cursor()
cur.execute("set log_statement = 'all'")
cur.execute("create table t(x integer)")
for _ in range(n_iters):
cur.execute(f"insert into t values (generate_series(1,{n_records}))")

View File

@@ -139,8 +139,10 @@ def test_fully_custom_config(positive_env: NeonEnv):
fully_custom_config = {
"compaction_period": "1h",
"compaction_threshold": 13,
"compaction_upper_limit": 100,
"l0_flush_delay_threshold": 25,
"l0_flush_stall_threshold": 42,
"l0_flush_wait_upload": True,
"compaction_target_size": 1048576,
"checkpoint_distance": 10000,
"checkpoint_timeout": "13m",

View File

@@ -19,7 +19,6 @@ from fixtures.pageserver.utils import wait_until_tenant_active
from fixtures.utils import query_scalar
from performance.test_perf_pgbench import get_scales_matrix
from requests import RequestException
from requests.exceptions import RetryError
# Test branch creation
@@ -177,8 +176,11 @@ def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonE
env.neon_cli.mappings_map_branch(initial_branch, env.initial_tenant, env.initial_timeline)
with pytest.raises(RuntimeError, match="is not active, state: Loading"):
env.endpoints.create_start(initial_branch, tenant_id=env.initial_tenant)
with pytest.raises(RuntimeError, match="ERROR: Not found: Timeline"):
env.endpoints.create_start(
initial_branch, tenant_id=env.initial_tenant, basebackup_request_tries=2
)
ps_http.configure_failpoints(("before-upload-index-pausable", "off"))
finally:
env.pageserver.stop(immediate=True)
@@ -219,7 +221,10 @@ def test_cannot_branch_from_non_uploaded_branch(neon_env_builder: NeonEnvBuilder
branch_id = TimelineId.generate()
with pytest.raises(RetryError, match="too many 503 error responses"):
with pytest.raises(
PageserverApiException,
match="Cannot branch off the timeline that's not present in pageserver",
):
ps_http.timeline_create(
env.pg_version,
env.initial_tenant,

View File

@@ -5,6 +5,7 @@ from typing import TYPE_CHECKING, cast
import pytest
from fixtures.compute_migrations import COMPUTE_MIGRATIONS, NUM_COMPUTE_MIGRATIONS
from fixtures.metrics import parse_metrics
if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnv
@@ -33,6 +34,17 @@ def test_compute_migrations_retry(neon_simple_env: NeonEnv, compute_migrations_d
migration_id = cast("int", cur.fetchall()[0][0])
assert migration_id == i - 1
# Check that migration failure is properly recorded in the metrics
client = endpoint.http_client()
raw_metrics = client.metrics()
metrics = parse_metrics(raw_metrics)
failed_migration = metrics.query_all(
"compute_ctl_db_migration_failed_total",
)
assert len(failed_migration) == 1
for sample in failed_migration:
assert sample.value == 1
endpoint.stop()
endpoint.start()

View File

@@ -8,6 +8,7 @@ from typing import TYPE_CHECKING
import pytest
from fixtures.log_helper import log
from fixtures.metrics import parse_metrics
from fixtures.neon_fixtures import (
NeonEnvBuilder,
)
@@ -128,6 +129,17 @@ def test_remote_extensions(
httpserver.check()
# Check that we properly recorded downloads in the metrics
client = endpoint.http_client()
raw_metrics = client.metrics()
metrics = parse_metrics(raw_metrics)
remote_ext_requests = metrics.query_all(
"compute_ctl_remote_ext_requests_total",
)
assert len(remote_ext_requests) == 1
for sample in remote_ext_requests:
assert sample.value == 1
# TODO
# 1. Test downloading remote library.
@@ -137,7 +149,7 @@ def test_remote_extensions(
#
# 3.Test that extension is downloaded after endpoint restart,
# when the library is used in the query.
# Run the test with mutliple simultaneous connections to an endpoint.
# Run the test with multiple simultaneous connections to an endpoint.
# to ensure that the extension is downloaded only once.
#
# 4. Test that private extensions are only downloaded when they are present in the spec.

View File

@@ -14,10 +14,8 @@ from fixtures.pageserver.http import (
ImportPgdataIdemptencyKey,
PageserverApiException,
)
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import RemoteStorageKind
from fixtures.utils import run_only_on_postgres
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
@@ -39,10 +37,6 @@ smoke_params = [
]
@run_only_on_postgres(
[PgVersion.V14, PgVersion.V15, PgVersion.V16],
"newer control file catalog version and struct format isn't supported",
)
@pytest.mark.parametrize("shard_count,stripe_size,rel_block_size", smoke_params)
def test_pgdata_import_smoke(
vanilla_pg: VanillaPostgres,
@@ -117,13 +111,15 @@ def test_pgdata_import_smoke(
# TODO: would be nicer to just compare pgdump
# Enable IO concurrency for batching on large sequential scan, to avoid making
# this test unnecessarily onerous on CPU
# this test unnecessarily onerous on CPU. Especially on debug mode, it's still
# pretty onerous though, so increase statement_timeout to avoid timeouts.
assert ep.safe_psql_many(
[
"set effective_io_concurrency=32;",
"SET statement_timeout='300s';",
"select count(*), sum(data::bigint)::bigint from t",
]
) == [[], [(expect_nrows, expect_sum)]]
) == [[], [], [(expect_nrows, expect_sum)]]
validate_vanilla_equivalence(vanilla_pg)
@@ -156,7 +152,7 @@ def test_pgdata_import_smoke(
)
timeline_id = TimelineId.generate()
log.info("starting import")
log.info(f"starting import (tenant {tenant_id} timeline {timeline_id})")
start = time.monotonic()
idempotency = ImportPgdataIdemptencyKey.random()
@@ -176,6 +172,7 @@ def test_pgdata_import_smoke(
},
)
env.neon_cli.mappings_map_branch(import_branch_name, tenant_id, timeline_id)
time.sleep(10)
while True:
locations = env.storage_controller.locate(tenant_id)
@@ -317,10 +314,6 @@ def test_pgdata_import_smoke(
br_initdb_endpoint.safe_psql("select * from othertable")
@run_only_on_postgres(
[PgVersion.V14, PgVersion.V15, PgVersion.V16],
"newer control file catalog version and struct format isn't supported",
)
def test_fast_import_binary(
test_output_dir,
vanilla_pg: VanillaPostgres,

View File

@@ -786,6 +786,54 @@ def test_empty_branch_remote_storage_upload_on_restart(neon_env_builder: NeonEnv
create_thread.join()
def test_paused_upload_stalls_checkpoint(
neon_env_builder: NeonEnvBuilder,
):
"""
This test checks that checkpoints block on uploads to remote storage.
"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start(
initial_tenant_conf={
# Set a small compaction threshold
"compaction_threshold": "3",
# Disable GC
"gc_period": "0s",
# disable PITR
"pitr_interval": "0s",
}
)
env.pageserver.allowed_errors.append(
f".*PUT.* path=/v1/tenant/{env.initial_tenant}/timeline.* request was dropped before completing"
)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
client = env.pageserver.http_client()
layers_at_creation = client.layer_map_info(tenant_id, timeline_id)
deltas_at_creation = len(layers_at_creation.delta_layers())
assert (
deltas_at_creation == 1
), "are you fixing #5863? make sure we end up with 2 deltas at the end of endpoint lifecycle"
# Make new layer uploads get stuck.
# Note that timeline creation waits for the initial layers to reach remote storage.
# So at this point, the `layers_at_creation` are in remote storage.
client.configure_failpoints(("before-upload-layer-pausable", "pause"))
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
# Build two tables with some data inside
endpoint.safe_psql("CREATE TABLE foo AS SELECT x FROM generate_series(1, 10000) g(x)")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
with pytest.raises(ReadTimeout):
client.timeline_checkpoint(tenant_id, timeline_id, timeout=5)
client.configure_failpoints(("before-upload-layer-pausable", "off"))
def wait_upload_queue_empty(
client: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId
):

View File

@@ -1,7 +1,7 @@
{
"v17": [
"17.2",
"46f9b96555e084c35dd975da9485996db9e86181"
"b654fa88b6fd2ad24a03a14a7cd417ec66e518f9"
],
"v16": [
"16.6",