Compare commits

..

13 Commits

Author SHA1 Message Date
Konstantin Knizhnik
f9dbed33a7 Replace not-thread safe LWlocks in LFC with pthread sync primitives 2025-02-21 08:43:43 +02:00
Konstantin Knizhnik
d677e85a14 Undo atomics optimization 2025-02-20 14:53:08 +02:00
Konstantin Knizhnik
598463bffe Fix issues with prtefetch 2025-02-20 10:16:54 +02:00
Konstantin Knizhnik
6d84c4057d Fix issues with prtefetch 2025-02-20 10:14:51 +02:00
Konstantin Knizhnik
43497cd7c0 Save getpage result in LFC 2025-02-19 17:58:21 +02:00
Konstantin Knizhnik
bc07358034 First working version 2025-02-19 16:59:07 +02:00
Konstantin Knizhnik
426101e38f Use pthread cond instead of Postgres latch for notification of coordinator 2025-02-18 19:19:32 +02:00
Konstantin Knizhnik
914cc529ba Fix more bugs 2025-02-18 10:18:13 +02:00
Konstantin Knizhnik
06ebb42db5 Bug fixing 2025-02-17 22:32:49 +02:00
Konstantin Knizhnik
93c92eb170 Fix bugs 2025-02-16 21:46:58 +02:00
Konstantin Knizhnik
bd67d1b7c8 Fix bugs 2025-02-16 21:46:44 +02:00
Konstantin Knizhnik
cfb1b63351 Fix bugs 2025-02-16 21:46:24 +02:00
Konstantin Knizhnik
782ad8fda4 Impoleent PS communicator 2025-02-16 18:14:34 +02:00
161 changed files with 2795 additions and 7455 deletions

View File

@@ -28,7 +28,3 @@ config-variables:
- DEV_AWS_OIDC_ROLE_MANAGE_BENCHMARK_EC2_VMS_ARN
- SLACK_ON_CALL_STORAGE_STAGING_STREAM
- SLACK_CICD_CHANNEL_ID
- SLACK_STORAGE_CHANNEL_ID
- NEON_DEV_AWS_ACCOUNT_ID
- NEON_PROD_AWS_ACCOUNT_ID
- AWS_ECR_REGION

View File

@@ -19,11 +19,7 @@ inputs:
default: '[1, 1]'
# settings below only needed if you want the project to be sharded from the beginning
shard_split_project:
description: 'by default new projects are not shard-split initiailly, but only when shard-split threshold is reached, specify true to explicitly shard-split initially'
required: false
default: 'false'
disable_sharding:
description: 'by default new projects use storage controller default policy to shard-split when shard-split threshold is reached, specify true to explicitly disable sharding'
description: 'by default new projects are not shard-split, specify true to shard-split'
required: false
default: 'false'
admin_api_key:
@@ -111,21 +107,6 @@ runs:
-H "Accept: application/json" -H "Content-Type: application/json" -H "Authorization: Bearer ${ADMIN_API_KEY}" \
-d "{\"new_shard_count\": $SHARD_COUNT, \"new_stripe_size\": $STRIPE_SIZE}"
fi
if [ "${DISABLE_SHARDING}" = "true" ]; then
# determine tenant ID
TENANT_ID=`${PSQL} ${dsn} -t -A -c "SHOW neon.tenant_id"`
echo "Explicitly disabling shard-splitting for project ${project_id} with tenant_id ${TENANT_ID}"
echo "Sending PUT request to https://${API_HOST}/regions/${REGION_ID}/api/v1/admin/storage/proxy/control/v1/tenant/${TENANT_ID}/policy"
echo "with body {\"scheduling\": \"Essential\"}"
# we need an ADMIN API KEY to invoke storage controller API for shard splitting (bash -u above checks that the variable is set)
curl -X PUT \
"https://${API_HOST}/regions/${REGION_ID}/api/v1/admin/storage/proxy/control/v1/tenant/${TENANT_ID}/policy" \
-H "Accept: application/json" -H "Content-Type: application/json" -H "Authorization: Bearer ${ADMIN_API_KEY}" \
-d "{\"scheduling\": \"Essential\"}"
fi
env:
API_HOST: ${{ inputs.api_host }}
@@ -135,7 +116,6 @@ runs:
MIN_CU: ${{ fromJSON(inputs.compute_units)[0] }}
MAX_CU: ${{ fromJSON(inputs.compute_units)[1] }}
SHARD_SPLIT_PROJECT: ${{ inputs.shard_split_project }}
DISABLE_SHARDING: ${{ inputs.disable_sharding }}
ADMIN_API_KEY: ${{ inputs.admin_api_key }}
SHARD_COUNT: ${{ inputs.shard_count }}
STRIPE_SIZE: ${{ inputs.stripe_size }}

View File

@@ -348,10 +348,6 @@ jobs:
rerun_failed: true
pg_version: ${{ matrix.pg_version }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
# `--session-timeout` is equal to (timeout-minutes - 10 minutes) * 60 seconds.
# Attempt to stop tests gracefully to generate test reports
# until they are forcibly stopped by the stricter `timeout-minutes` limit.
extra_params: --session-timeout=${{ inputs.sanitizers != 'enabled' && 3000 || 10200 }}
env:
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
CHECK_ONDISK_DATA_COMPATIBILITY: nonempty

View File

@@ -2,7 +2,7 @@ name: Push images to Container Registry
on:
workflow_call:
inputs:
# Example: {"docker.io/neondatabase/neon:13196061314":["${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}.dkr.ecr.${{ vars.AWS_ECR_REGION }}.amazonaws.com/neon:13196061314","neoneastus2.azurecr.io/neondatabase/neon:13196061314"]}
# Example: {"docker.io/neondatabase/neon:13196061314":["369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:13196061314","neoneastus2.azurecr.io/neondatabase/neon:13196061314"]}
image-map:
description: JSON map of images, mapping from a source image to an array of target images that should be pushed.
required: true

View File

@@ -68,7 +68,7 @@ jobs:
tag:
needs: [ check-permissions ]
runs-on: [ self-hosted, small ]
container: ${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}.dkr.ecr.${{ vars.AWS_ECR_REGION }}.amazonaws.com/base:pinned
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
outputs:
build-tag: ${{steps.build-tag.outputs.tag}}
@@ -859,17 +859,14 @@ jobs:
BRANCH: "${{ github.ref_name }}"
DEV_ACR: "${{ vars.AZURE_DEV_REGISTRY_NAME }}"
PROD_ACR: "${{ vars.AZURE_PROD_REGISTRY_NAME }}"
DEV_AWS: "${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}"
PROD_AWS: "${{ vars.NEON_PROD_AWS_ACCOUNT_ID }}"
AWS_REGION: "${{ vars.AWS_ECR_REGION }}"
push-neon-image-dev:
needs: [ generate-image-maps, neon-image ]
uses: ./.github/workflows/_push-to-container-registry.yml
with:
image-map: '${{ needs.generate-image-maps.outputs.neon-dev }}'
aws-region: ${{ vars.AWS_ECR_REGION }}
aws-account-ids: "${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}"
aws-region: eu-central-1
aws-account-ids: "369495373322"
azure-client-id: ${{ vars.AZURE_DEV_CLIENT_ID }}
azure-subscription-id: ${{ vars.AZURE_DEV_SUBSCRIPTION_ID }}
azure-tenant-id: ${{ vars.AZURE_TENANT_ID }}
@@ -884,8 +881,8 @@ jobs:
uses: ./.github/workflows/_push-to-container-registry.yml
with:
image-map: '${{ needs.generate-image-maps.outputs.compute-dev }}'
aws-region: ${{ vars.AWS_ECR_REGION }}
aws-account-ids: "${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}"
aws-region: eu-central-1
aws-account-ids: "369495373322"
azure-client-id: ${{ vars.AZURE_DEV_CLIENT_ID }}
azure-subscription-id: ${{ vars.AZURE_DEV_SUBSCRIPTION_ID }}
azure-tenant-id: ${{ vars.AZURE_TENANT_ID }}
@@ -901,8 +898,8 @@ jobs:
uses: ./.github/workflows/_push-to-container-registry.yml
with:
image-map: '${{ needs.generate-image-maps.outputs.neon-prod }}'
aws-region: ${{ vars.AWS_ECR_REGION }}
aws-account-ids: "${{ vars.NEON_PROD_AWS_ACCOUNT_ID }}"
aws-region: eu-central-1
aws-account-ids: "093970136003"
azure-client-id: ${{ vars.AZURE_PROD_CLIENT_ID }}
azure-subscription-id: ${{ vars.AZURE_PROD_SUBSCRIPTION_ID }}
azure-tenant-id: ${{ vars.AZURE_TENANT_ID }}
@@ -918,8 +915,8 @@ jobs:
uses: ./.github/workflows/_push-to-container-registry.yml
with:
image-map: '${{ needs.generate-image-maps.outputs.compute-prod }}'
aws-region: ${{ vars.AWS_ECR_REGION }}
aws-account-ids: "${{ vars.NEON_PROD_AWS_ACCOUNT_ID }}"
aws-region: eu-central-1
aws-account-ids: "093970136003"
azure-client-id: ${{ vars.AZURE_PROD_CLIENT_ID }}
azure-subscription-id: ${{ vars.AZURE_PROD_SUBSCRIPTION_ID }}
azure-tenant-id: ${{ vars.AZURE_TENANT_ID }}
@@ -1032,7 +1029,7 @@ jobs:
statuses: write
contents: write
runs-on: [ self-hosted, small ]
container: ${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}.dkr.ecr.${{ vars.AWS_ECR_REGION }}.amazonaws.com/ansible:latest
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
steps:
- uses: actions/checkout@v4
@@ -1181,22 +1178,6 @@ jobs:
exit 1
fi
notify-storage-release-deploy-failure:
needs: [ deploy ]
# We want this to run even if (transitive) dependencies are skipped, because deploy should really be successful on release branch workflow runs.
if: github.ref_name == 'release' && needs.deploy.result != 'success' && always()
runs-on: ubuntu-22.04
steps:
- name: Post release-deploy failure to team-storage slack channel
uses: slackapi/slack-github-action@v2
with:
method: chat.postMessage
token: ${{ secrets.SLACK_BOT_TOKEN }}
payload: |
channel: ${{ vars.SLACK_STORAGE_CHANNEL_ID }}
text: |
🔴 @oncall-storage: deploy job on release branch had unexpected status "${{ needs.deploy.result }}" <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>.
# The job runs on `release` branch and copies compatibility data and Neon artifact from the last *release PR* to the latest directory
promote-compatibility-data:
needs: [ deploy ]
@@ -1293,7 +1274,7 @@ jobs:
done
pin-build-tools-image:
needs: [ build-build-tools-image, test-images, build-and-test-locally ]
needs: [ build-build-tools-image, push-compute-image-prod, push-neon-image-prod, build-and-test-locally ]
if: github.ref_name == 'main'
uses: ./.github/workflows/pin-build-tools-image.yml
with:

View File

@@ -27,7 +27,7 @@ env:
jobs:
tag:
runs-on: [ self-hosted, small ]
container: ${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}.dkr.ecr.${{ vars.AWS_ECR_REGION }}.amazonaws.com/base:pinned
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
outputs:
build-tag: ${{steps.build-tag.outputs.tag}}

View File

@@ -32,27 +32,18 @@ jobs:
- target_project: new_empty_project_stripe_size_2048
stripe_size: 2048 # 16 MiB
postgres_version: 16
disable_sharding: false
- target_project: new_empty_project_stripe_size_32768
stripe_size: 32768 # 256 MiB # note that this is different from null because using null will shard_split the project only if it reaches the threshold
# while here it is sharded from the beginning with a shard size of 256 MiB
disable_sharding: false
postgres_version: 16
- target_project: new_empty_project
stripe_size: null # run with neon defaults which will shard split only when reaching the threshold
disable_sharding: false
postgres_version: 16
- target_project: new_empty_project
stripe_size: null # run with neon defaults which will shard split only when reaching the threshold
disable_sharding: false
postgres_version: 17
- target_project: large_existing_project
stripe_size: null # cannot re-shared or choose different stripe size for existing, already sharded project
disable_sharding: false
postgres_version: 16
- target_project: new_empty_project_unsharded
stripe_size: null # run with neon defaults which will shard split only when reaching the threshold
disable_sharding: true
postgres_version: 16
max-parallel: 1 # we want to run each stripe size sequentially to be able to compare the results
permissions:
@@ -105,7 +96,6 @@ jobs:
admin_api_key: ${{ secrets.NEON_STAGING_ADMIN_API_KEY }}
shard_count: 8
stripe_size: ${{ matrix.stripe_size }}
disable_sharding: ${{ matrix.disable_sharding }}
- name: Initialize Neon project
if: ${{ startsWith(matrix.target_project, 'new_empty_project') }}

View File

@@ -33,6 +33,10 @@ concurrency:
# No permission for GITHUB_TOKEN by default; the **minimal required** set of permissions should be granted in each job.
permissions: {}
env:
FROM_TAG: ${{ inputs.from-tag }}
TO_TAG: pinned
jobs:
check-manifests:
runs-on: ubuntu-22.04
@@ -42,14 +46,11 @@ jobs:
steps:
- name: Check if we really need to pin the image
id: check-manifests
env:
FROM_TAG: ${{ inputs.from-tag }}
TO_TAG: pinned
run: |
docker manifest inspect "docker.io/neondatabase/build-tools:${FROM_TAG}" > "${FROM_TAG}.json"
docker manifest inspect "docker.io/neondatabase/build-tools:${TO_TAG}" > "${TO_TAG}.json"
docker manifest inspect neondatabase/build-tools:${FROM_TAG} > ${FROM_TAG}.json
docker manifest inspect neondatabase/build-tools:${TO_TAG} > ${TO_TAG}.json
if diff "${FROM_TAG}.json" "${TO_TAG}.json"; then
if diff ${FROM_TAG}.json ${TO_TAG}.json; then
skip=true
else
skip=false
@@ -63,34 +64,55 @@ jobs:
# use format(..) to catch both inputs.force = true AND inputs.force = 'true'
if: needs.check-manifests.outputs.skip == 'false' || format('{0}', inputs.force) == 'true'
permissions:
id-token: write # Required for aws/azure login
runs-on: ubuntu-22.04
uses: ./.github/workflows/_push-to-container-registry.yml
with:
image-map: |
{
"docker.io/neondatabase/build-tools:${{ inputs.from-tag }}-bullseye": [
"docker.io/neondatabase/build-tools:pinned-bullseye",
"${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}.dkr.ecr.${{ vars.AWS_ECR_REGION }}.amazonaws.com/build-tools:pinned-bullseye",
"${{ vars.AZURE_DEV_REGISTRY_NAME }}.azurecr.io/neondatabase/build-tools:pinned-bullseye"
],
"docker.io/neondatabase/build-tools:${{ inputs.from-tag }}-bookworm": [
"docker.io/neondatabase/build-tools:pinned-bookworm",
"docker.io/neondatabase/build-tools:pinned",
"${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}.dkr.ecr.${{ vars.AWS_ECR_REGION }}.amazonaws.com/build-tools:pinned-bookworm",
"${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}.dkr.ecr.${{ vars.AWS_ECR_REGION }}.amazonaws.com/build-tools:pinned",
"${{ vars.AZURE_DEV_REGISTRY_NAME }}.azurecr.io/neondatabase/build-tools:pinned-bookworm",
"${{ vars.AZURE_DEV_REGISTRY_NAME }}.azurecr.io/neondatabase/build-tools:pinned"
]
}
aws-region: ${{ vars.AWS_ECR_REGION }}
aws-account-ids: "${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}"
azure-client-id: ${{ vars.AZURE_DEV_CLIENT_ID }}
azure-subscription-id: ${{ vars.AZURE_DEV_SUBSCRIPTION_ID }}
azure-tenant-id: ${{ vars.AZURE_TENANT_ID }}
acr-registry-name: ${{ vars.AZURE_DEV_REGISTRY_NAME }}
secrets:
aws-role-to-assume: "${{ vars.DEV_AWS_OIDC_ROLE_ARN }}"
docker-hub-username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
docker-hub-password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
permissions:
id-token: write # for `azure/login` and aws auth
steps:
- uses: docker/login-action@v3
with:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 3600
- name: Login to Amazon Dev ECR
uses: aws-actions/amazon-ecr-login@v2
- name: Azure login
uses: azure/login@6c251865b4e6290e7b78be643ea2d005bc51f69a # @v2.1.1
with:
client-id: ${{ secrets.AZURE_DEV_CLIENT_ID }}
tenant-id: ${{ secrets.AZURE_TENANT_ID }}
subscription-id: ${{ secrets.AZURE_DEV_SUBSCRIPTION_ID }}
- name: Login to ACR
run: |
az acr login --name=neoneastus2
- name: Tag build-tools with `${{ env.TO_TAG }}` in Docker Hub, ECR, and ACR
env:
DEFAULT_DEBIAN_VERSION: bookworm
run: |
for debian_version in bullseye bookworm; do
tags=()
tags+=("-t" "neondatabase/build-tools:${TO_TAG}-${debian_version}")
tags+=("-t" "369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:${TO_TAG}-${debian_version}")
tags+=("-t" "neoneastus2.azurecr.io/neondatabase/build-tools:${TO_TAG}-${debian_version}")
if [ "${debian_version}" == "${DEFAULT_DEBIAN_VERSION}" ]; then
tags+=("-t" "neondatabase/build-tools:${TO_TAG}")
tags+=("-t" "369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:${TO_TAG}")
tags+=("-t" "neoneastus2.azurecr.io/neondatabase/build-tools:${TO_TAG}")
fi
docker buildx imagetools create "${tags[@]}" \
neondatabase/build-tools:${FROM_TAG}-${debian_version}
done

View File

@@ -1,41 +0,0 @@
name: Regenerate Postgres Settings
on:
pull_request:
types:
- opened
- synchronize
- reopened
paths:
- pgxn/neon/**.c
- vendor/postgres-v*
- vendor/revisions.json
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref }}
cancel-in-progress: true
permissions:
pull-requests: write
jobs:
regenerate-pg-settings:
runs-on: ubuntu-22.04
steps:
- name: Add comment
uses: thollander/actions-comment-pull-request@v3
with:
comment-tag: ${{ github.job }}
pr-number: ${{ github.event.number }}
message: |
If this PR added a GUC in the Postgres fork or `neon` extension,
please regenerate the Postgres settings in the `cloud` repo:
```
make NEON_WORKDIR=path/to/neon/checkout \
-C goapp/internal/shareddomain/postgres generate
```
If you're an external contributor, a Neon employee will assist in
making sure this step is done.

31
Cargo.lock generated
View File

@@ -786,7 +786,7 @@ dependencies = [
[[package]]
name = "azure_core"
version = "0.21.0"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#f64bd57262ced51afce5d8909c06dcb11a6dd85a"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#c36ed4c039bb3d59b5a1705f2cc337636c73b541"
dependencies = [
"async-trait",
"base64 0.22.1",
@@ -815,7 +815,7 @@ dependencies = [
[[package]]
name = "azure_identity"
version = "0.21.0"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#f64bd57262ced51afce5d8909c06dcb11a6dd85a"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#c36ed4c039bb3d59b5a1705f2cc337636c73b541"
dependencies = [
"async-lock",
"async-trait",
@@ -834,7 +834,7 @@ dependencies = [
[[package]]
name = "azure_storage"
version = "0.21.0"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#f64bd57262ced51afce5d8909c06dcb11a6dd85a"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#c36ed4c039bb3d59b5a1705f2cc337636c73b541"
dependencies = [
"RustyXML",
"async-lock",
@@ -852,7 +852,7 @@ dependencies = [
[[package]]
name = "azure_storage_blobs"
version = "0.21.0"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#f64bd57262ced51afce5d8909c06dcb11a6dd85a"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#c36ed4c039bb3d59b5a1705f2cc337636c73b541"
dependencies = [
"RustyXML",
"azure_core",
@@ -872,7 +872,7 @@ dependencies = [
[[package]]
name = "azure_svc_blobstorage"
version = "0.21.0"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#f64bd57262ced51afce5d8909c06dcb11a6dd85a"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#c36ed4c039bb3d59b5a1705f2cc337636c73b541"
dependencies = [
"azure_core",
"bytes",
@@ -1029,6 +1029,12 @@ dependencies = [
"generic-array",
]
[[package]]
name = "boxcar"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2721c3c5a6f0e7f7e607125d963fedeb765f545f67adc9d71ed934693881eb42"
[[package]]
name = "bstr"
version = "1.5.0"
@@ -1303,7 +1309,6 @@ dependencies = [
"aws-config",
"aws-sdk-kms",
"aws-sdk-s3",
"aws-smithy-types",
"axum",
"base64 0.13.1",
"bytes",
@@ -1316,6 +1321,7 @@ dependencies = [
"flate2",
"futures",
"http 1.1.0",
"jsonwebtoken",
"metrics",
"nix 0.27.1",
"notify",
@@ -1325,6 +1331,7 @@ dependencies = [
"opentelemetry_sdk",
"postgres",
"postgres_initdb",
"prometheus",
"regex",
"remote_storage",
"reqwest",
@@ -1343,13 +1350,13 @@ dependencies = [
"tower 0.5.2",
"tower-http",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
"tracing-utils",
"url",
"utils",
"uuid",
"vm_monitor",
"walkdir",
"workspace_hack",
"zstd",
]
@@ -4922,6 +4929,7 @@ dependencies = [
"aws-sdk-iam",
"aws-sigv4",
"base64 0.13.1",
"boxcar",
"bstr",
"bytes",
"camino",
@@ -4973,6 +4981,7 @@ dependencies = [
"postgres-protocol2",
"postgres_backend",
"pq_proto",
"prometheus",
"rand 0.8.5",
"rand_distr",
"rcgen",
@@ -4997,6 +5006,7 @@ dependencies = [
"smallvec",
"smol_str",
"socket2",
"strum",
"strum_macros",
"subtle",
"thiserror 1.0.69",
@@ -5011,6 +5021,7 @@ dependencies = [
"tracing",
"tracing-log",
"tracing-opentelemetry",
"tracing-serde",
"tracing-subscriber",
"tracing-utils",
"try-lock",
@@ -6465,7 +6476,6 @@ dependencies = [
"strum",
"strum_macros",
"thiserror 1.0.69",
"tikv-jemallocator",
"tokio",
"tokio-postgres",
"tokio-postgres-rustls",
@@ -7019,11 +7029,14 @@ dependencies = [
name = "tokio-postgres2"
version = "0.1.0"
dependencies = [
"async-trait",
"byteorder",
"bytes",
"fallible-iterator",
"futures-util",
"log",
"parking_lot 0.12.1",
"percent-encoding",
"phf",
"pin-project-lite",
"postgres-protocol2",
@@ -7610,13 +7623,13 @@ dependencies = [
"hex",
"hex-literal",
"humantime",
"inferno 0.12.0",
"jsonwebtoken",
"metrics",
"nix 0.27.1",
"once_cell",
"pin-project-lite",
"postgres_connection",
"pprof",
"pq_proto",
"rand 0.8.5",
"regex",

View File

@@ -292,7 +292,7 @@ WORKDIR /home/nonroot
# Rust
# Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`)
ENV RUSTC_VERSION=1.85.0
ENV RUSTC_VERSION=1.84.1
ENV RUSTUP_HOME="/home/nonroot/.rustup"
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
ARG RUSTFILT_VERSION=0.2.1

View File

@@ -148,7 +148,7 @@ RUN case $DEBIAN_VERSION in \
apt install --no-install-recommends --no-install-suggests -y \
ninja-build git autoconf automake libtool build-essential bison flex libreadline-dev \
zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget ca-certificates pkg-config libssl-dev \
libicu-dev libxslt1-dev liblz4-dev libzstd-dev zstd curl unzip g++ \
libicu-dev libxslt1-dev liblz4-dev libzstd-dev zstd curl unzip \
$VERSION_INSTALLS \
&& apt clean && rm -rf /var/lib/apt/lists/*
@@ -1464,31 +1464,6 @@ RUN make release -j $(getconf _NPROCESSORS_ONLN) && \
make install -j $(getconf _NPROCESSORS_ONLN) && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_mooncake.control
#########################################################################################
#
# Layer "pg-duckdb-pg-build"
# compile pg_duckdb extension
#
#########################################################################################
FROM build-deps AS pg_duckdb-src
WORKDIR /ext-src
COPY compute/patches/pg_duckdb_v031.patch .
# pg_duckdb build requires source dir to be a git repo to get submodules
# allow neon_superuser to execute some functions that in pg_duckdb are available to superuser only:
# - extension management function duckdb.install_extension()
# - access to duckdb.extensions table and its sequence
RUN git clone --depth 1 --branch v0.3.1 https://github.com/duckdb/pg_duckdb.git pg_duckdb-src && \
cd pg_duckdb-src && \
git submodule update --init --recursive && \
patch -p1 < /ext-src/pg_duckdb_v031.patch
FROM pg-build AS pg_duckdb-build
ARG PG_VERSION
COPY --from=pg_duckdb-src /ext-src/ /ext-src/
WORKDIR /ext-src/pg_duckdb-src
RUN make install -j $(getconf _NPROCESSORS_ONLN) && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_duckdb.control
#########################################################################################
#
# Layer "pg_repack"
@@ -1509,73 +1484,6 @@ WORKDIR /ext-src/pg_repack-src
RUN make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install
#########################################################################################
#
# Layer "pgaudit"
# compile pgaudit extension
#
#########################################################################################
FROM build-deps AS pgaudit-src
ARG PG_VERSION
WORKDIR /ext-src
RUN case "${PG_VERSION}" in \
"v14") \
export PGAUDIT_VERSION=1.6.2 \
export PGAUDIT_CHECKSUM=1f350d70a0cbf488c0f2b485e3a5c9b11f78ad9e3cbb95ef6904afa1eb3187eb \
;; \
"v15") \
export PGAUDIT_VERSION=1.7.0 \
export PGAUDIT_CHECKSUM=8f4a73e451c88c567e516e6cba7dc1e23bc91686bb6f1f77f8f3126d428a8bd8 \
;; \
"v16") \
export PGAUDIT_VERSION=16.0 \
export PGAUDIT_CHECKSUM=d53ef985f2d0b15ba25c512c4ce967dce07b94fd4422c95bd04c4c1a055fe738 \
;; \
"v17") \
export PGAUDIT_VERSION=17.0 \
export PGAUDIT_CHECKSUM=7d0d08d030275d525f36cd48b38c6455f1023da863385badff0cec44965bfd8c \
;; \
*) \
echo "pgaudit is not supported on this PostgreSQL version" && exit 1;; \
esac && \
wget https://github.com/pgaudit/pgaudit/archive/refs/tags/${PGAUDIT_VERSION}.tar.gz -O pgaudit.tar.gz && \
echo "${PGAUDIT_CHECKSUM} pgaudit.tar.gz" | sha256sum --check && \
mkdir pgaudit-src && cd pgaudit-src && tar xzf ../pgaudit.tar.gz --strip-components=1 -C .
FROM pg-build AS pgaudit-build
COPY --from=pgaudit-src /ext-src/ /ext-src/
WORKDIR /ext-src/pgaudit-src
RUN make install USE_PGXS=1 -j $(getconf _NPROCESSORS_ONLN)
#########################################################################################
#
# Layer "pgauditlogtofile"
# compile pgauditlogtofile extension
#
#########################################################################################
FROM build-deps AS pgauditlogtofile-src
ARG PG_VERSION
WORKDIR /ext-src
RUN case "${PG_VERSION}" in \
"v14" | "v15" | "v16" | "v17") \
export PGAUDITLOGTOFILE_VERSION=v1.6.4 \
export PGAUDITLOGTOFILE_CHECKSUM=ef801eb09c26aaa935c0dabd92c81eb9ebe338930daa9674d420a280c6bc2d70 \
;; \
*) \
echo "pgauditlogtofile is not supported on this PostgreSQL version" && exit 1;; \
esac && \
wget https://github.com/fmbiete/pgauditlogtofile/archive/refs/tags/${PGAUDITLOGTOFILE_VERSION}.tar.gz -O pgauditlogtofile.tar.gz && \
echo "${PGAUDITLOGTOFILE_CHECKSUM} pgauditlogtofile.tar.gz" | sha256sum --check && \
mkdir pgauditlogtofile-src && cd pgauditlogtofile-src && tar xzf ../pgauditlogtofile.tar.gz --strip-components=1 -C .
FROM pg-build AS pgauditlogtofile-build
COPY --from=pgauditlogtofile-src /ext-src/ /ext-src/
WORKDIR /ext-src/pgauditlogtofile-src
RUN make install USE_PGXS=1 -j $(getconf _NPROCESSORS_ONLN)
#########################################################################################
#
# Layer "neon-ext-build"
@@ -1669,14 +1577,7 @@ COPY --from=pg_anon-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_ivm-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_partman-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_mooncake-build /usr/local/pgsql/ /usr/local/pgsql/
# Disabled temporarily, because it clashed with pg_mooncake. pg_mooncake
# also depends on libduckdb, but a different version.
#COPY --from=pg_duckdb-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_repack-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pgaudit-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pgauditlogtofile-build /usr/local/pgsql/ /usr/local/pgsql/
#########################################################################################
#
@@ -1768,6 +1669,29 @@ RUN if [ "$TARGETARCH" = "amd64" ]; then\
&& echo "${pgbouncer_exporter_sha256} pgbouncer_exporter" | sha256sum -c -\
&& echo "${sql_exporter_sha256} sql_exporter" | sha256sum -c -
#########################################################################################
#
# Layer "awscli"
#
#########################################################################################
FROM build-deps AS awscli
ARG TARGETARCH
RUN set -ex; \
if [ "${TARGETARCH}" = "amd64" ]; then \
TARGETARCH_ALT="x86_64"; \
CHECKSUM="c9a9df3770a3ff9259cb469b6179e02829687a464e0824d5c32d378820b53a00"; \
elif [ "${TARGETARCH}" = "arm64" ]; then \
TARGETARCH_ALT="aarch64"; \
CHECKSUM="8181730be7891582b38b028112e81b4899ca817e8c616aad807c9e9d1289223a"; \
else \
echo "Unsupported architecture: ${TARGETARCH}"; exit 1; \
fi; \
curl --retry 5 -L "https://awscli.amazonaws.com/awscli-exe-linux-${TARGETARCH_ALT}-2.17.5.zip" -o /tmp/awscliv2.zip; \
echo "${CHECKSUM} /tmp/awscliv2.zip" | sha256sum -c -; \
unzip /tmp/awscliv2.zip -d /tmp/awscliv2; \
/tmp/awscliv2/aws/install; \
rm -rf /tmp/awscliv2.zip /tmp/awscliv2
#########################################################################################
#
# Clean up postgres folder before inclusion
@@ -1848,20 +1772,14 @@ COPY --from=pg_semver-src /ext-src/ /ext-src/
COPY --from=pg_ivm-src /ext-src/ /ext-src/
COPY --from=pg_partman-src /ext-src/ /ext-src/
#COPY --from=pg_mooncake-src /ext-src/ /ext-src/
COPY --from=pg_repack-src /ext-src/ /ext-src/
COPY --from=pg_repack-build /usr/local/pgsql/ /usr/local/pgsql/
COPY compute/patches/pg_repack.patch /ext-src
RUN cd /ext-src/pg_repack-src && patch -p1 </ext-src/pg_repack.patch && rm -f /ext-src/pg_repack.patch
#COPY --from=pg_repack-src /ext-src/ /ext-src/
COPY --chmod=755 docker-compose/run-tests.sh /run-tests.sh
RUN apt-get update && apt-get install -y libtap-parser-sourcehandler-pgtap-perl\
&& apt clean && rm -rf /ext-src/*.tar.gz /var/lib/apt/lists/*
ENV PATH=/usr/local/pgsql/bin:$PATH
ENV PGHOST=compute
ENV PGPORT=55433
ENV PGUSER=cloud_admin
ENV PGDATABASE=postgres
ENV PG_VERSION=${PG_VERSION:?}
#########################################################################################
#
@@ -1943,6 +1861,9 @@ RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
mkdir /usr/local/download_extensions && \
chown -R postgres:postgres /usr/local/download_extensions
# aws cli is used by fast_import
COPY --from=awscli /usr/local/aws-cli /usr/local/aws-cli
# pgbouncer and its config
COPY --from=pgbouncer /usr/local/pgbouncer/bin/pgbouncer /usr/local/bin/pgbouncer
COPY --chmod=0666 --chown=postgres compute/etc/pgbouncer.ini /etc/pgbouncer.ini

View File

@@ -1,11 +0,0 @@
diff --git a/sql/pg_duckdb--0.2.0--0.3.0.sql b/sql/pg_duckdb--0.2.0--0.3.0.sql
index d777d76..af60106 100644
--- a/sql/pg_duckdb--0.2.0--0.3.0.sql
+++ b/sql/pg_duckdb--0.2.0--0.3.0.sql
@@ -1056,3 +1056,6 @@ GRANT ALL ON FUNCTION duckdb.cache(TEXT, TEXT) TO PUBLIC;
GRANT ALL ON FUNCTION duckdb.cache_info() TO PUBLIC;
GRANT ALL ON FUNCTION duckdb.cache_delete(TEXT) TO PUBLIC;
GRANT ALL ON PROCEDURE duckdb.recycle_ddb() TO PUBLIC;
+GRANT ALL ON FUNCTION duckdb.install_extension(TEXT) TO neon_superuser;
+GRANT ALL ON TABLE duckdb.extensions TO neon_superuser;
+GRANT ALL ON SEQUENCE duckdb.extensions_table_seq TO neon_superuser;

View File

@@ -1,72 +0,0 @@
diff --git a/regress/Makefile b/regress/Makefile
index bf6edcb..89b4c7f 100644
--- a/regress/Makefile
+++ b/regress/Makefile
@@ -17,7 +17,7 @@ INTVERSION := $(shell echo $$(($$(echo $(VERSION).0 | sed 's/\([[:digit:]]\{1,\}
# Test suite
#
-REGRESS := init-extension repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper tablespace get_order_by trigger
+REGRESS := init-extension repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper get_order_by trigger
USE_PGXS = 1 # use pgxs if not in contrib directory
PGXS := $(shell $(PG_CONFIG) --pgxs)
diff --git a/regress/expected/nosuper.out b/regress/expected/nosuper.out
index 8d0a94e..63b68bf 100644
--- a/regress/expected/nosuper.out
+++ b/regress/expected/nosuper.out
@@ -4,22 +4,22 @@
SET client_min_messages = error;
DROP ROLE IF EXISTS nosuper;
SET client_min_messages = warning;
-CREATE ROLE nosuper WITH LOGIN;
+CREATE ROLE nosuper WITH LOGIN PASSWORD 'NoSuPeRpAsSwOrD';
-- => OK
\! pg_repack --dbname=contrib_regression --table=tbl_cluster --no-superuser-check
INFO: repacking table "public.tbl_cluster"
-- => ERROR
-\! pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper
+\! PGPASSWORD=NoSuPeRpAsSwOrD pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper
ERROR: pg_repack failed with error: You must be a superuser to use pg_repack
-- => ERROR
-\! pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper --no-superuser-check
+\! PGPASSWORD=NoSuPeRpAsSwOrD pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper --no-superuser-check
ERROR: pg_repack failed with error: ERROR: permission denied for schema repack
LINE 1: select repack.version(), repack.version_sql()
^
GRANT ALL ON ALL TABLES IN SCHEMA repack TO nosuper;
GRANT USAGE ON SCHEMA repack TO nosuper;
-- => ERROR
-\! pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper --no-superuser-check
+\! PGPASSWORD=NoSuPeRpAsSwOrD pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper --no-superuser-check
INFO: repacking table "public.tbl_cluster"
ERROR: query failed: ERROR: current transaction is aborted, commands ignored until end of transaction block
DETAIL: query was: RESET lock_timeout
diff --git a/regress/sql/nosuper.sql b/regress/sql/nosuper.sql
index 072f0fa..dbe60f8 100644
--- a/regress/sql/nosuper.sql
+++ b/regress/sql/nosuper.sql
@@ -4,19 +4,19 @@
SET client_min_messages = error;
DROP ROLE IF EXISTS nosuper;
SET client_min_messages = warning;
-CREATE ROLE nosuper WITH LOGIN;
+CREATE ROLE nosuper WITH LOGIN PASSWORD 'NoSuPeRpAsSwOrD';
-- => OK
\! pg_repack --dbname=contrib_regression --table=tbl_cluster --no-superuser-check
-- => ERROR
-\! pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper
+\! PGPASSWORD=NoSuPeRpAsSwOrD pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper
-- => ERROR
-\! pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper --no-superuser-check
+\! PGPASSWORD=NoSuPeRpAsSwOrD pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper --no-superuser-check
GRANT ALL ON ALL TABLES IN SCHEMA repack TO nosuper;
GRANT USAGE ON SCHEMA repack TO nosuper;
-- => ERROR
-\! pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper --no-superuser-check
+\! PGPASSWORD=NoSuPeRpAsSwOrD pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper --no-superuser-check
REVOKE ALL ON ALL TABLES IN SCHEMA repack FROM nosuper;
REVOKE USAGE ON SCHEMA repack FROM nosuper;

View File

@@ -47,9 +47,7 @@ files:
# Allow postgres user (which is what compute_ctl runs as) to run /neonvm/bin/resize-swap
# and /neonvm/bin/set-disk-quota as root without requiring entering a password (NOPASSWD),
# regardless of hostname (ALL)
#
# Also allow it to shut down the VM. The fast_import job does that when it's finished.
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota
- filename: cgconfig.conf
content: |
# Configuration for cgroups in VM compute nodes

View File

@@ -14,7 +14,6 @@ base64.workspace = true
aws-config.workspace = true
aws-sdk-s3.workspace = true
aws-sdk-kms.workspace = true
aws-smithy-types.workspace = true
anyhow.workspace = true
axum = { workspace = true, features = [] }
camino.workspace = true
@@ -25,6 +24,7 @@ fail.workspace = true
flate2.workspace = true
futures.workspace = true
http.workspace = true
jsonwebtoken.workspace = true
metrics.workspace = true
nix.workspace = true
notify.workspace = true
@@ -47,12 +47,13 @@ tokio-postgres.workspace = true
tokio-util.workspace = true
tokio-stream.workspace = true
tracing.workspace = true
tracing-opentelemetry.workspace = true
tracing-subscriber.workspace = true
tracing-utils.workspace = true
thiserror.workspace = true
url.workspace = true
uuid.workspace = true
walkdir.workspace = true
prometheus.workspace = true
postgres_initdb.workspace = true
compute_api.workspace = true

View File

@@ -41,6 +41,7 @@ use std::process::exit;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::sync::{mpsc, Arc, Condvar, Mutex, RwLock};
use std::time::SystemTime;
use std::{thread, time::Duration};
use anyhow::{Context, Result};
@@ -85,6 +86,19 @@ fn parse_remote_ext_config(arg: &str) -> Result<String> {
}
}
/// Generate a compute ID if one is not supplied. This exists to keep forward
/// compatibility tests working, but will be removed in a future iteration.
fn generate_compute_id() -> String {
let now = SystemTime::now();
format!(
"compute-{}",
now.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
)
}
#[derive(Parser)]
#[command(rename_all = "kebab-case")]
struct Cli {
@@ -98,13 +112,16 @@ struct Cli {
/// outside the compute will talk to the compute through this port. Keep
/// the previous name for this argument around for a smoother release
/// with the control plane.
#[arg(long, default_value_t = 3080)]
///
/// TODO: Remove the alias after the control plane release which teaches the
/// control plane about the renamed argument.
#[arg(long, alias = "http-port", default_value_t = 3080)]
pub external_http_port: u16,
/// The port to bind the internal listening HTTP server to. Clients include
/// The port to bind the internal listening HTTP server to. Clients like
/// the neon extension (for installing remote extensions) and local_proxy.
#[arg(long, default_value_t = 3081)]
pub internal_http_port: u16,
#[arg(long)]
pub internal_http_port: Option<u16>,
#[arg(short = 'D', long, value_name = "DATADIR")]
pub pgdata: String,
@@ -139,7 +156,7 @@ struct Cli {
#[arg(short = 'S', long, group = "spec-path")]
pub spec_path: Option<OsString>,
#[arg(short = 'i', long, group = "compute-id")]
#[arg(short = 'i', long, group = "compute-id", default_value = generate_compute_id())]
pub compute_id: String,
#[arg(short = 'p', long, conflicts_with_all = ["spec", "spec-path"], value_name = "CONTROL_PLANE_API_BASE_URL")]
@@ -342,7 +359,7 @@ fn wait_spec(
pgbin: cli.pgbin.clone(),
pgversion: get_pg_version_string(&cli.pgbin),
external_http_port: cli.external_http_port,
internal_http_port: cli.internal_http_port,
internal_http_port: cli.internal_http_port.unwrap_or(cli.external_http_port + 1),
live_config_allowed,
state: Mutex::new(new_state),
state_changed: Condvar::new(),
@@ -366,7 +383,7 @@ fn wait_spec(
// The internal HTTP server could be launched later, but there isn't much
// sense in waiting.
Server::Internal(cli.internal_http_port).launch(&compute);
Server::Internal(cli.internal_http_port.unwrap_or(cli.external_http_port + 1)).launch(&compute);
if !spec_set {
// No spec provided, hang waiting for it.

View File

@@ -25,10 +25,10 @@
//! docker push localhost:3030/localregistry/compute-node-v14:latest
//! ```
use anyhow::{bail, Context};
use anyhow::Context;
use aws_config::BehaviorVersion;
use camino::{Utf8Path, Utf8PathBuf};
use clap::{Parser, Subcommand};
use clap::Parser;
use compute_tools::extension_server::{get_pg_version, PostgresMajorVersion};
use nix::unistd::Pid;
use tracing::{error, info, info_span, warn, Instrument};
@@ -44,59 +44,32 @@ mod s3_uri;
const PG_WAIT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(600);
const PG_WAIT_RETRY_INTERVAL: std::time::Duration = std::time::Duration::from_millis(300);
#[derive(Subcommand, Debug)]
enum Command {
/// Runs local postgres (neon binary), restores into it,
/// uploads pgdata to s3 to be consumed by pageservers
Pgdata {
/// Raw connection string to the source database. Used only in tests,
/// real scenario uses encrypted connection string in spec.json from s3.
#[clap(long)]
source_connection_string: Option<String>,
/// If specified, will not shut down the local postgres after the import. Used in local testing
#[clap(short, long)]
interactive: bool,
/// Port to run postgres on. Default is 5432.
#[clap(long, default_value_t = 5432)]
pg_port: u16, // port to run postgres on, 5432 is default
/// Number of CPUs in the system. This is used to configure # of
/// parallel worker processes, for index creation.
#[clap(long, env = "NEON_IMPORTER_NUM_CPUS")]
num_cpus: Option<usize>,
/// Amount of RAM in the system. This is used to configure shared_buffers
/// and maintenance_work_mem.
#[clap(long, env = "NEON_IMPORTER_MEMORY_MB")]
memory_mb: Option<usize>,
},
/// Runs pg_dump-pg_restore from source to destination without running local postgres.
DumpRestore {
/// Raw connection string to the source database. Used only in tests,
/// real scenario uses encrypted connection string in spec.json from s3.
#[clap(long)]
source_connection_string: Option<String>,
/// Raw connection string to the destination database. Used only in tests,
/// real scenario uses encrypted connection string in spec.json from s3.
#[clap(long)]
destination_connection_string: Option<String>,
},
}
#[derive(clap::Parser)]
struct Args {
#[clap(long, env = "NEON_IMPORTER_WORKDIR")]
#[clap(long)]
working_directory: Utf8PathBuf,
#[clap(long, env = "NEON_IMPORTER_S3_PREFIX")]
s3_prefix: Option<s3_uri::S3Uri>,
#[clap(long, env = "NEON_IMPORTER_PG_BIN_DIR")]
#[clap(long)]
source_connection_string: Option<String>,
#[clap(short, long)]
interactive: bool,
#[clap(long)]
pg_bin_dir: Utf8PathBuf,
#[clap(long, env = "NEON_IMPORTER_PG_LIB_DIR")]
#[clap(long)]
pg_lib_dir: Utf8PathBuf,
#[clap(long)]
pg_port: Option<u16>, // port to run postgres on, 5432 is default
#[clap(subcommand)]
command: Command,
/// Number of CPUs in the system. This is used to configure # of
/// parallel worker processes, for index creation.
#[clap(long, env = "NEON_IMPORTER_NUM_CPUS")]
num_cpus: Option<usize>,
/// Amount of RAM in the system. This is used to configure shared_buffers
/// and maintenance_work_mem.
#[clap(long, env = "NEON_IMPORTER_MEMORY_MB")]
memory_mb: Option<usize>,
}
#[serde_with::serde_as]
@@ -105,8 +78,6 @@ struct Spec {
encryption_secret: EncryptionSecret,
#[serde_as(as = "serde_with::base64::Base64")]
source_connstring_ciphertext_base64: Vec<u8>,
#[serde_as(as = "Option<serde_with::base64::Base64>")]
destination_connstring_ciphertext_base64: Option<Vec<u8>>,
}
#[derive(serde::Deserialize)]
@@ -122,150 +93,192 @@ const DEFAULT_LOCALE: &str = if cfg!(target_os = "macos") {
"C.UTF-8"
};
async fn decode_connstring(
kms_client: &aws_sdk_kms::Client,
key_id: &String,
connstring_ciphertext_base64: Vec<u8>,
) -> Result<String, anyhow::Error> {
let mut output = kms_client
.decrypt()
.key_id(key_id)
.ciphertext_blob(aws_sdk_s3::primitives::Blob::new(
connstring_ciphertext_base64,
))
.send()
.await
.context("decrypt connection string")?;
#[tokio::main]
pub(crate) async fn main() -> anyhow::Result<()> {
utils::logging::init(
utils::logging::LogFormat::Plain,
utils::logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
utils::logging::Output::Stdout,
)?;
let plaintext = output
.plaintext
.take()
.context("get plaintext connection string")?;
info!("starting");
String::from_utf8(plaintext.into_inner()).context("parse connection string as utf8")
}
let args = Args::parse();
struct PostgresProcess {
pgdata_dir: Utf8PathBuf,
pg_bin_dir: Utf8PathBuf,
pgbin: Utf8PathBuf,
pg_lib_dir: Utf8PathBuf,
postgres_proc: Option<tokio::process::Child>,
}
impl PostgresProcess {
fn new(pgdata_dir: Utf8PathBuf, pg_bin_dir: Utf8PathBuf, pg_lib_dir: Utf8PathBuf) -> Self {
Self {
pgdata_dir,
pgbin: pg_bin_dir.join("postgres"),
pg_bin_dir,
pg_lib_dir,
postgres_proc: None,
}
// Validate arguments
if args.s3_prefix.is_none() && args.source_connection_string.is_none() {
anyhow::bail!("either s3_prefix or source_connection_string must be specified");
}
if args.s3_prefix.is_some() && args.source_connection_string.is_some() {
anyhow::bail!("only one of s3_prefix or source_connection_string can be specified");
}
async fn prepare(&self, initdb_user: &str) -> Result<(), anyhow::Error> {
tokio::fs::create_dir(&self.pgdata_dir)
.await
.context("create pgdata directory")?;
let working_directory = args.working_directory;
let pg_bin_dir = args.pg_bin_dir;
let pg_lib_dir = args.pg_lib_dir;
let pg_port = args.pg_port.unwrap_or_else(|| {
info!("pg_port not specified, using default 5432");
5432
});
let pg_version = match get_pg_version(self.pgbin.as_ref()) {
PostgresMajorVersion::V14 => 14,
PostgresMajorVersion::V15 => 15,
PostgresMajorVersion::V16 => 16,
PostgresMajorVersion::V17 => 17,
// Initialize AWS clients only if s3_prefix is specified
let (aws_config, kms_client) = if args.s3_prefix.is_some() {
let config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await;
let kms = aws_sdk_kms::Client::new(&config);
(Some(config), Some(kms))
} else {
(None, None)
};
// Get source connection string either from S3 spec or direct argument
let source_connection_string = if let Some(s3_prefix) = &args.s3_prefix {
let spec: Spec = {
let spec_key = s3_prefix.append("/spec.json");
let s3_client = aws_sdk_s3::Client::new(aws_config.as_ref().unwrap());
let object = s3_client
.get_object()
.bucket(&spec_key.bucket)
.key(spec_key.key)
.send()
.await
.context("get spec from s3")?
.body
.collect()
.await
.context("download spec body")?;
serde_json::from_slice(&object.into_bytes()).context("parse spec as json")?
};
postgres_initdb::do_run_initdb(postgres_initdb::RunInitdbArgs {
superuser: initdb_user,
locale: DEFAULT_LOCALE, // XXX: this shouldn't be hard-coded,
pg_version,
initdb_bin: self.pg_bin_dir.join("initdb").as_ref(),
library_search_path: &self.pg_lib_dir, // TODO: is this right? Prob works in compute image, not sure about neon_local.
pgdata: &self.pgdata_dir,
})
match spec.encryption_secret {
EncryptionSecret::KMS { key_id } => {
let mut output = kms_client
.unwrap()
.decrypt()
.key_id(key_id)
.ciphertext_blob(aws_sdk_s3::primitives::Blob::new(
spec.source_connstring_ciphertext_base64,
))
.send()
.await
.context("decrypt source connection string")?;
let plaintext = output
.plaintext
.take()
.context("get plaintext source connection string")?;
String::from_utf8(plaintext.into_inner())
.context("parse source connection string as utf8")?
}
}
} else {
args.source_connection_string.unwrap()
};
match tokio::fs::create_dir(&working_directory).await {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
if !is_directory_empty(&working_directory)
.await
.context("check if working directory is empty")?
{
anyhow::bail!("working directory is not empty");
} else {
// ok
}
}
Err(e) => return Err(anyhow::Error::new(e).context("create working directory")),
}
let pgdata_dir = working_directory.join("pgdata");
tokio::fs::create_dir(&pgdata_dir)
.await
.context("initdb")
}
.context("create pgdata directory")?;
async fn start(
&mut self,
initdb_user: &str,
port: u16,
nproc: usize,
memory_mb: usize,
) -> Result<&tokio::process::Child, anyhow::Error> {
self.prepare(initdb_user).await?;
let pgbin = pg_bin_dir.join("postgres");
let pg_version = match get_pg_version(pgbin.as_ref()) {
PostgresMajorVersion::V14 => 14,
PostgresMajorVersion::V15 => 15,
PostgresMajorVersion::V16 => 16,
PostgresMajorVersion::V17 => 17,
};
let superuser = "cloud_admin"; // XXX: this shouldn't be hard-coded
postgres_initdb::do_run_initdb(postgres_initdb::RunInitdbArgs {
superuser,
locale: DEFAULT_LOCALE, // XXX: this shouldn't be hard-coded,
pg_version,
initdb_bin: pg_bin_dir.join("initdb").as_ref(),
library_search_path: &pg_lib_dir, // TODO: is this right? Prob works in compute image, not sure about neon_local.
pgdata: &pgdata_dir,
})
.await
.context("initdb")?;
// Somewhat arbitrarily, use 10 % of memory for shared buffer cache, 70% for
// maintenance_work_mem (i.e. for sorting during index creation), and leave the rest
// available for misc other stuff that PostgreSQL uses memory for.
let shared_buffers_mb = ((memory_mb as f32) * 0.10) as usize;
let maintenance_work_mem_mb = ((memory_mb as f32) * 0.70) as usize;
// If the caller didn't specify CPU / RAM to use for sizing, default to
// number of CPUs in the system, and pretty arbitrarily, 256 MB of RAM.
let nproc = args.num_cpus.unwrap_or_else(num_cpus::get);
let memory_mb = args.memory_mb.unwrap_or(256);
//
// Launch postgres process
//
let mut proc = tokio::process::Command::new(&self.pgbin)
.arg("-D")
.arg(&self.pgdata_dir)
.args(["-p", &format!("{port}")])
.args(["-c", "wal_level=minimal"])
.args(["-c", &format!("shared_buffers={shared_buffers_mb}MB")])
.args(["-c", "max_wal_senders=0"])
.args(["-c", "fsync=off"])
.args(["-c", "full_page_writes=off"])
.args(["-c", "synchronous_commit=off"])
.args([
"-c",
&format!("maintenance_work_mem={maintenance_work_mem_mb}MB"),
])
.args(["-c", &format!("max_parallel_maintenance_workers={nproc}")])
.args(["-c", &format!("max_parallel_workers={nproc}")])
.args(["-c", &format!("max_parallel_workers_per_gather={nproc}")])
.args(["-c", &format!("max_worker_processes={nproc}")])
.args(["-c", "effective_io_concurrency=100"])
.env_clear()
.env("LD_LIBRARY_PATH", &self.pg_lib_dir)
.env(
"ASAN_OPTIONS",
std::env::var("ASAN_OPTIONS").unwrap_or_default(),
)
.env(
"UBSAN_OPTIONS",
std::env::var("UBSAN_OPTIONS").unwrap_or_default(),
)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.context("spawn postgres")?;
// Somewhat arbitrarily, use 10 % of memory for shared buffer cache, 70% for
// maintenance_work_mem (i.e. for sorting during index creation), and leave the rest
// available for misc other stuff that PostgreSQL uses memory for.
let shared_buffers_mb = ((memory_mb as f32) * 0.10) as usize;
let maintenance_work_mem_mb = ((memory_mb as f32) * 0.70) as usize;
info!("spawned postgres, waiting for it to become ready");
tokio::spawn(
child_stdio_to_log::relay_process_output(proc.stdout.take(), proc.stderr.take())
.instrument(info_span!("postgres")),
);
self.postgres_proc = Some(proc);
Ok(self.postgres_proc.as_ref().unwrap())
}
async fn shutdown(&mut self) -> Result<(), anyhow::Error> {
let proc: &mut tokio::process::Child = self.postgres_proc.as_mut().unwrap();
info!("shutdown postgres");
nix::sys::signal::kill(
Pid::from_raw(i32::try_from(proc.id().unwrap()).expect("convert child pid to i32")),
nix::sys::signal::SIGTERM,
//
// Launch postgres process
//
let mut postgres_proc = tokio::process::Command::new(pgbin)
.arg("-D")
.arg(&pgdata_dir)
.args(["-p", &format!("{pg_port}")])
.args(["-c", "wal_level=minimal"])
.args(["-c", &format!("shared_buffers={shared_buffers_mb}MB")])
.args(["-c", "max_wal_senders=0"])
.args(["-c", "fsync=off"])
.args(["-c", "full_page_writes=off"])
.args(["-c", "synchronous_commit=off"])
.args([
"-c",
&format!("maintenance_work_mem={maintenance_work_mem_mb}MB"),
])
.args(["-c", &format!("max_parallel_maintenance_workers={nproc}")])
.args(["-c", &format!("max_parallel_workers={nproc}")])
.args(["-c", &format!("max_parallel_workers_per_gather={nproc}")])
.args(["-c", &format!("max_worker_processes={nproc}")])
.args([
"-c",
&format!(
"effective_io_concurrency={}",
if cfg!(target_os = "macos") { 0 } else { 100 }
),
])
.env_clear()
.env("LD_LIBRARY_PATH", &pg_lib_dir)
.env(
"ASAN_OPTIONS",
std::env::var("ASAN_OPTIONS").unwrap_or_default(),
)
.context("signal postgres to shut down")?;
proc.wait()
.await
.context("wait for postgres to shut down")
.map(|_| ())
}
}
.env(
"UBSAN_OPTIONS",
std::env::var("UBSAN_OPTIONS").unwrap_or_default(),
)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.context("spawn postgres")?;
info!("spawned postgres, waiting for it to become ready");
tokio::spawn(
child_stdio_to_log::relay_process_output(
postgres_proc.stdout.take(),
postgres_proc.stderr.take(),
)
.instrument(info_span!("postgres")),
);
async fn wait_until_ready(connstring: String, create_dbname: String) {
// Create neondb database in the running postgres
let restore_pg_connstring =
format!("host=localhost port={pg_port} user={superuser} dbname=postgres");
let start_time = std::time::Instant::now();
loop {
@@ -276,12 +289,7 @@ async fn wait_until_ready(connstring: String, create_dbname: String) {
std::process::exit(1);
}
match tokio_postgres::connect(
&connstring.replace("dbname=neondb", "dbname=postgres"),
tokio_postgres::NoTls,
)
.await
{
match tokio_postgres::connect(&restore_pg_connstring, tokio_postgres::NoTls).await {
Ok((client, connection)) => {
// Spawn the connection handling task to maintain the connection
tokio::spawn(async move {
@@ -290,12 +298,9 @@ async fn wait_until_ready(connstring: String, create_dbname: String) {
}
});
match client
.simple_query(format!("CREATE DATABASE {create_dbname};").as_str())
.await
{
match client.simple_query("CREATE DATABASE neondb;").await {
Ok(_) => {
info!("created {} database", create_dbname);
info!("created neondb database");
break;
}
Err(e) => {
@@ -319,16 +324,10 @@ async fn wait_until_ready(connstring: String, create_dbname: String) {
}
}
}
}
async fn run_dump_restore(
workdir: Utf8PathBuf,
pg_bin_dir: Utf8PathBuf,
pg_lib_dir: Utf8PathBuf,
source_connstring: String,
destination_connstring: String,
) -> Result<(), anyhow::Error> {
let dumpdir = workdir.join("dumpdir");
let restore_pg_connstring = restore_pg_connstring.replace("dbname=postgres", "dbname=neondb");
let dumpdir = working_directory.join("dumpdir");
let common_args = [
// schema mapping (prob suffices to specify them on one side)
@@ -357,18 +356,10 @@ async fn run_dump_restore(
.arg("--no-sync")
// POSITIONAL args
// source db (db name included in connection string)
.arg(&source_connstring)
.arg(&source_connection_string)
// how we run it
.env_clear()
.env("LD_LIBRARY_PATH", &pg_lib_dir)
.env(
"ASAN_OPTIONS",
std::env::var("ASAN_OPTIONS").unwrap_or_default(),
)
.env(
"UBSAN_OPTIONS",
std::env::var("UBSAN_OPTIONS").unwrap_or_default(),
)
.kill_on_drop(true)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
@@ -385,31 +376,24 @@ async fn run_dump_restore(
let st = pg_dump.wait().await.context("wait for pg_dump")?;
info!(status=?st, "pg_dump exited");
if !st.success() {
error!(status=%st, "pg_dump failed, restore will likely fail as well");
bail!("pg_dump failed");
warn!(status=%st, "pg_dump failed, restore will likely fail as well");
}
}
// TODO: maybe do it in a streaming way, plenty of internal research done on this already
// TODO: do it in a streaming way, plenty of internal research done on this already
// TODO: do the unlogged table trick
info!("restore from working directory into vanilla postgres");
{
let mut pg_restore = tokio::process::Command::new(pg_bin_dir.join("pg_restore"))
.args(&common_args)
.arg("-d")
.arg(&destination_connstring)
.arg(&restore_pg_connstring)
// POSITIONAL args
.arg(&dumpdir)
// how we run it
.env_clear()
.env("LD_LIBRARY_PATH", &pg_lib_dir)
.env(
"ASAN_OPTIONS",
std::env::var("ASAN_OPTIONS").unwrap_or_default(),
)
.env(
"UBSAN_OPTIONS",
std::env::var("UBSAN_OPTIONS").unwrap_or_default(),
)
.kill_on_drop(true)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
@@ -427,259 +411,48 @@ async fn run_dump_restore(
let st = pg_restore.wait().await.context("wait for pg_restore")?;
info!(status=?st, "pg_restore exited");
if !st.success() {
error!(status=%st, "pg_restore failed, restore will likely fail as well");
bail!("pg_restore failed");
warn!(status=%st, "pg_restore failed, restore will likely fail as well");
}
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn cmd_pgdata(
s3_client: Option<aws_sdk_s3::Client>,
kms_client: Option<aws_sdk_kms::Client>,
maybe_s3_prefix: Option<s3_uri::S3Uri>,
maybe_spec: Option<Spec>,
source_connection_string: Option<String>,
interactive: bool,
pg_port: u16,
workdir: Utf8PathBuf,
pg_bin_dir: Utf8PathBuf,
pg_lib_dir: Utf8PathBuf,
num_cpus: Option<usize>,
memory_mb: Option<usize>,
) -> Result<(), anyhow::Error> {
if maybe_spec.is_none() && source_connection_string.is_none() {
bail!("spec must be provided for pgdata command");
}
if maybe_spec.is_some() && source_connection_string.is_some() {
bail!("only one of spec or source_connection_string can be provided");
}
let source_connection_string = if let Some(spec) = maybe_spec {
match spec.encryption_secret {
EncryptionSecret::KMS { key_id } => {
decode_connstring(
kms_client.as_ref().unwrap(),
&key_id,
spec.source_connstring_ciphertext_base64,
)
.await?
}
}
} else {
source_connection_string.unwrap()
};
let superuser = "cloud_admin";
let destination_connstring = format!(
"host=localhost port={} user={} dbname=neondb",
pg_port, superuser
);
let pgdata_dir = workdir.join("pgdata");
let mut proc = PostgresProcess::new(pgdata_dir.clone(), pg_bin_dir.clone(), pg_lib_dir.clone());
let nproc = num_cpus.unwrap_or_else(num_cpus::get);
let memory_mb = memory_mb.unwrap_or(256);
proc.start(superuser, pg_port, nproc, memory_mb).await?;
wait_until_ready(destination_connstring.clone(), "neondb".to_string()).await;
run_dump_restore(
workdir.clone(),
pg_bin_dir,
pg_lib_dir,
source_connection_string,
destination_connstring,
)
.await?;
// If interactive mode, wait for Ctrl+C
if interactive {
if args.interactive {
info!("Running in interactive mode. Press Ctrl+C to shut down.");
tokio::signal::ctrl_c().await.context("wait for ctrl-c")?;
}
proc.shutdown().await?;
info!("shutdown postgres");
{
nix::sys::signal::kill(
Pid::from_raw(
i32::try_from(postgres_proc.id().unwrap()).expect("convert child pid to i32"),
),
nix::sys::signal::SIGTERM,
)
.context("signal postgres to shut down")?;
postgres_proc
.wait()
.await
.context("wait for postgres to shut down")?;
}
// Only sync if s3_prefix was specified
if let Some(s3_prefix) = maybe_s3_prefix {
if let Some(s3_prefix) = args.s3_prefix {
info!("upload pgdata");
aws_s3_sync::upload_dir_recursive(
s3_client.as_ref().unwrap(),
Utf8Path::new(&pgdata_dir),
&s3_prefix.append("/pgdata/"),
)
.await
.context("sync dump directory to destination")?;
aws_s3_sync::sync(Utf8Path::new(&pgdata_dir), &s3_prefix.append("/pgdata/"))
.await
.context("sync dump directory to destination")?;
info!("write status");
{
let status_dir = workdir.join("status");
let status_dir = working_directory.join("status");
std::fs::create_dir(&status_dir).context("create status directory")?;
let status_file = status_dir.join("pgdata");
std::fs::write(&status_file, serde_json::json!({"done": true}).to_string())
.context("write status file")?;
aws_s3_sync::upload_dir_recursive(
s3_client.as_ref().unwrap(),
&status_dir,
&s3_prefix.append("/status/"),
)
.await
.context("sync status directory to destination")?;
}
}
Ok(())
}
async fn cmd_dumprestore(
kms_client: Option<aws_sdk_kms::Client>,
maybe_spec: Option<Spec>,
source_connection_string: Option<String>,
destination_connection_string: Option<String>,
workdir: Utf8PathBuf,
pg_bin_dir: Utf8PathBuf,
pg_lib_dir: Utf8PathBuf,
) -> Result<(), anyhow::Error> {
let (source_connstring, destination_connstring) = if let Some(spec) = maybe_spec {
match spec.encryption_secret {
EncryptionSecret::KMS { key_id } => {
let source = decode_connstring(
kms_client.as_ref().unwrap(),
&key_id,
spec.source_connstring_ciphertext_base64,
)
.await?;
let dest = if let Some(dest_ciphertext) =
spec.destination_connstring_ciphertext_base64
{
decode_connstring(kms_client.as_ref().unwrap(), &key_id, dest_ciphertext)
.await?
} else {
bail!("destination connection string must be provided in spec for dump_restore command");
};
(source, dest)
}
}
} else {
(
source_connection_string.unwrap(),
if let Some(val) = destination_connection_string {
val
} else {
bail!("destination connection string must be provided for dump_restore command");
},
)
};
run_dump_restore(
workdir,
pg_bin_dir,
pg_lib_dir,
source_connstring,
destination_connstring,
)
.await
}
#[tokio::main]
pub(crate) async fn main() -> anyhow::Result<()> {
utils::logging::init(
utils::logging::LogFormat::Json,
utils::logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
utils::logging::Output::Stdout,
)?;
info!("starting");
let args = Args::parse();
// Initialize AWS clients only if s3_prefix is specified
let (s3_client, kms_client) = if args.s3_prefix.is_some() {
let config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await;
let s3_client = aws_sdk_s3::Client::new(&config);
let kms = aws_sdk_kms::Client::new(&config);
(Some(s3_client), Some(kms))
} else {
(None, None)
};
let spec: Option<Spec> = if let Some(s3_prefix) = &args.s3_prefix {
let spec_key = s3_prefix.append("/spec.json");
let object = s3_client
.as_ref()
.unwrap()
.get_object()
.bucket(&spec_key.bucket)
.key(spec_key.key)
.send()
.await
.context("get spec from s3")?
.body
.collect()
.await
.context("download spec body")?;
serde_json::from_slice(&object.into_bytes()).context("parse spec as json")?
} else {
None
};
match tokio::fs::create_dir(&args.working_directory).await {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
if !is_directory_empty(&args.working_directory)
aws_s3_sync::sync(&status_dir, &s3_prefix.append("/status/"))
.await
.context("check if working directory is empty")?
{
bail!("working directory is not empty");
} else {
// ok
}
}
Err(e) => return Err(anyhow::Error::new(e).context("create working directory")),
}
match args.command {
Command::Pgdata {
source_connection_string,
interactive,
pg_port,
num_cpus,
memory_mb,
} => {
cmd_pgdata(
s3_client,
kms_client,
args.s3_prefix,
spec,
source_connection_string,
interactive,
pg_port,
args.working_directory,
args.pg_bin_dir,
args.pg_lib_dir,
num_cpus,
memory_mb,
)
.await?;
}
Command::DumpRestore {
source_connection_string,
destination_connection_string,
} => {
cmd_dumprestore(
kms_client,
spec,
source_connection_string,
destination_connection_string,
args.working_directory,
args.pg_bin_dir,
args.pg_lib_dir,
)
.await?;
.context("sync status directory to destination")?;
}
}

View File

@@ -1,102 +1,24 @@
use camino::{Utf8Path, Utf8PathBuf};
use tokio::task::JoinSet;
use walkdir::WalkDir;
use anyhow::Context;
use camino::Utf8Path;
use super::s3_uri::S3Uri;
use tracing::{info, warn};
const MAX_PARALLEL_UPLOADS: usize = 10;
/// Upload all files from 'local' to 'remote'
pub(crate) async fn upload_dir_recursive(
s3_client: &aws_sdk_s3::Client,
local: &Utf8Path,
remote: &S3Uri,
) -> anyhow::Result<()> {
// Recursively scan directory
let mut dirwalker = WalkDir::new(local)
.into_iter()
.map(|entry| {
let entry = entry?;
let file_type = entry.file_type();
let path = <&Utf8Path>::try_from(entry.path())?.to_path_buf();
Ok((file_type, path))
})
.filter_map(|e: anyhow::Result<(std::fs::FileType, Utf8PathBuf)>| {
match e {
Ok((file_type, path)) if file_type.is_file() => Some(Ok(path)),
Ok((file_type, _path)) if file_type.is_dir() => {
// The WalkDir iterator will recurse into directories, but we don't want
// to do anything with directories as such. There's no concept of uploading
// an empty directory to S3.
None
}
Ok((file_type, path)) if file_type.is_symlink() => {
// huh, didn't expect a symlink. Can't upload that to S3. Warn and skip.
warn!("cannot upload symlink ({})", path);
None
}
Ok((_file_type, path)) => {
// should not happen
warn!("directory entry has unexpected type ({})", path);
None
}
Err(e) => Some(Err(e)),
}
});
// Spawn upload tasks for each file, keeping MAX_PARALLEL_UPLOADS active in
// parallel.
let mut joinset = JoinSet::new();
loop {
// Could we upload more?
while joinset.len() < MAX_PARALLEL_UPLOADS {
if let Some(full_local_path) = dirwalker.next() {
let full_local_path = full_local_path?;
let relative_local_path = full_local_path
.strip_prefix(local)
.expect("all paths start from the walkdir root");
let remote_path = remote.append(relative_local_path.as_str());
info!(
"starting upload of {} to {}",
&full_local_path, &remote_path
);
let upload_task = upload_file(s3_client.clone(), full_local_path, remote_path);
joinset.spawn(upload_task);
} else {
info!("draining upload tasks");
break;
}
}
// Wait for an upload to complete
if let Some(res) = joinset.join_next().await {
let _ = res?;
} else {
// all done!
break;
}
pub(crate) async fn sync(local: &Utf8Path, remote: &S3Uri) -> anyhow::Result<()> {
let mut builder = tokio::process::Command::new("aws");
builder
.arg("s3")
.arg("sync")
.arg(local.as_str())
.arg(remote.to_string());
let st = builder
.spawn()
.context("spawn aws s3 sync")?
.wait()
.await
.context("wait for aws s3 sync")?;
if st.success() {
Ok(())
} else {
Err(anyhow::anyhow!("aws s3 sync failed"))
}
Ok(())
}
pub(crate) async fn upload_file(
s3_client: aws_sdk_s3::Client,
local_path: Utf8PathBuf,
remote: S3Uri,
) -> anyhow::Result<()> {
use aws_smithy_types::byte_stream::ByteStream;
let stream = ByteStream::from_path(&local_path).await?;
let _result = s3_client
.put_object()
.bucket(remote.bucket)
.key(&remote.key)
.body(stream)
.send()
.await?;
info!("upload of {} to {} finished", &local_path, &remote.key);
Ok(())
}

View File

@@ -2,7 +2,6 @@ DO $$
DECLARE
subname TEXT;
BEGIN
LOCK TABLE pg_subscription IN ACCESS EXCLUSIVE MODE;
FOR subname IN SELECT pg_subscription.subname FROM pg_subscription WHERE subdbid = (SELECT oid FROM pg_database WHERE datname = {datname_str}) LOOP
EXECUTE format('ALTER SUBSCRIPTION %I DISABLE;', subname);
EXECUTE format('ALTER SUBSCRIPTION %I SET (slot_name = NONE);', subname);

View File

@@ -46,8 +46,6 @@ use std::process::Command;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use anyhow::{anyhow, bail, Context, Result};
use compute_api::requests::ConfigurationRequest;
@@ -61,7 +59,6 @@ use nix::sys::signal::Signal;
use pageserver_api::shard::ShardStripeSize;
use reqwest::header::CONTENT_TYPE;
use serde::{Deserialize, Serialize};
use tracing::debug;
use url::Host;
use utils::id::{NodeId, TenantId, TimelineId};
@@ -84,10 +81,8 @@ pub struct EndpointConf {
internal_http_port: u16,
pg_version: u32,
skip_pg_catalog_updates: bool,
reconfigure_concurrency: usize,
drop_subscriptions_before_start: bool,
features: Vec<ComputeFeature>,
cluster: Option<Cluster>,
}
//
@@ -184,9 +179,7 @@ impl ComputeControlPlane {
// we also skip catalog updates in the cloud.
skip_pg_catalog_updates,
drop_subscriptions_before_start,
reconfigure_concurrency: 1,
features: vec![],
cluster: None,
});
ep.create_endpoint_dir()?;
@@ -203,9 +196,7 @@ impl ComputeControlPlane {
pg_version,
skip_pg_catalog_updates,
drop_subscriptions_before_start,
reconfigure_concurrency: 1,
features: vec![],
cluster: None,
})?,
)?;
std::fs::write(
@@ -270,11 +261,8 @@ pub struct Endpoint {
skip_pg_catalog_updates: bool,
drop_subscriptions_before_start: bool,
reconfigure_concurrency: usize,
// Feature flags
features: Vec<ComputeFeature>,
// Cluster settings
cluster: Option<Cluster>,
}
#[derive(PartialEq, Eq)]
@@ -314,8 +302,6 @@ impl Endpoint {
let conf: EndpointConf =
serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
debug!("serialized endpoint conf: {:?}", conf);
Ok(Endpoint {
pg_address: SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), conf.pg_port),
external_http_address: SocketAddr::new(
@@ -333,10 +319,8 @@ impl Endpoint {
tenant_id: conf.tenant_id,
pg_version: conf.pg_version,
skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
reconfigure_concurrency: conf.reconfigure_concurrency,
drop_subscriptions_before_start: conf.drop_subscriptions_before_start,
features: conf.features,
cluster: conf.cluster,
})
}
@@ -623,7 +607,7 @@ impl Endpoint {
};
// Create spec file
let mut spec = ComputeSpec {
let spec = ComputeSpec {
skip_pg_catalog_updates: self.skip_pg_catalog_updates,
format_version: 1.0,
operation_uuid: None,
@@ -656,7 +640,7 @@ impl Endpoint {
Vec::new()
},
settings: None,
postgresql_conf: Some(postgresql_conf.clone()),
postgresql_conf: Some(postgresql_conf),
},
delta_operations: None,
tenant_id: Some(self.tenant_id),
@@ -669,35 +653,9 @@ impl Endpoint {
pgbouncer_settings: None,
shard_stripe_size: Some(shard_stripe_size),
local_proxy_config: None,
reconfigure_concurrency: self.reconfigure_concurrency,
reconfigure_concurrency: 1,
drop_subscriptions_before_start: self.drop_subscriptions_before_start,
};
// this strange code is needed to support respec() in tests
if self.cluster.is_some() {
debug!("Cluster is already set in the endpoint spec, using it");
spec.cluster = self.cluster.clone().unwrap();
debug!("spec.cluster {:?}", spec.cluster);
// fill missing fields again
if create_test_user {
spec.cluster.roles.push(Role {
name: PgIdent::from_str("test").unwrap(),
encrypted_password: None,
options: None,
});
spec.cluster.databases.push(Database {
name: PgIdent::from_str("neondb").unwrap(),
owner: PgIdent::from_str("test").unwrap(),
options: None,
restrict_conn: false,
invalid: false,
});
}
spec.cluster.postgresql_conf = Some(postgresql_conf);
}
let spec_path = self.endpoint_path().join("spec.json");
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
@@ -715,14 +673,18 @@ impl Endpoint {
println!("Also at '{}'", conn_str);
}
let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
//cmd.args([
// "--external-http-port",
// &self.external_http_address.port().to_string(),
//])
//.args([
// "--internal-http-port",
// &self.internal_http_address.port().to_string(),
//])
cmd.args([
"--external-http-port",
"--http-port",
&self.external_http_address.port().to_string(),
])
.args([
"--internal-http-port",
&self.internal_http_address.port().to_string(),
])
.args(["--pgdata", self.pgdata().to_str().unwrap()])
.args(["--connstr", &conn_str])
.args([
@@ -739,16 +701,20 @@ impl Endpoint {
])
// TODO: It would be nice if we generated compute IDs with the same
// algorithm as the real control plane.
.args([
"--compute-id",
&format!(
"compute-{}",
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
),
])
//
// TODO: Add this back when
// https://github.com/neondatabase/neon/pull/10747 is merged.
//
//.args([
// "--compute-id",
// &format!(
// "compute-{}",
// SystemTime::now()
// .duration_since(UNIX_EPOCH)
// .unwrap()
// .as_secs()
// ),
//])
.stdin(std::process::Stdio::null())
.stderr(logfile.try_clone()?)
.stdout(logfile);

View File

@@ -22,7 +22,7 @@ use pageserver_api::{
};
use pageserver_client::mgmt_api::{self};
use reqwest::{Method, StatusCode, Url};
use utils::id::{NodeId, TenantId, TimelineId};
use utils::id::{NodeId, TenantId};
use pageserver_api::controller_api::{
NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
@@ -47,9 +47,6 @@ enum Command {
listen_http_addr: String,
#[arg(long)]
listen_http_port: u16,
#[arg(long)]
listen_https_port: Option<u16>,
#[arg(long)]
availability_zone_id: String,
},
@@ -242,19 +239,6 @@ enum Command {
#[arg(long)]
scheduling_policy: SkSchedulingPolicyArg,
},
/// Downloads any missing heatmap layers for all shard for a given timeline
DownloadHeatmapLayers {
/// Tenant ID or tenant shard ID. When an unsharded tenant ID is specified,
/// the operation is performed on all shards. When a sharded tenant ID is
/// specified, the operation is only performed on the specified shard.
#[arg(long)]
tenant_shard_id: TenantShardId,
#[arg(long)]
timeline_id: TimelineId,
/// Optional: Maximum download concurrency (default is 16)
#[arg(long)]
concurrency: Option<usize>,
},
}
#[derive(Parser)]
@@ -397,7 +381,6 @@ async fn main() -> anyhow::Result<()> {
listen_pg_port,
listen_http_addr,
listen_http_port,
listen_https_port,
availability_zone_id,
} => {
storcon_client
@@ -410,7 +393,6 @@ async fn main() -> anyhow::Result<()> {
listen_pg_port,
listen_http_addr,
listen_http_port,
listen_https_port,
availability_zone_id: AvailabilityZone(availability_zone_id),
}),
)
@@ -1265,24 +1247,6 @@ async fn main() -> anyhow::Result<()> {
String::from(scheduling_policy)
);
}
Command::DownloadHeatmapLayers {
tenant_shard_id,
timeline_id,
concurrency,
} => {
let mut path = format!(
"/v1/tenant/{}/timeline/{}/download_heatmap_layers",
tenant_shard_id, timeline_id,
);
if let Some(c) = concurrency {
path = format!("{path}?concurrency={c}");
}
storcon_client
.dispatch::<(), ()>(Method::POST, path, None)
.await?;
}
}
Ok(())

View File

@@ -77,5 +77,4 @@ echo "Start compute node"
/usr/local/bin/compute_ctl --pgdata /var/db/postgres/compute \
-C "postgresql://cloud_admin@localhost:55433/postgres" \
-b /usr/local/bin/postgres \
--compute-id "compute-$RANDOM" \
-S ${SPEC_FILE}

View File

@@ -81,8 +81,15 @@ for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
[ $EXT_SUCCESS -eq 0 ] && FAILED=$(tail -1 testout.txt | awk '{for(i=1;i<=NF;i++){print "/ext-src/"$i;}}')
[ $CONTRIB_SUCCESS -eq 0 ] && CONTRIB_FAILED=$(tail -1 testout_contrib.txt | awk '{for(i=0;i<=NF;i++){print "/postgres/contrib/"$i;}}')
for d in $FAILED $CONTRIB_FAILED; do
docker exec $TEST_CONTAINER_NAME bash -c 'for file in $(find '"$d"' -name regression.diffs -o -name regression.out); do cat $file; done' || [ $? -eq 1 ]
dn="$(basename $d)"
rm -rf $dn
mkdir $dn
docker cp $TEST_CONTAINER_NAME:$d/regression.diffs $dn || [ $? -eq 1 ]
docker cp $TEST_CONTAINER_NAME:$d/regression.out $dn || [ $? -eq 1 ]
cat $dn/regression.out $dn/regression.diffs || true
rm -rf $dn
done
rm -rf $FAILED
exit 1
fi
fi

View File

@@ -1,5 +0,0 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --use-existing --inputdir=./regress --bindir='/usr/local/pgsql/bin' --dbname=contrib_regression repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper get_order_by trigger

View File

@@ -1,24 +0,0 @@
diff --git a/test/sql/base.sql b/test/sql/base.sql
index 53adb30..2eed91b 100644
--- a/test/sql/base.sql
+++ b/test/sql/base.sql
@@ -2,7 +2,6 @@
BEGIN;
\i test/pgtap-core.sql
-CREATE EXTENSION semver;
SELECT plan(334);
--SELECT * FROM no_plan();
diff --git a/test/sql/corpus.sql b/test/sql/corpus.sql
index c0fe98e..39cdd2e 100644
--- a/test/sql/corpus.sql
+++ b/test/sql/corpus.sql
@@ -4,7 +4,6 @@ BEGIN;
-- Test the SemVer corpus from https://regex101.com/r/Ly7O1x/3/.
\i test/pgtap-core.sql
-CREATE EXTENSION semver;
SELECT plan(76);
--SELECT * FROM no_plan();

View File

@@ -1,7 +1,6 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
patch -p1 <test-upgrade-${PG_VERSION}.patch
psql -d contrib_regression -c "DROP EXTENSION IF EXISTS pgtap"
patch -p1 <test-upgrade.patch
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --use-existing --inputdir=./ --bindir='/usr/local/pgsql/bin' --inputdir=test --dbname=contrib_regression base corpus

View File

@@ -1,16 +1,3 @@
diff --git a/Makefile b/Makefile
index f255fe6..0a0fa65 100644
--- a/Makefile
+++ b/Makefile
@@ -346,7 +346,7 @@ test: test-serial test-parallel
TB_DIR = test/build
GENERATED_SCHEDULE_DEPS = $(TB_DIR)/all_tests $(TB_DIR)/exclude_tests
REGRESS = --schedule $(TB_DIR)/run.sch # Set this again just to be safe
-REGRESS_OPTS = --inputdir=test --max-connections=$(PARALLEL_CONN) --schedule $(SETUP_SCH) $(REGRESS_CONF)
+REGRESS_OPTS = --use-existing --dbname=pgtap_regression --inputdir=test --max-connections=$(PARALLEL_CONN) --schedule $(SETUP_SCH) $(REGRESS_CONF)
SETUP_SCH = test/schedule/main.sch # schedule to use for test setup; this can be forcibly changed by some targets!
IGNORE_TESTS = $(notdir $(EXCLUDE_TEST_FILES:.sql=))
PARALLEL_TESTS = $(filter-out $(IGNORE_TESTS),$(filter-out $(SERIAL_TESTS),$(ALL_TESTS)))
diff --git a/test/schedule/create.sql b/test/schedule/create.sql
index ba355ed..7e250f5 100644
--- a/test/schedule/create.sql

View File

@@ -2,4 +2,5 @@
set -ex
cd "$(dirname ${0})"
patch -p1 <test-upgrade.patch
make installcheck
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --inputdir=./ --bindir='/usr/local/pgsql/bin' --inputdir=test --max-connections=86 --schedule test/schedule/main.sch --schedule test/build/run.sch --dbname contrib_regression --use-existing

View File

@@ -2,5 +2,4 @@
set -ex
cd "$(dirname ${0})"
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
REGRESS="$(make -n installcheck | awk '{print substr($0,index($0,"init-extension")+15);}')"
${PG_REGRESS} --inputdir=./ --bindir='/usr/local/pgsql/bin' --use-existing --dbname=contrib_regression ${REGRESS}
${PG_REGRESS} --inputdir=./ --bindir='/usr/local/pgsql/bin' --use-existing --dbname=contrib_regression plv8 plv8-errors scalar_args inline json startup_pre startup varparam json_conv jsonb_conv window guc es6 arraybuffer composites currentresource startup_perms bytea find_function_perms memory_limits reset show array_spread regression dialect bigint procedure

View File

@@ -43,8 +43,7 @@ EXTENSIONS='[
{"extname": "semver", "extdir": "pg_semver-src"},
{"extname": "pg_ivm", "extdir": "pg_ivm-src"},
{"extname": "pgjwt", "extdir": "pgjwt-src"},
{"extname": "pgtap", "extdir": "pgtap-src"},
{"extname": "pg_repack", "extdir": "pg_repack-src"}
{"extname": "pgtap", "extdir": "pgtap-src"}
]'
EXTNAMES=$(echo ${EXTENSIONS} | jq -r '.[].extname' | paste -sd ' ' -)
TAG=${NEWTAG} docker compose --profile test-extensions up --quiet-pull --build -d
@@ -60,8 +59,6 @@ wait_for_ready
docker compose cp ext-src neon-test-extensions:/
docker compose exec neon-test-extensions psql -c "DROP DATABASE IF EXISTS contrib_regression"
docker compose exec neon-test-extensions psql -c "CREATE DATABASE contrib_regression"
docker compose exec neon-test-extensions psql -c "CREATE DATABASE pgtap_regression"
docker compose exec neon-test-extensions psql -d pgtap_regression -c "CREATE EXTENSION pgtap"
create_extensions "${EXTNAMES}"
if [ "${FORCE_ALL_UPGRADE_TESTS:-false}" = true ]; then
exts="${EXTNAMES}"

View File

@@ -252,7 +252,7 @@ pub enum ComputeMode {
Replica,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct Cluster {
pub cluster_id: Option<String>,
pub name: Option<String>,
@@ -283,7 +283,7 @@ pub struct DeltaOp {
/// Rust representation of Postgres role info with only those fields
/// that matter for us.
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Role {
pub name: PgIdent,
pub encrypted_password: Option<String>,
@@ -292,7 +292,7 @@ pub struct Role {
/// Rust representation of Postgres database info with only those fields
/// that matter for us.
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Database {
pub name: PgIdent,
pub owner: PgIdent,
@@ -308,7 +308,7 @@ pub struct Database {
/// Common type representing both SQL statement params with or without value,
/// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config
/// options like `wal_level = logical`.
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct GenericOption {
pub name: String,
pub value: Option<String>,

View File

@@ -2,6 +2,7 @@ use anyhow::bail;
use flate2::write::{GzDecoder, GzEncoder};
use flate2::Compression;
use itertools::Itertools as _;
use once_cell::sync::Lazy;
use pprof::protos::{Function, Line, Location, Message as _, Profile};
use regex::Regex;
@@ -57,30 +58,38 @@ pub fn symbolize(mut profile: Profile) -> anyhow::Result<Profile> {
// Resolve the line and function for each location.
backtrace::resolve(loc.address as *mut c_void, |symbol| {
let Some(symbol_name) = symbol.name() else {
let Some(symname) = symbol.name() else {
return;
};
let mut name = symname.to_string();
let function_name = format!("{symbol_name:#}");
let functions_len = functions.len();
let function_id = functions
.entry(function_name)
.or_insert_with_key(|function_name| {
let function_id = functions_len as u64 + 1;
let system_name = String::from_utf8_lossy(symbol_name.as_bytes());
// Strip the Rust monomorphization suffix from the symbol name.
static SUFFIX_REGEX: Lazy<Regex> =
Lazy::new(|| Regex::new("::h[0-9a-f]{16}$").expect("invalid regex"));
if let Some(m) = SUFFIX_REGEX.find(&name) {
name.truncate(m.start());
}
let function_id = match functions.get(&name) {
Some(function) => function.id,
None => {
let id = functions.len() as u64 + 1;
let system_name = String::from_utf8_lossy(symname.as_bytes());
let filename = symbol
.filename()
.map(|path| path.to_string_lossy())
.unwrap_or(Cow::Borrowed(""));
Function {
id: function_id,
name: string_id(function_name),
let function = Function {
id,
name: string_id(&name),
system_name: string_id(&system_name),
filename: string_id(&filename),
..Default::default()
}
})
.id;
};
functions.insert(name, function);
id
}
};
loc.line.push(Line {
function_id,
line: symbol.lineno().unwrap_or(0) as i64,

View File

@@ -122,8 +122,6 @@ pub struct ConfigToml {
pub page_service_pipelining: PageServicePipeliningConfig,
pub get_vectored_concurrent_io: GetVectoredConcurrentIo,
pub enable_read_path_debugging: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub validate_wal_contiguity: Option<bool>,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -353,7 +351,7 @@ pub struct TenantConfigToml {
/// Enable rel_size_v2 for this tenant. Once enabled, the tenant will persist this information into
/// `index_part.json`, and it cannot be reversed.
pub rel_size_v2_enabled: bool,
pub rel_size_v2_enabled: Option<bool>,
// gc-compaction related configs
/// Enable automatic gc-compaction trigger on this tenant.
@@ -523,7 +521,6 @@ impl Default for ConfigToml {
} else {
None
},
validate_wal_contiguity: None,
}
}
}
@@ -547,11 +544,10 @@ pub mod tenant_conf_defaults {
pub const DEFAULT_COMPACTION_PERIOD: &str = "20 s";
pub const DEFAULT_COMPACTION_THRESHOLD: usize = 10;
// This value needs to be tuned to avoid OOM. We have 3/4*CPUs threads for L0 compaction, that's
// 3/4*16=9 on most of our pageservers. Compacting 20 layers requires about 1 GB memory (could
// be reduced later by optimizing L0 hole calculation to avoid loading all keys into memory). So
// with this config, we can get a maximum peak compaction usage of 9 GB.
pub const DEFAULT_COMPACTION_UPPER_LIMIT: usize = 20;
// This value needs to be tuned to avoid OOM. We have 3/4 of the total CPU threads to do background works, that's 16*3/4=9 on
// most of our pageservers. Compaction ~50 layers requires about 2GB memory (could be reduced later by optimizing L0 hole
// calculation to avoid loading all keys into the memory). So with this config, we can get a maximum peak compaction usage of 18GB.
pub const DEFAULT_COMPACTION_UPPER_LIMIT: usize = 50;
pub const DEFAULT_COMPACTION_L0_FIRST: bool = false;
pub const DEFAULT_COMPACTION_L0_SEMAPHORE: bool = true;
@@ -637,7 +633,7 @@ impl Default for TenantConfigToml {
lsn_lease_length_for_ts: LsnLease::DEFAULT_LENGTH_FOR_TS,
timeline_offloading: true,
wal_receiver_protocol_override: None,
rel_size_v2_enabled: false,
rel_size_v2_enabled: None,
gc_compaction_enabled: DEFAULT_GC_COMPACTION_ENABLED,
gc_compaction_initial_threshold_kb: DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB,
gc_compaction_ratio_percent: DEFAULT_GC_COMPACTION_RATIO_PERCENT,

View File

@@ -57,7 +57,6 @@ pub struct NodeRegisterRequest {
pub listen_http_addr: String,
pub listen_http_port: u16,
pub listen_https_port: Option<u16>,
pub availability_zone_id: AvailabilityZone,
}
@@ -106,7 +105,6 @@ pub struct TenantLocateResponseShard {
pub listen_http_addr: String,
pub listen_http_port: u16,
pub listen_https_port: Option<u16>,
}
#[derive(Serialize, Deserialize)]
@@ -150,7 +148,6 @@ pub struct NodeDescribeResponse {
pub listen_http_addr: String,
pub listen_http_port: u16,
pub listen_https_port: Option<u16>,
pub listen_pg_addr: String,
pub listen_pg_port: u16,

View File

@@ -1,12 +1,10 @@
use anyhow::{bail, Result};
use byteorder::{ByteOrder, BE};
use bytes::Bytes;
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::Oid;
use postgres_ffi::RepOriginId;
use serde::{Deserialize, Serialize};
use std::{fmt, ops::Range};
use utils::const_assert;
use crate::reltag::{BlockNumber, RelTag, SlruKind};
@@ -51,64 +49,6 @@ pub const AUX_KEY_PREFIX: u8 = 0x62;
/// The key prefix of ReplOrigin keys.
pub const REPL_ORIGIN_KEY_PREFIX: u8 = 0x63;
/// The key prefix of db directory keys.
pub const DB_DIR_KEY_PREFIX: u8 = 0x64;
/// The key prefix of rel directory keys.
pub const REL_DIR_KEY_PREFIX: u8 = 0x65;
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum RelDirExists {
Exists,
Removed,
}
#[derive(Debug)]
pub struct DecodeError;
impl fmt::Display for DecodeError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "invalid marker")
}
}
impl std::error::Error for DecodeError {}
impl RelDirExists {
/// The value of the rel directory keys that indicates the existence of a relation.
const REL_EXISTS_MARKER: Bytes = Bytes::from_static(b"r");
pub fn encode(&self) -> Bytes {
match self {
Self::Exists => Self::REL_EXISTS_MARKER.clone(),
Self::Removed => SPARSE_TOMBSTONE_MARKER.clone(),
}
}
pub fn decode_option(data: Option<impl AsRef<[u8]>>) -> Result<Self, DecodeError> {
match data {
Some(marker) if marker.as_ref() == Self::REL_EXISTS_MARKER => Ok(Self::Exists),
// Any other marker is invalid
Some(_) => Err(DecodeError),
None => Ok(Self::Removed),
}
}
pub fn decode(data: impl AsRef<[u8]>) -> Result<Self, DecodeError> {
let data = data.as_ref();
if data == Self::REL_EXISTS_MARKER {
Ok(Self::Exists)
} else if data == SPARSE_TOMBSTONE_MARKER {
Ok(Self::Removed)
} else {
Err(DecodeError)
}
}
}
/// A tombstone in the sparse keyspace, which is an empty buffer.
pub const SPARSE_TOMBSTONE_MARKER: Bytes = Bytes::from_static(b"");
/// Check if the key falls in the range of metadata keys.
pub const fn is_metadata_key_slice(key: &[u8]) -> bool {
key[0] >= METADATA_KEY_BEGIN_PREFIX && key[0] < METADATA_KEY_END_PREFIX
@@ -170,24 +110,6 @@ impl Key {
}
}
pub fn rel_dir_sparse_key_range() -> Range<Self> {
Key {
field1: REL_DIR_KEY_PREFIX,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: 0,
}..Key {
field1: REL_DIR_KEY_PREFIX + 1,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: 0,
}
}
/// This function checks more extensively what keys we can take on the write path.
/// If a key beginning with 00 does not have a global/default tablespace OID, it
/// will be rejected on the write path.
@@ -518,36 +440,6 @@ pub fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key {
}
}
#[inline(always)]
pub fn rel_tag_sparse_key(spcnode: Oid, dbnode: Oid, relnode: Oid, forknum: u8) -> Key {
Key {
field1: REL_DIR_KEY_PREFIX,
field2: spcnode,
field3: dbnode,
field4: relnode,
field5: forknum,
field6: 1,
}
}
pub fn rel_tag_sparse_key_range(spcnode: Oid, dbnode: Oid) -> Range<Key> {
Key {
field1: REL_DIR_KEY_PREFIX,
field2: spcnode,
field3: dbnode,
field4: 0,
field5: 0,
field6: 0,
}..Key {
field1: REL_DIR_KEY_PREFIX,
field2: spcnode,
field3: dbnode,
field4: u32::MAX,
field5: u8::MAX,
field6: u32::MAX,
} // it's fine to exclude the last key b/c we only use field6 == 1
}
#[inline(always)]
pub fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
Key {
@@ -842,9 +734,9 @@ impl Key {
self.field1 == RELATION_SIZE_PREFIX
}
pub const fn sparse_non_inherited_keyspace() -> Range<Key> {
pub fn sparse_non_inherited_keyspace() -> Range<Key> {
// The two keys are adjacent; if we will have non-adjancent keys in the future, we should return a keyspace
const_assert!(AUX_KEY_PREFIX + 1 == REPL_ORIGIN_KEY_PREFIX);
debug_assert_eq!(AUX_KEY_PREFIX + 1, REPL_ORIGIN_KEY_PREFIX);
Key {
field1: AUX_KEY_PREFIX,
field2: 0,

View File

@@ -1080,7 +1080,8 @@ pub struct TenantInfo {
/// Opaque explanation if gc is being blocked.
///
/// Only looked up for the individual tenant detail, not the listing.
/// Only looked up for the individual tenant detail, not the listing. This is purely for
/// debugging, not included in openapi.
#[serde(skip_serializing_if = "Option::is_none")]
pub gc_blocking: Option<String>,
}
@@ -1143,7 +1144,6 @@ pub struct TimelineInfo {
/// The LSN up to which GC has advanced: older data may still exist but it is not available for clients.
/// This LSN is not suitable for deciding where to create branches etc: use [`TimelineInfo::min_readable_lsn`] instead,
/// as it is easier to reason about.
#[serde(default)]
pub applied_gc_cutoff_lsn: Lsn,
/// The upper bound of data which is either already GC'ed, or elegible to be GC'ed at any time based on PITR interval.
@@ -1152,7 +1152,6 @@ pub struct TimelineInfo {
///
/// Note that holders of valid LSN leases may be able to create branches and read pages earlier
/// than this LSN, but new leases may not be taken out earlier than this LSN.
#[serde(default)]
pub min_readable_lsn: Lsn,
pub disk_consistent_lsn: Lsn,

View File

@@ -9,8 +9,6 @@ use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::io::ErrorKind;
use std::net::SocketAddr;
use std::os::fd::AsRawFd;
use std::os::fd::RawFd;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{ready, Poll};
@@ -270,7 +268,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> MaybeWriteOnly<IO> {
}
pub struct PostgresBackend<IO> {
pub socket_fd: RawFd,
framed: MaybeWriteOnly<IO>,
pub state: ProtoState,
@@ -296,11 +293,9 @@ impl PostgresBackend<tokio::net::TcpStream> {
tls_config: Option<Arc<rustls::ServerConfig>>,
) -> io::Result<Self> {
let peer_addr = socket.peer_addr()?;
let socket_fd = socket.as_raw_fd();
let stream = MaybeTlsStream::Unencrypted(socket);
Ok(Self {
socket_fd,
framed: MaybeWriteOnly::Full(Framed::new(stream)),
state: ProtoState::Initialization,
auth_type,
@@ -312,7 +307,6 @@ impl PostgresBackend<tokio::net::TcpStream> {
impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
pub fn new_from_io(
socket_fd: RawFd,
socket: IO,
peer_addr: SocketAddr,
auth_type: AuthType,
@@ -321,7 +315,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
let stream = MaybeTlsStream::Unencrypted(socket);
Ok(Self {
socket_fd,
framed: MaybeWriteOnly::Full(Framed::new(stream)),
state: ProtoState::Initialization,
auth_type,

View File

@@ -278,7 +278,7 @@ pub fn generate_pg_control(
checkpoint_bytes: &[u8],
lsn: Lsn,
pg_version: u32,
) -> anyhow::Result<(Bytes, u64, bool)> {
) -> anyhow::Result<(Bytes, u64)> {
dispatch_pgversion!(
pg_version,
pgv::xlog_utils::generate_pg_control(pg_control_bytes, checkpoint_bytes, lsn),

View File

@@ -124,59 +124,23 @@ pub fn normalize_lsn(lsn: Lsn, seg_sz: usize) -> Lsn {
}
}
/// Generate a pg_control file, for a basebackup for starting up Postgres at the given LSN
///
/// 'pg_control_bytes' and 'checkpoint_bytes' are the contents of those keys persisted in
/// the pageserver. They use the same format as the PostgreSQL control file and the
/// checkpoint record, but see walingest.rs for how exactly they are kept up to date.
/// 'lsn' is the LSN at which we're starting up.
///
/// Returns:
/// - pg_control file contents
/// - system_identifier, extracted from the persisted information
/// - true, if we're starting up from a "clean shutdown", i.e. if there was a shutdown
/// checkpoint at the given LSN
pub fn generate_pg_control(
pg_control_bytes: &[u8],
checkpoint_bytes: &[u8],
lsn: Lsn,
) -> anyhow::Result<(Bytes, u64, bool)> {
) -> anyhow::Result<(Bytes, u64)> {
let mut pg_control = ControlFileData::decode(pg_control_bytes)?;
let mut checkpoint = CheckPoint::decode(checkpoint_bytes)?;
// Generate new pg_control needed for bootstrap
//
// NB: In the checkpoint struct that we persist in the pageserver, we have a different
// convention for the 'redo' field than in PostgreSQL: On a shutdown checkpoint,
// 'redo' points the *end* of the checkpoint WAL record. On PostgreSQL, it points to
// the beginning. Furthermore, on an online checkpoint, 'redo' is set to 0.
//
// We didn't always have this convention however, and old persisted records will have
// old REDO values that point to some old LSN.
//
// The upshot is that if 'redo' is equal to the "current" LSN, there was a shutdown
// checkpoint record at that point in WAL, with no new WAL records after it. That case
// can be treated as starting from a clean shutdown. All other cases are treated as
// non-clean shutdown. In Neon, we don't do WAL replay at startup in either case, so
// that distinction doesn't matter very much. As of this writing, it only affects
// whether the persisted pg_stats information can be used or not.
//
// In the Checkpoint struct in the returned pg_control file, the redo pointer is
// always set to the LSN we're starting at, to hint that no WAL replay is required.
// (There's some neon-specific code in Postgres startup to make that work, though.
// Just setting the redo pointer is not sufficient.)
let was_shutdown = Lsn(checkpoint.redo) == lsn;
checkpoint.redo = normalize_lsn(lsn, WAL_SEGMENT_SIZE).0;
// We use DBState_DB_SHUTDOWNED even if it was not a clean shutdown. The
// neon-specific code at postgres startup ignores the state stored in the control
// file, similar to archive recovery in standalone PostgreSQL. Similarly, the
// checkPoint pointer is ignored, so just set it to 0.
//save new values in pg_control
pg_control.checkPoint = 0;
pg_control.checkPointCopy = checkpoint;
pg_control.state = DBState_DB_SHUTDOWNED;
Ok((pg_control.encode(), pg_control.system_identifier, was_shutdown))
Ok((pg_control.encode(), pg_control.system_identifier))
}
pub fn get_current_timestamp() -> TimestampTz {

View File

@@ -5,15 +5,18 @@ edition = "2021"
license = "MIT/Apache-2.0"
[dependencies]
async-trait.workspace = true
bytes.workspace = true
byteorder.workspace = true
fallible-iterator.workspace = true
futures-util = { workspace = true, features = ["sink"] }
log = "0.4"
parking_lot.workspace = true
percent-encoding = "2.0"
pin-project-lite.workspace = true
phf = "0.11"
postgres-protocol2 = { path = "../postgres-protocol2" }
postgres-types2 = { path = "../postgres-types2" }
tokio = { workspace = true, features = ["io-util", "time", "net"] }
tokio-util = { workspace = true, features = ["codec"] }
serde = { workspace = true, features = ["derive"] }
serde = { workspace = true, features = ["derive"] }

View File

@@ -10,8 +10,8 @@ use crate::simple_query::SimpleQueryStream;
use crate::types::{Oid, ToSql, Type};
use crate::{
query, simple_query, slice_iter, CancelToken, Error, ReadyForQueryStatus, Row,
SimpleQueryMessage, Statement, Transaction, TransactionBuilder,
prepare, query, simple_query, slice_iter, CancelToken, Error, ReadyForQueryStatus, Row,
SimpleQueryMessage, Statement, ToStatement, Transaction, TransactionBuilder,
};
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
@@ -54,18 +54,18 @@ impl Responses {
}
/// A cache of type info and prepared statements for fetching type info
/// (corresponding to the queries in the [crate::prepare] module).
/// (corresponding to the queries in the [prepare] module).
#[derive(Default)]
struct CachedTypeInfo {
/// A statement for basic information for a type from its
/// OID. Corresponds to [TYPEINFO_QUERY](crate::prepare::TYPEINFO_QUERY) (or its
/// OID. Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_QUERY) (or its
/// fallback).
typeinfo: Option<Statement>,
/// A statement for getting information for a composite type from its OID.
/// Corresponds to [TYPEINFO_QUERY](crate::prepare::TYPEINFO_COMPOSITE_QUERY).
/// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY).
typeinfo_composite: Option<Statement>,
/// A statement for getting information for a composite type from its OID.
/// Corresponds to [TYPEINFO_QUERY](crate::prepare::TYPEINFO_COMPOSITE_QUERY) (or
/// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY) (or
/// its fallback).
typeinfo_enum: Option<Statement>,
@@ -190,6 +190,26 @@ impl Client {
&self.inner
}
/// Creates a new prepared statement.
///
/// Prepared statements can be executed repeatedly, and may contain query parameters (indicated by `$1`, `$2`, etc),
/// which are set when executed. Prepared statements can only be used with the connection that created them.
pub async fn prepare(&self, query: &str) -> Result<Statement, Error> {
self.prepare_typed(query, &[]).await
}
/// Like `prepare`, but allows the types of query parameters to be explicitly specified.
///
/// The list of types may be smaller than the number of parameters - the types of the remaining parameters will be
/// inferred. For example, `client.prepare_typed(query, &[])` is equivalent to `client.prepare(query)`.
pub async fn prepare_typed(
&self,
query: &str,
parameter_types: &[Type],
) -> Result<Statement, Error> {
prepare::prepare(&self.inner, query, parameter_types).await
}
/// Executes a statement, returning a vector of the resulting rows.
///
/// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
@@ -202,11 +222,14 @@ impl Client {
/// # Panics
///
/// Panics if the number of parameters provided does not match the number expected.
pub async fn query(
pub async fn query<T>(
&self,
statement: Statement,
statement: &T,
params: &[&(dyn ToSql + Sync)],
) -> Result<Vec<Row>, Error> {
) -> Result<Vec<Row>, Error>
where
T: ?Sized + ToStatement,
{
self.query_raw(statement, slice_iter(params))
.await?
.try_collect()
@@ -227,15 +250,13 @@ impl Client {
/// Panics if the number of parameters provided does not match the number expected.
///
/// [`query`]: #method.query
pub async fn query_raw<'a, I>(
&self,
statement: Statement,
params: I,
) -> Result<RowStream, Error>
pub async fn query_raw<'a, T, I>(&self, statement: &T, params: I) -> Result<RowStream, Error>
where
T: ?Sized + ToStatement,
I: IntoIterator<Item = &'a (dyn ToSql + Sync)>,
I::IntoIter: ExactSizeIterator,
{
let statement = statement.__convert().into_statement(self).await?;
query::query(&self.inner, statement, params).await
}
@@ -250,6 +271,55 @@ impl Client {
query::query_txt(&self.inner, statement, params).await
}
/// Executes a statement, returning the number of rows modified.
///
/// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
/// provided, 1-indexed.
///
/// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
/// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
/// with the `prepare` method.
///
/// If the statement does not modify any rows (e.g. `SELECT`), 0 is returned.
///
/// # Panics
///
/// Panics if the number of parameters provided does not match the number expected.
pub async fn execute<T>(
&self,
statement: &T,
params: &[&(dyn ToSql + Sync)],
) -> Result<u64, Error>
where
T: ?Sized + ToStatement,
{
self.execute_raw(statement, slice_iter(params)).await
}
/// The maximally flexible version of [`execute`].
///
/// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
/// provided, 1-indexed.
///
/// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
/// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
/// with the `prepare` method.
///
/// # Panics
///
/// Panics if the number of parameters provided does not match the number expected.
///
/// [`execute`]: #method.execute
pub async fn execute_raw<'a, T, I>(&self, statement: &T, params: I) -> Result<u64, Error>
where
T: ?Sized + ToStatement,
I: IntoIterator<Item = &'a (dyn ToSql + Sync)>,
I::IntoIter: ExactSizeIterator,
{
let statement = statement.__convert().into_statement(self).await?;
query::execute(self.inner(), statement, params).await
}
/// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows.
///
/// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that

View File

@@ -1,8 +1,7 @@
#![allow(async_fn_in_trait)]
use crate::query::RowStream;
use crate::types::Type;
use crate::{Client, Error, Transaction};
use async_trait::async_trait;
use postgres_protocol2::Oid;
mod private {
@@ -12,6 +11,7 @@ mod private {
/// A trait allowing abstraction over connections and transactions.
///
/// This trait is "sealed", and cannot be implemented outside of this crate.
#[async_trait]
pub trait GenericClient: private::Sealed {
/// Like `Client::query_raw_txt`.
async fn query_raw_txt<S, I>(&self, statement: &str, params: I) -> Result<RowStream, Error>
@@ -26,6 +26,7 @@ pub trait GenericClient: private::Sealed {
impl private::Sealed for Client {}
#[async_trait]
impl GenericClient for Client {
async fn query_raw_txt<S, I>(&self, statement: &str, params: I) -> Result<RowStream, Error>
where
@@ -38,12 +39,14 @@ impl GenericClient for Client {
/// Query for type information
async fn get_type(&self, oid: Oid) -> Result<Type, Error> {
crate::prepare::get_type(self.inner(), oid).await
self.get_type(oid).await
}
}
impl private::Sealed for Transaction<'_> {}
#[async_trait]
#[allow(clippy::needless_lifetimes)]
impl GenericClient for Transaction<'_> {
async fn query_raw_txt<S, I>(&self, statement: &str, params: I) -> Result<RowStream, Error>
where

View File

@@ -14,6 +14,7 @@ pub use crate::row::{Row, SimpleQueryRow};
pub use crate::simple_query::SimpleQueryStream;
pub use crate::statement::{Column, Statement};
pub use crate::tls::NoTls;
pub use crate::to_statement::ToStatement;
pub use crate::transaction::Transaction;
pub use crate::transaction_builder::{IsolationLevel, TransactionBuilder};
use crate::types::ToSql;
@@ -64,6 +65,7 @@ pub mod row;
mod simple_query;
mod statement;
pub mod tls;
mod to_statement;
mod transaction;
mod transaction_builder;
pub mod types;

View File

@@ -1,6 +1,7 @@
use crate::client::InnerClient;
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::error::SqlState;
use crate::types::{Field, Kind, Oid, Type};
use crate::{query, slice_iter};
use crate::{Column, Error, Statement};
@@ -12,6 +13,7 @@ use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
pub(crate) const TYPEINFO_QUERY: &str = "\
@@ -22,6 +24,14 @@ INNER JOIN pg_catalog.pg_namespace n ON t.typnamespace = n.oid
WHERE t.oid = $1
";
// Range types weren't added until Postgres 9.2, so pg_range may not exist
const TYPEINFO_FALLBACK_QUERY: &str = "\
SELECT t.typname, t.typtype, t.typelem, NULL::OID, t.typbasetype, n.nspname, t.typrelid
FROM pg_catalog.pg_type t
INNER JOIN pg_catalog.pg_namespace n ON t.typnamespace = n.oid
WHERE t.oid = $1
";
const TYPEINFO_ENUM_QUERY: &str = "\
SELECT enumlabel
FROM pg_catalog.pg_enum
@@ -29,6 +39,14 @@ WHERE enumtypid = $1
ORDER BY enumsortorder
";
// Postgres 9.0 didn't have enumsortorder
const TYPEINFO_ENUM_FALLBACK_QUERY: &str = "\
SELECT enumlabel
FROM pg_catalog.pg_enum
WHERE enumtypid = $1
ORDER BY oid
";
pub(crate) const TYPEINFO_COMPOSITE_QUERY: &str = "\
SELECT attname, atttypid
FROM pg_catalog.pg_attribute
@@ -38,13 +56,15 @@ AND attnum > 0
ORDER BY attnum
";
static NEXT_ID: AtomicUsize = AtomicUsize::new(0);
pub async fn prepare(
client: &Arc<InnerClient>,
name: &'static str,
query: &str,
types: &[Type],
) -> Result<Statement, Error> {
let buf = encode(client, name, query, types)?;
let name = format!("s{}", NEXT_ID.fetch_add(1, Ordering::SeqCst));
let buf = encode(client, &name, query, types)?;
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
match responses.next().await? {
@@ -85,11 +105,10 @@ pub async fn prepare(
fn prepare_rec<'a>(
client: &'a Arc<InnerClient>,
name: &'static str,
query: &'a str,
types: &'a [Type],
) -> Pin<Box<dyn Future<Output = Result<Statement, Error>> + 'a + Send>> {
Box::pin(prepare(client, name, query, types))
Box::pin(prepare(client, query, types))
}
fn encode(client: &InnerClient, name: &str, query: &str, types: &[Type]) -> Result<Bytes, Error> {
@@ -173,8 +192,13 @@ async fn typeinfo_statement(client: &Arc<InnerClient>) -> Result<Statement, Erro
return Ok(stmt);
}
let typeinfo = "neon_proxy_typeinfo";
let stmt = prepare_rec(client, typeinfo, TYPEINFO_QUERY, &[]).await?;
let stmt = match prepare_rec(client, TYPEINFO_QUERY, &[]).await {
Ok(stmt) => stmt,
Err(ref e) if e.code() == Some(&SqlState::UNDEFINED_TABLE) => {
prepare_rec(client, TYPEINFO_FALLBACK_QUERY, &[]).await?
}
Err(e) => return Err(e),
};
client.set_typeinfo(&stmt);
Ok(stmt)
@@ -195,8 +219,13 @@ async fn typeinfo_enum_statement(client: &Arc<InnerClient>) -> Result<Statement,
return Ok(stmt);
}
let typeinfo = "neon_proxy_typeinfo_enum";
let stmt = prepare_rec(client, typeinfo, TYPEINFO_ENUM_QUERY, &[]).await?;
let stmt = match prepare_rec(client, TYPEINFO_ENUM_QUERY, &[]).await {
Ok(stmt) => stmt,
Err(ref e) if e.code() == Some(&SqlState::UNDEFINED_COLUMN) => {
prepare_rec(client, TYPEINFO_ENUM_FALLBACK_QUERY, &[]).await?
}
Err(e) => return Err(e),
};
client.set_typeinfo_enum(&stmt);
Ok(stmt)
@@ -226,8 +255,7 @@ async fn typeinfo_composite_statement(client: &Arc<InnerClient>) -> Result<State
return Ok(stmt);
}
let typeinfo = "neon_proxy_typeinfo_composite";
let stmt = prepare_rec(client, typeinfo, TYPEINFO_COMPOSITE_QUERY, &[]).await?;
let stmt = prepare_rec(client, TYPEINFO_COMPOSITE_QUERY, &[]).await?;
client.set_typeinfo_composite(&stmt);
Ok(stmt)

View File

@@ -157,6 +157,49 @@ where
})
}
pub async fn execute<'a, I>(
client: &InnerClient,
statement: Statement,
params: I,
) -> Result<u64, Error>
where
I: IntoIterator<Item = &'a (dyn ToSql + Sync)>,
I::IntoIter: ExactSizeIterator,
{
let buf = if log_enabled!(Level::Debug) {
let params = params.into_iter().collect::<Vec<_>>();
debug!(
"executing statement {} with parameters: {:?}",
statement.name(),
BorrowToSqlParamsDebug(params.as_slice()),
);
encode(client, &statement, params)?
} else {
encode(client, &statement, params)?
};
let mut responses = start(client, buf).await?;
let mut rows = 0;
loop {
match responses.next().await? {
Message::DataRow(_) => {}
Message::CommandComplete(body) => {
rows = body
.tag()
.map_err(Error::parse)?
.rsplit(' ')
.next()
.unwrap()
.parse()
.unwrap_or(0);
}
Message::EmptyQueryResponse => rows = 0,
Message::ReadyForQuery(_) => return Ok(rows),
_ => return Err(Error::unexpected_message()),
}
}
}
async fn start(client: &InnerClient, buf: Bytes) -> Result<Responses, Error> {
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;

View File

@@ -13,7 +13,7 @@ use std::{
struct StatementInner {
client: Weak<InnerClient>,
name: &'static str,
name: String,
params: Vec<Type>,
columns: Vec<Column>,
}
@@ -22,7 +22,7 @@ impl Drop for StatementInner {
fn drop(&mut self) {
if let Some(client) = self.client.upgrade() {
let buf = client.with_buf(|buf| {
frontend::close(b'S', self.name, buf).unwrap();
frontend::close(b'S', &self.name, buf).unwrap();
frontend::sync(buf);
buf.split().freeze()
});
@@ -40,7 +40,7 @@ pub struct Statement(Arc<StatementInner>);
impl Statement {
pub(crate) fn new(
inner: &Arc<InnerClient>,
name: &'static str,
name: String,
params: Vec<Type>,
columns: Vec<Column>,
) -> Statement {
@@ -55,14 +55,14 @@ impl Statement {
pub(crate) fn new_anonymous(params: Vec<Type>, columns: Vec<Column>) -> Statement {
Statement(Arc::new(StatementInner {
client: Weak::new(),
name: "<anonymous>",
name: String::new(),
params,
columns,
}))
}
pub(crate) fn name(&self) -> &str {
self.0.name
&self.0.name
}
/// Returns the expected types of the statement's parameters.

View File

@@ -0,0 +1,57 @@
use crate::to_statement::private::{Sealed, ToStatementType};
use crate::Statement;
mod private {
use crate::{Client, Error, Statement};
pub trait Sealed {}
pub enum ToStatementType<'a> {
Statement(&'a Statement),
Query(&'a str),
}
impl ToStatementType<'_> {
pub async fn into_statement(self, client: &Client) -> Result<Statement, Error> {
match self {
ToStatementType::Statement(s) => Ok(s.clone()),
ToStatementType::Query(s) => client.prepare(s).await,
}
}
}
}
/// A trait abstracting over prepared and unprepared statements.
///
/// Many methods are generic over this bound, so that they support both a raw query string as well as a statement which
/// was prepared previously.
///
/// This trait is "sealed" and cannot be implemented by anything outside this crate.
pub trait ToStatement: Sealed {
#[doc(hidden)]
fn __convert(&self) -> ToStatementType<'_>;
}
impl ToStatement for Statement {
fn __convert(&self) -> ToStatementType<'_> {
ToStatementType::Statement(self)
}
}
impl Sealed for Statement {}
impl ToStatement for str {
fn __convert(&self) -> ToStatementType<'_> {
ToStatementType::Query(self)
}
}
impl Sealed for str {}
impl ToStatement for String {
fn __convert(&self) -> ToStatementType<'_> {
ToStatementType::Query(self)
}
}
impl Sealed for String {}

View File

@@ -9,43 +9,13 @@ use anyhow::bail;
use serde::{Deserialize, Serialize};
use utils::id::NodeId;
/// 1 is the first valid generation, 0 is used as
/// a placeholder before we fully migrate to generations.
pub const INVALID_GENERATION: SafekeeperGeneration = SafekeeperGeneration::new(0);
pub const INITIAL_GENERATION: SafekeeperGeneration = SafekeeperGeneration::new(1);
/// Number uniquely identifying safekeeper configuration.
/// Note: it is a part of sk control file.
///
/// Like tenant generations, but for safekeepers.
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct SafekeeperGeneration(u32);
impl SafekeeperGeneration {
pub const fn new(v: u32) -> Self {
Self(v)
}
#[track_caller]
pub fn previous(&self) -> Option<Self> {
Some(Self(self.0.checked_sub(1)?))
}
#[track_caller]
pub fn next(&self) -> Self {
Self(self.0 + 1)
}
pub fn into_inner(self) -> u32 {
self.0
}
}
impl Display for SafekeeperGeneration {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
pub type Generation = u32;
/// 1 is the first valid generation, 0 is used as
/// a placeholder before we fully migrate to generations.
pub const INVALID_GENERATION: Generation = 0;
pub const INITIAL_GENERATION: Generation = 1;
/// Membership is defined by ids so e.g. walproposer uses them to figure out
/// quorums, but we also carry host and port to give wp idea where to connect.
@@ -119,7 +89,7 @@ impl Display for MemberSet {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Configuration {
/// Unique id.
pub generation: SafekeeperGeneration,
pub generation: Generation,
/// Current members of the configuration.
pub members: MemberSet,
/// Some means it is a joint conf.

View File

@@ -282,18 +282,3 @@ pub struct TimelineTermBumpResponse {
pub struct SafekeeperUtilization {
pub timeline_count: u64,
}
/// pull_timeline request body.
#[derive(Debug, Deserialize, Serialize)]
pub struct PullTimelineRequest {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub http_hosts: Vec<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct PullTimelineResponse {
// Donor safekeeper host
pub safekeeper_host: String,
// TODO: add more fields?
}

View File

@@ -24,10 +24,11 @@ diatomic-waker.workspace = true
git-version.workspace = true
hex = { workspace = true, features = ["serde"] }
humantime.workspace = true
inferno.workspace = true
fail.workspace = true
futures = { workspace = true }
jsonwebtoken.workspace = true
nix = { workspace = true, features = ["ioctl"] }
nix.workspace = true
once_cell.workspace = true
pin-project-lite.workspace = true
regex.workspace = true
@@ -61,7 +62,6 @@ bytes.workspace = true
criterion.workspace = true
hex-literal.workspace = true
camino-tempfile.workspace = true
pprof.workspace = true
serde_assert.workspace = true
tokio = { workspace = true, features = ["test-util"] }

View File

@@ -1,26 +0,0 @@
## Utils Benchmarks
To run benchmarks:
```sh
# All benchmarks.
cargo bench --package utils
# Specific file.
cargo bench --package utils --bench benchmarks
# Specific benchmark.
cargo bench --package utils --bench benchmarks warn_slow/enabled=true
# List available benchmarks.
cargo bench --package utils --benches -- --list
# Generate flamegraph profiles using pprof-rs, profiling for 10 seconds.
# Output in target/criterion/*/profile/flamegraph.svg.
cargo bench --package utils --bench benchmarks warn_slow/enabled=true --profile-time 10
```
Additional charts and statistics are available in `target/criterion/report/index.html`.
Benchmarks are automatically compared against the previous run. To compare against other runs, see
`--baseline` and `--save-baseline`.

View File

@@ -1,18 +1,5 @@
use std::time::Duration;
use criterion::{criterion_group, criterion_main, Bencher, Criterion};
use pprof::criterion::{Output, PProfProfiler};
use criterion::{criterion_group, criterion_main, Criterion};
use utils::id;
use utils::logging::warn_slow;
// Register benchmarks with Criterion.
criterion_group!(
name = benches;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = bench_id_stringify,
bench_warn_slow,
);
criterion_main!(benches);
pub fn bench_id_stringify(c: &mut Criterion) {
// Can only use public methods.
@@ -29,31 +16,5 @@ pub fn bench_id_stringify(c: &mut Criterion) {
});
}
pub fn bench_warn_slow(c: &mut Criterion) {
for enabled in [false, true] {
c.bench_function(&format!("warn_slow/enabled={enabled}"), |b| {
run_bench(b, enabled).unwrap()
});
}
// The actual benchmark.
fn run_bench(b: &mut Bencher, enabled: bool) -> anyhow::Result<()> {
const THRESHOLD: Duration = Duration::from_secs(1);
// Use a multi-threaded runtime to avoid thread parking overhead when yielding.
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
// Test both with and without warn_slow, since we're essentially measuring Tokio scheduling
// performance too. Use a simple noop future that yields once, to avoid any scheduler fast
// paths for a ready future.
if enabled {
b.iter(|| runtime.block_on(warn_slow("ready", THRESHOLD, tokio::task::yield_now())));
} else {
b.iter(|| runtime.block_on(tokio::task::yield_now()));
}
Ok(())
}
}
criterion_group!(benches, bench_id_stringify);
criterion_main!(benches);

View File

@@ -286,11 +286,6 @@ mod tests {
const SHORT2_ENC_LE: &[u8] = &[8, 0, 0, 3, 7];
const SHORT2_ENC_LE_TRAILING: &[u8] = &[8, 0, 0, 3, 7, 0xff, 0xff, 0xff];
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
struct NewTypeStruct(u32);
const NT1: NewTypeStruct = NewTypeStruct(414243);
const NT1_INNER: u32 = 414243;
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct LongMsg {
pub tag: u8,
@@ -413,42 +408,4 @@ mod tests {
let msg2 = LongMsg::des(&encoded).unwrap();
assert_eq!(msg, msg2);
}
#[test]
/// Ensure that newtype wrappers around u32 don't change the serialization format
fn be_nt() {
use super::BeSer;
assert_eq!(NT1.serialized_size().unwrap(), 4);
let msg = NT1;
let encoded = msg.ser().unwrap();
let expected = hex_literal::hex!("0006 5223");
assert_eq!(encoded, expected);
assert_eq!(encoded, NT1_INNER.ser().unwrap());
let msg2 = NewTypeStruct::des(&encoded).unwrap();
assert_eq!(msg, msg2);
}
#[test]
/// Ensure that newtype wrappers around u32 don't change the serialization format
fn le_nt() {
use super::LeSer;
assert_eq!(NT1.serialized_size().unwrap(), 4);
let msg = NT1;
let encoded = msg.ser().unwrap();
let expected = hex_literal::hex!("2352 0600");
assert_eq!(encoded, expected);
assert_eq!(encoded, NT1_INNER.ser().unwrap());
let msg2 = NewTypeStruct::des(&encoded).unwrap();
assert_eq!(msg, msg2);
}
}

View File

@@ -93,9 +93,6 @@ pub mod try_rcu;
pub mod guard_arc_swap;
#[cfg(target_os = "linux")]
pub mod linux_socket_ioctl;
// Re-export used in macro. Avoids adding git-version as dep in target crates.
#[doc(hidden)]
pub use git_version;

View File

@@ -1,35 +0,0 @@
//! Linux-specific socket ioctls.
//!
//! <https://elixir.bootlin.com/linux/v6.1.128/source/include/uapi/linux/sockios.h#L25-L27>
use std::{
io,
mem::MaybeUninit,
os::{fd::RawFd, raw::c_int},
};
use nix::libc::{FIONREAD, TIOCOUTQ};
unsafe fn do_ioctl(socket_fd: RawFd, cmd: nix::libc::Ioctl) -> io::Result<c_int> {
let mut inq: MaybeUninit<c_int> = MaybeUninit::uninit();
let err = nix::libc::ioctl(socket_fd, cmd, inq.as_mut_ptr());
if err == 0 {
Ok(inq.assume_init())
} else {
Err(io::Error::last_os_error())
}
}
/// # Safety
///
/// Caller must ensure that `socket_fd` is a valid TCP socket file descriptor.
pub unsafe fn inq(socket_fd: RawFd) -> io::Result<c_int> {
do_ioctl(socket_fd, FIONREAD)
}
/// # Safety
///
/// Caller must ensure that `socket_fd` is a valid TCP socket file descriptor.
pub unsafe fn outq(socket_fd: RawFd) -> io::Result<c_int> {
do_ioctl(socket_fd, TIOCOUTQ)
}

View File

@@ -1,13 +1,9 @@
use std::future::Future;
use std::str::FromStr;
use std::time::Duration;
use anyhow::Context;
use metrics::{IntCounter, IntCounterVec};
use once_cell::sync::Lazy;
use strum_macros::{EnumString, VariantNames};
use tokio::time::Instant;
use tracing::warn;
/// Logs a critical error, similarly to `tracing::error!`. This will:
///
@@ -322,41 +318,6 @@ impl std::fmt::Debug for SecretString {
}
}
/// Logs a periodic warning if a future is slow to complete.
///
/// This is performance-sensitive as it's used on the GetPage read path.
#[inline]
pub async fn warn_slow<O>(name: &str, threshold: Duration, f: impl Future<Output = O>) -> O {
// TODO: we unfortunately have to pin the future on the heap, since GetPage futures are huge and
// won't fit on the stack.
let mut f = Box::pin(f);
let started = Instant::now();
let mut attempt = 1;
loop {
// NB: use timeout_at() instead of timeout() to avoid an extra clock reading in the common
// case where the timeout doesn't fire.
let deadline = started + attempt * threshold;
if let Ok(output) = tokio::time::timeout_at(deadline, &mut f).await {
// NB: we check if we exceeded the threshold even if the timeout never fired, because
// scheduling or execution delays may cause the future to succeed even if it exceeds the
// timeout. This costs an extra unconditional clock reading, but seems worth it to avoid
// false negatives.
let elapsed = started.elapsed();
if elapsed >= threshold {
warn!("slow {name} completed after {:.3}s", elapsed.as_secs_f64());
}
return output;
}
let elapsed = started.elapsed().as_secs_f64();
warn!("slow {name} still running after {elapsed:.3}s",);
attempt += 1;
}
}
#[cfg(test)]
mod tests {
use metrics::{core::Opts, IntCounterVec};

View File

@@ -117,10 +117,6 @@ impl TenantShardId {
)
}
pub fn range(&self) -> RangeInclusive<Self> {
RangeInclusive::new(*self, *self)
}
pub fn shard_slug(&self) -> impl std::fmt::Display + '_ {
ShardSlug(self)
}

View File

@@ -5,7 +5,6 @@ package interpreted_wal;
message InterpretedWalRecords {
repeated InterpretedWalRecord records = 1;
optional uint64 next_record_lsn = 2;
optional uint64 raw_wal_start_lsn = 3;
}
message InterpretedWalRecord {

View File

@@ -60,11 +60,7 @@ pub struct InterpretedWalRecords {
pub records: Vec<InterpretedWalRecord>,
// Start LSN of the next record after the batch.
// Note that said record may not belong to the current shard.
pub next_record_lsn: Lsn,
// Inclusive start LSN of the PG WAL from which the interpreted
// WAL records were extracted. Note that this is not necessarily the
// start LSN of the first interpreted record in the batch.
pub raw_wal_start_lsn: Option<Lsn>,
pub next_record_lsn: Option<Lsn>,
}
/// An interpreted Postgres WAL record, ready to be handled by the pageserver

View File

@@ -167,8 +167,7 @@ impl TryFrom<InterpretedWalRecords> for proto::InterpretedWalRecords {
.collect::<Result<Vec<_>, _>>()?;
Ok(proto::InterpretedWalRecords {
records,
next_record_lsn: Some(value.next_record_lsn.0),
raw_wal_start_lsn: value.raw_wal_start_lsn.map(|l| l.0),
next_record_lsn: value.next_record_lsn.map(|l| l.0),
})
}
}
@@ -255,11 +254,7 @@ impl TryFrom<proto::InterpretedWalRecords> for InterpretedWalRecords {
Ok(InterpretedWalRecords {
records,
next_record_lsn: value
.next_record_lsn
.map(Lsn::from)
.expect("Always provided"),
raw_wal_start_lsn: value.raw_wal_start_lsn.map(Lsn::from),
next_record_lsn: value.next_record_lsn.map(Lsn::from),
})
}
}

View File

@@ -477,26 +477,6 @@ impl Client {
self.request(Method::POST, &uri, ()).await.map(|_| ())
}
pub async fn timeline_download_heatmap_layers(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
concurrency: Option<usize>,
) -> Result<()> {
let mut path = reqwest::Url::parse(&format!(
"{}/v1/tenant/{}/timeline/{}/download_heatmap_layers",
self.mgmt_api_endpoint, tenant_shard_id, timeline_id
))
.expect("Cannot build URL");
if let Some(concurrency) = concurrency {
path.query_pairs_mut()
.append_pair("concurrency", &format!("{}", concurrency));
}
self.request(Method::POST, path, ()).await.map(|_| ())
}
pub async fn tenant_reset(&self, tenant_shard_id: TenantShardId) -> Result<()> {
let uri = format!(
"{}/v1/tenant/{}/reset",

View File

@@ -345,7 +345,6 @@ impl AuxFileV2 {
AuxFileV2::Recognized("pg_logical/replorigin_checkpoint", hash)
}
(2, 1) => AuxFileV2::Recognized("pg_replslot/", hash),
(3, 1) => AuxFileV2::Recognized("pg_stat/pgstat.stat", hash),
(1, 0xff) => AuxFileV2::OtherWithPrefix("pg_logical/", hash),
(0xff, 0xff) => AuxFileV2::Other(hash),
_ => return None,

View File

@@ -39,7 +39,6 @@ fn aux_hash_to_metadata_key(dir_level1: u8, dir_level2: u8, data: &[u8]) -> Key
const AUX_DIR_PG_LOGICAL: u8 = 0x01;
const AUX_DIR_PG_REPLSLOT: u8 = 0x02;
const AUX_DIR_PG_STAT: u8 = 0x03;
const AUX_DIR_PG_UNKNOWN: u8 = 0xFF;
/// Encode the aux file into a fixed-size key.
@@ -54,7 +53,6 @@ const AUX_DIR_PG_UNKNOWN: u8 = 0xFF;
/// * pg_logical/replorigin_checkpoint -> 0x0103
/// * pg_logical/others -> 0x01FF
/// * pg_replslot/ -> 0x0201
/// * pg_stat/pgstat.stat -> 0x0301
/// * others -> 0xFFFF
///
/// If you add new AUX files to this function, please also add a test case to `test_encoding_portable`.
@@ -77,8 +75,6 @@ pub fn encode_aux_file_key(path: &str) -> Key {
aux_hash_to_metadata_key(AUX_DIR_PG_LOGICAL, 0xFF, fname.as_bytes())
} else if let Some(fname) = path.strip_prefix("pg_replslot/") {
aux_hash_to_metadata_key(AUX_DIR_PG_REPLSLOT, 0x01, fname.as_bytes())
} else if let Some(fname) = path.strip_prefix("pg_stat/") {
aux_hash_to_metadata_key(AUX_DIR_PG_STAT, 0x01, fname.as_bytes())
} else {
if cfg!(debug_assertions) {
warn!(

View File

@@ -13,7 +13,7 @@
use anyhow::{anyhow, Context};
use bytes::{BufMut, Bytes, BytesMut};
use fail::fail_point;
use pageserver_api::key::{rel_block_to_key, Key};
use pageserver_api::key::Key;
use postgres_ffi::pg_constants;
use std::fmt::Write as FmtWrite;
use std::time::{Instant, SystemTime};
@@ -264,31 +264,6 @@ where
async fn send_tarball(mut self) -> Result<(), BasebackupError> {
// TODO include checksum
// Construct the pg_control file from the persisted checkpoint and pg_control
// information. But we only add this to the tarball at the end, so that if the
// writing is interrupted half-way through, the resulting incomplete tarball will
// be missing the pg_control file, which prevents PostgreSQL from starting up on
// it. With proper error handling, you should never try to start up from an
// incomplete basebackup in the first place, of course, but this is a nice little
// extra safety measure.
let checkpoint_bytes = self
.timeline
.get_checkpoint(self.lsn, self.ctx)
.await
.context("failed to get checkpoint bytes")?;
let pg_control_bytes = self
.timeline
.get_control_file(self.lsn, self.ctx)
.await
.context("failed to get control bytes")?;
let (pg_control_bytes, system_identifier, was_shutdown) =
postgres_ffi::generate_pg_control(
&pg_control_bytes,
&checkpoint_bytes,
self.lsn,
self.timeline.pg_version,
)?;
let lazy_slru_download = self.timeline.get_lazy_slru_download() && !self.full_backup;
let pgversion = self.timeline.pg_version;
@@ -426,10 +401,6 @@ where
// In future we will not generate AUX record for "pg_logical/replorigin_checkpoint" at all,
// but now we should handle (skip) it for backward compatibility.
continue;
} else if path == "pg_stat/pgstat.stat" && !was_shutdown {
// Drop statistic in case of abnormal termination, i.e. if we're not starting from the exact LSN
// of a shutdown checkpoint.
continue;
}
let header = new_tar_header(&path, content.len() as u64)?;
self.ar
@@ -491,9 +462,8 @@ where
)))
});
// Last, add the pg_control file and bootstrap WAL segment.
self.add_pgcontrol_file(pg_control_bytes, system_identifier)
.await?;
// Generate pg_control and bootstrap WAL segment.
self.add_pgcontrol_file().await?;
self.ar
.finish()
.await
@@ -531,9 +501,13 @@ where
for blknum in startblk..endblk {
let img = self
.timeline
// TODO: investigate using get_vectored for the entire startblk..endblk range.
// But this code path is not on the critical path for most basebackups (?).
.get(rel_block_to_key(src, blknum), self.lsn, self.ctx)
.get_rel_page_at_lsn(
src,
blknum,
Version::Lsn(self.lsn),
self.ctx,
self.io_concurrency.clone(),
)
.await
.map_err(|e| BasebackupError::Server(e.into()))?;
segment_data.extend_from_slice(&img[..]);
@@ -701,11 +675,7 @@ where
// Add generated pg_control file and bootstrap WAL segment.
// Also send zenith.signal file with extra bootstrap data.
//
async fn add_pgcontrol_file(
&mut self,
pg_control_bytes: Bytes,
system_identifier: u64,
) -> Result<(), BasebackupError> {
async fn add_pgcontrol_file(&mut self) -> Result<(), BasebackupError> {
// add zenith.signal file
let mut zenith_signal = String::new();
if self.prev_record_lsn == Lsn(0) {
@@ -728,6 +698,24 @@ where
.await
.map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,zenith.signal"))?;
let checkpoint_bytes = self
.timeline
.get_checkpoint(self.lsn, self.ctx)
.await
.context("failed to get checkpoint bytes")?;
let pg_control_bytes = self
.timeline
.get_control_file(self.lsn, self.ctx)
.await
.context("failed get control bytes")?;
let (pg_control_bytes, system_identifier) = postgres_ffi::generate_pg_control(
&pg_control_bytes,
&checkpoint_bytes,
self.lsn,
self.timeline.pg_version,
)?;
//send pg_control
let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?;
self.ar

View File

@@ -134,7 +134,6 @@ fn main() -> anyhow::Result<()> {
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode");
info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol");
info!(?conf.validate_wal_contiguity, "starting with WAL contiguity validation");
info!(?conf.page_service_pipelining, "starting with page service pipelining config");
info!(?conf.get_vectored_concurrent_io, "starting with get_vectored IO concurrency config");

View File

@@ -197,10 +197,6 @@ pub struct PageServerConf {
/// Enable read path debugging. If enabled, read key errors will print a backtrace of the layer
/// files read.
pub enable_read_path_debugging: bool,
/// Interpreted protocol feature: if enabled, validate that the logical WAL received from
/// safekeepers does not have gaps.
pub validate_wal_contiguity: bool,
}
/// Token for authentication to safekeepers
@@ -364,7 +360,6 @@ impl PageServerConf {
page_service_pipelining,
get_vectored_concurrent_io,
enable_read_path_debugging,
validate_wal_contiguity,
} = config_toml;
let mut conf = PageServerConf {
@@ -451,7 +446,6 @@ impl PageServerConf {
virtual_file_io_mode: virtual_file_io_mode.unwrap_or(virtual_file::IoMode::preferred()),
no_sync: no_sync.unwrap_or(false),
enable_read_path_debugging: enable_read_path_debugging.unwrap_or(false),
validate_wal_contiguity: validate_wal_contiguity.unwrap_or(false),
};
// ------------------------------------------------------------

View File

@@ -98,7 +98,6 @@ pub struct RequestContext {
download_behavior: DownloadBehavior,
access_stats_behavior: AccessStatsBehavior,
page_content_kind: PageContentKind,
read_path_debug: bool,
}
/// The kind of access to the page cache.
@@ -156,7 +155,6 @@ impl RequestContextBuilder {
download_behavior: DownloadBehavior::Download,
access_stats_behavior: AccessStatsBehavior::Update,
page_content_kind: PageContentKind::Unknown,
read_path_debug: false,
},
}
}
@@ -170,7 +168,6 @@ impl RequestContextBuilder {
download_behavior: original.download_behavior,
access_stats_behavior: original.access_stats_behavior,
page_content_kind: original.page_content_kind,
read_path_debug: original.read_path_debug,
},
}
}
@@ -194,11 +191,6 @@ impl RequestContextBuilder {
self
}
pub(crate) fn read_path_debug(mut self, b: bool) -> Self {
self.inner.read_path_debug = b;
self
}
pub fn build(self) -> RequestContext {
self.inner
}
@@ -299,8 +291,4 @@ impl RequestContext {
pub(crate) fn page_content_kind(&self) -> PageContentKind {
self.page_content_kind
}
pub(crate) fn read_path_debug(&self) -> bool {
self.read_path_debug
}
}

View File

@@ -173,7 +173,6 @@ impl ControlPlaneGenerationsApi for ControllerUpcallClient {
listen_pg_port: m.postgres_port,
listen_http_addr: m.http_host,
listen_http_port: m.http_port,
listen_https_port: None, // TODO: Support https.
availability_zone_id: az_id.expect("Checked above"),
})
}

View File

@@ -824,38 +824,6 @@ paths:
schema:
$ref: "#/components/schemas/TenantConfigResponse"
/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers:
parameters:
- name: tenant_shard_id
in: path
required: true
schema:
type: string
- name: timeline_id
in: path
required: true
schema:
type: string
- name: concurrency
description: Maximum number of concurrent downloads (capped at remote storage concurrency)
in: query
required: false
schema:
type: integer
post:
description: |
Download all layers in the specified timeline's heatmap. The `tenant_shard_id` parameter
may be used to target all shards of a tenant when the unsharded form is used, or a specific
tenant shard with the sharded form.
responses:
"200":
description: Success
delete:
description: Stop any on-going background downloads of heatmap layers for the specified timeline.
responses:
"200":
description: Success
/v1/utilization:
get:
description: |
@@ -914,8 +882,6 @@ components:
properties:
reason:
type: string
gc_blocking:
type: string
TenantCreateRequest:
allOf:
@@ -1117,9 +1083,6 @@ components:
min_readable_lsn:
type: string
format: hex
latest_gc_cutoff_lsn:
type: string
format: hex
applied_gc_cutoff_lsn:
type: string
format: hex

View File

@@ -68,7 +68,6 @@ use tokio_util::sync::CancellationToken;
use tracing::*;
use crate::config::PageServerConf;
use crate::context::RequestContextBuilder;
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::DeletionQueueClient;
use crate::pgdatadir_mapping::LsnForTimestamp;
@@ -1464,59 +1463,6 @@ async fn timeline_layer_scan_disposable_keys(
)
}
async fn timeline_download_heatmap_layers_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
// Only used in the case where remote storage is not configured.
const DEFAULT_MAX_CONCURRENCY: usize = 100;
// A conservative default.
const DEFAULT_CONCURRENCY: usize = 16;
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let desired_concurrency =
parse_query_param(&request, "concurrency")?.unwrap_or(DEFAULT_CONCURRENCY);
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
let max_concurrency = get_config(&request)
.remote_storage_config
.as_ref()
.map(|c| c.concurrency_limit())
.unwrap_or(DEFAULT_MAX_CONCURRENCY);
let concurrency = std::cmp::min(max_concurrency, desired_concurrency);
timeline.start_heatmap_layers_download(concurrency).await?;
json_response(StatusCode::ACCEPTED, ())
}
async fn timeline_shutdown_download_heatmap_layers_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
timeline.stop_and_drain_heatmap_layers_download().await;
json_response(StatusCode::OK, ())
}
async fn layer_download_handler(
request: Request<Body>,
_cancel: CancellationToken,
@@ -2395,7 +2341,6 @@ async fn timeline_checkpoint_handler(
match e {
CompactionError::ShuttingDown => ApiError::ShuttingDown,
CompactionError::Offload(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
CompactionError::CollectKeySpaceError(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
CompactionError::Other(e) => ApiError::InternalServerError(e)
}
)?;
@@ -2573,30 +2518,14 @@ async fn deletion_queue_flush(
}
}
async fn getpage_at_lsn_handler(
request: Request<Body>,
cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
getpage_at_lsn_handler_inner(false, request, cancel).await
}
async fn touchpage_at_lsn_handler(
request: Request<Body>,
cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
getpage_at_lsn_handler_inner(true, request, cancel).await
}
/// Try if `GetPage@Lsn` is successful, useful for manual debugging.
async fn getpage_at_lsn_handler_inner(
touch: bool,
async fn getpage_at_lsn_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
// Require pageserver admin permission for this API instead of only tenant-level token.
check_permission(&request, None)?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
struct Key(pageserver_api::key::Key);
@@ -2611,29 +2540,22 @@ async fn getpage_at_lsn_handler_inner(
let key: Key = parse_query_param(&request, "key")?
.ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'key' query parameter")))?;
let lsn: Option<Lsn> = parse_query_param(&request, "lsn")?;
let lsn: Lsn = parse_query_param(&request, "lsn")?
.ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'lsn' query parameter")))?;
async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
// Enable read path debugging
let ctx = RequestContextBuilder::extend(&ctx).read_path_debug(true).build();
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
// Use last_record_lsn if no lsn is provided
let lsn = lsn.unwrap_or_else(|| timeline.get_last_record_lsn());
let page = timeline.get(key.0, lsn, &ctx).await?;
if touch {
json_response(StatusCode::OK, ())
} else {
Result::<_, ApiError>::Ok(
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/octet-stream")
.body(hyper::Body::from(page))
.unwrap(),
)
}
Result::<_, ApiError>::Ok(
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/octet-stream")
.body(hyper::Body::from(page))
.unwrap(),
)
}
.instrument(info_span!("timeline_get", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
.await
@@ -3704,14 +3626,6 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer",
|r| api_handler(r, layer_map_info_handler),
)
.post(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_heatmap_layers",
|r| api_handler(r, timeline_download_heatmap_layers_handler),
)
.delete(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_heatmap_layers",
|r| api_handler(r, timeline_shutdown_download_heatmap_layers_handler),
)
.get(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer/:layer_file_name",
|r| api_handler(r, layer_download_handler),
@@ -3768,10 +3682,6 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/getpage",
|r| testing_api_handler("getpage@lsn", r, getpage_at_lsn_handler),
)
.get(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/touchpage",
|r| api_handler(r, touchpage_at_lsn_handler),
)
.get(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/keyspace",
|r| api_handler(r, timeline_collect_keyspace),

View File

@@ -1,6 +1,5 @@
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::os::fd::RawFd;
use std::pin::Pin;
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex};
@@ -130,7 +129,7 @@ pub(crate) static LAYERS_PER_READ: Lazy<HistogramVec> = Lazy::new(|| {
"Layers visited to serve a single read (read amplification). In a batch, all visited layers count towards every read.",
&["tenant_id", "shard_id", "timeline_id"],
// Low resolution to reduce cardinality.
vec![4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0],
vec![1.0, 5.0, 10.0, 25.0, 50.0, 100.0],
)
.expect("failed to define a metric")
});
@@ -1440,66 +1439,27 @@ impl Drop for SmgrOpTimer {
}
impl SmgrOpFlushInProgress {
/// The caller must guarantee that `socket_fd`` outlives this function.
pub(crate) async fn measure<Fut, O>(
self,
started_at: Instant,
mut fut: Fut,
socket_fd: RawFd,
) -> O
pub(crate) async fn measure<Fut, O>(self, mut started_at: Instant, mut fut: Fut) -> O
where
Fut: std::future::Future<Output = O>,
{
let mut fut = std::pin::pin!(fut);
let mut logged = false;
let mut last_counter_increment_at = started_at;
// Whenever observe_guard gets called, or dropped,
// it adds the time elapsed since its last call to metrics.
// Last call is tracked in `now`.
let mut observe_guard = scopeguard::guard(
|is_timeout| {
|| {
let now = Instant::now();
// Increment counter
{
let elapsed_since_last_observe = now - last_counter_increment_at;
self.global_micros
.inc_by(u64::try_from(elapsed_since_last_observe.as_micros()).unwrap());
self.per_timeline_micros
.inc_by(u64::try_from(elapsed_since_last_observe.as_micros()).unwrap());
last_counter_increment_at = now;
}
// Log something on every timeout, and on completion but only if we hit a timeout.
if is_timeout || logged {
logged = true;
let elapsed_total = now - started_at;
let msg = if is_timeout {
"slow flush ongoing"
} else {
"slow flush completed or cancelled"
};
let (inq, outq) = {
// SAFETY: caller guarantees that `socket_fd` outlives this function.
#[cfg(target_os = "linux")]
unsafe {
(
utils::linux_socket_ioctl::inq(socket_fd).unwrap_or(-2),
utils::linux_socket_ioctl::outq(socket_fd).unwrap_or(-2),
)
}
#[cfg(not(target_os = "linux"))]
{
_ = socket_fd; // appease unused lint on macOS
(-1, -1)
}
};
let elapsed_total_secs = format!("{:.6}", elapsed_total.as_secs_f64());
tracing::info!(elapsed_total_secs, inq, outq, msg);
}
let elapsed = now - started_at;
self.global_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
self.per_timeline_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
started_at = now;
},
|mut observe| {
observe(false);
observe();
},
);
@@ -1507,7 +1467,7 @@ impl SmgrOpFlushInProgress {
match tokio::time::timeout(Duration::from_secs(10), &mut fut).await {
Ok(v) => return v,
Err(_timeout) => {
(*observe_guard)(true);
(*observe_guard)();
}
}
}

View File

@@ -34,13 +34,11 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::SystemTime;
use std::time::{Duration, Instant};
use strum_macros::IntoStaticStr;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::logging::warn_slow;
use utils::sync::gate::{Gate, GateGuard};
use utils::sync::spsc_fold;
use utils::{
@@ -75,7 +73,6 @@ use pageserver_api::models::PageTraceEvent;
use pageserver_api::reltag::SlruKind;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;
use std::os::fd::AsRawFd;
/// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::Tenant`] which
/// is not yet in state [`TenantState::Active`].
@@ -83,9 +80,6 @@ use std::os::fd::AsRawFd;
/// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
/// Threshold at which to log a warning about slow GetPage requests.
const WARN_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
///////////////////////////////////////////////////////////////////////////////
pub struct Listener {
@@ -242,7 +236,7 @@ pub async fn libpq_listener_main(
type ConnectionHandlerResult = anyhow::Result<()>;
#[instrument(skip_all, fields(peer_addr, application_name))]
#[instrument(skip_all, fields(peer_addr))]
#[allow(clippy::too_many_arguments)]
async fn page_service_conn_main(
conf: &'static PageServerConf,
@@ -263,8 +257,6 @@ async fn page_service_conn_main(
.set_nodelay(true)
.context("could not set TCP_NODELAY")?;
let socket_fd = socket.as_raw_fd();
let peer_addr = socket.peer_addr().context("get peer address")?;
tracing::Span::current().record("peer_addr", field::display(peer_addr));
@@ -313,7 +305,7 @@ async fn page_service_conn_main(
cancel.clone(),
gate_guard,
);
let pgbackend = PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, None)?;
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
match pgbackend.run(&mut conn_handler, &cancel).await {
Ok(()) => {
@@ -599,7 +591,6 @@ struct BatchedTestRequest {
/// NB: we only hold [`timeline::handle::WeakHandle`] inside this enum,
/// so that we don't keep the [`Timeline::gate`] open while the batch
/// is being built up inside the [`spsc_fold`] (pagestream pipelining).
#[derive(IntoStaticStr)]
enum BatchedFeMessage {
Exists {
span: Span,
@@ -644,10 +635,6 @@ enum BatchedFeMessage {
}
impl BatchedFeMessage {
fn as_static_str(&self) -> &'static str {
self.into()
}
fn observe_execution_start(&mut self, at: Instant) {
match self {
BatchedFeMessage::Exists { timer, .. }
@@ -1299,15 +1286,12 @@ impl PageServerHandler {
))?;
// what we want to do
let socket_fd = pgb_writer.socket_fd;
let flush_fut = pgb_writer.flush();
// metric for how long flushing takes
let flush_fut = match flushing_timer {
Some(flushing_timer) => futures::future::Either::Left(flushing_timer.measure(
Instant::now(),
flush_fut,
socket_fd,
)),
Some(flushing_timer) => {
futures::future::Either::Left(flushing_timer.measure(Instant::now(), flush_fut))
}
None => futures::future::Either::Right(flush_fut),
};
// do it while respecting cancellation
@@ -1473,20 +1457,17 @@ impl PageServerHandler {
}
};
let result = warn_slow(
msg.as_static_str(),
WARN_SLOW_GETPAGE_THRESHOLD,
self.pagesteam_handle_batched_message(
let err = self
.pagesteam_handle_batched_message(
pgb_writer,
msg,
io_concurrency.clone(),
&cancel,
protocol_version,
ctx,
),
)
.await;
match result {
)
.await;
match err {
Ok(()) => {}
Err(e) => break e,
}
@@ -1649,17 +1630,13 @@ impl PageServerHandler {
return Err(e);
}
};
warn_slow(
batch.as_static_str(),
WARN_SLOW_GETPAGE_THRESHOLD,
self.pagesteam_handle_batched_message(
pgb_writer,
batch,
io_concurrency.clone(),
&cancel,
protocol_version,
&ctx,
),
self.pagesteam_handle_batched_message(
pgb_writer,
batch,
io_concurrency.clone(),
&cancel,
protocol_version,
&ctx,
)
.await?;
}
@@ -1816,13 +1793,6 @@ impl PageServerHandler {
.as_millis()
.to_string()
});
info!(
"acquired lease for {} until {}",
lsn,
valid_until_str.as_deref().unwrap_or("<unknown>")
);
let bytes = valid_until_str.as_ref().map(|x| x.as_bytes());
pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
@@ -2487,16 +2457,9 @@ where
fn startup(
&mut self,
_pgb: &mut PostgresBackend<IO>,
sm: &FeStartupPacket,
_sm: &FeStartupPacket,
) -> Result<(), QueryError> {
fail::fail_point!("ps::connection-start::startup-packet");
if let FeStartupPacket::StartupMessage { params, .. } = sm {
if let Some(app_name) = params.get("application_name") {
Span::current().record("application_name", field::display(app_name));
}
};
Ok(())
}

View File

@@ -23,14 +23,13 @@ use anyhow::{ensure, Context};
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
use itertools::Itertools;
use pageserver_api::key::Key;
use pageserver_api::key::{
dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
rel_tag_sparse_key_range, relmap_file_key, repl_origin_key, repl_origin_key_range,
slru_block_to_key, slru_dir_to_key, slru_segment_key_range, slru_segment_size_to_key,
twophase_file_key, twophase_key_range, CompactKey, RelDirExists, AUX_FILES_KEY, CHECKPOINT_KEY,
CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
};
use pageserver_api::key::{rel_tag_sparse_key, Key};
use pageserver_api::keyspace::SparseKeySpace;
use pageserver_api::record::NeonWalRecord;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
@@ -45,7 +44,7 @@ use std::ops::ControlFlow;
use std::ops::Range;
use strum::IntoEnumIterator;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, trace, warn};
use tracing::{debug, trace, warn};
use utils::bin_ser::DeserializeError;
use utils::pausable_failpoint;
use utils::{bin_ser::BeSer, lsn::Lsn};
@@ -491,33 +490,12 @@ impl Timeline {
if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
return Ok(false);
}
// Read path: first read the new reldir keyspace. Early return if the relation exists.
// Otherwise, read the old reldir keyspace.
// TODO: if IndexPart::rel_size_migration is `Migrated`, we only need to read from v2.
if self.get_rel_size_v2_enabled() {
// fetch directory listing (new)
let key = rel_tag_sparse_key(tag.spcnode, tag.dbnode, tag.relnode, tag.forknum);
let buf = RelDirExists::decode_option(version.sparse_get(self, key, ctx).await?)
.map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
let exists_v2 = buf == RelDirExists::Exists;
// Fast path: if the relation exists in the new format, return true.
// TODO: we should have a verification mode that checks both keyspaces
// to ensure the relation only exists in one of them.
if exists_v2 {
return Ok(true);
}
}
// fetch directory listing (old)
// fetch directory listing
let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
let buf = version.get(self, key, ctx).await?;
let dir = RelDirectory::des(&buf)?;
let exists_v1 = dir.rels.contains(&(tag.relnode, tag.forknum));
Ok(exists_v1)
Ok(dir.rels.contains(&(tag.relnode, tag.forknum)))
}
/// Get a list of all existing relations in given tablespace and database.
@@ -535,12 +513,12 @@ impl Timeline {
version: Version<'_>,
ctx: &RequestContext,
) -> Result<HashSet<RelTag>, PageReconstructError> {
// fetch directory listing (old)
// fetch directory listing
let key = rel_dir_to_key(spcnode, dbnode);
let buf = version.get(self, key, ctx).await?;
let dir = RelDirectory::des(&buf)?;
let rels_v1: HashSet<RelTag> =
let rels: HashSet<RelTag> =
HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
spcnode,
dbnode,
@@ -548,46 +526,6 @@ impl Timeline {
forknum: *forknum,
}));
if !self.get_rel_size_v2_enabled() {
return Ok(rels_v1);
}
// scan directory listing (new), merge with the old results
let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
let io_concurrency = IoConcurrency::spawn_from_conf(
self.conf,
self.gate
.enter()
.map_err(|_| PageReconstructError::Cancelled)?,
);
let results = self
.scan(
KeySpace::single(key_range),
version.get_lsn(),
ctx,
io_concurrency,
)
.await?;
let mut rels = rels_v1;
for (key, val) in results {
let val = RelDirExists::decode(&val?)
.map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
assert_eq!(key.field6, 1);
assert_eq!(key.field2, spcnode);
assert_eq!(key.field3, dbnode);
let tag = RelTag {
spcnode,
dbnode,
relnode: key.field4,
forknum: key.field5,
};
if val == RelDirExists::Removed {
debug_assert!(!rels.contains(&tag), "removed reltag in v2");
continue;
}
let did_not_contain = rels.insert(tag);
debug_assert!(did_not_contain, "duplicate reltag in v2");
}
Ok(rels)
}
@@ -1206,11 +1144,7 @@ impl Timeline {
let dense_keyspace = result.to_keyspace();
let sparse_keyspace = SparseKeySpace(KeySpace {
ranges: vec![
Key::metadata_aux_key_range(),
repl_origin_key_range(),
Key::rel_dir_sparse_key_range(),
],
ranges: vec![Key::metadata_aux_key_range(), repl_origin_key_range()],
});
if cfg!(debug_assertions) {
@@ -1340,22 +1274,12 @@ pub struct DatadirModification<'a> {
/// For special "directory" keys that store key-value maps, track the size of the map
/// if it was updated in this modification.
pending_directory_entries: Vec<(DirectoryKind, MetricsUpdate)>,
pending_directory_entries: Vec<(DirectoryKind, usize)>,
/// An **approximation** of how many metadata bytes will be written to the EphemeralFile.
pending_metadata_bytes: usize,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MetricsUpdate {
/// Set the metrics to this value
Set(u64),
/// Increment the metrics by this value
Add(u64),
/// Decrement the metrics by this value
Sub(u64),
}
impl DatadirModification<'_> {
// When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can
// contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
@@ -1435,8 +1359,7 @@ impl DatadirModification<'_> {
let buf = DbDirectory::ser(&DbDirectory {
dbdirs: HashMap::new(),
})?;
self.pending_directory_entries
.push((DirectoryKind::Db, MetricsUpdate::Set(0)));
self.pending_directory_entries.push((DirectoryKind::Db, 0));
self.put(DBDIR_KEY, Value::Image(buf.into()));
let buf = if self.tline.pg_version >= 17 {
@@ -1449,7 +1372,7 @@ impl DatadirModification<'_> {
})
}?;
self.pending_directory_entries
.push((DirectoryKind::TwoPhase, MetricsUpdate::Set(0)));
.push((DirectoryKind::TwoPhase, 0));
self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
@@ -1459,23 +1382,17 @@ impl DatadirModification<'_> {
// harmless but they'd just be dropped on later compaction.
if self.tline.tenant_shard_id.is_shard_zero() {
self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
self.pending_directory_entries.push((
DirectoryKind::SlruSegment(SlruKind::Clog),
MetricsUpdate::Set(0),
));
self.pending_directory_entries
.push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
self.put(
slru_dir_to_key(SlruKind::MultiXactMembers),
empty_dir.clone(),
);
self.pending_directory_entries.push((
DirectoryKind::SlruSegment(SlruKind::Clog),
MetricsUpdate::Set(0),
));
self.pending_directory_entries
.push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
self.pending_directory_entries.push((
DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets),
MetricsUpdate::Set(0),
));
self.pending_directory_entries
.push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));
}
Ok(())
@@ -1741,16 +1658,10 @@ impl DatadirModification<'_> {
}
if r.is_none() {
// Create RelDirectory
// TODO: if we have fully migrated to v2, no need to create this directory
let buf = RelDirectory::ser(&RelDirectory {
rels: HashSet::new(),
})?;
self.pending_directory_entries
.push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
if self.tline.get_rel_size_v2_enabled() {
self.pending_directory_entries
.push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
}
self.pending_directory_entries.push((DirectoryKind::Rel, 0));
self.put(
rel_dir_to_key(spcnode, dbnode),
Value::Image(Bytes::from(buf)),
@@ -1774,10 +1685,8 @@ impl DatadirModification<'_> {
if !dir.xids.insert(xid) {
anyhow::bail!("twophase file for xid {} already exists", xid);
}
self.pending_directory_entries.push((
DirectoryKind::TwoPhase,
MetricsUpdate::Set(dir.xids.len() as u64),
));
self.pending_directory_entries
.push((DirectoryKind::TwoPhase, dir.xids.len()));
Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
} else {
let xid = xid as u32;
@@ -1785,10 +1694,8 @@ impl DatadirModification<'_> {
if !dir.xids.insert(xid) {
anyhow::bail!("twophase file for xid {} already exists", xid);
}
self.pending_directory_entries.push((
DirectoryKind::TwoPhase,
MetricsUpdate::Set(dir.xids.len() as u64),
));
self.pending_directory_entries
.push((DirectoryKind::TwoPhase, dir.xids.len()));
Bytes::from(TwoPhaseDirectory::ser(&dir)?)
};
self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
@@ -1837,10 +1744,8 @@ impl DatadirModification<'_> {
let mut dir = DbDirectory::des(&buf)?;
if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
let buf = DbDirectory::ser(&dir)?;
self.pending_directory_entries.push((
DirectoryKind::Db,
MetricsUpdate::Set(dir.dbdirs.len() as u64),
));
self.pending_directory_entries
.push((DirectoryKind::Db, dir.dbdirs.len()));
self.put(DBDIR_KEY, Value::Image(buf.into()));
} else {
warn!(
@@ -1873,85 +1778,39 @@ impl DatadirModification<'_> {
// tablespace. Create the reldir entry for it if so.
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
.context("deserialize db")?;
let dbdir_exists =
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
let mut rel_dir =
if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
// Didn't exist. Update dbdir
e.insert(false);
let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
self.pending_directory_entries.push((
DirectoryKind::Db,
MetricsUpdate::Set(dbdir.dbdirs.len() as u64),
));
self.pending_directory_entries
.push((DirectoryKind::Db, dbdir.dbdirs.len()));
self.put(DBDIR_KEY, Value::Image(buf.into()));
false
} else {
true
};
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
let mut rel_dir = if !dbdir_exists {
// Create the RelDirectory
RelDirectory::default()
} else {
// reldir already exists, fetch it
RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
.context("deserialize db")?
};
// and create the RelDirectory
RelDirectory::default()
} else {
// reldir already exists, fetch it
RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
.context("deserialize db")?
};
// Add the new relation to the rel directory entry, and write it back
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
return Err(RelationError::AlreadyExists);
}
if self.tline.get_rel_size_v2_enabled() {
let sparse_rel_dir_key =
rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
// check if the rel_dir_key exists in v2
let val = self
.sparse_get(sparse_rel_dir_key, ctx)
.await
.map_err(|e| RelationError::Other(e.into()))?;
let val = RelDirExists::decode_option(val)
.map_err(|_| RelationError::Other(anyhow::anyhow!("invalid reldir key")))?;
if val == RelDirExists::Exists {
return Err(RelationError::AlreadyExists);
}
self.put(
sparse_rel_dir_key,
Value::Image(RelDirExists::Exists.encode()),
);
if !dbdir_exists {
self.pending_directory_entries
.push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
self.pending_directory_entries
.push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
// We don't write `rel_dir_key -> rel_dir.rels` back to the storage in the v2 path unless it's the initial creation.
// TODO: if we have fully migrated to v2, no need to create this directory. Otherwise, there
// will be key not found errors if we don't create an empty one for rel_size_v2.
self.put(
rel_dir_key,
Value::Image(Bytes::from(
RelDirectory::ser(&RelDirectory::default()).context("serialize")?,
)),
);
}
self.pending_directory_entries
.push((DirectoryKind::RelV2, MetricsUpdate::Add(1)));
} else {
if !dbdir_exists {
self.pending_directory_entries
.push((DirectoryKind::Rel, MetricsUpdate::Set(0)))
}
self.pending_directory_entries
.push((DirectoryKind::Rel, MetricsUpdate::Add(1)));
self.put(
rel_dir_key,
Value::Image(Bytes::from(
RelDirectory::ser(&rel_dir).context("serialize")?,
)),
);
}
self.pending_directory_entries
.push((DirectoryKind::Rel, rel_dir.rels.len()));
self.put(
rel_dir_key,
Value::Image(Bytes::from(
RelDirectory::ser(&rel_dir).context("serialize")?,
)),
);
// Put size
let size_key = rel_size_to_key(rel);
let buf = nblocks.to_le_bytes();
@@ -2037,34 +1896,9 @@ impl DatadirModification<'_> {
let mut dirty = false;
for rel_tag in rel_tags {
let found = if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
self.pending_directory_entries
.push((DirectoryKind::Rel, MetricsUpdate::Sub(1)));
if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
dirty = true;
true
} else if self.tline.get_rel_size_v2_enabled() {
// The rel is not found in the old reldir key, so we need to check the new sparse keyspace.
// Note that a relation can only exist in one of the two keyspaces (guaranteed by the ingestion
// logic).
let key =
rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum);
let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?)
.map_err(|_| RelationError::Other(anyhow::anyhow!("invalid reldir key")))?;
if val == RelDirExists::Exists {
self.pending_directory_entries
.push((DirectoryKind::RelV2, MetricsUpdate::Sub(1)));
// put tombstone
self.put(key, Value::Image(RelDirExists::Removed.encode()));
// no need to set dirty to true
true
} else {
false
}
} else {
false
};
if found {
// update logical size
let size_key = rel_size_to_key(rel_tag);
let old_size = self.get(size_key, ctx).await?.get_u32_le();
@@ -2080,6 +1914,8 @@ impl DatadirModification<'_> {
if dirty {
self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
self.pending_directory_entries
.push((DirectoryKind::Rel, dir.rels.len()));
}
}
@@ -2103,10 +1939,8 @@ impl DatadirModification<'_> {
if !dir.segments.insert(segno) {
anyhow::bail!("slru segment {kind:?}/{segno} already exists");
}
self.pending_directory_entries.push((
DirectoryKind::SlruSegment(kind),
MetricsUpdate::Set(dir.segments.len() as u64),
));
self.pending_directory_entries
.push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
self.put(
dir_key,
Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
@@ -2153,10 +1987,8 @@ impl DatadirModification<'_> {
if !dir.segments.remove(&segno) {
warn!("slru segment {:?}/{} does not exist", kind, segno);
}
self.pending_directory_entries.push((
DirectoryKind::SlruSegment(kind),
MetricsUpdate::Set(dir.segments.len() as u64),
));
self.pending_directory_entries
.push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
self.put(
dir_key,
Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
@@ -2188,10 +2020,8 @@ impl DatadirModification<'_> {
if !dir.xids.remove(&xid) {
warn!("twophase file for xid {} does not exist", xid);
}
self.pending_directory_entries.push((
DirectoryKind::TwoPhase,
MetricsUpdate::Set(dir.xids.len() as u64),
));
self.pending_directory_entries
.push((DirectoryKind::TwoPhase, dir.xids.len()));
Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
} else {
let xid: u32 = u32::try_from(xid)?;
@@ -2200,10 +2030,8 @@ impl DatadirModification<'_> {
if !dir.xids.remove(&xid) {
warn!("twophase file for xid {} does not exist", xid);
}
self.pending_directory_entries.push((
DirectoryKind::TwoPhase,
MetricsUpdate::Set(dir.xids.len() as u64),
));
self.pending_directory_entries
.push((DirectoryKind::TwoPhase, dir.xids.len()));
Bytes::from(TwoPhaseDirectory::ser(&dir)?)
};
self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
@@ -2264,13 +2092,6 @@ impl DatadirModification<'_> {
self.tline.aux_file_size_estimator.on_add(content.len());
new_files.push((path, content));
}
// Compute may request delete of old version of pgstat AUX file if new one exceeds size limit.
// Compute doesn't know if previous version of this file exists or not, so
// attempt to delete non-existing file can cause this message.
// To avoid false alarms, log it as info rather than warning.
(None, true) if path.starts_with("pg_stat/") => {
info!("removing non-existing pg_stat file: {}", path)
}
(None, true) => warn!("removing non-existing aux file: {}", path),
}
let new_val = aux_file::encode_file_value(&new_files)?;
@@ -2326,7 +2147,7 @@ impl DatadirModification<'_> {
}
for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
writer.update_directory_entries_count(kind, count);
writer.update_directory_entries_count(kind, count as u64);
}
Ok(())
@@ -2412,7 +2233,7 @@ impl DatadirModification<'_> {
}
for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
writer.update_directory_entries_count(kind, count);
writer.update_directory_entries_count(kind, count as u64);
}
self.pending_metadata_bytes = 0;
@@ -2476,22 +2297,6 @@ impl DatadirModification<'_> {
self.tline.get(key, lsn, ctx).await
}
/// Get a key from the sparse keyspace. Automatically converts the missing key error
/// and the empty value into None.
async fn sparse_get(
&self,
key: Key,
ctx: &RequestContext,
) -> Result<Option<Bytes>, PageReconstructError> {
let val = self.get(key, ctx).await;
match val {
Ok(val) if val.is_empty() => Ok(None),
Ok(val) => Ok(Some(val)),
Err(PageReconstructError::MissingKey(_)) => Ok(None),
Err(e) => Err(e),
}
}
fn put(&mut self, key: Key, val: Value) {
if Self::is_data_key(&key) {
self.put_data(key.to_compact(), val)
@@ -2574,23 +2379,6 @@ impl Version<'_> {
}
}
/// Get a key from the sparse keyspace. Automatically converts the missing key error
/// and the empty value into None.
async fn sparse_get(
&self,
timeline: &Timeline,
key: Key,
ctx: &RequestContext,
) -> Result<Option<Bytes>, PageReconstructError> {
let val = self.get(timeline, key, ctx).await;
match val {
Ok(val) if val.is_empty() => Ok(None),
Ok(val) => Ok(Some(val)),
Err(PageReconstructError::MissingKey(_)) => Ok(None),
Err(e) => Err(e),
}
}
fn get_lsn(&self) -> Lsn {
match self {
Version::Lsn(lsn) => *lsn,
@@ -2650,7 +2438,6 @@ pub(crate) enum DirectoryKind {
Rel,
AuxFiles,
SlruSegment(SlruKind),
RelV2,
}
impl DirectoryKind {

View File

@@ -3101,9 +3101,6 @@ impl Tenant {
if let Some(queue) = queue {
outcome = queue
.iteration(cancel, ctx, &self.gc_block, &timeline)
.instrument(
info_span!("gc_compact_timeline", timeline_id = %timeline.timeline_id),
)
.await?;
}
}
@@ -3150,12 +3147,6 @@ impl Tenant {
// Offload failures don't trip the circuit breaker, since they're cheap to retry and
// shouldn't block compaction.
CompactionError::Offload(_) => {}
CompactionError::CollectKeySpaceError(err) => {
self.compaction_circuit_breaker
.lock()
.unwrap()
.fail(&CIRCUIT_BREAKERS_BROKEN, err);
}
CompactionError::Other(err) => {
self.compaction_circuit_breaker
.lock()
@@ -3933,13 +3924,6 @@ impl Tenant {
.unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
}
pub fn get_rel_size_v2_enabled(&self) -> bool {
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
tenant_conf
.rel_size_v2_enabled
.unwrap_or(self.conf.default_tenant_conf.rel_size_v2_enabled)
}
pub fn get_compaction_upper_limit(&self) -> usize {
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
tenant_conf
@@ -5656,7 +5640,7 @@ pub(crate) mod harness {
lsn_lease_length_for_ts: Some(tenant_conf.lsn_lease_length_for_ts),
timeline_offloading: Some(tenant_conf.timeline_offloading),
wal_receiver_protocol_override: tenant_conf.wal_receiver_protocol_override,
rel_size_v2_enabled: Some(tenant_conf.rel_size_v2_enabled),
rel_size_v2_enabled: tenant_conf.rel_size_v2_enabled,
gc_compaction_enabled: Some(tenant_conf.gc_compaction_enabled),
gc_compaction_initial_threshold_kb: Some(
tenant_conf.gc_compaction_initial_threshold_kb,
@@ -7855,6 +7839,18 @@ mod tests {
}
tline.freeze_and_flush().await?;
// Force layers to L1
tline
.compact(
&cancel,
{
let mut flags = EnumSet::new();
flags.insert(CompactFlags::ForceL0Compaction);
flags
},
&ctx,
)
.await?;
if iter % 5 == 0 {
let (_, before_delta_file_accessed) =
@@ -7867,6 +7863,7 @@ mod tests {
let mut flags = EnumSet::new();
flags.insert(CompactFlags::ForceImageLayerCreation);
flags.insert(CompactFlags::ForceRepartition);
flags.insert(CompactFlags::ForceL0Compaction);
flags
},
&ctx,
@@ -8313,6 +8310,8 @@ mod tests {
let cancel = CancellationToken::new();
// Image layer creation happens on the disk_consistent_lsn so we need to force set it now.
tline.force_set_disk_consistent_lsn(Lsn(0x40));
tline
.compact(
&cancel,
@@ -8326,8 +8325,7 @@ mod tests {
)
.await
.unwrap();
// Image layers are created at last_record_lsn
// Image layers are created at repartition LSN
let images = tline
.inspect_image_layers(Lsn(0x40), &ctx, io_concurrency.clone())
.await

View File

@@ -485,9 +485,7 @@ impl TenantConfOpt {
wal_receiver_protocol_override: self
.wal_receiver_protocol_override
.or(global_conf.wal_receiver_protocol_override),
rel_size_v2_enabled: self
.rel_size_v2_enabled
.unwrap_or(global_conf.rel_size_v2_enabled),
rel_size_v2_enabled: self.rel_size_v2_enabled.or(global_conf.rel_size_v2_enabled),
gc_compaction_enabled: self
.gc_compaction_enabled
.unwrap_or(global_conf.gc_compaction_enabled),

View File

@@ -51,7 +51,8 @@ use camino::{Utf8Path, Utf8PathBuf};
use futures::StreamExt;
use itertools::Itertools;
use pageserver_api::config::MaxVectoredReadBytes;
use pageserver_api::key::{Key, DBDIR_KEY, KEY_SIZE};
use pageserver_api::key::DBDIR_KEY;
use pageserver_api::key::{Key, KEY_SIZE};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::ImageCompressionAlgorithm;
use pageserver_api::shard::TenantShardId;
@@ -966,10 +967,7 @@ impl DeltaLayerInner {
.as_slice()
.iter()
.filter_map(|(_, blob_meta)| {
if blob_meta.key.is_rel_dir_key()
|| blob_meta.key == DBDIR_KEY
|| blob_meta.key.is_aux_file_key()
{
if blob_meta.key.is_rel_dir_key() || blob_meta.key == DBDIR_KEY {
// The size of values for these keys is unbounded and can
// grow very large in pathological cases.
None

View File

@@ -48,7 +48,8 @@ use camino::{Utf8Path, Utf8PathBuf};
use hex;
use itertools::Itertools;
use pageserver_api::config::MaxVectoredReadBytes;
use pageserver_api::key::{Key, DBDIR_KEY, KEY_SIZE};
use pageserver_api::key::DBDIR_KEY;
use pageserver_api::key::{Key, KEY_SIZE};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use pageserver_api::value::Value;
@@ -602,10 +603,7 @@ impl ImageLayerInner {
.as_slice()
.iter()
.filter_map(|(_, blob_meta)| {
if blob_meta.key.is_rel_dir_key()
|| blob_meta.key == DBDIR_KEY
|| blob_meta.key.is_aux_file_key()
{
if blob_meta.key.is_rel_dir_key() || blob_meta.key == DBDIR_KEY {
// The size of values for these keys is unbounded and can
// grow very large in pathological cases.
None

View File

@@ -287,7 +287,6 @@ fn log_compaction_error(
sleep_duration: Duration,
task_cancelled: bool,
) {
use crate::pgdatadir_mapping::CollectKeySpaceError;
use crate::tenant::upload_queue::NotInitialized;
use crate::tenant::PageReconstructError;
use CompactionError::*;
@@ -295,8 +294,6 @@ fn log_compaction_error(
let level = match err {
ShuttingDown => return,
Offload(_) => Level::ERROR,
CollectKeySpaceError(CollectKeySpaceError::Cancelled) => Level::INFO,
CollectKeySpaceError(_) => Level::ERROR,
_ if task_cancelled => Level::INFO,
Other(err) => {
let root_cause = err.root_cause();

View File

@@ -4,7 +4,6 @@ pub mod delete;
pub(crate) mod detach_ancestor;
mod eviction_task;
pub(crate) mod handle;
mod heatmap_layers_downloader;
pub(crate) mod import_pgdata;
mod init;
pub mod layer_manager;
@@ -22,7 +21,6 @@ use chrono::{DateTime, Utc};
use compaction::CompactionOutcome;
use enumset::EnumSet;
use fail::fail_point;
use futures::FutureExt;
use futures::{stream::FuturesUnordered, StreamExt};
use handle::ShardTimelineId;
use layer_manager::Shutdown;
@@ -119,7 +117,7 @@ use pageserver_api::config::tenant_conf_defaults::DEFAULT_PITR_INTERVAL;
use crate::config::PageServerConf;
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::metrics::{TimelineMetrics, DELTAS_PER_READ_GLOBAL, LAYERS_PER_READ_GLOBAL};
use crate::pgdatadir_mapping::{CalculateLogicalSizeError, MetricsUpdate};
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
use crate::tenant::config::TenantConfOpt;
use pageserver_api::reltag::RelTag;
use pageserver_api::shard::ShardIndex;
@@ -329,7 +327,6 @@ pub struct Timeline {
// in `crate::page_service` writes these metrics.
pub(crate) query_metrics: crate::metrics::SmgrQueryTimePerTimeline,
directory_metrics_inited: [AtomicBool; DirectoryKind::KINDS_NUM],
directory_metrics: [AtomicU64; DirectoryKind::KINDS_NUM],
/// Ensures layers aren't frozen by checkpointer between
@@ -469,10 +466,6 @@ pub struct Timeline {
pub(crate) page_trace: ArcSwapOption<Sender<PageTraceEvent>>,
previous_heatmap: ArcSwapOption<PreviousHeatmap>,
/// May host a background Tokio task which downloads all the layers from the current
/// heatmap on demand.
heatmap_layers_downloader: Mutex<Option<heatmap_layers_downloader::HeatmapLayersDownloader>>,
}
pub(crate) enum PreviousHeatmap {
@@ -1299,7 +1292,7 @@ impl Timeline {
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
let read_path = if self.conf.enable_read_path_debugging || ctx.read_path_debug() {
let read_path = if self.conf.enable_read_path_debugging {
Some(ReadPath::new(keyspace.clone(), lsn))
} else {
None
@@ -1882,7 +1875,7 @@ impl Timeline {
// Signal compaction failure to avoid L0 flush stalls when it's broken.
match result {
Ok(_) => self.compaction_failed.store(false, AtomicOrdering::Relaxed),
Err(CompactionError::Other(_)) | Err(CompactionError::CollectKeySpaceError(_)) => {
Err(CompactionError::Other(_)) => {
self.compaction_failed.store(true, AtomicOrdering::Relaxed)
}
// Don't change the current value on offload failure or shutdown. We don't want to
@@ -2045,11 +2038,6 @@ impl Timeline {
tracing::debug!("Cancelling CancellationToken");
self.cancel.cancel();
// If we have a background task downloading heatmap layers stop it.
// The background downloads are sensitive to timeline cancellation (done above),
// so the drain will be immediate.
self.stop_and_drain_heatmap_layers_download().await;
// Ensure Prevent new page service requests from starting.
self.handles.shutdown();
@@ -2367,14 +2355,6 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
}
pub(crate) fn get_rel_size_v2_enabled(&self) -> bool {
let tenant_conf = self.tenant_conf.load();
tenant_conf
.tenant_conf
.rel_size_v2_enabled
.unwrap_or(self.conf.default_tenant_conf.rel_size_v2_enabled)
}
fn get_compaction_upper_limit(&self) -> usize {
let tenant_conf = self.tenant_conf.load();
tenant_conf
@@ -2684,7 +2664,6 @@ impl Timeline {
),
directory_metrics: array::from_fn(|_| AtomicU64::new(0)),
directory_metrics_inited: array::from_fn(|_| AtomicBool::new(false)),
flush_loop_state: Mutex::new(FlushLoopState::NotStarted),
@@ -2763,8 +2742,6 @@ impl Timeline {
page_trace: Default::default(),
previous_heatmap: ArcSwapOption::from_pointee(previous_heatmap),
heatmap_layers_downloader: Mutex::new(None),
};
result.repartition_threshold =
@@ -2874,7 +2851,6 @@ impl Timeline {
auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
availability_zone: self.conf.availability_zone.clone(),
ingest_batch_size: self.conf.ingest_batch_size,
validate_wal_contiguity: self.conf.validate_wal_contiguity,
},
broker_client,
ctx,
@@ -3454,42 +3430,8 @@ impl Timeline {
}
}
pub(crate) fn update_directory_entries_count(&self, kind: DirectoryKind, count: MetricsUpdate) {
// TODO: this directory metrics is not correct -- we could have multiple reldirs in the system
// for each of the database, but we only store one value, and therefore each pgdirmodification
// would overwrite the previous value if they modify different databases.
match count {
MetricsUpdate::Set(count) => {
self.directory_metrics[kind.offset()].store(count, AtomicOrdering::Relaxed);
self.directory_metrics_inited[kind.offset()].store(true, AtomicOrdering::Relaxed);
}
MetricsUpdate::Add(count) => {
// TODO: these operations are not atomic; but we only have one writer to the metrics, so
// it's fine.
if self.directory_metrics_inited[kind.offset()].load(AtomicOrdering::Relaxed) {
// The metrics has been initialized with `MetricsUpdate::Set` before, so we can add/sub
// the value reliably.
self.directory_metrics[kind.offset()].fetch_add(count, AtomicOrdering::Relaxed);
}
// Otherwise, ignore this update
}
MetricsUpdate::Sub(count) => {
// TODO: these operations are not atomic; but we only have one writer to the metrics, so
// it's fine.
if self.directory_metrics_inited[kind.offset()].load(AtomicOrdering::Relaxed) {
// The metrics has been initialized with `MetricsUpdate::Set` before.
// The operation could overflow so we need to normalize the value.
let prev_val =
self.directory_metrics[kind.offset()].load(AtomicOrdering::Relaxed);
let res = prev_val.saturating_sub(count);
self.directory_metrics[kind.offset()].store(res, AtomicOrdering::Relaxed);
}
// Otherwise, ignore this update
}
};
// TODO: remove this, there's no place in the code that updates this aux metrics.
pub(crate) fn update_directory_entries_count(&self, kind: DirectoryKind, count: u64) {
self.directory_metrics[kind.offset()].store(count, AtomicOrdering::Relaxed);
let aux_metric =
self.directory_metrics[DirectoryKind::AuxFiles.offset()].load(AtomicOrdering::Relaxed);
@@ -3707,9 +3649,7 @@ impl Timeline {
// space. If that's not the case, we had at least one key encounter a gap in the image layer
// and stop the search as a result of that.
let mut removed = keyspace.remove_overlapping_with(&image_covered_keyspace);
// Do not fire missing key error and end early for sparse keys. Note that we hava already removed
// non-inherited keyspaces before, so we can safely do a full `SPARSE_RANGE` remove instead of
// figuring out what is the inherited key range and do a fine-grained pruning.
// Do not fire missing key error for sparse keys.
removed.remove_overlapping_with(&KeySpace {
ranges: vec![SPARSE_RANGE],
});
@@ -4606,10 +4546,7 @@ impl Timeline {
));
}
let (dense_ks, sparse_ks) = self
.collect_keyspace(lsn, ctx)
.await
.map_err(CompactionError::CollectKeySpaceError)?;
let (dense_ks, sparse_ks) = self.collect_keyspace(lsn, ctx).await?;
let dense_partitioning = dense_ks.partition(&self.shard_identity, partition_size);
let sparse_partitioning = SparseKeyPartitioning {
parts: vec![sparse_ks],
@@ -5130,26 +5067,20 @@ impl Timeline {
// image layer generation taking too long time and blocking L0 compaction. So in this
// mode, we also inspect the current number of L0 layers and skip image layer generation
// if there are too many of them.
let num_of_l0_layers = {
let layers = self.layers.read().await;
layers.layer_map()?.level0_deltas().len()
};
let image_preempt_threshold = self.get_image_creation_preempt_threshold()
* self.get_compaction_threshold();
// TODO: currently we do not respect `get_image_creation_preempt_threshold` and always yield
// when there is a single timeline with more than L0 threshold L0 layers. As long as the
// `get_image_creation_preempt_threshold` is set to a value greater than 0, we will yield for L0 compaction.
if image_preempt_threshold != 0 {
let should_yield = self
.l0_compaction_trigger
.notified()
.now_or_never()
.is_some();
if should_yield {
tracing::info!(
"preempt image layer generation at {lsn} when processing partition {}..{}: too many L0 layers",
partition.start().unwrap(), partition.end().unwrap()
);
last_partition_processed = Some(partition.clone());
all_generated = false;
break;
}
if image_preempt_threshold != 0 && num_of_l0_layers >= image_preempt_threshold {
tracing::info!(
"preempt image layer generation at {lsn} when processing partition {}..{}: too many L0 layers {}",
partition.start().unwrap(), partition.end().unwrap(), num_of_l0_layers
);
last_partition_processed = Some(partition.clone());
all_generated = false;
break;
}
}
}
@@ -5178,16 +5109,14 @@ impl Timeline {
.map(|l| l.metadata().file_size)
.sum::<u64>();
if !image_layers.is_empty() {
info!(
"created {} image layers ({} bytes) in {}s, processed {} out of {} partitions",
image_layers.len(),
total_layer_size,
duration.as_secs_f64(),
partition_processed,
total_partitions
);
}
info!(
"created {} image layers ({} bytes) in {}s, processed {} out of {} partitions",
image_layers.len(),
total_layer_size,
duration.as_secs_f64(),
partition_processed,
total_partitions
);
Ok((
image_layers,
@@ -5330,8 +5259,6 @@ pub(crate) enum CompactionError {
#[error("Failed to offload timeline: {0}")]
Offload(OffloadError),
/// Compaction cannot be done right now; page reconstruction and so on.
#[error("Failed to collect keyspace: {0}")]
CollectKeySpaceError(CollectKeySpaceError),
#[error(transparent)]
Other(anyhow::Error),
}
@@ -5345,6 +5272,12 @@ impl From<OffloadError> for CompactionError {
}
}
impl CompactionError {
pub fn is_cancelled(&self) -> bool {
matches!(self, CompactionError::ShuttingDown)
}
}
impl From<CollectKeySpaceError> for CompactionError {
fn from(err: CollectKeySpaceError) -> Self {
match err {
@@ -6609,7 +6542,7 @@ impl TimelineWriter<'_> {
if let Some(wait_threshold) = wait_threshold {
if l0_count >= wait_threshold {
debug!("layer roll waiting for flush due to compaction backpressure at {l0_count} L0 layers");
info!("layer roll waiting for flush due to compaction backpressure at {l0_count} L0 layers");
self.tl.wait_flush_completion(flush_id).await?;
}
}

View File

@@ -11,8 +11,7 @@ use std::sync::Arc;
use super::layer_manager::LayerManager;
use super::{
CompactFlags, CompactOptions, CreateImageLayersError, DurationRecorder, GetVectoredError,
ImageLayerCreationMode, LastImageLayerCreationStatus, PageReconstructError, RecordedDuration,
Timeline,
ImageLayerCreationMode, LastImageLayerCreationStatus, RecordedDuration, Timeline,
};
use anyhow::{anyhow, bail, Context};
@@ -26,13 +25,12 @@ use pageserver_api::models::CompactInfoResponse;
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
use serde::Serialize;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use tracing::{debug, info, info_span, trace, warn, Instrument};
use utils::critical;
use utils::id::TimelineId;
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
use crate::page_cache;
use crate::pgdatadir_mapping::CollectKeySpaceError;
use crate::statvfs::Statvfs;
use crate::tenant::checks::check_valid_layermap;
use crate::tenant::gc_block::GcBlock;
@@ -303,12 +301,18 @@ impl GcCompactionQueue {
let mut guard = self.inner.lock().unwrap();
guard.gc_guards.insert(id, gc_guard);
}
let _ = timeline.compact_with_options(cancel, options, ctx).await?;
let _ = timeline
.compact_with_options(cancel, options, ctx)
.instrument(info_span!("scheduled_compact_timeline", %timeline.timeline_id))
.await?;
self.notify_and_unblock(id);
}
}
GcCompactionQueueItem::SubCompactionJob(options) => {
let _ = timeline.compact_with_options(cancel, options, ctx).await?;
let _ = timeline
.compact_with_options(cancel, options, ctx)
.instrument(info_span!("scheduled_compact_timeline", %timeline.timeline_id))
.await?;
}
GcCompactionQueueItem::Notify(id) => {
self.notify_and_unblock(id);
@@ -688,6 +692,21 @@ impl Timeline {
// Define partitioning schema if needed
let l0_l1_boundary_lsn = {
// We do the repartition on the L0-L1 boundary. All data below the boundary
// are compacted by L0 with low read amplification, thus making the `repartition`
// function run fast.
let guard = self.layers.read().await;
let l0_min_lsn = guard
.layer_map()?
.level0_deltas()
.iter()
.map(|l| l.get_lsn_range().start)
.min()
.unwrap_or(self.get_disk_consistent_lsn());
l0_min_lsn.max(self.get_ancestor_lsn())
};
// 1. L0 Compact
let l0_outcome = {
let timer = self.metrics.compact_time_histo.start_timer();
@@ -714,87 +733,86 @@ impl Timeline {
return Ok(CompactionOutcome::YieldForL0);
}
// 2. Repartition and create image layers if necessary
match self
.repartition(
self.get_last_record_lsn(),
self.get_compaction_target_size(),
options.flags,
ctx,
)
.await
{
Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
// Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
let image_ctx = RequestContextBuilder::extend(ctx)
.access_stats_behavior(AccessStatsBehavior::Skip)
.build();
if l0_l1_boundary_lsn < self.partitioning.read().1 {
// We never go backwards when repartition and create image layers.
info!("skipping image layer generation because repartition LSN is greater than L0-L1 boundary LSN.");
} else {
// 2. Repartition and create image layers if necessary
match self
.repartition(
l0_l1_boundary_lsn,
self.get_compaction_target_size(),
options.flags,
ctx,
)
.await
{
Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
// Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
let image_ctx = RequestContextBuilder::extend(ctx)
.access_stats_behavior(AccessStatsBehavior::Skip)
.build();
let mut partitioning = dense_partitioning;
partitioning
.parts
.extend(sparse_partitioning.into_dense().parts);
let mut partitioning = dense_partitioning;
partitioning
.parts
.extend(sparse_partitioning.into_dense().parts);
// 3. Create new image layers for partitions that have been modified "enough".
let (image_layers, outcome) = self
.create_image_layers(
&partitioning,
lsn,
if options
.flags
.contains(CompactFlags::ForceImageLayerCreation)
{
ImageLayerCreationMode::Force
} else {
ImageLayerCreationMode::Try
},
&image_ctx,
self.last_image_layer_creation_status
.load()
.as_ref()
.clone(),
!options.flags.contains(CompactFlags::NoYield),
)
.await
.inspect_err(|err| {
if let CreateImageLayersError::GetVectoredError(
GetVectoredError::MissingKey(_),
) = err
{
critical!("missing key during compaction: {err:?}");
}
})?;
// 3. Create new image layers for partitions that have been modified "enough".
let (image_layers, outcome) = self
.create_image_layers(
&partitioning,
lsn,
if options
.flags
.contains(CompactFlags::ForceImageLayerCreation)
{
ImageLayerCreationMode::Force
} else {
ImageLayerCreationMode::Try
},
&image_ctx,
self.last_image_layer_creation_status
.load()
.as_ref()
.clone(),
!options.flags.contains(CompactFlags::NoYield),
)
.await
.inspect_err(|err| {
if let CreateImageLayersError::GetVectoredError(
GetVectoredError::MissingKey(_),
) = err
{
critical!("missing key during compaction: {err:?}");
}
})?;
self.last_image_layer_creation_status
.store(Arc::new(outcome.clone()));
self.last_image_layer_creation_status
.store(Arc::new(outcome.clone()));
self.upload_new_image_layers(image_layers)?;
if let LastImageLayerCreationStatus::Incomplete { .. } = outcome {
// Yield and do not do any other kind of compaction.
info!("skipping shard ancestor compaction due to pending image layer generation tasks (preempted by L0 compaction).");
return Ok(CompactionOutcome::YieldForL0);
self.upload_new_image_layers(image_layers)?;
if let LastImageLayerCreationStatus::Incomplete { .. } = outcome {
// Yield and do not do any other kind of compaction.
info!("skipping shard ancestor compaction due to pending image layer generation tasks (preempted by L0 compaction).");
return Ok(CompactionOutcome::YieldForL0);
}
}
}
// Suppress errors when cancelled.
Err(_) if self.cancel.is_cancelled() => {}
Err(CompactionError::ShuttingDown) => {}
// Alert on critical errors that indicate data corruption.
Err(
err @ CompactionError::CollectKeySpaceError(
CollectKeySpaceError::Decode(_)
| CollectKeySpaceError::PageRead(
PageReconstructError::MissingKey(_) | PageReconstructError::WalRedo(_),
),
),
) => critical!("could not compact, repartitioning keyspace failed: {err:?}"),
// Log other errors. No partitioning? This is normal, if the timeline was just created
// as an empty timeline. Also in unit tests, when we use the timeline as a simple
// key-value store, ignoring the datadir layout. Log the error but continue.
Err(err) => error!("could not compact, repartitioning keyspace failed: {err:?}"),
};
Err(err) => {
// no partitioning? This is normal, if the timeline was just created
// as an empty timeline. Also in unit tests, when we use the timeline
// as a simple key-value store, ignoring the datadir layout. Log the
// error but continue.
//
// Suppress error when it's due to cancellation
if !self.cancel.is_cancelled() && !err.is_cancelled() {
tracing::error!(
"could not compact, repartitioning keyspace failed: {err:?}"
);
}
}
};
}
let partition_count = self.partitioning.read().0 .0.parts.len();
@@ -2212,7 +2230,7 @@ impl Timeline {
let sub_compaction_max_job_size_mb =
sub_compaction_max_job_size_mb.unwrap_or(GC_COMPACT_MAX_SIZE_MB);
let mut compact_jobs = Vec::<GcCompactJob>::new();
let mut compact_jobs = Vec::new();
// For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
// by estimating the amount of files read for a compaction job. We should also partition on LSN.
let ((dense_ks, sparse_ks), _) = self.partitioning.read().as_ref().clone();
@@ -2299,25 +2317,16 @@ impl Timeline {
} else {
end
};
if total_size == 0 && !compact_jobs.is_empty() {
info!(
"splitting compaction job: {}..{}, estimated_size={}, extending the previous job",
start, end, total_size
);
compact_jobs.last_mut().unwrap().compact_key_range.end = end;
current_start = Some(end);
} else {
info!(
"splitting compaction job: {}..{}, estimated_size={}",
start, end, total_size
);
compact_jobs.push(GcCompactJob {
dry_run: job.dry_run,
compact_key_range: start..end,
compact_lsn_range: job.compact_lsn_range.start..compact_below_lsn,
});
current_start = Some(end);
}
info!(
"splitting compaction job: {}..{}, estimated_size={}",
start, end, total_size
);
compact_jobs.push(GcCompactJob {
dry_run: job.dry_run,
compact_key_range: start..end,
compact_lsn_range: job.compact_lsn_range.start..compact_below_lsn,
});
current_start = Some(end);
}
}
Ok(compact_jobs)

View File

@@ -1,162 +0,0 @@
//! Timeline utility module to hydrate everything from the current heatmap.
//!
//! Provides utilities to spawn and abort a background task where the downloads happen.
//! See /v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_heatmap_layers.
use futures::StreamExt;
use http_utils::error::ApiError;
use std::sync::{Arc, Mutex};
use tokio_util::sync::CancellationToken;
use utils::sync::gate::Gate;
use super::Timeline;
// This status is not strictly necessary now, but gives us a nice place
// to store progress information if we ever wish to expose it.
pub(super) enum HeatmapLayersDownloadStatus {
InProgress,
Complete,
}
pub(super) struct HeatmapLayersDownloader {
handle: tokio::task::JoinHandle<()>,
status: Arc<Mutex<HeatmapLayersDownloadStatus>>,
cancel: CancellationToken,
downloads_guard: Arc<Gate>,
}
impl HeatmapLayersDownloader {
fn new(
timeline: Arc<Timeline>,
concurrency: usize,
) -> Result<HeatmapLayersDownloader, ApiError> {
let tl_guard = timeline.gate.enter().map_err(|_| ApiError::Cancelled)?;
let cancel = timeline.cancel.child_token();
let downloads_guard = Arc::new(Gate::default());
let status = Arc::new(Mutex::new(HeatmapLayersDownloadStatus::InProgress));
let handle = tokio::task::spawn({
let status = status.clone();
let downloads_guard = downloads_guard.clone();
let cancel = cancel.clone();
async move {
let _guard = tl_guard;
scopeguard::defer! {
*status.lock().unwrap() = HeatmapLayersDownloadStatus::Complete;
}
let Some(heatmap) = timeline.generate_heatmap().await else {
tracing::info!("Heatmap layers download failed to generate heatmap");
return;
};
tracing::info!(
resident_size=%timeline.resident_physical_size(),
heatmap_layers=%heatmap.layers.len(),
"Starting heatmap layers download"
);
let stream = futures::stream::iter(heatmap.layers.into_iter().filter_map(
|layer| {
let tl = timeline.clone();
let dl_guard = match downloads_guard.enter() {
Ok(g) => g,
Err(_) => {
// [`Self::shutdown`] was called. Don't spawn any more downloads.
return None;
}
};
Some(async move {
let _dl_guard = dl_guard;
let res = tl.download_layer(&layer.name).await;
if let Err(err) = res {
if !err.is_cancelled() {
tracing::warn!(layer=%layer.name,"Failed to download heatmap layer: {err}")
}
}
})
}
)).buffered(concurrency);
tokio::select! {
_ = stream.collect::<()>() => {
tracing::info!(
resident_size=%timeline.resident_physical_size(),
"Heatmap layers download completed"
);
},
_ = cancel.cancelled() => {
tracing::info!("Heatmap layers download cancelled");
}
}
}
});
Ok(Self {
status,
handle,
cancel,
downloads_guard,
})
}
fn is_complete(&self) -> bool {
matches!(
*self.status.lock().unwrap(),
HeatmapLayersDownloadStatus::Complete
)
}
/// Drive any in-progress downloads to completion and stop spawning any new ones.
///
/// This has two callers and they behave differently
/// 1. [`Timeline::shutdown`]: the drain will be immediate since downloads themselves
/// are sensitive to timeline cancellation.
///
/// 2. Endpoint handler in [`crate::http::routes`]: the drain will wait for any in-progress
/// downloads to complete.
async fn stop_and_drain(self) {
// Counterintuitive: close the guard before cancelling.
// Something needs to poll the already created download futures to completion.
// If we cancel first, then the underlying task exits and we lost
// the poller.
self.downloads_guard.close().await;
self.cancel.cancel();
if let Err(err) = self.handle.await {
tracing::warn!("Failed to join heatmap layer downloader task: {err}");
}
}
}
impl Timeline {
pub(crate) async fn start_heatmap_layers_download(
self: &Arc<Self>,
concurrency: usize,
) -> Result<(), ApiError> {
let mut locked = self.heatmap_layers_downloader.lock().unwrap();
if locked.as_ref().map(|dl| dl.is_complete()).unwrap_or(true) {
let dl = HeatmapLayersDownloader::new(self.clone(), concurrency)?;
*locked = Some(dl);
Ok(())
} else {
Err(ApiError::Conflict("Already running".to_string()))
}
}
pub(crate) async fn stop_and_drain_heatmap_layers_download(&self) {
// This can race with the start of a new downloader and lead to a situation
// where one donloader is shutting down and another one is in-flight.
// The only impact is that we'd end up using more remote storage semaphore
// units than expected.
let downloader = self.heatmap_layers_downloader.lock().unwrap().take();
if let Some(dl) = downloader {
dl.stop_and_drain().await;
}
}
}

View File

@@ -56,7 +56,6 @@ pub struct WalReceiverConf {
pub auth_token: Option<Arc<String>>,
pub availability_zone: Option<String>,
pub ingest_batch_size: u64,
pub validate_wal_contiguity: bool,
}
pub struct WalReceiver {

View File

@@ -537,7 +537,6 @@ impl ConnectionManagerState {
let connect_timeout = self.conf.wal_connect_timeout;
let ingest_batch_size = self.conf.ingest_batch_size;
let protocol = self.conf.protocol;
let validate_wal_contiguity = self.conf.validate_wal_contiguity;
let timeline = Arc::clone(&self.timeline);
let ctx = ctx.detached_child(
TaskKind::WalReceiverConnectionHandler,
@@ -559,7 +558,6 @@ impl ConnectionManagerState {
ctx,
node_id,
ingest_batch_size,
validate_wal_contiguity,
)
.await;
@@ -1565,7 +1563,6 @@ mod tests {
auth_token: None,
availability_zone: None,
ingest_batch_size: 1,
validate_wal_contiguity: false,
},
wal_connection: None,
wal_stream_candidates: HashMap::new(),

View File

@@ -120,7 +120,6 @@ pub(super) async fn handle_walreceiver_connection(
ctx: RequestContext,
safekeeper_node: NodeId,
ingest_batch_size: u64,
validate_wal_contiguity: bool,
) -> Result<(), WalReceiverError> {
debug_assert_current_span_has_tenant_and_timeline_id();
@@ -275,7 +274,6 @@ pub(super) async fn handle_walreceiver_connection(
} => Some((format, compression)),
};
let mut expected_wal_start = startpoint;
while let Some(replication_message) = {
select! {
_ = cancellation.cancelled() => {
@@ -342,49 +340,13 @@ pub(super) async fn handle_walreceiver_connection(
)
})?;
// Guard against WAL gaps. If the start LSN of the PG WAL section
// from which the interpreted records were extracted, doesn't match
// the end of the previous batch (or the starting point for the first batch),
// then kill this WAL receiver connection and start a new one.
if validate_wal_contiguity {
if let Some(raw_wal_start_lsn) = batch.raw_wal_start_lsn {
match raw_wal_start_lsn.cmp(&expected_wal_start) {
std::cmp::Ordering::Greater => {
let msg = format!(
"Gap in streamed WAL: [{}, {})",
expected_wal_start, raw_wal_start_lsn
);
critical!("{msg}");
return Err(WalReceiverError::Other(anyhow!(msg)));
}
std::cmp::Ordering::Less => {
// Other shards are reading WAL behind us.
// This is valid, but check that we received records
// that we haven't seen before.
if let Some(first_rec) = batch.records.first() {
if first_rec.next_record_lsn < last_rec_lsn {
let msg = format!(
"Received record with next_record_lsn multiple times ({} < {})",
first_rec.next_record_lsn, expected_wal_start
);
critical!("{msg}");
return Err(WalReceiverError::Other(anyhow!(msg)));
}
}
}
std::cmp::Ordering::Equal => {}
}
}
}
let InterpretedWalRecords {
records,
next_record_lsn,
raw_wal_start_lsn: _,
} = batch;
tracing::debug!(
"Received WAL up to {} with next_record_lsn={}",
"Received WAL up to {} with next_record_lsn={:?}",
streaming_lsn,
next_record_lsn
);
@@ -461,11 +423,12 @@ pub(super) async fn handle_walreceiver_connection(
// need to advance last record LSN on all shards. If we've not ingested the latest
// record, then set the LSN of the modification past it. This way all shards
// advance their last record LSN at the same time.
let needs_last_record_lsn_advance = if next_record_lsn > modification.get_lsn() {
modification.set_lsn(next_record_lsn).unwrap();
true
} else {
false
let needs_last_record_lsn_advance = match next_record_lsn {
Some(lsn) if lsn > modification.get_lsn() => {
modification.set_lsn(lsn).unwrap();
true
}
_ => false,
};
if uncommitted_records > 0 || needs_last_record_lsn_advance {
@@ -483,8 +446,9 @@ pub(super) async fn handle_walreceiver_connection(
timeline.get_last_record_lsn()
);
last_rec_lsn = next_record_lsn;
expected_wal_start = streaming_lsn;
if let Some(lsn) = next_record_lsn {
last_rec_lsn = lsn;
}
Some(streaming_lsn)
}

View File

@@ -1180,50 +1180,6 @@ impl WalIngest {
} else {
cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
}
// NB: We abuse the Checkpoint.redo field:
//
// - In PostgreSQL, the Checkpoint struct doesn't store the information
// of whether this is an online checkpoint or a shutdown checkpoint. It's
// stored in the XLOG info field of the WAL record, shutdown checkpoints
// use record type XLOG_CHECKPOINT_SHUTDOWN and online checkpoints use
// XLOG_CHECKPOINT_ONLINE. We don't store the original WAL record headers
// in the pageserver, however.
//
// - In PostgreSQL, the Checkpoint.redo field stores the *start* of the
// checkpoint record, if it's a shutdown checkpoint. But when we are
// starting from a shutdown checkpoint, the basebackup LSN is the *end*
// of the shutdown checkpoint WAL record. That makes it difficult to
// correctly detect whether we're starting from a shutdown record or
// not.
//
// To address both of those issues, we store 0 in the redo field if it's
// an online checkpoint record, and the record's *end* LSN if it's a
// shutdown checkpoint. We don't need the original redo pointer in neon,
// because we don't perform WAL replay at startup anyway, so we can get
// away with abusing the redo field like this.
//
// XXX: Ideally, we would persist the extra information in a more
// explicit format, rather than repurpose the fields of the Postgres
// struct like this. However, we already have persisted data like this,
// so we need to maintain backwards compatibility.
//
// NB: We didn't originally have this convention, so there are still old
// persisted records that didn't do this. Before, we didn't update the
// persisted redo field at all. That means that old records have a bogus
// redo pointer that points to some old value, from the checkpoint record
// that was originally imported from the data directory. If it was a
// project created in Neon, that means it points to the first checkpoint
// after initdb. That's OK for our purposes: all such old checkpoints are
// treated as old online checkpoints when the basebackup is created.
cp.redo = if info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN {
// Store the *end* LSN of the checkpoint record. Or to be precise,
// the start LSN of the *next* record, i.e. if the record ends
// exactly at page boundary, the redo LSN points to just after the
// page header on the next page.
lsn.into()
} else {
Lsn::INVALID.into()
};
// Write a new checkpoint key-value pair on every checkpoint record, even
// if nothing really changed. Not strictly required, but it seems nice to

View File

@@ -18,8 +18,6 @@
#include "neon_utils.h"
static int extension_server_port = 0;
static int extension_server_request_timeout = 60;
static int extension_server_connect_timeout = 60;
static download_extension_file_hook_type prev_download_extension_file_hook = NULL;
@@ -36,18 +34,19 @@ static download_extension_file_hook_type prev_download_extension_file_hook = NUL
static bool
neon_download_extension_file_http(const char *filename, bool is_library)
{
static CURL *handle = NULL;
CURLcode res;
bool ret = false;
CURL *handle = NULL;
char *compute_ctl_url;
bool ret = false;
handle = alloc_curl_handle();
if (handle == NULL)
{
handle = alloc_curl_handle();
curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, "POST");
if (extension_server_request_timeout > 0)
curl_easy_setopt(handle, CURLOPT_TIMEOUT, (long)extension_server_request_timeout /* seconds */ );
if (extension_server_connect_timeout > 0)
curl_easy_setopt(handle, CURLOPT_CONNECTTIMEOUT, (long)extension_server_connect_timeout /* seconds */ );
curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, "POST");
curl_easy_setopt(handle, CURLOPT_TIMEOUT, 60L /* seconds */ );
}
compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s%s",
extension_server_port, filename, is_library ? "?is_library=true" : "");
@@ -58,8 +57,6 @@ neon_download_extension_file_http(const char *filename, bool is_library)
/* Perform the request, res will get the return code */
res = curl_easy_perform(handle);
curl_easy_cleanup(handle);
/* Check for errors */
if (res == CURLE_OK)
{
@@ -91,24 +88,6 @@ pg_init_extension_server()
0, /* no flags required */
NULL, NULL, NULL);
DefineCustomIntVariable("neon.extension_server_request_timeout",
"timeout for fetching extensions in seconds",
NULL,
&extension_server_request_timeout,
60, 0, INT_MAX,
PGC_SUSET,
GUC_UNIT_S,
NULL, NULL, NULL);
DefineCustomIntVariable("neon.extension_server_connect_timeout",
"timeout for connecting to the extension server in seconds",
NULL,
&extension_server_connect_timeout,
60, 0, INT_MAX,
PGC_SUSET,
GUC_UNIT_S,
NULL, NULL, NULL);
/* set download_extension_file_hook */
prev_download_extension_file_hook = download_extension_file_hook;
download_extension_file_hook = neon_download_extension_file_http;

File diff suppressed because it is too large Load Diff

View File

@@ -122,8 +122,8 @@ addSHLL(HyperLogLogState *cState, uint32 hash)
index = hash >> HLL_C_BITS;
/* Compute the rank of the remaining 32 - "k" (registerWidth) bits */
count = rho(hash << HLL_BIT_WIDTH, HLL_C_BITS) - 1;
Assert(count <= HLL_C_BITS);
count = rho(hash << HLL_BIT_WIDTH, HLL_C_BITS);
cState->regs[index][count] = now;
}
@@ -136,7 +136,7 @@ getMaximum(const TimestampTz* reg, TimestampTz since)
{
if (reg[i] >= since)
{
max = i + 1;
max = i;
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -56,6 +56,7 @@ uint32 WAIT_EVENT_NEON_LFC_MAINTENANCE;
uint32 WAIT_EVENT_NEON_LFC_READ;
uint32 WAIT_EVENT_NEON_LFC_TRUNCATE;
uint32 WAIT_EVENT_NEON_LFC_WRITE;
uint32 WAIT_EVENT_NEON_LFC_CV_WAIT;
uint32 WAIT_EVENT_NEON_PS_STARTING;
uint32 WAIT_EVENT_NEON_PS_CONFIGURING;
uint32 WAIT_EVENT_NEON_PS_SEND;
@@ -538,6 +539,7 @@ neon_shmem_startup_hook(void)
WAIT_EVENT_NEON_LFC_READ = WaitEventExtensionNew("Neon/FileCache_Read");
WAIT_EVENT_NEON_LFC_TRUNCATE = WaitEventExtensionNew("Neon/FileCache_Truncate");
WAIT_EVENT_NEON_LFC_WRITE = WaitEventExtensionNew("Neon/FileCache_Write");
WAIT_EVENT_NEON_LFC_CV_WAIT = WaitEventExtensionNew("Neon/FileCache_CvWait");
WAIT_EVENT_NEON_PS_STARTING = WaitEventExtensionNew("Neon/PS_Starting");
WAIT_EVENT_NEON_PS_CONFIGURING = WaitEventExtensionNew("Neon/PS_Configuring");
WAIT_EVENT_NEON_PS_SEND = WaitEventExtensionNew("Neon/PS_SendIO");

View File

@@ -28,6 +28,7 @@ extern uint32 WAIT_EVENT_NEON_LFC_MAINTENANCE;
extern uint32 WAIT_EVENT_NEON_LFC_READ;
extern uint32 WAIT_EVENT_NEON_LFC_TRUNCATE;
extern uint32 WAIT_EVENT_NEON_LFC_WRITE;
extern uint32 WAIT_EVENT_NEON_LFC_CV_WAIT;
extern uint32 WAIT_EVENT_NEON_PS_STARTING;
extern uint32 WAIT_EVENT_NEON_PS_CONFIGURING;
extern uint32 WAIT_EVENT_NEON_PS_SEND;
@@ -38,6 +39,7 @@ extern uint32 WAIT_EVENT_NEON_WAL_DL;
#define WAIT_EVENT_NEON_LFC_READ WAIT_EVENT_BUFFILE_READ
#define WAIT_EVENT_NEON_LFC_TRUNCATE WAIT_EVENT_BUFFILE_TRUNCATE
#define WAIT_EVENT_NEON_LFC_WRITE WAIT_EVENT_BUFFILE_WRITE
#define WAIT_EVENT_NEON_LFC_CV_WAIT WAIT_EVENT_BUFFILE_READ
#define WAIT_EVENT_NEON_PS_STARTING PG_WAIT_EXTENSION
#define WAIT_EVENT_NEON_PS_CONFIGURING PG_WAIT_EXTENSION
#define WAIT_EVENT_NEON_PS_SEND PG_WAIT_EXTENSION

View File

@@ -50,11 +50,18 @@ typedef enum
typedef uint64 NeonRequestId;
/* base struct for c-style inheritance */
typedef struct
{
NeonMessageTag tag;
NeonRequestId reqid;
union {
struct {
int procno; /* process number */
Buffer bufid; /* InvalidBuffer for prefetch */
} recepient;
NeonRequestId reqid; /* two fields above temporary replace reqid, just to preserve protocol */
} u;
XLogRecPtr lsn;
XLogRecPtr not_modified_since;
} NeonMessage;
@@ -140,6 +147,30 @@ typedef struct
int segno;
} NeonGetSlruSegmentRequest;
typedef union {
NeonRequest hdr;
NeonNblocksRequest exists;
NeonNblocksRequest nblocks;
NeonDbSizeRequest dbsize;
NeonGetPageRequest page;
NeonGetSlruSegmentRequest slru;
} NeonCommunicatorRequest;
typedef struct
{
NeonMessageTag tag;
int64 value;
} NeonCommunicatorResponse;
typedef struct
{
uint64 write_pos;
pthread_mutex_t mutex;
pthread_cond_t cond;
NeonCommunicatorRequest* requests;
} NeonCommunicatorChannel;
/* supertype of all the Neon*Response structs below */
typedef NeonMessage NeonResponse;
@@ -161,6 +192,7 @@ typedef struct
char page[FLEXIBLE_ARRAY_MEMBER];
} NeonGetPageResponse;
#define PS_GETPAGERESPONSE_SIZE (MAXALIGN(offsetof(NeonGetPageResponse, page) + BLCKSZ))
typedef struct
@@ -184,57 +216,22 @@ typedef struct
} NeonGetSlruSegmentResponse;
extern StringInfoData nm_pack_request(NeonRequest *msg);
extern void nm_pack_request(StringInfo s, NeonRequest *msg);
extern NeonResponse *nm_unpack_response(StringInfo s);
extern char *nm_to_string(NeonMessage *msg);
/*
* API
*/
typedef uint16 shardno_t;
typedef struct
{
/*
* Send this request to the PageServer associated with this shard.
*/
bool (*send) (shardno_t shard_no, NeonRequest * request);
/*
* Blocking read for the next response of this shard.
*
* When a CANCEL signal is handled, the connection state will be
* unmodified.
*/
NeonResponse *(*receive) (shardno_t shard_no);
/*
* Try get the next response from the TCP buffers, if any.
* Returns NULL when the data is not yet available.
*/
NeonResponse *(*try_receive) (shardno_t shard_no);
/*
* Make sure all requests are sent to PageServer.
*/
bool (*flush) (shardno_t shard_no);
/*
* Disconnect from this pageserver shard.
*/
void (*disconnect) (shardno_t shard_no);
} page_server_api;
extern void prefetch_on_ps_disconnect(void);
extern page_server_api *page_server;
extern char *page_server_connstring;
extern int flush_every_n_requests;
extern int readahead_buffer_size;
extern char *neon_timeline;
extern char *neon_tenant;
extern int32 max_cluster_size;
extern int neon_protocol_version;
extern bool lfc_store_prefetch_result;
extern shardno_t get_shard_number(BufferTag* tag);
extern int get_shard_number(NRelFileInfo rinfo, BlockNumber blocknum);
extern const f_smgr *smgr_neon(ProcNumber backend, NRelFileInfo rinfo);
extern void smgr_init_neon(void);
@@ -301,14 +298,17 @@ extern bool lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno);
extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, int nblocks, bits8 *bitmap);
extern void lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno);
extern void lfc_init(void);
extern bool lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
const void* buffer, XLogRecPtr lsn);
extern bool lfc_enabled(void);
static inline bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
void *buffer)
{
bits8 rv = 0;
bits8 rv = 1;
return lfc_readv_select(rinfo, forkNum, blkno, &buffer, 1, &rv) == 1;
}
@@ -319,4 +319,9 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
return lfc_writev(rinfo, forkNum, blkno, &buffer, 1);
}
extern void communicator_send_request(int shard, NeonCommunicatorRequest* req);
extern int64 communicator_receive_response(void);
extern int64 communicator_request(int shard, NeonCommunicatorRequest* req);
#endif

File diff suppressed because it is too large Load Diff

25
poetry.lock generated
View File

@@ -412,7 +412,6 @@ files = [
[package.dependencies]
botocore-stubs = "*"
mypy-boto3-kms = {version = ">=1.26.0,<1.27.0", optional = true, markers = "extra == \"kms\""}
mypy-boto3-s3 = {version = ">=1.26.0,<1.27.0", optional = true, markers = "extra == \"s3\""}
types-s3transfer = "*"
typing-extensions = ">=4.1.0"
@@ -2023,18 +2022,6 @@ install-types = ["pip"]
mypyc = ["setuptools (>=50)"]
reports = ["lxml"]
[[package]]
name = "mypy-boto3-kms"
version = "1.26.147"
description = "Type annotations for boto3.KMS 1.26.147 service generated with mypy-boto3-builder 7.14.5"
optional = false
python-versions = ">=3.7"
groups = ["main"]
files = [
{file = "mypy-boto3-kms-1.26.147.tar.gz", hash = "sha256:816a4d1bb0585e1b9620a3f96c1d69a06f53b7b5621858579dd77c60dbb5fa5c"},
{file = "mypy_boto3_kms-1.26.147-py3-none-any.whl", hash = "sha256:493f0db674a25c88769f5cb8ab8ac00d3dda5dfc903d5cda34c990ee64689f79"},
]
[[package]]
name = "mypy-boto3-s3"
version = "1.26.0.post1"
@@ -2771,18 +2758,18 @@ pytest = ">=5,<8"
[[package]]
name = "pytest-timeout"
version = "2.3.1"
version = "2.1.0"
description = "pytest plugin to abort hanging tests"
optional = false
python-versions = ">=3.7"
python-versions = ">=3.6"
groups = ["main"]
files = [
{file = "pytest-timeout-2.3.1.tar.gz", hash = "sha256:12397729125c6ecbdaca01035b9e5239d4db97352320af155b3f5de1ba5165d9"},
{file = "pytest_timeout-2.3.1-py3-none-any.whl", hash = "sha256:68188cb703edfc6a18fad98dc25a3c61e9f24d644b0b70f33af545219fc7813e"},
{file = "pytest-timeout-2.1.0.tar.gz", hash = "sha256:c07ca07404c612f8abbe22294b23c368e2e5104b521c1790195561f37e1ac3d9"},
{file = "pytest_timeout-2.1.0-py3-none-any.whl", hash = "sha256:f6f50101443ce70ad325ceb4473c4255e9d74e3c7cd0ef827309dfa4c0d975c6"},
]
[package.dependencies]
pytest = ">=7.0.0"
pytest = ">=5.0.0"
[[package]]
name = "pytest-xdist"
@@ -3820,4 +3807,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.1"
python-versions = "^3.11"
content-hash = "00ddc42c32e235b6171845fc066dcab078282ed832cd464d5e8a0afa959dd04a"
content-hash = "4dc3165fe22c0e0f7a030ea0f8a680ae2ff74561d8658c393abbe9112caaf5d7"

View File

@@ -19,6 +19,7 @@ aws-config.workspace = true
aws-sdk-iam.workspace = true
aws-sigv4.workspace = true
base64.workspace = true
boxcar = "0.2.8"
bstr.workspace = true
bytes = { workspace = true, features = ["serde"] }
camino.workspace = true
@@ -62,6 +63,7 @@ postgres_backend.workspace = true
postgres-client = { package = "tokio-postgres2", path = "../libs/proxy/tokio-postgres2" }
postgres-protocol = { package = "postgres-protocol2", path = "../libs/proxy/postgres-protocol2" }
pq_proto.workspace = true
prometheus.workspace = true
rand.workspace = true
regex.workspace = true
remote_storage = { version = "0.1", path = "../libs/remote_storage/" }
@@ -79,6 +81,7 @@ sha2 = { workspace = true, features = ["asm", "oid"] }
smol_str.workspace = true
smallvec.workspace = true
socket2.workspace = true
strum.workspace = true
strum_macros.workspace = true
subtle.workspace = true
thiserror.workspace = true
@@ -92,6 +95,7 @@ tracing-subscriber.workspace = true
tracing-utils.workspace = true
tracing.workspace = true
tracing-log.workspace = true
tracing-serde.workspace = true
tracing-opentelemetry.workspace = true
try-lock.workspace = true
typed-json.workspace = true

View File

@@ -140,8 +140,9 @@ async fn authenticate(
let (psql_session_id, waiter) = loop {
let psql_session_id = new_psql_session_id();
if let Ok(waiter) = control_plane::mgmt::get_waiter(&psql_session_id) {
break (psql_session_id, waiter);
match control_plane::mgmt::get_waiter(&psql_session_id) {
Ok(waiter) => break (psql_session_id, waiter),
Err(_e) => continue,
}
};

View File

@@ -220,11 +220,11 @@ async fn fetch_jwks(
}
impl JwkCacheEntryLock {
async fn acquire_permit(self: &Arc<Self>) -> JwkRenewalPermit<'_> {
async fn acquire_permit<'a>(self: &'a Arc<Self>) -> JwkRenewalPermit<'a> {
JwkRenewalPermit::acquire_permit(self).await
}
fn try_acquire_permit(self: &Arc<Self>) -> Option<JwkRenewalPermit<'_>> {
fn try_acquire_permit<'a>(self: &'a Arc<Self>) -> Option<JwkRenewalPermit<'a>> {
JwkRenewalPermit::try_acquire_permit(self)
}
@@ -393,7 +393,7 @@ impl JwkCacheEntryLock {
verify_rsa_signature(header_payload.as_bytes(), &sig, key, &header.algorithm)?;
}
key => return Err(JwtError::UnsupportedKeyType(key.into())),
}
};
tracing::debug!(?payload, "JWT signature valid with claims");
@@ -510,7 +510,7 @@ fn verify_rsa_signature(
key.verify(data, &sig)?;
}
_ => return Err(JwtError::InvalidRsaSigningAlgorithm),
}
};
Ok(())
}

View File

@@ -4,20 +4,6 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{bail, ensure, Context};
use camino::{Utf8Path, Utf8PathBuf};
use clap::Parser;
use compute_api::spec::LocalProxySpec;
use futures::future::Either;
use thiserror::Error;
use tokio::net::TcpListener;
use tokio::sync::Notify;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use utils::sentry_init::init_sentry;
use utils::{pid_file, project_build_tag, project_git_version};
use crate::auth::backend::jwt::JwkCache;
use crate::auth::backend::local::{LocalBackend, JWKS_ROLE_MAP};
use crate::auth::{self};
@@ -39,10 +25,24 @@ use crate::serverless::{self, GlobalConnPoolOptions};
use crate::tls::client_config::compute_client_config_with_root_certs;
use crate::types::RoleName;
use crate::url::ApiUrl;
use anyhow::{bail, ensure, Context};
use camino::{Utf8Path, Utf8PathBuf};
use compute_api::spec::LocalProxySpec;
use futures::future::Either;
project_git_version!(GIT_VERSION);
project_build_tag!(BUILD_TAG);
use clap::Parser;
use thiserror::Error;
use tokio::net::TcpListener;
use tokio::sync::Notify;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use utils::sentry_init::init_sentry;
use utils::{pid_file, project_build_tag, project_git_version};
/// Neon proxy/router
#[derive(Parser)]
#[command(version = GIT_VERSION, about)]

Some files were not shown because too many files have changed in this diff Show More