Compare commits

..

1 Commits

Author SHA1 Message Date
Heikki Linnakangas
f011af063d Increase timeout when stopping a service in tests.
I've seen a few test failures with error:

    pageserver stop failed: pageserver with pid 115851 did not stop in 10 seconds

These have all been with tests that use real S3. Pageserver shutdown
waits for all in-memory layers to be flush to disk and uploaded to
remote storage, so I think it's reasonable that that might take longer
than 10 s if there's some kind of a network hiccup.
2023-05-14 17:51:14 +03:00
47 changed files with 744 additions and 1106 deletions

View File

@@ -14,4 +14,3 @@ opt-level = 1
[alias]
build_testing = ["build", "--features", "testing"]
neon = ["run", "--bin", "neon_local"]

View File

@@ -147,8 +147,6 @@ runs:
echo "report-url=${REPORT_URL}" >> $GITHUB_OUTPUT
echo "report-json-url=${REPORT_URL%/index.html}/data/suites.json" >> $GITHUB_OUTPUT
echo "[Allure Report](${REPORT_URL})" >> ${GITHUB_STEP_SUMMARY}
- name: Release lock
if: always()
shell: bash -euxo pipefail {0}

View File

@@ -48,10 +48,6 @@ inputs:
description: 'Whether to rerun flaky tests'
required: false
default: 'false'
pg_version:
description: 'Postgres version to use for tests'
required: false
default: 'v14'
runs:
using: "composite"
@@ -72,7 +68,7 @@ runs:
prefix: latest
- name: Download compatibility snapshot for Postgres 14
if: inputs.build_type != 'remote' && inputs.pg_version == 'v14'
if: inputs.build_type != 'remote'
uses: ./.github/actions/download
with:
name: compatibility-snapshot-${{ inputs.build_type }}-pg14
@@ -110,14 +106,13 @@ runs:
ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE: contains(github.event.pull_request.labels.*.name, 'backward compatibility breakage')
ALLOW_FORWARD_COMPATIBILITY_BREAKAGE: contains(github.event.pull_request.labels.*.name, 'forward compatibility breakage')
RERUN_FLAKY: ${{ inputs.rerun_flaky }}
PG_VERSION: ${{ inputs.pg_version }}
shell: bash -euxo pipefail {0}
run: |
# PLATFORM will be embedded in the perf test report
# and it is needed to distinguish different environments
export PLATFORM=${PLATFORM:-github-actions-selfhosted}
export POSTGRES_DISTRIB_DIR=${POSTGRES_DISTRIB_DIR:-/tmp/neon/pg_install}
export DEFAULT_PG_VERSION=${PG_VERSION#v}
export DEFAULT_PG_VERSION=${DEFAULT_PG_VERSION:-14}
if [ "${BUILD_TYPE}" = "remote" ]; then
export REMOTE_ENV=1
@@ -198,7 +193,7 @@ runs:
fi
- name: Upload compatibility snapshot for Postgres 14
if: github.ref_name == 'release' && inputs.pg_version == 'v14'
if: github.ref_name == 'release'
uses: ./.github/actions/upload
with:
name: compatibility-snapshot-${{ inputs.build_type }}-pg14-${{ github.run_id }}
@@ -211,4 +206,4 @@ runs:
uses: ./.github/actions/allure-report-store
with:
report-dir: /tmp/test_output/allure/results
unique-key: ${{ inputs.build_type }}
unique-key: ${{ inputs.test_selection }}-${{ inputs.build_type }}

View File

@@ -1,6 +1,6 @@
## Problem
## Describe your changes
## Summary of changes
## Issue ticket number and link
## Checklist before requesting a review

View File

@@ -16,12 +16,12 @@ on:
workflow_dispatch: # adds ability to run this manually
inputs:
region_id:
description: 'Project region id. If not set, the default region will be used'
description: 'Use a particular region. If not set the default region will be used'
required: false
default: 'aws-us-east-2'
save_perf_report:
type: boolean
description: 'Publish perf report. If not set, the report will be published only for the main branch'
description: 'Publish perf report or not. If not set, the report is published only for the main branch'
required: false
defaults:
@@ -125,14 +125,13 @@ jobs:
matrix='{
"platform": [
"neon-captest-new",
"neon-captest-reuse",
"neonvm-captest-new"
"neon-captest-reuse"
],
"db_size": [ "10gb" ],
"include": [{ "platform": "neon-captest-freetier", "db_size": "3gb" },
{ "platform": "neon-captest-new", "db_size": "50gb" },
{ "platform": "neonvm-captest-freetier", "db_size": "3gb" },
{ "platform": "neonvm-captest-new", "db_size": "50gb" }]
"include": [
{ "platform": "neon-captest-freetier", "db_size": "3gb" },
{ "platform": "neon-captest-new", "db_size": "50gb" }
]
}'
if [ "$(date +%A)" = "Saturday" ]; then
@@ -198,7 +197,7 @@ jobs:
echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH
- name: Create Neon Project
if: contains(fromJson('["neon-captest-new", "neon-captest-freetier", "neonvm-captest-new", "neonvm-captest-freetier"]'), matrix.platform)
if: contains(fromJson('["neon-captest-new", "neon-captest-freetier"]'), matrix.platform)
id: create-neon-project
uses: ./.github/actions/neon-project-create
with:
@@ -206,7 +205,6 @@ jobs:
postgres_version: ${{ env.DEFAULT_PG_VERSION }}
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
compute_units: ${{ (matrix.platform == 'neon-captest-freetier' && '[0.25, 0.25]') || '[1, 1]' }}
provisioner: ${{ (contains(matrix.platform, 'neonvm-') && 'k8s-neonvm') || 'k8s-pod' }}
- name: Set up Connection String
id: set-up-connstr
@@ -215,7 +213,7 @@ jobs:
neon-captest-reuse)
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_CONNSTR }}
;;
neon-captest-new | neon-captest-freetier | neonvm-captest-new | neonvm-captest-freetier)
neon-captest-new | neon-captest-freetier)
CONNSTR=${{ steps.create-neon-project.outputs.dsn }}
;;
rds-aurora)
@@ -225,7 +223,7 @@ jobs:
CONNSTR=${{ secrets.BENCHMARK_RDS_POSTGRES_CONNSTR }}
;;
*)
echo >&2 "Unknown PLATFORM=${PLATFORM}"
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'neon-captest-new', 'neon-captest-freetier', 'rds-aurora', or 'rds-postgres'"
exit 1
;;
esac

View File

@@ -324,8 +324,7 @@ jobs:
runs-on: [ self-hosted, gen3, large ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
# Default shared memory is 64mb
options: --init --shm-size=512mb
options: --init
needs: [ build-neon ]
strategy:
fail-fast: false
@@ -351,8 +350,8 @@ jobs:
real_s3_access_key_id: "${{ secrets.AWS_ACCESS_KEY_ID_CI_TESTS_S3 }}"
real_s3_secret_access_key: "${{ secrets.AWS_SECRET_ACCESS_KEY_CI_TESTS_S3 }}"
rerun_flaky: true
pg_version: ${{ matrix.pg_version }}
env:
DEFAULT_PG_VERSION: ${{ matrix.pg_version }}
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR }}
CHECK_ONDISK_DATA_COMPATIBILITY: nonempty
@@ -364,8 +363,7 @@ jobs:
runs-on: [ self-hosted, gen3, small ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
# Default shared memory is 64mb
options: --init --shm-size=512mb
options: --init
needs: [ build-neon ]
if: github.ref_name == 'main' || contains(github.event.pull_request.labels.*.name, 'run-benchmarks')
strategy:
@@ -492,43 +490,37 @@ jobs:
- name: Merge coverage data
run: scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage merge
- name: Build coverage report
env:
COMMIT_URL: ${{ github.server_url }}/${{ github.repository }}/commit/${{ github.event.pull_request.head.sha || github.sha }}
- name: Build and upload coverage report
run: |
COMMIT_SHA=${{ github.event.pull_request.head.sha }}
COMMIT_SHA=${COMMIT_SHA:-${{ github.sha }}}
COMMIT_URL=https://github.com/${{ github.repository }}/commit/$COMMIT_SHA
scripts/coverage \
--dir=/tmp/coverage report \
--input-objects=/tmp/coverage/binaries.list \
--commit-url=${COMMIT_URL} \
--commit-url=$COMMIT_URL \
--format=github
- name: Upload coverage report
id: upload-coverage-report
env:
BUCKET: neon-github-public-dev
COMMIT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
run: |
aws s3 cp --only-show-errors --recursive /tmp/coverage/report s3://neon-github-public-dev/code-coverage/${COMMIT_SHA}
REPORT_URL=https://${{ github.repository_owner }}.github.io/zenith-coverage-data/$COMMIT_SHA
REPORT_URL=https://${BUCKET}.s3.amazonaws.com/code-coverage/${COMMIT_SHA}/index.html
echo "report-url=${REPORT_URL}" >> $GITHUB_OUTPUT
scripts/git-upload \
--repo=https://${{ secrets.VIP_VAP_ACCESS_TOKEN }}@github.com/${{ github.repository_owner }}/zenith-coverage-data.git \
--message="Add code coverage for $COMMIT_URL" \
copy /tmp/coverage/report $COMMIT_SHA # COPY FROM TO_RELATIVE
- uses: actions/github-script@v6
env:
REPORT_URL: ${{ steps.upload-coverage-report.outputs.report-url }}
COMMIT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
with:
script: |
const { REPORT_URL, COMMIT_SHA } = process.env
await github.rest.repos.createCommitStatus({
owner: context.repo.owner,
repo: context.repo.repo,
sha: `${COMMIT_SHA}`,
state: 'success',
target_url: `${REPORT_URL}`,
context: 'Code coverage report',
})
# Add link to the coverage report to the commit
curl -f -X POST \
https://api.github.com/repos/${{ github.repository }}/statuses/$COMMIT_SHA \
-H "Accept: application/vnd.github.v3+json" \
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
--data \
"{
\"state\": \"success\",
\"context\": \"neon-coverage\",
\"description\": \"Coverage report is ready\",
\"target_url\": \"$REPORT_URL\"
}"
trigger-e2e-tests:
runs-on: [ self-hosted, gen3, small ]

View File

@@ -2,7 +2,7 @@
Howdy! Usual good software engineering practices apply. Write
tests. Write comments. Follow standard Rust coding practices where
possible. Use `cargo fmt` and `cargo clippy` to tidy up formatting.
possible. Use 'cargo fmt' and 'clippy' to tidy up formatting.
There are soft spots in the code, which could use cleanup,
refactoring, additional comments, and so forth. Let's try to raise the

313
Cargo.lock generated
View File

@@ -230,38 +230,40 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "aws-config"
version = "0.55.2"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc00553f5f3c06ffd4510a9d576f92143618706c45ea6ff81e84ad9be9588abd"
checksum = "56a636c44c77fa18bdba56126a34d30cfe5538fe88f7d34988fa731fee143ddd"
dependencies = [
"aws-credential-types",
"aws-http",
"aws-sdk-sso",
"aws-sdk-sts",
"aws-smithy-async",
"aws-smithy-client",
"aws-smithy-http",
"aws-smithy-http-tower",
"aws-smithy-async 0.51.0",
"aws-smithy-client 0.51.0",
"aws-smithy-http 0.51.0",
"aws-smithy-http-tower 0.51.0",
"aws-smithy-json",
"aws-smithy-types",
"aws-types",
"aws-smithy-types 0.51.0",
"aws-types 0.51.0",
"bytes",
"fastrand",
"hex",
"http",
"hyper",
"ring",
"time",
"tokio",
"tower",
"tracing",
"zeroize",
]
[[package]]
name = "aws-credential-types"
version = "0.55.2"
version = "0.55.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4cb57ac6088805821f78d282c0ba8aec809f11cbee10dda19a97b03ab040ccc2"
checksum = "f4232d3729eefc287adc0d5a8adc97b7d94eefffe6bbe94312cc86c7ab6b06ce"
dependencies = [
"aws-smithy-async",
"aws-smithy-types",
"aws-smithy-async 0.55.1",
"aws-smithy-types 0.55.1",
"fastrand",
"tokio",
"tracing",
@@ -270,13 +272,13 @@ dependencies = [
[[package]]
name = "aws-endpoint"
version = "0.55.2"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c5f6f84a4f46f95a9bb71d9300b73cd67eb868bc43ae84f66ad34752299f4ac"
checksum = "6ca8f374874f6459aaa88dc861d7f5d834ca1ff97668eae190e97266b5f6c3fb"
dependencies = [
"aws-smithy-http",
"aws-smithy-types",
"aws-types",
"aws-smithy-http 0.51.0",
"aws-smithy-types 0.51.0",
"aws-types 0.51.0",
"http",
"regex",
"tracing",
@@ -284,14 +286,13 @@ dependencies = [
[[package]]
name = "aws-http"
version = "0.55.2"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a754683c322f7dc5167484266489fdebdcd04d26e53c162cad1f3f949f2c5671"
checksum = "78d41e19e779b73463f5f0c21b3aacc995f4ba783ab13a7ae9f5dfb159a551b4"
dependencies = [
"aws-credential-types",
"aws-smithy-http",
"aws-smithy-types",
"aws-types",
"aws-smithy-http 0.51.0",
"aws-smithy-types 0.51.0",
"aws-types 0.51.0",
"bytes",
"http",
"http-body",
@@ -303,104 +304,127 @@ dependencies = [
[[package]]
name = "aws-sdk-s3"
version = "0.25.1"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "392b9811ca489747ac84349790e49deaa1f16631949e7dd4156000251c260eae"
checksum = "a9f08665c8e03aca8cb092ef01e617436ebfa977fddc1240e1b062488ab5d48a"
dependencies = [
"aws-credential-types",
"aws-endpoint",
"aws-http",
"aws-sig-auth",
"aws-sigv4",
"aws-smithy-async",
"aws-smithy-async 0.51.0",
"aws-smithy-checksums",
"aws-smithy-client",
"aws-smithy-client 0.51.0",
"aws-smithy-eventstream",
"aws-smithy-http",
"aws-smithy-http-tower",
"aws-smithy-json",
"aws-smithy-types",
"aws-smithy-http 0.51.0",
"aws-smithy-http-tower 0.51.0",
"aws-smithy-types 0.51.0",
"aws-smithy-xml",
"aws-types",
"aws-types 0.51.0",
"bytes",
"bytes-utils",
"http",
"http-body",
"once_cell",
"percent-encoding",
"regex",
"tokio-stream",
"tower",
"tracing",
"url",
]
[[package]]
name = "aws-sdk-sso"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86dcb1cb71aa8763b327542ead410424515cff0cde5b753eedd2917e09c63734"
dependencies = [
"aws-endpoint",
"aws-http",
"aws-sig-auth",
"aws-smithy-async 0.51.0",
"aws-smithy-client 0.51.0",
"aws-smithy-http 0.51.0",
"aws-smithy-http-tower 0.51.0",
"aws-smithy-json",
"aws-smithy-types 0.51.0",
"aws-types 0.51.0",
"bytes",
"http",
"tokio-stream",
"tower",
]
[[package]]
name = "aws-sdk-sts"
version = "0.27.0"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d0fbe3c2c342bc8dfea4bb43937405a8ec06f99140a0dcb9c7b59e54dfa93a1"
checksum = "fdfcf584297c666f6b472d5368a78de3bc714b6e0a53d7fbf76c3e347c292ab1"
dependencies = [
"aws-credential-types",
"aws-endpoint",
"aws-http",
"aws-sig-auth",
"aws-smithy-async",
"aws-smithy-client",
"aws-smithy-http",
"aws-smithy-http-tower",
"aws-smithy-json",
"aws-smithy-async 0.51.0",
"aws-smithy-client 0.51.0",
"aws-smithy-http 0.51.0",
"aws-smithy-http-tower 0.51.0",
"aws-smithy-query",
"aws-smithy-types",
"aws-smithy-types 0.51.0",
"aws-smithy-xml",
"aws-types",
"aws-types 0.51.0",
"bytes",
"http",
"regex",
"tower",
"tracing",
]
[[package]]
name = "aws-sig-auth"
version = "0.55.2"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84dc92a63ede3c2cbe43529cb87ffa58763520c96c6a46ca1ced80417afba845"
checksum = "12cbe7b2be9e185c1fbce27fc9c41c66b195b32d89aa099f98768d9544221308"
dependencies = [
"aws-credential-types",
"aws-sigv4",
"aws-smithy-eventstream",
"aws-smithy-http",
"aws-types",
"aws-smithy-http 0.51.0",
"aws-types 0.51.0",
"http",
"tracing",
]
[[package]]
name = "aws-sigv4"
version = "0.55.2"
version = "0.51.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "392fefab9d6fcbd76d518eb3b1c040b84728ab50f58df0c3c53ada4bea9d327e"
checksum = "5c0b2658d2cb66dbf02f0e8dee80810ef1e0ca3530ede463e0ef994c301087d1"
dependencies = [
"aws-smithy-eventstream",
"aws-smithy-http",
"aws-smithy-http 0.51.0",
"bytes",
"form_urlencoded",
"hex",
"hmac",
"http",
"once_cell",
"percent-encoding",
"regex",
"sha2",
"ring",
"time",
"tracing",
]
[[package]]
name = "aws-smithy-async"
version = "0.55.2"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae23b9fe7a07d0919000116c4c5c0578303fbce6fc8d32efca1f7759d4c20faf"
checksum = "7b3442b4c5d3fc39891a2e5e625735fba6b24694887d49c6518460fde98247a9"
dependencies = [
"futures-util",
"pin-project-lite",
"tokio",
"tokio-stream",
]
[[package]]
name = "aws-smithy-async"
version = "0.55.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88573bcfbe1dcfd54d4912846df028b42d6255cbf9ce07be216b1bbfd11fc4b9"
dependencies = [
"futures-util",
"pin-project-lite",
@@ -410,12 +434,12 @@ dependencies = [
[[package]]
name = "aws-smithy-checksums"
version = "0.55.2"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6367acbd6849b8c7c659e166955531274ae147bf83ab4312885991f6b6706cb"
checksum = "cc227e36e346f45298288359f37123e1a92628d1cec6b11b5eb335553278bd9e"
dependencies = [
"aws-smithy-http",
"aws-smithy-types",
"aws-smithy-http 0.51.0",
"aws-smithy-types 0.51.0",
"bytes",
"crc32c",
"crc32fast",
@@ -431,14 +455,14 @@ dependencies = [
[[package]]
name = "aws-smithy-client"
version = "0.55.2"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5230d25d244a51339273b8870f0f77874cd4449fb4f8f629b21188ae10cfc0ba"
checksum = "ff28d553714f8f54cd921227934fc13a536a1c03f106e56b362fd57e16d450ad"
dependencies = [
"aws-smithy-async",
"aws-smithy-http",
"aws-smithy-http-tower",
"aws-smithy-types",
"aws-smithy-async 0.51.0",
"aws-smithy-http 0.51.0",
"aws-smithy-http-tower 0.51.0",
"aws-smithy-types 0.51.0",
"bytes",
"fastrand",
"http",
@@ -447,7 +471,26 @@ dependencies = [
"hyper-rustls",
"lazy_static",
"pin-project-lite",
"rustls 0.20.8",
"tokio",
"tower",
"tracing",
]
[[package]]
name = "aws-smithy-client"
version = "0.55.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2f52352bae50d3337d5d6151b695d31a8c10ebea113eca5bead531f8301b067"
dependencies = [
"aws-smithy-async 0.55.1",
"aws-smithy-http 0.55.1",
"aws-smithy-http-tower 0.55.1",
"aws-smithy-types 0.55.1",
"bytes",
"fastrand",
"http",
"http-body",
"pin-project-lite",
"tokio",
"tower",
"tracing",
@@ -455,23 +498,23 @@ dependencies = [
[[package]]
name = "aws-smithy-eventstream"
version = "0.55.2"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22d2a2bcc16e5c4d949ffd2b851da852b9bbed4bb364ed4ae371b42137ca06d9"
checksum = "d7ea0df7161ce65b5c8ca6eb709a1a907376fa18226976e41c748ce02ccccf24"
dependencies = [
"aws-smithy-types",
"aws-smithy-types 0.51.0",
"bytes",
"crc32fast",
]
[[package]]
name = "aws-smithy-http"
version = "0.55.2"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b60e2133beb9fe6ffe0b70deca57aaeff0a35ad24a9c6fab2fd3b4f45b99fdb5"
checksum = "bf58ed4fefa61dbf038e5421a521cbc2c448ef69deff0ab1d915d8a10eda5664"
dependencies = [
"aws-smithy-eventstream",
"aws-smithy-types",
"aws-smithy-types 0.51.0",
"bytes",
"bytes-utils",
"futures-core",
@@ -488,13 +531,48 @@ dependencies = [
]
[[package]]
name = "aws-smithy-http-tower"
version = "0.55.2"
name = "aws-smithy-http"
version = "0.55.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a4d94f556c86a0dd916a5d7c39747157ea8cb909ca469703e20fee33e448b67"
checksum = "03bcc02d7ed9649d855c8ce4a735e9848d7b8f7568aad0504c158e3baa955df8"
dependencies = [
"aws-smithy-http",
"aws-smithy-types",
"aws-smithy-types 0.55.1",
"bytes",
"bytes-utils",
"futures-core",
"http",
"http-body",
"hyper",
"once_cell",
"percent-encoding",
"pin-project-lite",
"pin-utils",
"tracing",
]
[[package]]
name = "aws-smithy-http-tower"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20c96d7bd35e7cf96aca1134b2f81b1b59ffe493f7c6539c051791cbbf7a42d3"
dependencies = [
"aws-smithy-http 0.51.0",
"bytes",
"http",
"http-body",
"pin-project-lite",
"tower",
"tracing",
]
[[package]]
name = "aws-smithy-http-tower"
version = "0.55.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da88b3a860f65505996c29192d800f1aeb9480440f56d63aad33a3c12045017a"
dependencies = [
"aws-smithy-http 0.55.1",
"aws-smithy-types 0.55.1",
"bytes",
"http",
"http-body",
@@ -505,28 +583,40 @@ dependencies = [
[[package]]
name = "aws-smithy-json"
version = "0.55.2"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ce3d6e6ebb00b2cce379f079ad5ec508f9bcc3a9510d9b9c1840ed1d6f8af39"
checksum = "d8324ba98c8a94187723cc16c37aefa09504646ee65c3d2c3af495bab5ea701b"
dependencies = [
"aws-smithy-types",
"aws-smithy-types 0.51.0",
]
[[package]]
name = "aws-smithy-query"
version = "0.55.2"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d58edfca32ef9bfbc1ca394599e17ea329cb52d6a07359827be74235b64b3298"
checksum = "83834ed2ff69ea6f6657baf205267dc2c0abe940703503a3e5d60ce23be3d306"
dependencies = [
"aws-smithy-types",
"aws-smithy-types 0.51.0",
"urlencoding",
]
[[package]]
name = "aws-smithy-types"
version = "0.55.2"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "58db46fc1f4f26be01ebdb821751b4e2482cd43aa2b64a0348fb89762defaffa"
checksum = "8b02e06ea63498c43bc0217ea4d16605d4e58d85c12fc23f6572ff6d0a840c61"
dependencies = [
"itoa",
"num-integer",
"ryu",
"time",
]
[[package]]
name = "aws-smithy-types"
version = "0.55.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd0afc731fd1417d791f9145a1e0c30e23ae0beaab9b4814017708ead2fc20f1"
dependencies = [
"base64-simd",
"itoa",
@@ -537,24 +627,40 @@ dependencies = [
[[package]]
name = "aws-smithy-xml"
version = "0.55.2"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb557fe4995bd9ec87fb244bbb254666a971dc902a783e9da8b7711610e9664c"
checksum = "246e9f83dd1fdf5d347fa30ae4ad30a9d1d42ce4cd74a93d94afa874646f94cd"
dependencies = [
"xmlparser",
]
[[package]]
name = "aws-types"
version = "0.55.2"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de0869598bfe46ec44ffe17e063ed33336e59df90356ca8ff0e8da6f7c1d994b"
checksum = "05701d32da168b44f7ee63147781aed8723e792cc131cb9b18363b5393f17f70"
dependencies = [
"aws-smithy-async 0.51.0",
"aws-smithy-client 0.51.0",
"aws-smithy-http 0.51.0",
"aws-smithy-types 0.51.0",
"http",
"rustc_version",
"tracing",
"zeroize",
]
[[package]]
name = "aws-types"
version = "0.55.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9b082e329d9a304d39e193ad5c7ab363a0d6507aca6965e0673a746686fb0cc"
dependencies = [
"aws-credential-types",
"aws-smithy-async",
"aws-smithy-client",
"aws-smithy-http",
"aws-smithy-types",
"aws-smithy-async 0.55.1",
"aws-smithy-client 0.55.1",
"aws-smithy-http 0.55.1",
"aws-smithy-types 0.55.1",
"http",
"rustc_version",
"tracing",
@@ -3261,10 +3367,9 @@ dependencies = [
"anyhow",
"async-trait",
"aws-config",
"aws-credential-types",
"aws-sdk-s3",
"aws-smithy-http",
"aws-types",
"aws-smithy-http 0.51.0",
"aws-types 0.55.1",
"hyper",
"metrics",
"once_cell",

View File

@@ -21,10 +21,9 @@ anyhow = { version = "1.0", features = ["backtrace"] }
async-stream = "0.3"
async-trait = "0.1"
atty = "0.2.14"
aws-config = { version = "0.55", default-features = false, features=["rustls"] }
aws-sdk-s3 = "0.25"
aws-smithy-http = "0.55"
aws-credential-types = "0.55"
aws-config = { version = "0.51.0", default-features = false, features=["rustls"] }
aws-sdk-s3 = "0.21.0"
aws-smithy-http = "0.51.0"
aws-types = "0.55"
base64 = "0.13.0"
bincode = "1.3"

View File

@@ -415,23 +415,6 @@ RUN apt-get update && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/kq_imcx.control
#########################################################################################
#
# Layer "pg-cron-pg-build"
# compile pg_cron extension
#
#########################################################################################
FROM build-deps AS pg-cron-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ENV PATH "/usr/local/pgsql/bin/:$PATH"
RUN wget https://github.com/citusdata/pg_cron/archive/refs/tags/v1.5.2.tar.gz -O pg_cron.tar.gz && \
echo "6f7f0980c03f1e2a6a747060e67bf4a303ca2a50e941e2c19daeed2b44dec744 pg_cron.tar.gz" | sha256sum --check && \
mkdir pg_cron-src && cd pg_cron-src && tar xvzf ../pg_cron.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_cron.control
#########################################################################################
#
# Layer "rust extensions"
@@ -546,7 +529,6 @@ COPY --from=plpgsql-check-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=timescaledb-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-hint-plan-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=kq-imcx-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-cron-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \

View File

@@ -17,7 +17,7 @@ The Neon storage engine consists of two major components:
- Pageserver. Scalable storage backend for the compute nodes.
- Safekeepers. The safekeepers form a redundant WAL service that received WAL from the compute node, and stores it durably until it has been processed by the pageserver and uploaded to cloud storage.
See developer documentation in [SUMMARY.md](/docs/SUMMARY.md) for more information.
See developer documentation in [/docs/SUMMARY.md](/docs/SUMMARY.md) for more information.
## Running local installation
@@ -130,11 +130,11 @@ Python (3.9 or higher), and install python3 packages using `./scripts/pysync` (r
```sh
# Create repository in .neon with proper paths to binaries and data
# Later that would be responsibility of a package install script
> cargo neon init
> ./target/debug/neon_local init
Starting pageserver at '127.0.0.1:64000' in '.neon'.
# start pageserver, safekeeper, and broker for their intercommunication
> cargo neon start
> ./target/debug/neon_local start
Starting neon broker at 127.0.0.1:50051
storage_broker started, pid: 2918372
Starting pageserver at '127.0.0.1:64000' in '.neon'.
@@ -143,19 +143,19 @@ Starting safekeeper at '127.0.0.1:5454' in '.neon/safekeepers/sk1'.
safekeeper 1 started, pid: 2918437
# create initial tenant and use it as a default for every future neon_local invocation
> cargo neon tenant create --set-default
> ./target/debug/neon_local tenant create --set-default
tenant 9ef87a5bf0d92544f6fafeeb3239695c successfully created on the pageserver
Created an initial timeline 'de200bd42b49cc1814412c7e592dd6e9' at Lsn 0/16B5A50 for tenant: 9ef87a5bf0d92544f6fafeeb3239695c
Setting tenant 9ef87a5bf0d92544f6fafeeb3239695c as a default one
# start postgres compute node
> cargo neon endpoint start main
> ./target/debug/neon_local endpoint start main
Starting new endpoint main (PostgreSQL v14) on timeline de200bd42b49cc1814412c7e592dd6e9 ...
Extracting base backup to create postgres instance: path=.neon/pgdatadirs/tenants/9ef87a5bf0d92544f6fafeeb3239695c/main port=55432
Starting postgres at 'host=127.0.0.1 port=55432 user=cloud_admin dbname=postgres'
# check list of running postgres instances
> cargo neon endpoint list
> ./target/debug/neon_local endpoint list
ENDPOINT ADDRESS TIMELINE BRANCH NAME LSN STATUS
main 127.0.0.1:55432 de200bd42b49cc1814412c7e592dd6e9 main 0/16B5BA8 running
```
@@ -177,22 +177,22 @@ postgres=# select * from t;
3. And create branches and run postgres on them:
```sh
# create branch named migration_check
> cargo neon timeline branch --branch-name migration_check
> ./target/debug/neon_local timeline branch --branch-name migration_check
Created timeline 'b3b863fa45fa9e57e615f9f2d944e601' at Lsn 0/16F9A00 for tenant: 9ef87a5bf0d92544f6fafeeb3239695c. Ancestor timeline: 'main'
# check branches tree
> cargo neon timeline list
> ./target/debug/neon_local timeline list
(L) main [de200bd42b49cc1814412c7e592dd6e9]
(L) ┗━ @0/16F9A00: migration_check [b3b863fa45fa9e57e615f9f2d944e601]
# start postgres on that branch
> cargo neon endpoint start migration_check --branch-name migration_check
> ./target/debug/neon_local endpoint start migration_check --branch-name migration_check
Starting new endpoint migration_check (PostgreSQL v14) on timeline b3b863fa45fa9e57e615f9f2d944e601 ...
Extracting base backup to create postgres instance: path=.neon/pgdatadirs/tenants/9ef87a5bf0d92544f6fafeeb3239695c/migration_check port=55433
Starting postgres at 'host=127.0.0.1 port=55433 user=cloud_admin dbname=postgres'
# check the new list of running postgres instances
> cargo neon endpoint list
> ./target/debug/neon_local endpoint list
ENDPOINT ADDRESS TIMELINE BRANCH NAME LSN STATUS
main 127.0.0.1:55432 de200bd42b49cc1814412c7e592dd6e9 main 0/16F9A38 running
migration_check 127.0.0.1:55433 b3b863fa45fa9e57e615f9f2d944e601 migration_check 0/16F9A70 running
@@ -221,7 +221,7 @@ postgres=# select * from t;
4. If you want to run tests afterward (see below), you must stop all the running of the pageserver, safekeeper, and postgres instances
you have just started. You can terminate them all with one command:
```sh
> cargo neon stop
> ./target/debug/neon_local stop
```
## Running tests
@@ -238,9 +238,9 @@ CARGO_BUILD_FLAGS="--features=testing" make
## Documentation
[docs](/docs) Contains a top-level overview of all available markdown documentation.
[/docs/](/docs/) Contains a top-level overview of all available markdown documentation.
- [sourcetree.md](/docs/sourcetree.md) contains overview of source tree layout.
- [/docs/sourcetree.md](/docs/sourcetree.md) contains overview of source tree layout.
To view your `rustdoc` documentation in a browser, try running `cargo doc --no-deps --open`
@@ -265,6 +265,6 @@ To get more familiar with this aspect, refer to:
## Join the development
- Read [CONTRIBUTING.md](/CONTRIBUTING.md) to learn about project code style and practices.
- To get familiar with a source tree layout, use [sourcetree.md](/docs/sourcetree.md).
- Read `CONTRIBUTING.md` to learn about project code style and practices.
- To get familiar with a source tree layout, use [/docs/sourcetree.md](/docs/sourcetree.md).
- To learn more about PostgreSQL internals, check http://www.interdb.jp/pg/index.html

View File

@@ -30,12 +30,12 @@ use utils::pid_file::{self, PidFileRead};
// These constants control the loop used to poll for process start / stop.
//
// The loop waits for at most 10 seconds, polling every 100 ms.
// The loop waits for at most 20 seconds, polling every 100 ms.
// Once a second, it prints a dot ("."), to give the user an indication that
// it's waiting. If the process hasn't started/stopped after 5 seconds,
// it prints a notice that it's taking long, but keeps waiting.
//
const RETRY_UNTIL_SECS: u64 = 10;
const RETRY_UNTIL_SECS: u64 = 20;
const RETRIES: u64 = (RETRY_UNTIL_SECS * 1000) / RETRY_INTERVAL_MILLIS;
const RETRY_INTERVAL_MILLIS: u64 = 100;
const DOT_EVERY_RETRIES: u64 = 10;

View File

@@ -134,7 +134,6 @@ pub struct Endpoint {
// port and address of the Postgres server
pub address: SocketAddr,
// postgres major version in the format: 14, 15, etc.
pg_version: u32,
// These are not part of the endpoint as such, but the environment
@@ -382,11 +381,6 @@ impl Endpoint {
conf.append("primary_conninfo", connstr.as_str());
conf.append("primary_slot_name", slot_name.as_str());
conf.append("hot_standby", "on");
// prefetching of blocks referenced in WAL doesn't make sense for us
// Neon hot standby ignores pages that are not in the shared_buffers
if self.pg_version >= 15 {
conf.append("recovery_prefetch", "off");
}
}
}

View File

@@ -8,7 +8,9 @@ use std::process::{Child, Command};
use std::{io, result};
use anyhow::{bail, Context};
use pageserver_api::models::{self, TenantInfo, TimelineInfo};
use pageserver_api::models::{
TenantConfigRequest, TenantCreateRequest, TenantInfo, TimelineCreateRequest, TimelineInfo,
};
use postgres_backend::AuthType;
use postgres_connection::{parse_host_port, PgConnectionConfig};
use reqwest::blocking::{Client, RequestBuilder, Response};
@@ -314,8 +316,8 @@ impl PageServerNode {
settings: HashMap<&str, &str>,
) -> anyhow::Result<TenantId> {
let mut settings = settings.clone();
let config = models::TenantConfig {
let request = TenantCreateRequest {
new_tenant_id,
checkpoint_distance: settings
.remove("checkpoint_distance")
.map(|x| x.parse::<u64>())
@@ -370,10 +372,6 @@ impl PageServerNode {
.remove("evictions_low_residence_duration_metric_threshold")
.map(|x| x.to_string()),
};
let request = models::TenantCreateRequest {
new_tenant_id,
config,
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")
}
@@ -394,9 +392,9 @@ impl PageServerNode {
}
pub fn tenant_config(&self, tenant_id: TenantId, settings: HashMap<&str, &str>) -> Result<()> {
let config = {
// Braces to make the diff easier to read
models::TenantConfig {
self.http_request(Method::PUT, format!("{}/tenant/config", self.http_base_url))?
.json(&TenantConfigRequest {
tenant_id,
checkpoint_distance: settings
.get("checkpoint_distance")
.map(|x| x.parse::<u64>())
@@ -453,11 +451,7 @@ impl PageServerNode {
evictions_low_residence_duration_metric_threshold: settings
.get("evictions_low_residence_duration_metric_threshold")
.map(|x| x.to_string()),
}
};
self.http_request(Method::PUT, format!("{}/tenant/config", self.http_base_url))?
.json(&models::TenantConfigRequest { tenant_id, config })
})
.send()?
.error_from_body()?;
@@ -489,7 +483,7 @@ impl PageServerNode {
Method::POST,
format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id),
)?
.json(&models::TimelineCreateRequest {
.json(&TimelineCreateRequest {
new_timeline_id,
ancestor_start_lsn,
ancestor_timeline_id,

View File

@@ -4,11 +4,6 @@ The pageserver uses Tokio for handling concurrency. Everything runs in
Tokio tasks, although some parts are written in blocking style and use
spawn_blocking().
We currently use std blocking functions for disk I/O, however. The
current model is that we consider disk I/Os to be short enough that we
perform them while running in a Tokio task. Changing all the disk I/O
calls to async is a TODO.
Each Tokio task is tracked by the `task_mgr` module. It maintains a
registry of tasks, and which tenant or timeline they are operating
on.
@@ -26,86 +21,19 @@ also a `shudown_watcher()` Future that can be used with `tokio::select!`
or similar, to wake up on shutdown.
### Async cancellation safety
### Sync vs async
In async Rust, futures can be "cancelled" at any await point, by
dropping the Future. For example, `tokio::select!` returns as soon as
one of the Futures returns, and drops the others. `tokio::timeout!` is
another example. In the Rust ecosystem, some functions are
cancellation-safe, meaning they can be safely dropped without
side-effects, while others are not. See documentation of
`tokio::select!` for examples.
We use async to wait for incoming data on network connections, and to
perform other long-running operations. For example, each WAL receiver
connection is handled by a tokio Task. Once a piece of WAL has been
received from the network, the task calls the blocking functions in
the Repository to process the WAL.
In the pageserver and safekeeper, async code is *not*
cancellation-safe by default. Unless otherwise marked, any async
function that you call cannot be assumed to be async
cancellation-safe, and must be polled to completion.
The core storage code in `layered_repository/` is synchronous, with
blocking locks and I/O calls. The current model is that we consider
disk I/Os to be short enough that we perform them while running in a
Tokio task. If that becomes a problem, we should use `spawn_blocking`
before entering the synchronous parts of the code, or switch to using
tokio I/O functions.
The downside of non-cancellation safe code is that you have to be very
careful when using `tokio::select!`, `tokio::timeout!`, and other such
functions that can cause a Future to be dropped. They can only be used
with functions that are explicitly documented to be cancellation-safe,
or you need to spawn a separate task to shield from the cancellation.
At the entry points to the code, we also take care to poll futures to
completion, or shield the rest of the code from surprise cancellations
by spawning a separate task. The code that handles incoming HTTP
requests, for example, spawns a separate task for each request,
because Hyper will drop the request-handling Future if the HTTP
connection is lost. (FIXME: our HTTP handlers do not do that
currently, but we should fix that. See [issue
3478](https://github.com/neondatabase/neon/issues/3478)).
#### How to cancel, then?
If our code is not cancellation-safe, how do you cancel long-running
tasks? Use CancellationTokens.
TODO: More details on that. And we have an ongoing discussion on what
to do if cancellations might come from multiple sources.
#### Exceptions
Some library functions are cancellation-safe, and are explicitly marked
as such. For example, `utils::seqwait`.
#### Rationale
The alternative would be to make all async code cancellation-safe,
unless otherwise marked. That way, you could use `tokio::select!` more
liberally. The reasons we didn't choose that are explained in this
section.
Writing code in a cancellation-safe manner is tedious, as you need to
scrutinize every `.await` and ensure that if the `.await` call never
returns, the system is in a safe, consistent state. In some ways, you
need to do that with `?` and early `returns`, too, but `.await`s are
easier to miss. It is also easier to perform cleanup tasks when a
function returns an `Err` than when an `.await` simply never
returns. You can use `scopeguard` and Drop guards to perform cleanup
tasks, but it is more tedious. An `.await` that never returns is more
similar to a panic.
Note that even if you only use building blocks that themselves are
cancellation-safe, it doesn't mean that the code as whole is
cancellation-safe. For example, consider the following code:
```
while let Some(i) = work_inbox.recv().await {
if let Err(_) = results_outbox.send(i).await {
println!("receiver dropped");
return;
}
}
}
```
It reads messages from one channel, sends them to another channel. If
this code is cancelled at the `results_outbox.send(i).await`, the
message read from the receiver is lost. That may or may not be OK,
depending on the context.
Another reason to not require cancellation-safety is historical: we
already had a lot of async code that was not scrutinized for
cancellation-safety when this issue was raised. Scrutinizing all
existing code is no fun.
Be very careful when mixing sync and async code!

View File

@@ -136,20 +136,6 @@ pub struct TenantCreateRequest {
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub new_tenant_id: Option<TenantId>,
#[serde(flatten)]
pub config: TenantConfig,
}
impl std::ops::Deref for TenantCreateRequest {
type Target = TenantConfig;
fn deref(&self) -> &Self::Target {
&self.config
}
}
#[derive(Serialize, Deserialize, Default)]
pub struct TenantConfig {
pub checkpoint_distance: Option<u64>,
pub checkpoint_timeout: Option<String>,
pub compaction_target_size: Option<u64>,
@@ -196,21 +182,33 @@ impl TenantCreateRequest {
pub struct TenantConfigRequest {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
#[serde(flatten)]
pub config: TenantConfig,
}
impl std::ops::Deref for TenantConfigRequest {
type Target = TenantConfig;
fn deref(&self) -> &Self::Target {
&self.config
}
#[serde(default)]
pub checkpoint_distance: Option<u64>,
pub checkpoint_timeout: Option<String>,
pub compaction_target_size: Option<u64>,
pub compaction_period: Option<String>,
pub compaction_threshold: Option<usize>,
pub gc_horizon: Option<u64>,
pub gc_period: Option<String>,
pub image_creation_threshold: Option<usize>,
pub pitr_interval: Option<String>,
pub walreceiver_connect_timeout: Option<String>,
pub lagging_wal_timeout: Option<String>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
pub trace_read_requests: Option<bool>,
// We defer the parsing of the eviction_policy field to the request handler.
// Otherwise we'd have to move the types for eviction policy into this package.
// We might do that once the eviction feature has stabilizied.
// For now, this field is not even documented in the openapi_spec.yml.
pub eviction_policy: Option<serde_json::Value>,
pub min_resident_size_override: Option<u64>,
pub evictions_low_residence_duration_metric_threshold: Option<String>,
}
impl TenantConfigRequest {
pub fn new(tenant_id: TenantId) -> TenantConfigRequest {
let config = TenantConfig {
TenantConfigRequest {
tenant_id,
checkpoint_distance: None,
checkpoint_timeout: None,
compaction_target_size: None,
@@ -227,8 +225,7 @@ impl TenantConfigRequest {
eviction_policy: None,
min_resident_size_override: None,
evictions_low_residence_duration_metric_threshold: None,
};
TenantConfigRequest { tenant_id, config }
}
}
}

View File

@@ -12,7 +12,6 @@ aws-smithy-http.workspace = true
aws-types.workspace = true
aws-config.workspace = true
aws-sdk-s3.workspace = true
aws-credential-types.workspace = true
hyper = { workspace = true, features = ["stream"] }
serde.workspace = true
serde_json.workspace = true

View File

@@ -9,15 +9,14 @@ use std::sync::Arc;
use anyhow::Context;
use aws_config::{
environment::credentials::EnvironmentVariableCredentialsProvider,
imds::credentials::ImdsCredentialsProvider, meta::credentials::CredentialsProviderChain,
imds::credentials::ImdsCredentialsProvider,
meta::credentials::{CredentialsProviderChain, LazyCachingCredentialsProvider},
};
use aws_credential_types::cache::CredentialsCache;
use aws_sdk_s3::{
config::{Config, Region},
error::SdkError,
operation::get_object::GetObjectError,
primitives::ByteStream,
Client,
config::Config,
error::{GetObjectError, GetObjectErrorKind},
types::{ByteStream, SdkError},
Client, Endpoint, Region,
};
use aws_smithy_http::body::SdkBody;
use hyper::Body;
@@ -126,23 +125,28 @@ impl S3Bucket {
let credentials_provider = {
// uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
CredentialsProviderChain::first_try(
"env",
EnvironmentVariableCredentialsProvider::new(),
)
let env_creds = EnvironmentVariableCredentialsProvider::new();
// uses imds v2
.or_else("imds", ImdsCredentialsProvider::builder().build())
let imds = ImdsCredentialsProvider::builder().build();
// finally add caching.
// this might change in future, see https://github.com/awslabs/aws-sdk-rust/issues/629
LazyCachingCredentialsProvider::builder()
.load(CredentialsProviderChain::first_try("env", env_creds).or_else("imds", imds))
.build()
};
let mut config_builder = Config::builder()
.region(Region::new(aws_config.bucket_region.clone()))
.credentials_cache(CredentialsCache::lazy())
.credentials_provider(credentials_provider);
if let Some(custom_endpoint) = aws_config.endpoint.clone() {
config_builder = config_builder
.endpoint_url(custom_endpoint)
.force_path_style(true);
let endpoint = Endpoint::immutable(
custom_endpoint
.parse()
.expect("Failed to parse S3 custom endpoint"),
);
config_builder.set_endpoint_resolver(Some(Arc::new(endpoint)));
}
let client = Client::from_conf(config_builder.build());
@@ -225,9 +229,14 @@ impl S3Bucket {
))),
})
}
Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
Err(DownloadError::NotFound)
}
Err(SdkError::ServiceError {
err:
GetObjectError {
kind: GetObjectErrorKind::NoSuchKey(..),
..
},
..
}) => Err(DownloadError::NotFound),
Err(e) => {
metrics::inc_get_object_fail();
Err(DownloadError::Other(anyhow::anyhow!(

View File

@@ -144,8 +144,6 @@ where
///
/// This call won't complete until someone has called `advance`
/// with a number greater than or equal to the one we're waiting for.
///
/// This function is async cancellation-safe.
pub async fn wait_for(&self, num: V) -> Result<(), SeqWaitError> {
match self.queue_for_wait(num) {
Ok(None) => Ok(()),
@@ -161,8 +159,6 @@ where
///
/// If that hasn't happened after the specified timeout duration,
/// [`SeqWaitError::Timeout`] will be returned.
///
/// This function is async cancellation-safe.
pub async fn wait_for_timeout(
&self,
num: V,

View File

@@ -88,7 +88,3 @@ harness = false
[[bench]]
name = "bench_walredo"
harness = false
[[bench]]
name = "bench_disk_lookup"
harness = false

View File

@@ -1,133 +0,0 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use pageserver::{tenant::{disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}, block_io::{BlockBuf, FileBlockReader}, storage_layer::DeltaLayerWriter}, repository::Key, virtual_file::{VirtualFile, self}, page_cache};
use std::{time::Instant, collections::BTreeMap};
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use utils::{id::{TimelineId, TenantId}, lsn::Lsn};
use std::{io::{Read, Write}, path::PathBuf};
use pageserver::config::PageServerConf;
struct MockLayer {
pub path: PathBuf,
pub index_start_blk: u32,
pub index_root_blk: u32,
}
impl MockLayer {
fn read(&self, key: i128) -> Option<u64> {
// Read from disk btree
let file = FileBlockReader::new(VirtualFile::open(&self.path).unwrap());
let tree_reader = DiskBtreeReader::<_, 24>::new(
self.index_start_blk,
self.index_root_blk,
file,
);
let key: Key = Key::from_i128(key);
let mut key_bytes: [u8; 24] = [8u8; 24];
key.write_to_byte_slice(&mut key_bytes);
let mut result = None;
tree_reader.visit(&key_bytes, VisitDirection::Backwards, |key, value| {
if key == key_bytes {
result = Some(value);
}
return false
}).unwrap();
result
}
}
fn make_simple(n_keys: i128, name: &str) -> MockLayer {
let block_buf = BlockBuf::new();
let mut writer = DiskBtreeBuilder::<_, 24>::new(block_buf);
for i in 0..n_keys {
let key: Key = Key::from_i128(i);
let value: u64 = i as u64;
let mut key_bytes: [u8; 24] = [8u8; 24];
key.write_to_byte_slice(&mut key_bytes);
writer.append(&key_bytes, value).unwrap();
}
let (index_root_blk, block_buf) = writer.finish().unwrap();
let index_start_blk = 0; // ???
let path = std::env::current_dir().unwrap()
.parent().unwrap()
.join("test_output")
.join("bench_disk_lookup")
.join("disk_btree")
.join(name);
std::fs::create_dir_all(path.clone().parent().unwrap()).unwrap();
let layer = MockLayer {
path: path.clone(),
index_start_blk,
index_root_blk,
};
let mut file = VirtualFile::create(&path).unwrap();
for buf in block_buf.blocks {
file.write_all(buf.as_ref()).unwrap();
}
layer
}
fn make_many(n_keys: i128, n_layers: i128) -> Vec<MockLayer> {
(0..n_layers)
.map(|i| make_simple(n_keys, &format!("layer_{}.tmp", i)))
.collect()
}
// cargo bench --bench bench_disk_lookup
fn bench_disk_lookup(c: &mut Criterion) {
virtual_file::init(10);
page_cache::init(10000);
// Results in a 40MB index
let n_keys = 4_000_000;
// One layer for each query
let n_layers = 100;
let n_queries = n_layers;
// Write to disk btree
let layers = make_many(n_keys, n_layers);
// Write to mem btrees
let mem_btrees: Vec<BTreeMap<i128, u64>> = (0..n_layers)
.map(|_| (0..n_keys)
.map(|i| (i as i128, i as u64))
.collect())
.collect();
// Pick queries
let rng = &mut StdRng::seed_from_u64(1);
let queries: Vec<_> = (0..n_keys).collect();
let queries: Vec<_> = queries.choose_multiple(rng, n_queries as usize).copied().collect();
// Define and name the benchmark function
let mut group = c.benchmark_group("g1");
group.bench_function("disk_btree", |b| {
b.iter(|| {
for (i, q) in queries.clone().into_iter().enumerate() {
black_box({
assert_eq!(layers[i].read(q), Some(q as u64));
})
}
});
});
group.bench_function("mem_btree", |b| {
b.iter(|| {
for (i, q) in queries.clone().into_iter().enumerate() {
black_box({
assert_eq!(mem_btrees[i].get(&q), Some(&(q as u64)));
})
}
});
});
group.finish();
}
criterion_group!(group_1, bench_disk_lookup);
criterion_main!(group_1);

View File

@@ -747,7 +747,7 @@ paths:
content:
application/json:
schema:
$ref: "#/components/schemas/TenantCreateRequest"
$ref: "#/components/schemas/TenantCreateInfo"
responses:
"201":
description: New tenant created successfully
@@ -794,7 +794,7 @@ paths:
content:
application/json:
schema:
$ref: "#/components/schemas/TenantConfigRequest"
$ref: "#/components/schemas/TenantConfigInfo"
responses:
"200":
description: OK
@@ -846,7 +846,7 @@ paths:
content:
application/json:
schema:
$ref: "#/components/schemas/TenantConfigResponse"
$ref: "#/components/schemas/TenantConfig"
"400":
description: Malformed get tenanant config request
content:
@@ -909,27 +909,35 @@ components:
See the tenant `/attach` endpoint for more information.
type: string
enum: [ "maybe", "attached" ]
TenantCreateRequest:
allOf:
- $ref: '#/components/schemas/TenantConfig'
- type: object
properties:
new_tenant_id:
type: string
format: hex
TenantConfigRequest:
allOf:
- $ref: '#/components/schemas/TenantConfig'
- type: object
required:
- tenant_id
properties:
tenant_id:
type: string
format: hex
TenantConfig:
TenantCreateInfo:
type: object
properties:
new_tenant_id:
type: string
format: hex
tenant_id:
type: string
format: hex
gc_period:
type: string
gc_horizon:
type: integer
pitr_interval:
type: string
checkpoint_distance:
type: integer
checkpoint_timeout:
type: string
compaction_period:
type: string
compaction_threshold:
type: string
TenantConfigInfo:
type: object
properties:
tenant_id:
type: string
format: hex
gc_period:
type: string
gc_horizon:
@@ -956,13 +964,13 @@ components:
type: integer
trace_read_requests:
type: boolean
TenantConfigResponse:
TenantConfig:
type: object
properties:
tenant_specific_overrides:
$ref: "#/components/schemas/TenantConfig"
$ref: "#/components/schemas/TenantConfigInfo"
effective_config:
$ref: "#/components/schemas/TenantConfig"
$ref: "#/components/schemas/TenantConfigInfo"
TimelineInfo:
type: object
required:

View File

@@ -19,7 +19,6 @@ use super::models::{
};
use crate::context::{DownloadBehavior, RequestContext};
use crate::disk_usage_eviction_task;
use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
use crate::tenant::config::TenantConfOpt;
@@ -398,17 +397,9 @@ async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>,
let state = get_state(&request);
if let Some(remote_storage) = &state.remote_storage {
mgr::attach_tenant(
state.conf,
tenant_id,
// XXX: Attach should provide the config, especially during tenant migration.
// See https://github.com/neondatabase/neon/issues/1555
TenantConfOpt::default(),
remote_storage.clone(),
&ctx,
)
.instrument(info_span!("tenant_attach", tenant = %tenant_id))
.await?;
mgr::attach_tenant(state.conf, tenant_id, remote_storage.clone(), &ctx)
.instrument(info_span!("tenant_attach", tenant = %tenant_id))
.await?;
} else {
return Err(ApiError::BadRequest(anyhow!(
"attach_tenant is not possible because pageserver was configured without remote storage"
@@ -717,17 +708,11 @@ pub fn html_response(status: StatusCode, data: String) -> Result<Response<Body>,
async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let _timer = STORAGE_TIME_GLOBAL
.get_metric_with_label_values(&[StorageTimeOperation::CreateTenant.into()])
.expect("bug")
.start_timer();
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let request_data: TenantCreateRequest = json_request(&mut request).await?;
let tenant_conf =
TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
let tenant_conf = TenantConfOpt::try_from(&request_data).map_err(ApiError::BadRequest)?;
let target_tenant_id = request_data
.new_tenant_id
@@ -758,7 +743,6 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
res.context("created tenant failed to become active")
.map_err(ApiError::InternalServerError)?;
}
json_response(
StatusCode::CREATED,
TenantCreateResponse(new_tenant.tenant_id()),
@@ -796,8 +780,7 @@ async fn update_tenant_config_handler(
let tenant_id = request_data.tenant_id;
check_permission(&request, Some(tenant_id))?;
let tenant_conf =
TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
let tenant_conf = TenantConfOpt::try_from(&request_data).map_err(ApiError::BadRequest)?;
let state = get_state(&request);
mgr::set_new_tenant_config(state.conf, tenant_conf, tenant_id)

View File

@@ -8,7 +8,6 @@ use metrics::{
use once_cell::sync::Lazy;
use pageserver_api::models::TenantState;
use strum::VariantNames;
use strum_macros::{EnumVariantNames, IntoStaticStr};
use utils::id::{TenantId, TimelineId};
/// Prometheus histogram buckets (in seconds) for operations in the critical
@@ -25,33 +24,16 @@ const CRITICAL_OP_BUCKETS: &[f64] = &[
];
// Metrics collected on operations on the storage repository.
#[derive(Debug, EnumVariantNames, IntoStaticStr)]
#[strum(serialize_all = "kebab_case")]
pub enum StorageTimeOperation {
#[strum(serialize = "layer flush")]
LayerFlush,
#[strum(serialize = "compact")]
Compact,
#[strum(serialize = "create images")]
CreateImages,
#[strum(serialize = "logical size")]
LogicalSize,
#[strum(serialize = "imitate logical size")]
ImitateLogicalSize,
#[strum(serialize = "load layer map")]
LoadLayerMap,
#[strum(serialize = "gc")]
Gc,
#[strum(serialize = "create tenant")]
CreateTenant,
}
const STORAGE_TIME_OPERATIONS: &[&str] = &[
"layer flush",
"compact",
"create images",
"init logical size",
"logical size",
"imitate logical size",
"load layer map",
"gc",
];
pub static STORAGE_TIME_SUM_PER_TIMELINE: Lazy<CounterVec> = Lazy::new(|| {
register_counter_vec!(
@@ -690,9 +672,7 @@ pub struct StorageTimeMetrics {
}
impl StorageTimeMetrics {
pub fn new(operation: StorageTimeOperation, tenant_id: &str, timeline_id: &str) -> Self {
let operation: &'static str = operation.into();
pub fn new(operation: &str, tenant_id: &str, timeline_id: &str) -> Self {
let timeline_sum = STORAGE_TIME_SUM_PER_TIMELINE
.get_metric_with_label_values(&[operation, tenant_id, timeline_id])
.unwrap();
@@ -756,23 +736,16 @@ impl TimelineMetrics {
let materialized_page_cache_hit_counter = MATERIALIZED_PAGE_CACHE_HIT
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let flush_time_histo =
StorageTimeMetrics::new(StorageTimeOperation::LayerFlush, &tenant_id, &timeline_id);
let compact_time_histo =
StorageTimeMetrics::new(StorageTimeOperation::Compact, &tenant_id, &timeline_id);
let flush_time_histo = StorageTimeMetrics::new("layer flush", &tenant_id, &timeline_id);
let compact_time_histo = StorageTimeMetrics::new("compact", &tenant_id, &timeline_id);
let create_images_time_histo =
StorageTimeMetrics::new(StorageTimeOperation::CreateImages, &tenant_id, &timeline_id);
let logical_size_histo =
StorageTimeMetrics::new(StorageTimeOperation::LogicalSize, &tenant_id, &timeline_id);
let imitate_logical_size_histo = StorageTimeMetrics::new(
StorageTimeOperation::ImitateLogicalSize,
&tenant_id,
&timeline_id,
);
StorageTimeMetrics::new("create images", &tenant_id, &timeline_id);
let logical_size_histo = StorageTimeMetrics::new("logical size", &tenant_id, &timeline_id);
let imitate_logical_size_histo =
StorageTimeMetrics::new("imitate logical size", &tenant_id, &timeline_id);
let load_layer_map_histo =
StorageTimeMetrics::new(StorageTimeOperation::LoadLayerMap, &tenant_id, &timeline_id);
let garbage_collect_histo =
StorageTimeMetrics::new(StorageTimeOperation::Gc, &tenant_id, &timeline_id);
StorageTimeMetrics::new("load layer map", &tenant_id, &timeline_id);
let garbage_collect_histo = StorageTimeMetrics::new("gc", &tenant_id, &timeline_id);
let last_record_gauge = LAST_RECORD_LSN
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
@@ -840,7 +813,7 @@ impl Drop for TimelineMetrics {
.write()
.unwrap()
.remove(tenant_id, timeline_id);
for op in StorageTimeOperation::VARIANTS {
for op in STORAGE_TIME_OPERATIONS {
let _ =
STORAGE_TIME_SUM_PER_TIMELINE.remove_label_values(&[op, tenant_id, timeline_id]);
let _ =

View File

@@ -80,7 +80,6 @@ use utils::{
mod blob_io;
pub mod block_io;
pub mod disk_btree;
pub mod disk_persistent_bst;
pub(crate) mod ephemeral_file;
pub mod layer_map;
@@ -603,9 +602,12 @@ impl Tenant {
remote_storage: GenericRemoteStorage,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
// TODO dedup with spawn_load
let tenant_conf =
Self::load_tenant_config(conf, tenant_id).context("load tenant config")?;
// XXX: Attach should provide the config, especially during tenant migration.
// See https://github.com/neondatabase/neon/issues/1555
let tenant_conf = TenantConfOpt::default();
Self::attach_idempotent_create_marker_file(conf, tenant_id)
.context("create attach marker file")?;
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
let tenant = Arc::new(Tenant::new(
@@ -642,6 +644,45 @@ impl Tenant {
Ok(tenant)
}
fn attach_idempotent_create_marker_file(
conf: &'static PageServerConf,
tenant_id: TenantId,
) -> anyhow::Result<()> {
// Create directory with marker file to indicate attaching state.
// The load_local_tenants() function in tenant::mgr relies on the marker file
// to determine whether a tenant has finished attaching.
let tenant_dir = conf.tenant_path(&tenant_id);
let marker_file = conf.tenant_attaching_mark_file_path(&tenant_id);
debug_assert_eq!(marker_file.parent().unwrap(), tenant_dir);
// TODO: should use tokio::fs here, but
// 1. caller is not async, for good reason (it holds tenants map lock)
// 2. we'd need to think about cancel safety. Turns out dropping a tokio::fs future
// doesn't wait for the activity in the fs thread pool.
crashsafe::create_dir_all(&tenant_dir).context("create tenant directory")?;
match fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(&marker_file)
{
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
// Either this is a retry of attach or there is a concurrent task also doing attach for this tenant.
// We cannot distinguish this here.
// The caller is responsible for ensuring there's no concurrent attach for a tenant.
{} // fsync again, we don't know if that already happened
}
err => {
err.context("create tenant attaching marker file")?;
unreachable!("we covered the Ok() case above");
}
}
crashsafe::fsync_file_and_parent(&marker_file)
.context("fsync tenant attaching marker file and parent")?;
debug_assert!(tenant_dir.is_dir());
debug_assert!(marker_file.is_file());
Ok(())
}
///
/// Background task that downloads all data for a tenant and brings it to Active state.
///
@@ -793,8 +834,6 @@ impl Tenant {
remote_client: RemoteTimelineClient,
ctx: &RequestContext,
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
info!("downloading index file for timeline {}", timeline_id);
tokio::fs::create_dir_all(self.conf.timeline_path(&timeline_id, &self.tenant_id))
.await
@@ -1059,8 +1098,6 @@ impl Tenant {
local_metadata: TimelineMetadata,
ctx: &RequestContext,
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
let remote_client = self.remote_storage.as_ref().map(|remote_storage| {
RemoteTimelineClient::new(
remote_storage.clone(),
@@ -1227,24 +1264,8 @@ impl Tenant {
"Cannot create timelines on inactive tenant"
);
if let Ok(existing) = self.get_timeline(new_timeline_id, false) {
if self.get_timeline(new_timeline_id, false).is_ok() {
debug!("timeline {new_timeline_id} already exists");
if let Some(remote_client) = existing.remote_client.as_ref() {
// Wait for uploads to complete, so that when we return Ok, the timeline
// is known to be durable on remote storage. Just like we do at the end of
// this function, after we have created the timeline ourselves.
//
// We only really care that the initial version of `index_part.json` has
// been uploaded. That's enough to remember that the timeline
// exists. However, there is no function to wait specifically for that so
// we just wait for all in-progress uploads to finish.
remote_client
.wait_completion()
.await
.context("wait for timeline uploads to complete")?;
}
return Ok(None);
}
@@ -1286,17 +1307,6 @@ impl Tenant {
}
};
if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
// Wait for the upload of the 'index_part.json` file to finish, so that when we return
// Ok, the timeline is durable in remote storage.
let kind = ancestor_timeline_id
.map(|_| "branched")
.unwrap_or("bootstrapped");
remote_client.wait_completion().await.with_context(|| {
format!("wait for {} timeline initial uploads to complete", kind)
})?;
}
Ok(Some(loaded_timeline))
}
@@ -1590,8 +1600,6 @@ impl Tenant {
/// Changes tenant status to active, unless shutdown was already requested.
fn activate(&self, ctx: &RequestContext) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
let mut result = Ok(());
self.state.send_modify(|current_state| {
match &*current_state {
@@ -2083,7 +2091,6 @@ impl Tenant {
// enough to just fsync it always.
crashsafe::fsync(target_config_parent)?;
// XXX we're not fsyncing the parent dir, need to do that in case `creating_tenant`
Ok(())
};
@@ -2369,18 +2376,17 @@ impl Tenant {
src_timeline.initdb_lsn,
src_timeline.pg_version,
);
let new_timeline = {
let mut timelines = self.timelines.lock().unwrap();
self.prepare_timeline(
let mut timelines = self.timelines.lock().unwrap();
let new_timeline = self
.prepare_timeline(
dst_id,
&metadata,
timeline_uninit_mark,
false,
Some(Arc::clone(src_timeline)),
)?
.initialize_with_lock(ctx, &mut timelines, true, true)?
};
.initialize_with_lock(ctx, &mut timelines, true, true)?;
drop(timelines);
// Root timeline gets its layers during creation and uploads them along with the metadata.
// A branch timeline though, when created, can get no writes for some time, hence won't get any layers created.
@@ -2727,23 +2733,15 @@ fn remove_timeline_and_uninit_mark(timeline_dir: &Path, uninit_mark: &Path) -> a
Ok(())
}
pub(crate) enum CreateTenantFilesMode {
Create,
Attach,
}
pub(crate) fn create_tenant_files(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
tenant_id: TenantId,
mode: CreateTenantFilesMode,
) -> anyhow::Result<PathBuf> {
let target_tenant_directory = conf.tenant_path(&tenant_id);
anyhow::ensure!(
!target_tenant_directory
.try_exists()
.context("check existence of tenant directory")?,
"tenant directory already exists",
!target_tenant_directory.exists(),
"cannot create new tenant repo: '{tenant_id}' directory already exists",
);
let temporary_tenant_dir =
@@ -2765,7 +2763,6 @@ pub(crate) fn create_tenant_files(
conf,
tenant_conf,
tenant_id,
mode,
&temporary_tenant_dir,
&target_tenant_directory,
);
@@ -2790,28 +2787,9 @@ fn try_create_target_tenant_dir(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
tenant_id: TenantId,
mode: CreateTenantFilesMode,
temporary_tenant_dir: &Path,
target_tenant_directory: &Path,
) -> Result<(), anyhow::Error> {
match mode {
CreateTenantFilesMode::Create => {} // needs no attach marker, writing tenant conf + atomic rename of dir is good enough
CreateTenantFilesMode::Attach => {
let attach_marker_path = temporary_tenant_dir.join(TENANT_ATTACHING_MARKER_FILENAME);
let file = std::fs::OpenOptions::new()
.create_new(true)
.write(true)
.open(&attach_marker_path)
.with_context(|| {
format!("could not create attach marker file {attach_marker_path:?}")
})?;
file.sync_all().with_context(|| {
format!("could not sync attach marker file: {attach_marker_path:?}")
})?;
// fsync of the directory in which the file resides comes later in this function
}
}
let temporary_tenant_timelines_dir = rebase_directory(
&conf.timelines_path(&tenant_id),
target_tenant_directory,
@@ -2838,11 +2816,6 @@ fn try_create_target_tenant_dir(
anyhow::bail!("failpoint tenant-creation-before-tmp-rename");
});
// Make sure the current tenant directory entries are durable before renaming.
// Without this, a crash may reorder any of the directory entry creations above.
crashsafe::fsync(temporary_tenant_dir)
.with_context(|| format!("sync temporary tenant directory {temporary_tenant_dir:?}"))?;
fs::rename(temporary_tenant_dir, target_tenant_directory).with_context(|| {
format!(
"move tenant {} temporary directory {} into the permanent one {}",
@@ -3978,28 +3951,3 @@ mod tests {
Ok(())
}
}
#[cfg(not(debug_assertions))]
#[inline]
pub(crate) fn debug_assert_current_span_has_tenant_id() {}
#[cfg(debug_assertions)]
pub static TENANT_ID_EXTRACTOR: once_cell::sync::Lazy<
utils::tracing_span_assert::MultiNameExtractor<2>,
> = once_cell::sync::Lazy::new(|| {
utils::tracing_span_assert::MultiNameExtractor::new("TenantId", ["tenant_id", "tenant"])
});
#[cfg(debug_assertions)]
#[inline]
pub(crate) fn debug_assert_current_span_has_tenant_id() {
use utils::tracing_span_assert;
match tracing_span_assert::check_fields_present([&*TENANT_ID_EXTRACTOR]) {
Ok(()) => (),
Err(missing) => panic!(
"missing extractors: {:?}",
missing.into_iter().map(|e| e.name()).collect::<Vec<_>>()
),
}
}

View File

@@ -9,7 +9,7 @@
//! may lead to a data loss.
//!
use anyhow::Context;
use pageserver_api::models;
use pageserver_api::models::{TenantConfigRequest, TenantCreateRequest};
use serde::{Deserialize, Serialize};
use std::num::NonZeroU64;
use std::time::Duration;
@@ -292,10 +292,10 @@ fn bad_duration<'a>(field_name: &'static str, value: &'a str) -> impl 'a + Fn()
move || format!("Cannot parse `{field_name}` duration {value:?}")
}
impl TryFrom<&'_ models::TenantConfig> for TenantConfOpt {
impl TryFrom<&'_ TenantCreateRequest> for TenantConfOpt {
type Error = anyhow::Error;
fn try_from(request_data: &'_ models::TenantConfig) -> Result<Self, Self::Error> {
fn try_from(request_data: &TenantCreateRequest) -> Result<Self, Self::Error> {
let mut tenant_conf = TenantConfOpt::default();
if let Some(gc_period) = &request_data.gc_period {
@@ -377,6 +377,84 @@ impl TryFrom<&'_ models::TenantConfig> for TenantConfOpt {
}
}
impl TryFrom<&'_ TenantConfigRequest> for TenantConfOpt {
type Error = anyhow::Error;
fn try_from(request_data: &TenantConfigRequest) -> Result<Self, Self::Error> {
let mut tenant_conf = TenantConfOpt::default();
if let Some(gc_period) = &request_data.gc_period {
tenant_conf.gc_period = Some(
humantime::parse_duration(gc_period)
.with_context(bad_duration("gc_period", gc_period))?,
);
}
tenant_conf.gc_horizon = request_data.gc_horizon;
tenant_conf.image_creation_threshold = request_data.image_creation_threshold;
if let Some(pitr_interval) = &request_data.pitr_interval {
tenant_conf.pitr_interval = Some(
humantime::parse_duration(pitr_interval)
.with_context(bad_duration("pitr_interval", pitr_interval))?,
);
}
if let Some(walreceiver_connect_timeout) = &request_data.walreceiver_connect_timeout {
tenant_conf.walreceiver_connect_timeout = Some(
humantime::parse_duration(walreceiver_connect_timeout).with_context(
bad_duration("walreceiver_connect_timeout", walreceiver_connect_timeout),
)?,
);
}
if let Some(lagging_wal_timeout) = &request_data.lagging_wal_timeout {
tenant_conf.lagging_wal_timeout = Some(
humantime::parse_duration(lagging_wal_timeout)
.with_context(bad_duration("lagging_wal_timeout", lagging_wal_timeout))?,
);
}
tenant_conf.max_lsn_wal_lag = request_data.max_lsn_wal_lag;
tenant_conf.trace_read_requests = request_data.trace_read_requests;
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
if let Some(checkpoint_timeout) = &request_data.checkpoint_timeout {
tenant_conf.checkpoint_timeout = Some(
humantime::parse_duration(checkpoint_timeout)
.with_context(bad_duration("checkpoint_timeout", checkpoint_timeout))?,
);
}
tenant_conf.compaction_target_size = request_data.compaction_target_size;
tenant_conf.compaction_threshold = request_data.compaction_threshold;
if let Some(compaction_period) = &request_data.compaction_period {
tenant_conf.compaction_period = Some(
humantime::parse_duration(compaction_period)
.with_context(bad_duration("compaction_period", compaction_period))?,
);
}
if let Some(eviction_policy) = &request_data.eviction_policy {
tenant_conf.eviction_policy = Some(
serde::Deserialize::deserialize(eviction_policy)
.context("parse field `eviction_policy`")?,
);
}
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
if let Some(evictions_low_residence_duration_metric_threshold) =
&request_data.evictions_low_residence_duration_metric_threshold
{
tenant_conf.evictions_low_residence_duration_metric_threshold = Some(
humantime::parse_duration(evictions_low_residence_duration_metric_threshold)
.with_context(bad_duration(
"evictions_low_residence_duration_metric_threshold",
evictions_low_residence_duration_metric_threshold,
))?,
);
}
Ok(tenant_conf)
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -19,7 +19,7 @@ use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::TenantConfOpt;
use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState};
use crate::tenant::{Tenant, TenantState};
use crate::IGNORED_TENANT_FILE_NAME;
use utils::fs_ext::PathExt;
@@ -282,15 +282,9 @@ pub async fn create_tenant(
// We're holding the tenants lock in write mode while doing local IO.
// If this section ever becomes contentious, introduce a new `TenantState::Creating`
// and do the work in that state.
let tenant_directory = super::create_tenant_files(conf, tenant_conf, tenant_id, CreateTenantFilesMode::Create)?;
// TODO: tenant directory remains on disk if we bail out from here on.
// See https://github.com/neondatabase/neon/issues/4233
let tenant_directory = super::create_tenant_files(conf, tenant_conf, tenant_id)?;
let created_tenant =
schedule_local_tenant_processing(conf, &tenant_directory, remote_storage, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
let crated_tenant_id = created_tenant.tenant_id();
anyhow::ensure!(
tenant_id == crated_tenant_id,
@@ -472,32 +466,19 @@ pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapLis
pub async fn attach_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
tenant_conf: TenantConfOpt,
remote_storage: GenericRemoteStorage,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
tenant_map_insert(tenant_id, |vacant_entry| {
let tenant_dir = create_tenant_files(conf, tenant_conf, tenant_id, CreateTenantFilesMode::Attach)?;
// TODO: tenant directory remains on disk if we bail out from here on.
// See https://github.com/neondatabase/neon/issues/4233
// Without the attach marker, schedule_local_tenant_processing will treat the attached tenant as fully attached
let marker_file_exists = conf
.tenant_attaching_mark_file_path(&tenant_id)
.try_exists()
.context("check for attach marker file existence")?;
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, Some(remote_storage), ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
let attached_tenant_id = attached_tenant.tenant_id();
let tenant_path = conf.tenant_path(&tenant_id);
anyhow::ensure!(
tenant_id == attached_tenant_id,
"loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {attached_tenant_id})",
!tenant_path.exists(),
"Cannot attach tenant {tenant_id}, local tenant directory already exists"
);
vacant_entry.insert(Arc::clone(&attached_tenant));
let tenant =
Tenant::spawn_attach(conf, tenant_id, remote_storage, ctx).context("spawn_attach")?;
vacant_entry.insert(tenant);
Ok(())
})
.await

View File

@@ -945,7 +945,7 @@ impl Timeline {
pub async fn wait_to_become_active(
&self,
_ctx: &RequestContext, // Prepare for use by cancellation
_ctx: &RequestContext, /* Prepare for use by cancellation */
) -> Result<(), TimelineState> {
let mut receiver = self.state.subscribe();
loop {
@@ -4416,6 +4416,12 @@ pub(crate) fn debug_assert_current_span_has_tenant_and_timeline_id() {}
pub(crate) fn debug_assert_current_span_has_tenant_and_timeline_id() {
use utils::tracing_span_assert;
pub static TENANT_ID_EXTRACTOR: once_cell::sync::Lazy<
tracing_span_assert::MultiNameExtractor<2>,
> = once_cell::sync::Lazy::new(|| {
tracing_span_assert::MultiNameExtractor::new("TenantId", ["tenant_id", "tenant"])
});
pub static TIMELINE_ID_EXTRACTOR: once_cell::sync::Lazy<
tracing_span_assert::MultiNameExtractor<2>,
> = once_cell::sync::Lazy::new(|| {
@@ -4423,7 +4429,7 @@ pub(crate) fn debug_assert_current_span_has_tenant_and_timeline_id() {
});
match tracing_span_assert::check_fields_present([
&*super::TENANT_ID_EXTRACTOR,
&*TENANT_ID_EXTRACTOR,
&*TIMELINE_ID_EXTRACTOR,
]) {
Ok(()) => (),

View File

@@ -28,8 +28,8 @@ use storage_broker::proto::SubscribeSafekeeperInfoRequest;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use storage_broker::BrokerClientChannel;
use storage_broker::Streaming;
use tokio::select;
use tokio::sync::RwLock;
use tokio::{select, sync::watch};
use tracing::*;
use crate::{exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS};
@@ -50,13 +50,13 @@ pub(super) async fn connection_manager_loop_step(
ctx: &RequestContext,
manager_status: &RwLock<Option<ConnectionManagerStatus>>,
) -> ControlFlow<(), ()> {
match connection_manager_state
let mut timeline_state_updates = connection_manager_state
.timeline
.wait_to_become_active(ctx)
.await
{
Ok(()) => {}
Err(_) => {
.subscribe_for_state_updates();
match wait_for_active_timeline(&mut timeline_state_updates).await {
ControlFlow::Continue(()) => {}
ControlFlow::Break(()) => {
info!("Timeline dropped state updates sender before becoming active, stopping wal connection manager loop");
return ControlFlow::Break(());
}
@@ -72,10 +72,6 @@ pub(super) async fn connection_manager_loop_step(
timeline_id: connection_manager_state.timeline.timeline_id,
};
let mut timeline_state_updates = connection_manager_state
.timeline
.subscribe_for_state_updates();
// Subscribe to the broker updates. Stream shares underlying TCP connection
// with other streams on this client (other connection managers). When
// object goes out of scope, stream finishes in drop() automatically.
@@ -199,6 +195,34 @@ pub(super) async fn connection_manager_loop_step(
}
}
async fn wait_for_active_timeline(
timeline_state_updates: &mut watch::Receiver<TimelineState>,
) -> ControlFlow<(), ()> {
let current_state = *timeline_state_updates.borrow();
if current_state == TimelineState::Active {
return ControlFlow::Continue(());
}
loop {
match timeline_state_updates.changed().await {
Ok(()) => {
let new_state = *timeline_state_updates.borrow();
match new_state {
TimelineState::Active => {
debug!("Timeline state changed to active, continuing the walreceiver connection manager");
return ControlFlow::Continue(());
}
state => {
debug!("Not running the walreceiver connection manager, timeline is not active: {state:?}");
continue;
}
}
}
Err(_sender_dropped_error) => return ControlFlow::Break(()),
}
}
}
/// Endlessly try to subscribe for broker updates for a given timeline.
async fn subscribe_for_timeline_updates(
broker_client: &mut BrokerClientChannel,

View File

@@ -379,6 +379,17 @@ impl XlXactParsedRecord {
});
}
}
if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
let nmsgs = buf.get_i32_le();
for _i in 0..nmsgs {
let sizeof_shared_invalidation_message = 0;
buf.advance(sizeof_shared_invalidation_message);
}
}
if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
xid = buf.get_u32_le();
trace!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE");
}
if xinfo & postgres_ffi::v15::bindings::XACT_XINFO_HAS_DROPPED_STATS != 0 {
let nitems = buf.get_i32_le();
@@ -386,23 +397,7 @@ impl XlXactParsedRecord {
"XLOG_XACT_COMMIT-XACT_XINFO_HAS_DROPPED_STAT nitems {}",
nitems
);
let sizeof_xl_xact_stats_item = 12;
buf.advance((nitems * sizeof_xl_xact_stats_item).try_into().unwrap());
}
if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
let nmsgs = buf.get_i32_le();
let sizeof_shared_invalidation_message = 16;
buf.advance(
(nmsgs * sizeof_shared_invalidation_message)
.try_into()
.unwrap(),
);
}
if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
xid = buf.get_u32_le();
debug!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE xid {}", xid);
//FIXME: do we need to handle dropped stats here?
}
XlXactParsedRecord {

View File

@@ -192,9 +192,8 @@ retry:
{
if (!PQconsumeInput(pageserver_conn))
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
neon_log(LOG, "could not get response from pageserver: %s", msg);
pfree(msg);
neon_log(LOG, "could not get response from pageserver: %s",
PQerrorMessage(pageserver_conn));
return -1;
}
}
@@ -344,7 +343,7 @@ pageserver_receive(void)
resp = NULL;
}
else if (rc == -2)
neon_log(ERROR, "could not read COPY data: %s", pchomp(PQerrorMessage(pageserver_conn)));
neon_log(ERROR, "could not read COPY data: %s", PQerrorMessage(pageserver_conn));
else
neon_log(ERROR, "unexpected PQgetCopyData return value: %d", rc);
}
@@ -368,7 +367,7 @@ pageserver_flush(void)
}
else if (PQflush(pageserver_conn))
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
char *msg = PQerrorMessage(pageserver_conn);
pageserver_disconnect();
neon_log(ERROR, "failed to flush page requests: %s", msg);

View File

@@ -7,7 +7,6 @@ mod credentials;
pub use credentials::ClientCredentials;
mod password_hack;
pub use password_hack::parse_endpoint_param;
use password_hack::PasswordHackPayload;
mod flow;
@@ -45,10 +44,10 @@ pub enum AuthErrorImpl {
#[error(
"Endpoint ID is not specified. \
Either please upgrade the postgres client library (libpq) for SNI support \
or pass the endpoint ID (first part of the domain name) as a parameter: '?options=endpoint%3D<endpoint-id>'. \
or pass the endpoint ID (first part of the domain name) as a parameter: '?options=project%3D<endpoint-id>'. \
See more at https://neon.tech/sni"
)]
MissingEndpointName,
MissingProjectName,
#[error("password authentication failed for user '{0}'")]
AuthFailed(Box<str>),
@@ -89,7 +88,7 @@ impl UserFacingError for AuthError {
AuthFailed(_) => self.to_string(),
BadAuthMethod(_) => self.to_string(),
MalformedPassword(_) => self.to_string(),
MissingEndpointName => self.to_string(),
MissingProjectName => self.to_string(),
Io(_) => "Internal error".to_string(),
}
}

View File

@@ -52,8 +52,8 @@ pub async fn password_hack(
.authenticate()
.await?;
info!(project = &payload.endpoint, "received missing parameter");
creds.project = Some(payload.endpoint);
info!(project = &payload.project, "received missing parameter");
creds.project = Some(payload.project);
let mut node = api.wake_compute(extra, creds).await?;
node.config.password(payload.password);

View File

@@ -1,7 +1,6 @@
//! User credentials used in authentication.
use crate::{auth::password_hack::parse_endpoint_param, error::UserFacingError};
use itertools::Itertools;
use crate::error::UserFacingError;
use pq_proto::StartupMessageParams;
use std::collections::HashSet;
use thiserror::Error;
@@ -62,15 +61,7 @@ impl<'a> ClientCredentials<'a> {
// Project name might be passed via PG's command-line options.
let project_option = params
.options_raw()
.and_then(|options| {
// We support both `project` (deprecated) and `endpoint` options for backward compatibility.
// However, if both are present, we don't exactly know which one to use.
// Therefore we require that only one of them is present.
options
.filter_map(parse_endpoint_param)
.at_most_one()
.ok()?
})
.and_then(|mut options| options.find_map(|opt| opt.strip_prefix("project=")))
.map(|name| name.to_string());
let project_from_domain = if let Some(sni_str) = sni {
@@ -186,51 +177,6 @@ mod tests {
Ok(())
}
#[test]
fn parse_endpoint_from_options() -> anyhow::Result<()> {
let options = StartupMessageParams::new([
("user", "john_doe"),
("options", "-ckey=1 endpoint=bar -c geqo=off"),
]);
let creds = ClientCredentials::parse(&options, None, None)?;
assert_eq!(creds.user, "john_doe");
assert_eq!(creds.project.as_deref(), Some("bar"));
Ok(())
}
#[test]
fn parse_three_endpoints_from_options() -> anyhow::Result<()> {
let options = StartupMessageParams::new([
("user", "john_doe"),
(
"options",
"-ckey=1 endpoint=one endpoint=two endpoint=three -c geqo=off",
),
]);
let creds = ClientCredentials::parse(&options, None, None)?;
assert_eq!(creds.user, "john_doe");
assert!(creds.project.is_none());
Ok(())
}
#[test]
fn parse_when_endpoint_and_project_are_in_options() -> anyhow::Result<()> {
let options = StartupMessageParams::new([
("user", "john_doe"),
("options", "-ckey=1 endpoint=bar project=foo -c geqo=off"),
]);
let creds = ClientCredentials::parse(&options, None, None)?;
assert_eq!(creds.user, "john_doe");
assert!(creds.project.is_none());
Ok(())
}
#[test]
fn parse_projects_identical() -> anyhow::Result<()> {
let options = StartupMessageParams::new([("user", "john_doe"), ("options", "project=baz")]);

View File

@@ -91,7 +91,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AuthFlow<'_, S, PasswordHack> {
// the user neither enabled SNI nor resorted to any other method
// for passing the project name we rely on. We should show them
// the most helpful error message and point to the documentation.
.ok_or(AuthErrorImpl::MissingEndpointName)?;
.ok_or(AuthErrorImpl::MissingProjectName)?;
Ok(payload)
}

View File

@@ -6,55 +6,27 @@
use bstr::ByteSlice;
pub struct PasswordHackPayload {
pub endpoint: String,
pub project: String,
pub password: Vec<u8>,
}
impl PasswordHackPayload {
pub fn parse(bytes: &[u8]) -> Option<Self> {
// The format is `project=<utf-8>;<password-bytes>`.
let mut iter = bytes.splitn_str(2, ";");
let endpoint = iter.next()?.to_str().ok()?;
let endpoint = parse_endpoint_param(endpoint)?.to_owned();
let mut iter = bytes.strip_prefix(b"project=")?.splitn_str(2, ";");
let project = iter.next()?.to_str().ok()?.to_owned();
let password = iter.next()?.to_owned();
Some(Self { endpoint, password })
Some(Self { project, password })
}
}
pub fn parse_endpoint_param(bytes: &str) -> Option<&str> {
bytes
.strip_prefix("project=")
.or_else(|| bytes.strip_prefix("endpoint="))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_endpoint_param_fn() {
let input = "";
assert!(parse_endpoint_param(input).is_none());
let input = "project=";
assert_eq!(parse_endpoint_param(input), Some(""));
let input = "project=foobar";
assert_eq!(parse_endpoint_param(input), Some("foobar"));
let input = "endpoint=";
assert_eq!(parse_endpoint_param(input), Some(""));
let input = "endpoint=foobar";
assert_eq!(parse_endpoint_param(input), Some("foobar"));
let input = "other_option=foobar";
assert!(parse_endpoint_param(input).is_none());
}
#[test]
fn parse_password_hack_payload_project() {
fn parse_password_hack_payload() {
let bytes = b"";
assert!(PasswordHackPayload::parse(bytes).is_none());
@@ -62,33 +34,13 @@ mod tests {
assert!(PasswordHackPayload::parse(bytes).is_none());
let bytes = b"project=;";
let payload: PasswordHackPayload =
PasswordHackPayload::parse(bytes).expect("parsing failed");
assert_eq!(payload.endpoint, "");
let payload = PasswordHackPayload::parse(bytes).expect("parsing failed");
assert_eq!(payload.project, "");
assert_eq!(payload.password, b"");
let bytes = b"project=foobar;pass;word";
let payload = PasswordHackPayload::parse(bytes).expect("parsing failed");
assert_eq!(payload.endpoint, "foobar");
assert_eq!(payload.password, b"pass;word");
}
#[test]
fn parse_password_hack_payload_endpoint() {
let bytes = b"";
assert!(PasswordHackPayload::parse(bytes).is_none());
let bytes = b"endpoint=";
assert!(PasswordHackPayload::parse(bytes).is_none());
let bytes = b"endpoint=;";
let payload = PasswordHackPayload::parse(bytes).expect("parsing failed");
assert_eq!(payload.endpoint, "");
assert_eq!(payload.password, b"");
let bytes = b"endpoint=foobar;pass;word";
let payload = PasswordHackPayload::parse(bytes).expect("parsing failed");
assert_eq!(payload.endpoint, "foobar");
assert_eq!(payload.project, "foobar");
assert_eq!(payload.password, b"pass;word");
}
}

View File

@@ -1,4 +1,4 @@
use crate::{auth::parse_endpoint_param, cancellation::CancelClosure, error::UserFacingError};
use crate::{cancellation::CancelClosure, error::UserFacingError};
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
use pq_proto::StartupMessageParams;
@@ -279,7 +279,7 @@ fn filtered_options(params: &StartupMessageParams) -> Option<String> {
#[allow(unstable_name_collisions)]
let options: String = params
.options_raw()?
.filter(|opt| parse_endpoint_param(opt).is_none())
.filter(|opt| !opt.starts_with("project="))
.intersperse(" ") // TODO: use impl from std once it's stabilized
.collect();

View File

@@ -36,9 +36,11 @@ module.exports = async ({ github, context, fetch, report }) => {
// Marker to find the comment in the subsequent runs
const startMarker = `<!--AUTOMATIC COMMENT START #${context.payload.number}-->`
// Let users know that the comment is updated automatically
const autoupdateNotice = `<div align="right"><sub>The comment gets automatically updated with the latest test results<br>${context.payload.pull_request.head.sha} at ${new Date().toISOString()} :recycle:</sub></div>`
const autoupdateNotice = `<div align="right"><sub>The comment gets automatically updated with the latest test results :recycle:</sub></div>`
// GitHub bot id taken from (https://api.github.com/users/github-actions[bot])
const githubActionsBotId = 41898282
// The latest commit in the PR URL
const commitUrl = `${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/pull/${context.payload.number}/commits/${context.payload.pull_request.head.sha}`
// Commend body itself
let commentBody = `${startMarker}\n`
@@ -72,46 +74,36 @@ module.exports = async ({ github, context, fetch, report }) => {
let flakyTestsCount = 0
const pgVersions = new Set()
const buildTypes = new Set()
for (const parentSuite of suites.children) {
for (const suite of parentSuite.children) {
for (const test of suite.children) {
let buildType, pgVersion
const match = test.name.match(/[\[-](?<buildType>debug|release)-pg(?<pgVersion>\d+)[-\]]/)?.groups
if (match) {
({buildType, pgVersion} = match)
} else {
// It's ok, we embed BUILD_TYPE and Postgres Version into the test name only for regress suite and do not for other suites (like performance).
console.info(`Cannot get BUILD_TYPE and Postgres Version from test name: "${test.name}", defaulting to "release" and "14"`)
buildType = "release"
pgVersion = "14"
}
const {groups: {buildType, pgVersion}} = test.name.match(/[\[-](?<buildType>debug|release)-pg(?<pgVersion>\d+)[-\]]/)
pgVersions.add(pgVersion)
buildTypes.add(buildType)
// Removing build type and PostgreSQL version from the test name to make it shorter
const testName = test.name.replace(new RegExp(`${buildType}-pg${pgVersion}-?`), "").replace("[]", "")
test.pytestName = `${parentSuite.name.replace(".", "/")}/${suite.name}.py::${testName}`
test.pgVersion = pgVersion
test.buildType = buildType
if (test.status === "passed") {
passedTests[pgVersion][testName].push(test)
passedTests[pgVersion][buildType].push(test)
passedTestsCount += 1
} else if (test.status === "failed" || test.status === "broken") {
failedTests[pgVersion][testName].push(test)
failedTests[pgVersion][buildType].push(test)
failedTestsCount += 1
} else if (test.status === "skipped") {
skippedTests[pgVersion][testName].push(test)
skippedTests[pgVersion][buildType].push(test)
skippedTestsCount += 1
}
if (test.retriesCount > 0) {
retriedTests[pgVersion][testName].push(test)
retriedTests[pgVersion][buildType].push(test)
if (test.retriesStatusChange) {
flakyTests[pgVersion][testName].push(test)
flakyTests[pgVersion][buildType].push(test)
flakyTestsCount += 1
}
}
@@ -120,44 +112,39 @@ module.exports = async ({ github, context, fetch, report }) => {
}
const totalTestsCount = failedTestsCount + passedTestsCount + skippedTestsCount
commentBody += `### ${totalTestsCount} tests run: ${passedTestsCount} passed, ${failedTestsCount} failed, ${skippedTestsCount} skipped ([full report](${reportUrl}))\n___\n`
commentBody += `### ${totalTestsCount} tests run: ${passedTestsCount} passed, ${failedTestsCount} failed, ${skippedTestsCount} skipped ([full report](${reportUrl}) for ${commitUrl})\n___\n`
// Print test resuls from the newest to the oldest Postgres version for release and debug builds.
// Print test resuls from the newest to the oldest PostgreSQL version for release and debug builds.
for (const pgVersion of Array.from(pgVersions).sort().reverse()) {
if (Object.keys(failedTests[pgVersion]).length > 0) {
commentBody += `#### Failures on Posgres ${pgVersion}\n\n`
for (const [testName, tests] of Object.entries(failedTests[pgVersion])) {
const links = []
for (const test of tests) {
for (const buildType of Array.from(buildTypes).sort().reverse()) {
if (failedTests[pgVersion][buildType].length > 0) {
commentBody += `#### PostgreSQL ${pgVersion} (${buildType} build)\n\n`
commentBody += `Failed tests:\n`
for (const test of failedTests[pgVersion][buildType]) {
const allureLink = `${reportUrl}#suites/${test.parentUid}/${test.uid}`
links.push(`[${test.buildType}](${allureLink})`)
commentBody += `- [\`${test.pytestName}\`](${allureLink})`
if (test.retriesCount > 0) {
commentBody += ` (ran [${test.retriesCount + 1} times](${allureLink}/retries))`
}
commentBody += "\n"
}
commentBody += `- \`${testName}\`: ${links.join(", ")}\n`
commentBody += "\n"
}
const testsToRerun = Object.values(failedTests[pgVersion]).map(x => x[0].name)
const command = `DEFAULT_PG_VERSION=${pgVersion} scripts/pytest -k "${testsToRerun.join(" or ")}"`
commentBody += "```\n"
commentBody += `# Run failed on Postgres ${pgVersion} tests locally:\n`
commentBody += `${command}\n`
commentBody += "```\n"
}
}
if (flakyTestsCount > 0) {
commentBody += `<details>\n<summary>Flaky tests (${flakyTestsCount})</summary>\n\n`
commentBody += "<details>\n<summary>Flaky tests</summary>\n\n"
for (const pgVersion of Array.from(pgVersions).sort().reverse()) {
if (Object.keys(flakyTests[pgVersion]).length > 0) {
commentBody += `#### Postgres ${pgVersion}\n\n`
for (const [testName, tests] of Object.entries(flakyTests[pgVersion])) {
const links = []
for (const test of tests) {
const allureLink = `${reportUrl}#suites/${test.parentUid}/${test.uid}/retries`
for (const buildType of Array.from(buildTypes).sort().reverse()) {
if (flakyTests[pgVersion][buildType].length > 0) {
commentBody += `#### PostgreSQL ${pgVersion} (${buildType} build)\n\n`
for (const test of flakyTests[pgVersion][buildType]) {
const status = test.status === "passed" ? ":white_check_mark:" : ":x:"
links.push(`[${status} ${test.buildType}](${allureLink})`)
commentBody += `- ${status} [\`${test.pytestName}\`](${reportUrl}#suites/${test.parentUid}/${test.uid}/retries)\n`
}
commentBody += `- \`${testName}\`: ${links.join(", ")}\n`
commentBody += "\n"
}
}
}

View File

@@ -272,7 +272,6 @@ class PageserverHttpClient(requests.Session):
new_timeline_id: Optional[TimelineId] = None,
ancestor_timeline_id: Optional[TimelineId] = None,
ancestor_start_lsn: Optional[Lsn] = None,
**kwargs,
) -> Dict[Any, Any]:
body: Dict[str, Any] = {
"new_timeline_id": str(new_timeline_id) if new_timeline_id else None,
@@ -282,9 +281,7 @@ class PageserverHttpClient(requests.Session):
if pg_version != PgVersion.NOT_SET:
body["pg_version"] = int(pg_version)
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline", json=body, **kwargs
)
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline", json=body)
self.verbose_error(res)
if res.status_code == 409:
raise Exception(f"could not create timeline: already exists for id {new_timeline_id}")

View File

@@ -136,7 +136,9 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> Ev
env.pageserver.allowed_errors.append(r".* running disk usage based eviction due to pressure.*")
# remove the initial tenant
## why wait for upload queue? => https://github.com/neondatabase/neon/issues/3865
assert env.initial_timeline
wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, env.initial_timeline)
pageserver_http.tenant_detach(env.initial_tenant)
assert isinstance(env.remote_storage, LocalFsStorage)
tenant_remote_storage = env.remote_storage.root / "tenants" / str(env.initial_tenant)

View File

@@ -20,7 +20,6 @@ from fixtures.pageserver.utils import (
assert_tenant_state,
wait_for_last_record_lsn,
wait_for_upload,
wait_for_upload_queue_empty,
wait_until_tenant_state,
)
from fixtures.types import Lsn
@@ -64,15 +63,12 @@ def test_ondemand_download_large_rel(
tenant, _ = env.neon_cli.create_tenant(
conf={
# disable background GC
"gc_period": "0s",
"gc_period": "10 m",
"gc_horizon": f"{10 * 1024 ** 3}", # 10 GB
# small checkpoint distance to create more delta layer files
"checkpoint_distance": f"{10 * 1024 ** 2}", # 10 MB
# allow compaction with the checkpoint
"compaction_threshold": "3",
"compaction_target_size": f"{10 * 1024 ** 2}", # 10 MB
# but don't run compaction in background or on restart
"compaction_period": "0s",
}
)
env.initial_tenant = tenant
@@ -99,17 +95,9 @@ def test_ondemand_download_large_rel(
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
# wait until pageserver receives that data
wait_for_last_record_lsn(client, tenant_id, timeline_id, current_lsn)
# stop endpoint before checkpoint to stop wal generation
endpoint.stop()
# stopping of safekeepers now will help us not to calculate logical size
# after startup, so page requests should be the only one on-demand
# downloading the layers
for sk in env.safekeepers:
sk.stop()
# run checkpoint manually to be sure that data landed in remote storage
client.timeline_checkpoint(tenant_id, timeline_id)
@@ -118,6 +106,7 @@ def test_ondemand_download_large_rel(
log.info("uploads have finished")
##### Stop the first pageserver instance, erase all its data
endpoint.stop()
env.pageserver.stop()
# remove all the layer files
@@ -128,13 +117,8 @@ def test_ondemand_download_large_rel(
##### Second start, restore the data and ensure it's the same
env.pageserver.start()
# start a readonly endpoint which we'll use to check the database.
# readonly (with lsn=) is required so that we don't try to connect to
# safekeepers, that have now been shut down.
endpoint = env.endpoints.create_start("main", lsn=current_lsn)
endpoint.start()
before_downloads = get_num_downloaded_layers(client, tenant_id, timeline_id)
assert before_downloads != 0, "basebackup should on-demand non-zero layers"
# Probe in the middle of the table. There's a high chance that the beginning
# and end of the table was stored together in the same layer files with data
@@ -165,7 +149,6 @@ def test_ondemand_download_timetravel(
##### First start, insert data and upload it to the remote storage
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
# Override defaults, to create more layers
tenant, _ = env.neon_cli.create_tenant(
@@ -242,8 +225,7 @@ def test_ondemand_download_timetravel(
assert filled_current_physical == filled_size, "we don't yet do layer eviction"
# Wait until generated image layers are uploaded to S3
if remote_storage_kind is not None:
wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, timeline_id)
time.sleep(3)
env.pageserver.stop()

View File

@@ -5,18 +5,16 @@ import pytest
from fixtures.neon_fixtures import PSQL, NeonProxy, VanillaPostgres
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
def test_proxy_select_1(static_proxy: NeonProxy, option_name: str):
def test_proxy_select_1(static_proxy: NeonProxy):
"""
A simplest smoke test: check proxy against a local postgres instance.
"""
out = static_proxy.safe_psql("select 1", options=f"{option_name}=generic-project-name")
out = static_proxy.safe_psql("select 1", options="project=generic-project-name")
assert out[0][0] == 1
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
def test_password_hack(static_proxy: NeonProxy, option_name: str):
def test_password_hack(static_proxy: NeonProxy):
"""
Check the PasswordHack auth flow: an alternative to SCRAM auth for
clients which can't provide the project/endpoint name via SNI or `options`.
@@ -25,12 +23,11 @@ def test_password_hack(static_proxy: NeonProxy, option_name: str):
user = "borat"
password = "password"
static_proxy.safe_psql(
f"create role {user} with login password '{password}'",
options=f"{option_name}=irrelevant",
f"create role {user} with login password '{password}'", options="project=irrelevant"
)
# Note the format of `magic`!
magic = f"{option_name}=irrelevant;{password}"
magic = f"project=irrelevant;{password}"
static_proxy.safe_psql("select 1", sslsni=0, user=user, password=magic)
# Must also check that invalid magic won't be accepted.
@@ -59,62 +56,55 @@ async def test_link_auth(vanilla_pg: VanillaPostgres, link_proxy: NeonProxy):
assert out == "42"
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
def test_proxy_options(static_proxy: NeonProxy, option_name: str):
def test_proxy_options(static_proxy: NeonProxy):
"""
Check that we pass extra `options` to the PostgreSQL server:
* `project=...` and `endpoint=...` shouldn't be passed at all
* (otherwise postgres will raise an error).
* `project=...` shouldn't be passed at all (otherwise postgres will raise an error).
* everything else should be passed as-is.
"""
options = f"{option_name}=irrelevant -cproxytest.option=value"
options = "project=irrelevant -cproxytest.option=value"
out = static_proxy.safe_psql("show proxytest.option", options=options)
assert out[0][0] == "value"
options = f"-c proxytest.foo=\\ str {option_name}=irrelevant"
options = "-c proxytest.foo=\\ str project=irrelevant"
out = static_proxy.safe_psql("show proxytest.foo", options=options)
assert out[0][0] == " str"
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
def test_auth_errors(static_proxy: NeonProxy, option_name: str):
def test_auth_errors(static_proxy: NeonProxy):
"""
Check that we throw very specific errors in some unsuccessful auth scenarios.
"""
# User does not exist
with pytest.raises(psycopg2.Error) as exprinfo:
static_proxy.connect(user="pinocchio", options=f"{option_name}=irrelevant")
static_proxy.connect(user="pinocchio", options="project=irrelevant")
text = str(exprinfo.value).strip()
assert text.endswith("password authentication failed for user 'pinocchio'")
static_proxy.safe_psql(
"create role pinocchio with login password 'magic'",
options=f"{option_name}=irrelevant",
"create role pinocchio with login password 'magic'", options="project=irrelevant"
)
# User exists, but password is missing
with pytest.raises(psycopg2.Error) as exprinfo:
static_proxy.connect(user="pinocchio", password=None, options=f"{option_name}=irrelevant")
static_proxy.connect(user="pinocchio", password=None, options="project=irrelevant")
text = str(exprinfo.value).strip()
assert text.endswith("password authentication failed for user 'pinocchio'")
# User exists, but password is wrong
with pytest.raises(psycopg2.Error) as exprinfo:
static_proxy.connect(user="pinocchio", password="bad", options=f"{option_name}=irrelevant")
static_proxy.connect(user="pinocchio", password="bad", options="project=irrelevant")
text = str(exprinfo.value).strip()
assert text.endswith("password authentication failed for user 'pinocchio'")
# Finally, check that the user can connect
with static_proxy.connect(
user="pinocchio", password="magic", options=f"{option_name}=irrelevant"
):
with static_proxy.connect(user="pinocchio", password="magic", options="project=irrelevant"):
pass
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
def test_forward_params_to_client(static_proxy: NeonProxy, option_name: str):
def test_forward_params_to_client(static_proxy: NeonProxy):
"""
Check that we forward all necessary PostgreSQL server params to client.
"""
@@ -140,7 +130,7 @@ def test_forward_params_to_client(static_proxy: NeonProxy, option_name: str):
where name = any(%s)
"""
with static_proxy.connect(options=f"{option_name}=irrelevant") as conn:
with static_proxy.connect(options="project=irrelevant") as conn:
with conn.cursor() as cur:
cur.execute(query, (reported_params_subset,))
for name, value in cur.fetchall():
@@ -148,18 +138,17 @@ def test_forward_params_to_client(static_proxy: NeonProxy, option_name: str):
assert conn.get_parameter_status(name) == value
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
@pytest.mark.timeout(5)
def test_close_on_connections_exit(static_proxy: NeonProxy, option_name: str):
def test_close_on_connections_exit(static_proxy: NeonProxy):
# Open two connections, send SIGTERM, then ensure that proxy doesn't exit
# until after connections close.
with static_proxy.connect(options=f"{option_name}=irrelevant"), static_proxy.connect(
options=f"{option_name}=irrelevant"
with static_proxy.connect(options="project=irrelevant"), static_proxy.connect(
options="project=irrelevant"
):
static_proxy.terminate()
with pytest.raises(subprocess.TimeoutExpired):
static_proxy.wait_for_exit(timeout=2)
# Ensure we don't accept any more connections
with pytest.raises(psycopg2.OperationalError):
static_proxy.connect(options=f"{option_name}=irrelevant")
static_proxy.connect(options="project=irrelevant")
static_proxy.wait_for_exit()

View File

@@ -2,12 +2,11 @@
# env NEON_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/neon_zzz/'}" poetry ......
import os
import queue
import shutil
import threading
import time
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from typing import Dict, List, Tuple
import pytest
from fixtures.log_helper import log
@@ -27,7 +26,6 @@ from fixtures.pageserver.utils import (
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import print_gc_result, query_scalar, wait_until
from requests import ReadTimeout
#
@@ -83,7 +81,9 @@ def test_remote_storage_backup_and_restore(
env.pageserver.allowed_errors.append(".*failed to load remote timeline.*")
# we have a bunch of pytest.raises for these below
env.pageserver.allowed_errors.append(".*tenant .*? already exists, state:.*")
env.pageserver.allowed_errors.append(".*tenant directory already exists.*")
env.pageserver.allowed_errors.append(
".*Cannot attach tenant .*?, local tenant directory already exists.*"
)
env.pageserver.allowed_errors.append(".*simulated failure of remote operation.*")
pageserver_http = env.pageserver.http_client()
@@ -626,7 +626,10 @@ def test_empty_branch_remote_storage_upload(
new_branch_name = "new_branch"
new_branch_timeline_id = env.neon_cli.create_branch(new_branch_name, "main", env.initial_tenant)
assert_nothing_to_upload(client, env.initial_tenant, new_branch_timeline_id)
with env.endpoints.create_start(new_branch_name, tenant_id=env.initial_tenant) as endpoint:
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, new_branch_timeline_id)
wait_upload_queue_empty(client, env.initial_tenant, new_branch_timeline_id)
timelines_before_detach = set(
map(
@@ -655,19 +658,13 @@ def test_empty_branch_remote_storage_upload(
), f"Expected to have same timelines after reattach, but got {timelines_after_detach}"
# Branches off a root branch, but does not write anything to the new branch, so it has a metadata file only.
# Ensures the branch is not on the remote storage and restarts the pageserver — the branch should be uploaded after the restart.
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
def test_empty_branch_remote_storage_upload_on_restart(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
"""
Branches off a root branch, but does not write anything to the new branch, so
it has a metadata file only.
Ensures the branch is not on the remote storage and restarts the pageserver
— the upload should be scheduled by load, and create_timeline should await
for it even though it gets 409 Conflict.
"""
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_empty_branch_remote_storage_upload_on_restart",
@@ -676,87 +673,35 @@ def test_empty_branch_remote_storage_upload_on_restart(
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
client.configure_failpoints(("before-upload-index", "return"))
new_branch_name = "new_branch"
new_branch_timeline_id = env.neon_cli.create_branch(new_branch_name, "main", env.initial_tenant)
new_branch_timeline_id = TimelineId.generate()
with env.endpoints.create_start(new_branch_name, tenant_id=env.initial_tenant) as endpoint:
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, new_branch_timeline_id)
wait_upload_queue_empty(client, env.initial_tenant, new_branch_timeline_id)
with pytest.raises(ReadTimeout):
client.timeline_create(
tenant_id=env.initial_tenant,
ancestor_timeline_id=env.initial_timeline,
new_timeline_id=new_branch_timeline_id,
pg_version=env.pg_version,
timeout=4,
)
env.pageserver.allowed_errors.append(
f".*POST.* path=/v1/tenant/{env.initial_tenant}/timeline.* request was dropped before completing"
)
# index upload is now hitting the failpoint, should not block the shutdown
env.pageserver.stop()
timeline_path = (
Path("tenants") / str(env.initial_tenant) / "timelines" / str(new_branch_timeline_id)
)
local_metadata = env.repo_dir / timeline_path / "metadata"
assert local_metadata.is_file(), "timeout cancelled timeline branching, not the upload"
# Remove new branch from the remote storage
assert isinstance(env.remote_storage, LocalFsStorage)
new_branch_on_remote_storage = env.remote_storage.root / timeline_path
new_branch_on_remote_storage = (
env.remote_storage.root
/ "tenants"
/ str(env.initial_tenant)
/ "timelines"
/ str(new_branch_timeline_id)
)
assert (
not new_branch_on_remote_storage.exists()
), "failpoint should had prohibited index_part.json upload"
new_branch_on_remote_storage.is_dir()
), f"'{new_branch_on_remote_storage}' path does not exist on the remote storage"
shutil.rmtree(new_branch_on_remote_storage)
# during reconciliation we should had scheduled the uploads and on the
# retried create_timeline, we will await for those to complete on next
# client.timeline_create
env.pageserver.start(extra_env_vars={"FAILPOINTS": "before-upload-index=return"})
env.pageserver.start()
# sleep a bit to force the upload task go into exponential backoff
time.sleep(1)
q: queue.Queue[Optional[PageserverApiException]] = queue.Queue()
barrier = threading.Barrier(2)
def create_in_background():
barrier.wait()
try:
client.timeline_create(
tenant_id=env.initial_tenant,
ancestor_timeline_id=env.initial_timeline,
new_timeline_id=new_branch_timeline_id,
pg_version=env.pg_version,
)
q.put(None)
except PageserverApiException as e:
q.put(e)
create_thread = threading.Thread(target=create_in_background)
create_thread.start()
try:
# maximize chances of actually waiting for the uploads by create_timeline
barrier.wait()
assert not new_branch_on_remote_storage.exists(), "failpoint should had stopped uploading"
client.configure_failpoints(("before-upload-index", "off"))
conflict = q.get()
assert conflict, "create_timeline should not have succeeded"
assert (
conflict.status_code == 409
), "timeline was created before restart, and uploads scheduled during initial load, so we expect 409 conflict"
assert_nothing_to_upload(client, env.initial_tenant, new_branch_timeline_id)
assert (
new_branch_on_remote_storage / "index_part.json"
).is_file(), "uploads scheduled during initial load should had been awaited for"
finally:
create_thread.join()
wait_upload_queue_empty(client, env.initial_tenant, new_branch_timeline_id)
assert (
new_branch_on_remote_storage.is_dir()
), f"New branch should have been reuploaded on pageserver restart to the remote storage path '{new_branch_on_remote_storage}'"
def wait_upload_queue_empty(
@@ -807,17 +752,4 @@ def get_queued_count(
return int(val)
def assert_nothing_to_upload(
client: PageserverHttpClient,
tenant_id: TenantId,
timeline_id: TimelineId,
):
"""
Check last_record_lsn == remote_consistent_lsn. Assert works only for empty timelines, which
do not have anything to compact or gc.
"""
detail = client.timeline_detail(tenant_id, timeline_id)
assert Lsn(detail["last_record_lsn"]) == Lsn(detail["remote_consistent_lsn"])
# TODO Test that we correctly handle GC of files that are stuck in upload queue.

View File

@@ -685,10 +685,12 @@ def test_load_attach_negatives(
pageserver_http.tenant_ignore(tenant_id)
env.pageserver.allowed_errors.append(".*tenant directory already exists.*")
env.pageserver.allowed_errors.append(
".*Cannot attach tenant .*?, local tenant directory already exists.*"
)
with pytest.raises(
expected_exception=PageserverApiException,
match="tenant directory already exists",
match=f"Cannot attach tenant {tenant_id}, local tenant directory already exists",
):
pageserver_http.tenant_attach(tenant_id)
@@ -732,10 +734,12 @@ def test_ignore_while_attaching(
pageserver_http.tenant_ignore(tenant_id)
# Cannot attach it due to some local files existing
env.pageserver.allowed_errors.append(".*tenant directory already exists.*")
env.pageserver.allowed_errors.append(
".*Cannot attach tenant .*?, local tenant directory already exists.*"
)
with pytest.raises(
expected_exception=PageserverApiException,
match="tenant directory already exists",
match=f"Cannot attach tenant {tenant_id}, local tenant directory already exists",
):
pageserver_http.tenant_attach(tenant_id)

View File

@@ -23,6 +23,7 @@ from fixtures.pageserver.utils import (
tenant_exists,
wait_for_last_record_lsn,
wait_for_upload,
wait_for_upload_queue_empty,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import (
@@ -545,7 +546,7 @@ def test_emergency_relocate_with_branches_slow_replay(
# - A logical replication message between the inserts, so that we can conveniently
# pause the WAL ingestion between the two inserts.
# - Child branch, created after the inserts
tenant_id, _ = env.neon_cli.create_tenant()
tenant_id, timeline_id = env.neon_cli.create_tenant()
main_endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
with main_endpoint.cursor() as cur:
@@ -558,7 +559,14 @@ def test_emergency_relocate_with_branches_slow_replay(
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
main_endpoint.stop()
env.neon_cli.create_branch("child", tenant_id=tenant_id, ancestor_start_lsn=current_lsn)
child_timeline_id = env.neon_cli.create_branch(
"child", tenant_id=tenant_id, ancestor_start_lsn=current_lsn
)
# Wait for the index_part.json file of both branches to be uploaded to remote storage.
# This is a work around for issue https://github.com/neondatabase/neon/issues/3865.
wait_for_upload_queue_empty(pageserver_http, tenant_id, timeline_id)
wait_for_upload_queue_empty(pageserver_http, tenant_id, child_timeline_id)
# Now kill the pageserver, remove the tenant directory, and restart. This simulates
# the scenario that a pageserver dies unexpectedly and cannot be recovered, so we relocate
@@ -696,7 +704,7 @@ def test_emergency_relocate_with_branches_createdb(
pageserver_http = env.pageserver.http_client()
# create new nenant
tenant_id, _ = env.neon_cli.create_tenant()
tenant_id, timeline_id = env.neon_cli.create_tenant()
main_endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
with main_endpoint.cursor() as cur:
@@ -704,7 +712,9 @@ def test_emergency_relocate_with_branches_createdb(
cur.execute("CREATE DATABASE neondb")
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
env.neon_cli.create_branch("child", tenant_id=tenant_id, ancestor_start_lsn=current_lsn)
child_timeline_id = env.neon_cli.create_branch(
"child", tenant_id=tenant_id, ancestor_start_lsn=current_lsn
)
with main_endpoint.cursor(dbname="neondb") as cur:
cur.execute("CREATE TABLE test_migrate_one AS SELECT generate_series(1,100)")
@@ -715,6 +725,11 @@ def test_emergency_relocate_with_branches_createdb(
cur.execute("CREATE TABLE test_migrate_one AS SELECT generate_series(1,200)")
child_endpoint.stop()
# Wait for the index_part.json file of both branches to be uploaded to remote storage.
# This is a work around for issue https://github.com/neondatabase/neon/issues/3865.
wait_for_upload_queue_empty(pageserver_http, tenant_id, timeline_id)
wait_for_upload_queue_empty(pageserver_http, tenant_id, child_timeline_id)
# Kill the pageserver, remove the tenant directory, and restart
env.pageserver.stop(immediate=True)
shutil.rmtree(Path(env.repo_dir) / "tenants" / str(tenant_id))

View File

@@ -217,16 +217,6 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder):
labels = ",".join([f'{key}="{value}"' for key, value in sample.labels.items()])
log.info(f"{sample.name}{{{labels}}} {sample.value}")
# Test that we gather tenant create metric
storage_operation_metrics = [
"pageserver_storage_operations_seconds_global_bucket",
"pageserver_storage_operations_seconds_global_sum",
"pageserver_storage_operations_seconds_global_count",
]
for metric in storage_operation_metrics:
value = ps_metrics.query_all(metric, filter={"operation": "create tenant"})
assert value
@pytest.mark.parametrize(
"remote_storage_kind",