mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 12:10:37 +00:00
Compare commits
63 Commits
alexk/comp
...
wal_redo_i
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f992cfe86f | ||
|
|
6d82eddecb | ||
|
|
a25813b8a6 | ||
|
|
3a8f50c6c8 | ||
|
|
51f16af1ac | ||
|
|
9e4a223413 | ||
|
|
ef55ee0e6c | ||
|
|
adfd4b1836 | ||
|
|
0ee2b646ab | ||
|
|
212d27f24e | ||
|
|
b1d8771d5f | ||
|
|
3e82addd64 | ||
|
|
5e3c234edc | ||
|
|
ff3819efc7 | ||
|
|
f927ae6e15 | ||
|
|
61d385caea | ||
|
|
c214c32d3f | ||
|
|
9b42d1ce1a | ||
|
|
0b9b391ea0 | ||
|
|
3f376e44ba | ||
|
|
5b81a774fc | ||
|
|
bd335fa751 | ||
|
|
34996416d6 | ||
|
|
d571553d8a | ||
|
|
f7474d3f41 | ||
|
|
e808e9432a | ||
|
|
7c7180a79d | ||
|
|
07bee60037 | ||
|
|
f7edcf12e3 | ||
|
|
1d9346f8b7 | ||
|
|
a6d8640d6f | ||
|
|
bb7e244a42 | ||
|
|
787b98f8f2 | ||
|
|
f148d71d9b | ||
|
|
aad817d806 | ||
|
|
0b3db74c44 | ||
|
|
9ba2a87e69 | ||
|
|
1f9511dbd9 | ||
|
|
aab5482fd5 | ||
|
|
3720cf1c5a | ||
|
|
0453eaf65c | ||
|
|
2d96134a4e | ||
|
|
e52e93797f | ||
|
|
aa115a774c | ||
|
|
2f0d6571a9 | ||
|
|
7199919f04 | ||
|
|
a4e3989c8d | ||
|
|
9d074db18d | ||
|
|
538ea03f73 | ||
|
|
cb8060545d | ||
|
|
9151d3a318 | ||
|
|
381115b68e | ||
|
|
1a69a8cba7 | ||
|
|
ed98f6d57e | ||
|
|
f9a063e2e9 | ||
|
|
f36ec5c84b | ||
|
|
274cb13293 | ||
|
|
290f007b8e | ||
|
|
29e4ca351e | ||
|
|
caece02da7 | ||
|
|
d36baae758 | ||
|
|
f81259967d | ||
|
|
719ec378cd |
4
.github/actionlint.yml
vendored
4
.github/actionlint.yml
vendored
@@ -28,3 +28,7 @@ config-variables:
|
||||
- DEV_AWS_OIDC_ROLE_MANAGE_BENCHMARK_EC2_VMS_ARN
|
||||
- SLACK_ON_CALL_STORAGE_STAGING_STREAM
|
||||
- SLACK_CICD_CHANNEL_ID
|
||||
- SLACK_STORAGE_CHANNEL_ID
|
||||
- NEON_DEV_AWS_ACCOUNT_ID
|
||||
- NEON_PROD_AWS_ACCOUNT_ID
|
||||
- AWS_ECR_REGION
|
||||
|
||||
22
.github/actions/neon-project-create/action.yml
vendored
22
.github/actions/neon-project-create/action.yml
vendored
@@ -19,7 +19,11 @@ inputs:
|
||||
default: '[1, 1]'
|
||||
# settings below only needed if you want the project to be sharded from the beginning
|
||||
shard_split_project:
|
||||
description: 'by default new projects are not shard-split, specify true to shard-split'
|
||||
description: 'by default new projects are not shard-split initiailly, but only when shard-split threshold is reached, specify true to explicitly shard-split initially'
|
||||
required: false
|
||||
default: 'false'
|
||||
disable_sharding:
|
||||
description: 'by default new projects use storage controller default policy to shard-split when shard-split threshold is reached, specify true to explicitly disable sharding'
|
||||
required: false
|
||||
default: 'false'
|
||||
admin_api_key:
|
||||
@@ -107,6 +111,21 @@ runs:
|
||||
-H "Accept: application/json" -H "Content-Type: application/json" -H "Authorization: Bearer ${ADMIN_API_KEY}" \
|
||||
-d "{\"new_shard_count\": $SHARD_COUNT, \"new_stripe_size\": $STRIPE_SIZE}"
|
||||
fi
|
||||
if [ "${DISABLE_SHARDING}" = "true" ]; then
|
||||
# determine tenant ID
|
||||
TENANT_ID=`${PSQL} ${dsn} -t -A -c "SHOW neon.tenant_id"`
|
||||
|
||||
echo "Explicitly disabling shard-splitting for project ${project_id} with tenant_id ${TENANT_ID}"
|
||||
|
||||
echo "Sending PUT request to https://${API_HOST}/regions/${REGION_ID}/api/v1/admin/storage/proxy/control/v1/tenant/${TENANT_ID}/policy"
|
||||
echo "with body {\"scheduling\": \"Essential\"}"
|
||||
|
||||
# we need an ADMIN API KEY to invoke storage controller API for shard splitting (bash -u above checks that the variable is set)
|
||||
curl -X PUT \
|
||||
"https://${API_HOST}/regions/${REGION_ID}/api/v1/admin/storage/proxy/control/v1/tenant/${TENANT_ID}/policy" \
|
||||
-H "Accept: application/json" -H "Content-Type: application/json" -H "Authorization: Bearer ${ADMIN_API_KEY}" \
|
||||
-d "{\"scheduling\": \"Essential\"}"
|
||||
fi
|
||||
|
||||
env:
|
||||
API_HOST: ${{ inputs.api_host }}
|
||||
@@ -116,6 +135,7 @@ runs:
|
||||
MIN_CU: ${{ fromJSON(inputs.compute_units)[0] }}
|
||||
MAX_CU: ${{ fromJSON(inputs.compute_units)[1] }}
|
||||
SHARD_SPLIT_PROJECT: ${{ inputs.shard_split_project }}
|
||||
DISABLE_SHARDING: ${{ inputs.disable_sharding }}
|
||||
ADMIN_API_KEY: ${{ inputs.admin_api_key }}
|
||||
SHARD_COUNT: ${{ inputs.shard_count }}
|
||||
STRIPE_SIZE: ${{ inputs.stripe_size }}
|
||||
|
||||
@@ -2,7 +2,7 @@ name: Push images to Container Registry
|
||||
on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
# Example: {"docker.io/neondatabase/neon:13196061314":["369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:13196061314","neoneastus2.azurecr.io/neondatabase/neon:13196061314"]}
|
||||
# Example: {"docker.io/neondatabase/neon:13196061314":["${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}.dkr.ecr.${{ vars.AWS_ECR_REGION }}.amazonaws.com/neon:13196061314","neoneastus2.azurecr.io/neondatabase/neon:13196061314"]}
|
||||
image-map:
|
||||
description: JSON map of images, mapping from a source image to an array of target images that should be pushed.
|
||||
required: true
|
||||
|
||||
41
.github/workflows/build_and_test.yml
vendored
41
.github/workflows/build_and_test.yml
vendored
@@ -68,7 +68,7 @@ jobs:
|
||||
tag:
|
||||
needs: [ check-permissions ]
|
||||
runs-on: [ self-hosted, small ]
|
||||
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
|
||||
container: ${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}.dkr.ecr.${{ vars.AWS_ECR_REGION }}.amazonaws.com/base:pinned
|
||||
outputs:
|
||||
build-tag: ${{steps.build-tag.outputs.tag}}
|
||||
|
||||
@@ -859,14 +859,17 @@ jobs:
|
||||
BRANCH: "${{ github.ref_name }}"
|
||||
DEV_ACR: "${{ vars.AZURE_DEV_REGISTRY_NAME }}"
|
||||
PROD_ACR: "${{ vars.AZURE_PROD_REGISTRY_NAME }}"
|
||||
DEV_AWS: "${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}"
|
||||
PROD_AWS: "${{ vars.NEON_PROD_AWS_ACCOUNT_ID }}"
|
||||
AWS_REGION: "${{ vars.AWS_ECR_REGION }}"
|
||||
|
||||
push-neon-image-dev:
|
||||
needs: [ generate-image-maps, neon-image ]
|
||||
uses: ./.github/workflows/_push-to-container-registry.yml
|
||||
with:
|
||||
image-map: '${{ needs.generate-image-maps.outputs.neon-dev }}'
|
||||
aws-region: eu-central-1
|
||||
aws-account-ids: "369495373322"
|
||||
aws-region: ${{ vars.AWS_ECR_REGION }}
|
||||
aws-account-ids: "${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}"
|
||||
azure-client-id: ${{ vars.AZURE_DEV_CLIENT_ID }}
|
||||
azure-subscription-id: ${{ vars.AZURE_DEV_SUBSCRIPTION_ID }}
|
||||
azure-tenant-id: ${{ vars.AZURE_TENANT_ID }}
|
||||
@@ -881,8 +884,8 @@ jobs:
|
||||
uses: ./.github/workflows/_push-to-container-registry.yml
|
||||
with:
|
||||
image-map: '${{ needs.generate-image-maps.outputs.compute-dev }}'
|
||||
aws-region: eu-central-1
|
||||
aws-account-ids: "369495373322"
|
||||
aws-region: ${{ vars.AWS_ECR_REGION }}
|
||||
aws-account-ids: "${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}"
|
||||
azure-client-id: ${{ vars.AZURE_DEV_CLIENT_ID }}
|
||||
azure-subscription-id: ${{ vars.AZURE_DEV_SUBSCRIPTION_ID }}
|
||||
azure-tenant-id: ${{ vars.AZURE_TENANT_ID }}
|
||||
@@ -898,8 +901,8 @@ jobs:
|
||||
uses: ./.github/workflows/_push-to-container-registry.yml
|
||||
with:
|
||||
image-map: '${{ needs.generate-image-maps.outputs.neon-prod }}'
|
||||
aws-region: eu-central-1
|
||||
aws-account-ids: "093970136003"
|
||||
aws-region: ${{ vars.AWS_ECR_REGION }}
|
||||
aws-account-ids: "${{ vars.NEON_PROD_AWS_ACCOUNT_ID }}"
|
||||
azure-client-id: ${{ vars.AZURE_PROD_CLIENT_ID }}
|
||||
azure-subscription-id: ${{ vars.AZURE_PROD_SUBSCRIPTION_ID }}
|
||||
azure-tenant-id: ${{ vars.AZURE_TENANT_ID }}
|
||||
@@ -915,8 +918,8 @@ jobs:
|
||||
uses: ./.github/workflows/_push-to-container-registry.yml
|
||||
with:
|
||||
image-map: '${{ needs.generate-image-maps.outputs.compute-prod }}'
|
||||
aws-region: eu-central-1
|
||||
aws-account-ids: "093970136003"
|
||||
aws-region: ${{ vars.AWS_ECR_REGION }}
|
||||
aws-account-ids: "${{ vars.NEON_PROD_AWS_ACCOUNT_ID }}"
|
||||
azure-client-id: ${{ vars.AZURE_PROD_CLIENT_ID }}
|
||||
azure-subscription-id: ${{ vars.AZURE_PROD_SUBSCRIPTION_ID }}
|
||||
azure-tenant-id: ${{ vars.AZURE_TENANT_ID }}
|
||||
@@ -1029,7 +1032,7 @@ jobs:
|
||||
statuses: write
|
||||
contents: write
|
||||
runs-on: [ self-hosted, small ]
|
||||
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
|
||||
container: ${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}.dkr.ecr.${{ vars.AWS_ECR_REGION }}.amazonaws.com/ansible:latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
@@ -1178,6 +1181,22 @@ jobs:
|
||||
exit 1
|
||||
fi
|
||||
|
||||
notify-storage-release-deploy-failure:
|
||||
needs: [ deploy ]
|
||||
# We want this to run even if (transitive) dependencies are skipped, because deploy should really be successful on release branch workflow runs.
|
||||
if: github.ref_name == 'release' && needs.deploy.result != 'success' && always()
|
||||
runs-on: ubuntu-22.04
|
||||
steps:
|
||||
- name: Post release-deploy failure to team-storage slack channel
|
||||
uses: slackapi/slack-github-action@v2
|
||||
with:
|
||||
method: chat.postMessage
|
||||
token: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
payload: |
|
||||
channel: ${{ vars.SLACK_STORAGE_CHANNEL_ID }}
|
||||
text: |
|
||||
🔴 @oncall-storage: deploy job on release branch had unexpected status "${{ needs.deploy.result }}" <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>.
|
||||
|
||||
# The job runs on `release` branch and copies compatibility data and Neon artifact from the last *release PR* to the latest directory
|
||||
promote-compatibility-data:
|
||||
needs: [ deploy ]
|
||||
@@ -1274,7 +1293,7 @@ jobs:
|
||||
done
|
||||
|
||||
pin-build-tools-image:
|
||||
needs: [ build-build-tools-image, push-compute-image-prod, push-neon-image-prod, build-and-test-locally ]
|
||||
needs: [ build-build-tools-image, test-images, build-and-test-locally ]
|
||||
if: github.ref_name == 'main'
|
||||
uses: ./.github/workflows/pin-build-tools-image.yml
|
||||
with:
|
||||
|
||||
@@ -27,7 +27,7 @@ env:
|
||||
jobs:
|
||||
tag:
|
||||
runs-on: [ self-hosted, small ]
|
||||
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
|
||||
container: ${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}.dkr.ecr.${{ vars.AWS_ECR_REGION }}.amazonaws.com/base:pinned
|
||||
outputs:
|
||||
build-tag: ${{steps.build-tag.outputs.tag}}
|
||||
|
||||
|
||||
10
.github/workflows/ingest_benchmark.yml
vendored
10
.github/workflows/ingest_benchmark.yml
vendored
@@ -32,18 +32,27 @@ jobs:
|
||||
- target_project: new_empty_project_stripe_size_2048
|
||||
stripe_size: 2048 # 16 MiB
|
||||
postgres_version: 16
|
||||
disable_sharding: false
|
||||
- target_project: new_empty_project_stripe_size_32768
|
||||
stripe_size: 32768 # 256 MiB # note that this is different from null because using null will shard_split the project only if it reaches the threshold
|
||||
# while here it is sharded from the beginning with a shard size of 256 MiB
|
||||
disable_sharding: false
|
||||
postgres_version: 16
|
||||
- target_project: new_empty_project
|
||||
stripe_size: null # run with neon defaults which will shard split only when reaching the threshold
|
||||
disable_sharding: false
|
||||
postgres_version: 16
|
||||
- target_project: new_empty_project
|
||||
stripe_size: null # run with neon defaults which will shard split only when reaching the threshold
|
||||
disable_sharding: false
|
||||
postgres_version: 17
|
||||
- target_project: large_existing_project
|
||||
stripe_size: null # cannot re-shared or choose different stripe size for existing, already sharded project
|
||||
disable_sharding: false
|
||||
postgres_version: 16
|
||||
- target_project: new_empty_project_unsharded
|
||||
stripe_size: null # run with neon defaults which will shard split only when reaching the threshold
|
||||
disable_sharding: true
|
||||
postgres_version: 16
|
||||
max-parallel: 1 # we want to run each stripe size sequentially to be able to compare the results
|
||||
permissions:
|
||||
@@ -96,6 +105,7 @@ jobs:
|
||||
admin_api_key: ${{ secrets.NEON_STAGING_ADMIN_API_KEY }}
|
||||
shard_count: 8
|
||||
stripe_size: ${{ matrix.stripe_size }}
|
||||
disable_sharding: ${{ matrix.disable_sharding }}
|
||||
|
||||
- name: Initialize Neon project
|
||||
if: ${{ startsWith(matrix.target_project, 'new_empty_project') }}
|
||||
|
||||
92
.github/workflows/pin-build-tools-image.yml
vendored
92
.github/workflows/pin-build-tools-image.yml
vendored
@@ -33,10 +33,6 @@ concurrency:
|
||||
# No permission for GITHUB_TOKEN by default; the **minimal required** set of permissions should be granted in each job.
|
||||
permissions: {}
|
||||
|
||||
env:
|
||||
FROM_TAG: ${{ inputs.from-tag }}
|
||||
TO_TAG: pinned
|
||||
|
||||
jobs:
|
||||
check-manifests:
|
||||
runs-on: ubuntu-22.04
|
||||
@@ -46,11 +42,14 @@ jobs:
|
||||
steps:
|
||||
- name: Check if we really need to pin the image
|
||||
id: check-manifests
|
||||
env:
|
||||
FROM_TAG: ${{ inputs.from-tag }}
|
||||
TO_TAG: pinned
|
||||
run: |
|
||||
docker manifest inspect neondatabase/build-tools:${FROM_TAG} > ${FROM_TAG}.json
|
||||
docker manifest inspect neondatabase/build-tools:${TO_TAG} > ${TO_TAG}.json
|
||||
docker manifest inspect "docker.io/neondatabase/build-tools:${FROM_TAG}" > "${FROM_TAG}.json"
|
||||
docker manifest inspect "docker.io/neondatabase/build-tools:${TO_TAG}" > "${TO_TAG}.json"
|
||||
|
||||
if diff ${FROM_TAG}.json ${TO_TAG}.json; then
|
||||
if diff "${FROM_TAG}.json" "${TO_TAG}.json"; then
|
||||
skip=true
|
||||
else
|
||||
skip=false
|
||||
@@ -64,55 +63,34 @@ jobs:
|
||||
# use format(..) to catch both inputs.force = true AND inputs.force = 'true'
|
||||
if: needs.check-manifests.outputs.skip == 'false' || format('{0}', inputs.force) == 'true'
|
||||
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
permissions:
|
||||
id-token: write # for `azure/login` and aws auth
|
||||
id-token: write # Required for aws/azure login
|
||||
|
||||
steps:
|
||||
- uses: docker/login-action@v3
|
||||
with:
|
||||
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
|
||||
- name: Configure AWS credentials
|
||||
uses: aws-actions/configure-aws-credentials@v4
|
||||
with:
|
||||
aws-region: eu-central-1
|
||||
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
|
||||
role-duration-seconds: 3600
|
||||
|
||||
- name: Login to Amazon Dev ECR
|
||||
uses: aws-actions/amazon-ecr-login@v2
|
||||
|
||||
- name: Azure login
|
||||
uses: azure/login@6c251865b4e6290e7b78be643ea2d005bc51f69a # @v2.1.1
|
||||
with:
|
||||
client-id: ${{ secrets.AZURE_DEV_CLIENT_ID }}
|
||||
tenant-id: ${{ secrets.AZURE_TENANT_ID }}
|
||||
subscription-id: ${{ secrets.AZURE_DEV_SUBSCRIPTION_ID }}
|
||||
|
||||
- name: Login to ACR
|
||||
run: |
|
||||
az acr login --name=neoneastus2
|
||||
|
||||
- name: Tag build-tools with `${{ env.TO_TAG }}` in Docker Hub, ECR, and ACR
|
||||
env:
|
||||
DEFAULT_DEBIAN_VERSION: bookworm
|
||||
run: |
|
||||
for debian_version in bullseye bookworm; do
|
||||
tags=()
|
||||
|
||||
tags+=("-t" "neondatabase/build-tools:${TO_TAG}-${debian_version}")
|
||||
tags+=("-t" "369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:${TO_TAG}-${debian_version}")
|
||||
tags+=("-t" "neoneastus2.azurecr.io/neondatabase/build-tools:${TO_TAG}-${debian_version}")
|
||||
|
||||
if [ "${debian_version}" == "${DEFAULT_DEBIAN_VERSION}" ]; then
|
||||
tags+=("-t" "neondatabase/build-tools:${TO_TAG}")
|
||||
tags+=("-t" "369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:${TO_TAG}")
|
||||
tags+=("-t" "neoneastus2.azurecr.io/neondatabase/build-tools:${TO_TAG}")
|
||||
fi
|
||||
|
||||
docker buildx imagetools create "${tags[@]}" \
|
||||
neondatabase/build-tools:${FROM_TAG}-${debian_version}
|
||||
done
|
||||
uses: ./.github/workflows/_push-to-container-registry.yml
|
||||
with:
|
||||
image-map: |
|
||||
{
|
||||
"docker.io/neondatabase/build-tools:${{ inputs.from-tag }}-bullseye": [
|
||||
"docker.io/neondatabase/build-tools:pinned-bullseye",
|
||||
"${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}.dkr.ecr.${{ vars.AWS_ECR_REGION }}.amazonaws.com/build-tools:pinned-bullseye",
|
||||
"${{ vars.AZURE_DEV_REGISTRY_NAME }}.azurecr.io/neondatabase/build-tools:pinned-bullseye"
|
||||
],
|
||||
"docker.io/neondatabase/build-tools:${{ inputs.from-tag }}-bookworm": [
|
||||
"docker.io/neondatabase/build-tools:pinned-bookworm",
|
||||
"docker.io/neondatabase/build-tools:pinned",
|
||||
"${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}.dkr.ecr.${{ vars.AWS_ECR_REGION }}.amazonaws.com/build-tools:pinned-bookworm",
|
||||
"${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}.dkr.ecr.${{ vars.AWS_ECR_REGION }}.amazonaws.com/build-tools:pinned",
|
||||
"${{ vars.AZURE_DEV_REGISTRY_NAME }}.azurecr.io/neondatabase/build-tools:pinned-bookworm",
|
||||
"${{ vars.AZURE_DEV_REGISTRY_NAME }}.azurecr.io/neondatabase/build-tools:pinned"
|
||||
]
|
||||
}
|
||||
aws-region: ${{ vars.AWS_ECR_REGION }}
|
||||
aws-account-ids: "${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}"
|
||||
azure-client-id: ${{ vars.AZURE_DEV_CLIENT_ID }}
|
||||
azure-subscription-id: ${{ vars.AZURE_DEV_SUBSCRIPTION_ID }}
|
||||
azure-tenant-id: ${{ vars.AZURE_TENANT_ID }}
|
||||
acr-registry-name: ${{ vars.AZURE_DEV_REGISTRY_NAME }}
|
||||
secrets:
|
||||
aws-role-to-assume: "${{ vars.DEV_AWS_OIDC_ROLE_ARN }}"
|
||||
docker-hub-username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
|
||||
docker-hub-password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
|
||||
27
Cargo.lock
generated
27
Cargo.lock
generated
@@ -1316,7 +1316,6 @@ dependencies = [
|
||||
"flate2",
|
||||
"futures",
|
||||
"http 1.1.0",
|
||||
"jsonwebtoken",
|
||||
"metrics",
|
||||
"nix 0.27.1",
|
||||
"notify",
|
||||
@@ -1326,7 +1325,6 @@ dependencies = [
|
||||
"opentelemetry_sdk",
|
||||
"postgres",
|
||||
"postgres_initdb",
|
||||
"prometheus",
|
||||
"regex",
|
||||
"remote_storage",
|
||||
"reqwest",
|
||||
@@ -1345,7 +1343,6 @@ dependencies = [
|
||||
"tower 0.5.2",
|
||||
"tower-http",
|
||||
"tracing",
|
||||
"tracing-opentelemetry",
|
||||
"tracing-subscriber",
|
||||
"tracing-utils",
|
||||
"url",
|
||||
@@ -1877,6 +1874,12 @@ dependencies = [
|
||||
"syn 2.0.90",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "difflib"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8"
|
||||
|
||||
[[package]]
|
||||
name = "digest"
|
||||
version = "0.10.7"
|
||||
@@ -3334,6 +3337,17 @@ dependencies = [
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "json-structural-diff"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e878e36a8a44c158505c2c818abdc1350413ad83dcb774a0459f6a7ef2b65cbf"
|
||||
dependencies = [
|
||||
"difflib",
|
||||
"regex",
|
||||
"serde_json",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jsonwebtoken"
|
||||
version = "9.2.0"
|
||||
@@ -6446,6 +6460,7 @@ dependencies = [
|
||||
"humantime",
|
||||
"hyper 0.14.30",
|
||||
"itertools 0.10.5",
|
||||
"json-structural-diff",
|
||||
"lasso",
|
||||
"measured",
|
||||
"metrics",
|
||||
@@ -6468,6 +6483,7 @@ dependencies = [
|
||||
"strum",
|
||||
"strum_macros",
|
||||
"thiserror 1.0.69",
|
||||
"tikv-jemallocator",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres-rustls",
|
||||
@@ -7021,14 +7037,11 @@ dependencies = [
|
||||
name = "tokio-postgres2"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
"futures-util",
|
||||
"log",
|
||||
"parking_lot 0.12.1",
|
||||
"percent-encoding",
|
||||
"phf",
|
||||
"pin-project-lite",
|
||||
"postgres-protocol2",
|
||||
@@ -7615,13 +7628,13 @@ dependencies = [
|
||||
"hex",
|
||||
"hex-literal",
|
||||
"humantime",
|
||||
"inferno 0.12.0",
|
||||
"jsonwebtoken",
|
||||
"metrics",
|
||||
"nix 0.27.1",
|
||||
"once_cell",
|
||||
"pin-project-lite",
|
||||
"postgres_connection",
|
||||
"pprof",
|
||||
"pq_proto",
|
||||
"rand 0.8.5",
|
||||
"regex",
|
||||
|
||||
@@ -210,6 +210,7 @@ rustls-native-certs = "0.8"
|
||||
x509-parser = "0.16"
|
||||
whoami = "1.5.1"
|
||||
zerocopy = { version = "0.7", features = ["derive"] }
|
||||
json-structural-diff = { version = "0.2.0" }
|
||||
|
||||
## TODO replace this with tracing
|
||||
env_logger = "0.10"
|
||||
|
||||
@@ -292,7 +292,7 @@ WORKDIR /home/nonroot
|
||||
|
||||
# Rust
|
||||
# Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`)
|
||||
ENV RUSTC_VERSION=1.84.1
|
||||
ENV RUSTC_VERSION=1.85.0
|
||||
ENV RUSTUP_HOME="/home/nonroot/.rustup"
|
||||
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
|
||||
ARG RUSTFILT_VERSION=0.2.1
|
||||
|
||||
@@ -395,15 +395,22 @@ RUN case "${PG_VERSION:?}" in \
|
||||
cd plv8-src && \
|
||||
if [[ "${PG_VERSION:?}" < "v17" ]]; then patch -p1 < /ext-src/plv8-3.1.10.patch; fi
|
||||
|
||||
FROM pg-build AS plv8-build
|
||||
# Step 1: Build the vendored V8 engine. It doesn't depend on PostgreSQL, so use
|
||||
# 'build-deps' as the base. This enables caching and avoids unnecessary rebuilds.
|
||||
# (The V8 engine takes a very long time to build)
|
||||
FROM build-deps AS plv8-build
|
||||
ARG PG_VERSION
|
||||
WORKDIR /ext-src/plv8-src
|
||||
RUN apt update && \
|
||||
apt install --no-install-recommends --no-install-suggests -y \
|
||||
ninja-build python3-dev libncurses5 binutils clang \
|
||||
&& apt clean && rm -rf /var/lib/apt/lists/*
|
||||
|
||||
COPY --from=plv8-src /ext-src/ /ext-src/
|
||||
WORKDIR /ext-src/plv8-src
|
||||
RUN make DOCKER=1 -j $(getconf _NPROCESSORS_ONLN) v8
|
||||
|
||||
# Step 2: Build the PostgreSQL-dependent parts
|
||||
COPY --from=pg-build /usr/local/pgsql /usr/local/pgsql
|
||||
ENV PATH="/usr/local/pgsql/bin:$PATH"
|
||||
RUN \
|
||||
# generate and copy upgrade scripts
|
||||
make generate_upgrades && \
|
||||
@@ -1509,6 +1516,73 @@ WORKDIR /ext-src/pg_repack-src
|
||||
RUN make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install
|
||||
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pgaudit"
|
||||
# compile pgaudit extension
|
||||
#
|
||||
#########################################################################################
|
||||
|
||||
FROM build-deps AS pgaudit-src
|
||||
ARG PG_VERSION
|
||||
WORKDIR /ext-src
|
||||
RUN case "${PG_VERSION}" in \
|
||||
"v14") \
|
||||
export PGAUDIT_VERSION=1.6.2 \
|
||||
export PGAUDIT_CHECKSUM=1f350d70a0cbf488c0f2b485e3a5c9b11f78ad9e3cbb95ef6904afa1eb3187eb \
|
||||
;; \
|
||||
"v15") \
|
||||
export PGAUDIT_VERSION=1.7.0 \
|
||||
export PGAUDIT_CHECKSUM=8f4a73e451c88c567e516e6cba7dc1e23bc91686bb6f1f77f8f3126d428a8bd8 \
|
||||
;; \
|
||||
"v16") \
|
||||
export PGAUDIT_VERSION=16.0 \
|
||||
export PGAUDIT_CHECKSUM=d53ef985f2d0b15ba25c512c4ce967dce07b94fd4422c95bd04c4c1a055fe738 \
|
||||
;; \
|
||||
"v17") \
|
||||
export PGAUDIT_VERSION=17.0 \
|
||||
export PGAUDIT_CHECKSUM=7d0d08d030275d525f36cd48b38c6455f1023da863385badff0cec44965bfd8c \
|
||||
;; \
|
||||
*) \
|
||||
echo "pgaudit is not supported on this PostgreSQL version" && exit 1;; \
|
||||
esac && \
|
||||
wget https://github.com/pgaudit/pgaudit/archive/refs/tags/${PGAUDIT_VERSION}.tar.gz -O pgaudit.tar.gz && \
|
||||
echo "${PGAUDIT_CHECKSUM} pgaudit.tar.gz" | sha256sum --check && \
|
||||
mkdir pgaudit-src && cd pgaudit-src && tar xzf ../pgaudit.tar.gz --strip-components=1 -C .
|
||||
|
||||
FROM pg-build AS pgaudit-build
|
||||
COPY --from=pgaudit-src /ext-src/ /ext-src/
|
||||
WORKDIR /ext-src/pgaudit-src
|
||||
RUN make install USE_PGXS=1 -j $(getconf _NPROCESSORS_ONLN)
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pgauditlogtofile"
|
||||
# compile pgauditlogtofile extension
|
||||
#
|
||||
#########################################################################################
|
||||
|
||||
FROM build-deps AS pgauditlogtofile-src
|
||||
ARG PG_VERSION
|
||||
WORKDIR /ext-src
|
||||
RUN case "${PG_VERSION}" in \
|
||||
"v14" | "v15" | "v16" | "v17") \
|
||||
export PGAUDITLOGTOFILE_VERSION=v1.6.4 \
|
||||
export PGAUDITLOGTOFILE_CHECKSUM=ef801eb09c26aaa935c0dabd92c81eb9ebe338930daa9674d420a280c6bc2d70 \
|
||||
;; \
|
||||
*) \
|
||||
echo "pgauditlogtofile is not supported on this PostgreSQL version" && exit 1;; \
|
||||
esac && \
|
||||
wget https://github.com/fmbiete/pgauditlogtofile/archive/refs/tags/${PGAUDITLOGTOFILE_VERSION}.tar.gz -O pgauditlogtofile.tar.gz && \
|
||||
echo "${PGAUDITLOGTOFILE_CHECKSUM} pgauditlogtofile.tar.gz" | sha256sum --check && \
|
||||
mkdir pgauditlogtofile-src && cd pgauditlogtofile-src && tar xzf ../pgauditlogtofile.tar.gz --strip-components=1 -C .
|
||||
|
||||
FROM pg-build AS pgauditlogtofile-build
|
||||
COPY --from=pgauditlogtofile-src /ext-src/ /ext-src/
|
||||
WORKDIR /ext-src/pgauditlogtofile-src
|
||||
RUN make install USE_PGXS=1 -j $(getconf _NPROCESSORS_ONLN)
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "neon-ext-build"
|
||||
@@ -1602,8 +1676,14 @@ COPY --from=pg_anon-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_ivm-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_partman-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_mooncake-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_duckdb-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
# Disabled temporarily, because it clashed with pg_mooncake. pg_mooncake
|
||||
# also depends on libduckdb, but a different version.
|
||||
#COPY --from=pg_duckdb-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
COPY --from=pg_repack-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pgaudit-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pgauditlogtofile-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
@@ -1775,14 +1855,20 @@ COPY --from=pg_semver-src /ext-src/ /ext-src/
|
||||
COPY --from=pg_ivm-src /ext-src/ /ext-src/
|
||||
COPY --from=pg_partman-src /ext-src/ /ext-src/
|
||||
#COPY --from=pg_mooncake-src /ext-src/ /ext-src/
|
||||
#COPY --from=pg_repack-src /ext-src/ /ext-src/
|
||||
COPY --from=pg_repack-src /ext-src/ /ext-src/
|
||||
COPY --from=pg_repack-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY compute/patches/pg_repack.patch /ext-src
|
||||
RUN cd /ext-src/pg_repack-src && patch -p1 </ext-src/pg_repack.patch && rm -f /ext-src/pg_repack.patch
|
||||
|
||||
COPY --chmod=755 docker-compose/run-tests.sh /run-tests.sh
|
||||
RUN apt-get update && apt-get install -y libtap-parser-sourcehandler-pgtap-perl\
|
||||
&& apt clean && rm -rf /ext-src/*.tar.gz /var/lib/apt/lists/*
|
||||
ENV PATH=/usr/local/pgsql/bin:$PATH
|
||||
ENV PGHOST=compute
|
||||
ENV PGPORT=55433
|
||||
ENV PGUSER=cloud_admin
|
||||
ENV PGDATABASE=postgres
|
||||
ENV PG_VERSION=${PG_VERSION:?}
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
|
||||
72
compute/patches/pg_repack.patch
Normal file
72
compute/patches/pg_repack.patch
Normal file
@@ -0,0 +1,72 @@
|
||||
diff --git a/regress/Makefile b/regress/Makefile
|
||||
index bf6edcb..89b4c7f 100644
|
||||
--- a/regress/Makefile
|
||||
+++ b/regress/Makefile
|
||||
@@ -17,7 +17,7 @@ INTVERSION := $(shell echo $$(($$(echo $(VERSION).0 | sed 's/\([[:digit:]]\{1,\}
|
||||
# Test suite
|
||||
#
|
||||
|
||||
-REGRESS := init-extension repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper tablespace get_order_by trigger
|
||||
+REGRESS := init-extension repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper get_order_by trigger
|
||||
|
||||
USE_PGXS = 1 # use pgxs if not in contrib directory
|
||||
PGXS := $(shell $(PG_CONFIG) --pgxs)
|
||||
diff --git a/regress/expected/nosuper.out b/regress/expected/nosuper.out
|
||||
index 8d0a94e..63b68bf 100644
|
||||
--- a/regress/expected/nosuper.out
|
||||
+++ b/regress/expected/nosuper.out
|
||||
@@ -4,22 +4,22 @@
|
||||
SET client_min_messages = error;
|
||||
DROP ROLE IF EXISTS nosuper;
|
||||
SET client_min_messages = warning;
|
||||
-CREATE ROLE nosuper WITH LOGIN;
|
||||
+CREATE ROLE nosuper WITH LOGIN PASSWORD 'NoSuPeRpAsSwOrD';
|
||||
-- => OK
|
||||
\! pg_repack --dbname=contrib_regression --table=tbl_cluster --no-superuser-check
|
||||
INFO: repacking table "public.tbl_cluster"
|
||||
-- => ERROR
|
||||
-\! pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper
|
||||
+\! PGPASSWORD=NoSuPeRpAsSwOrD pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper
|
||||
ERROR: pg_repack failed with error: You must be a superuser to use pg_repack
|
||||
-- => ERROR
|
||||
-\! pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper --no-superuser-check
|
||||
+\! PGPASSWORD=NoSuPeRpAsSwOrD pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper --no-superuser-check
|
||||
ERROR: pg_repack failed with error: ERROR: permission denied for schema repack
|
||||
LINE 1: select repack.version(), repack.version_sql()
|
||||
^
|
||||
GRANT ALL ON ALL TABLES IN SCHEMA repack TO nosuper;
|
||||
GRANT USAGE ON SCHEMA repack TO nosuper;
|
||||
-- => ERROR
|
||||
-\! pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper --no-superuser-check
|
||||
+\! PGPASSWORD=NoSuPeRpAsSwOrD pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper --no-superuser-check
|
||||
INFO: repacking table "public.tbl_cluster"
|
||||
ERROR: query failed: ERROR: current transaction is aborted, commands ignored until end of transaction block
|
||||
DETAIL: query was: RESET lock_timeout
|
||||
diff --git a/regress/sql/nosuper.sql b/regress/sql/nosuper.sql
|
||||
index 072f0fa..dbe60f8 100644
|
||||
--- a/regress/sql/nosuper.sql
|
||||
+++ b/regress/sql/nosuper.sql
|
||||
@@ -4,19 +4,19 @@
|
||||
SET client_min_messages = error;
|
||||
DROP ROLE IF EXISTS nosuper;
|
||||
SET client_min_messages = warning;
|
||||
-CREATE ROLE nosuper WITH LOGIN;
|
||||
+CREATE ROLE nosuper WITH LOGIN PASSWORD 'NoSuPeRpAsSwOrD';
|
||||
-- => OK
|
||||
\! pg_repack --dbname=contrib_regression --table=tbl_cluster --no-superuser-check
|
||||
-- => ERROR
|
||||
-\! pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper
|
||||
+\! PGPASSWORD=NoSuPeRpAsSwOrD pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper
|
||||
-- => ERROR
|
||||
-\! pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper --no-superuser-check
|
||||
+\! PGPASSWORD=NoSuPeRpAsSwOrD pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper --no-superuser-check
|
||||
|
||||
GRANT ALL ON ALL TABLES IN SCHEMA repack TO nosuper;
|
||||
GRANT USAGE ON SCHEMA repack TO nosuper;
|
||||
|
||||
-- => ERROR
|
||||
-\! pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper --no-superuser-check
|
||||
+\! PGPASSWORD=NoSuPeRpAsSwOrD pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper --no-superuser-check
|
||||
|
||||
REVOKE ALL ON ALL TABLES IN SCHEMA repack FROM nosuper;
|
||||
REVOKE USAGE ON SCHEMA repack FROM nosuper;
|
||||
@@ -25,7 +25,6 @@ fail.workspace = true
|
||||
flate2.workspace = true
|
||||
futures.workspace = true
|
||||
http.workspace = true
|
||||
jsonwebtoken.workspace = true
|
||||
metrics.workspace = true
|
||||
nix.workspace = true
|
||||
notify.workspace = true
|
||||
@@ -48,13 +47,11 @@ tokio-postgres.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
tracing.workspace = true
|
||||
tracing-opentelemetry.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
tracing-utils.workspace = true
|
||||
thiserror.workspace = true
|
||||
url.workspace = true
|
||||
uuid.workspace = true
|
||||
prometheus.workspace = true
|
||||
walkdir.workspace = true
|
||||
|
||||
postgres_initdb.workspace = true
|
||||
|
||||
@@ -41,7 +41,6 @@ use std::process::exit;
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::{mpsc, Arc, Condvar, Mutex, RwLock};
|
||||
use std::time::SystemTime;
|
||||
use std::{thread, time::Duration};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
@@ -86,19 +85,6 @@ fn parse_remote_ext_config(arg: &str) -> Result<String> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate a compute ID if one is not supplied. This exists to keep forward
|
||||
/// compatibility tests working, but will be removed in a future iteration.
|
||||
fn generate_compute_id() -> String {
|
||||
let now = SystemTime::now();
|
||||
|
||||
format!(
|
||||
"compute-{}",
|
||||
now.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs()
|
||||
)
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(rename_all = "kebab-case")]
|
||||
struct Cli {
|
||||
@@ -112,16 +98,13 @@ struct Cli {
|
||||
/// outside the compute will talk to the compute through this port. Keep
|
||||
/// the previous name for this argument around for a smoother release
|
||||
/// with the control plane.
|
||||
///
|
||||
/// TODO: Remove the alias after the control plane release which teaches the
|
||||
/// control plane about the renamed argument.
|
||||
#[arg(long, alias = "http-port", default_value_t = 3080)]
|
||||
#[arg(long, default_value_t = 3080)]
|
||||
pub external_http_port: u16,
|
||||
|
||||
/// The port to bind the internal listening HTTP server to. Clients like
|
||||
/// The port to bind the internal listening HTTP server to. Clients include
|
||||
/// the neon extension (for installing remote extensions) and local_proxy.
|
||||
#[arg(long)]
|
||||
pub internal_http_port: Option<u16>,
|
||||
#[arg(long, default_value_t = 3081)]
|
||||
pub internal_http_port: u16,
|
||||
|
||||
#[arg(short = 'D', long, value_name = "DATADIR")]
|
||||
pub pgdata: String,
|
||||
@@ -156,7 +139,7 @@ struct Cli {
|
||||
#[arg(short = 'S', long, group = "spec-path")]
|
||||
pub spec_path: Option<OsString>,
|
||||
|
||||
#[arg(short = 'i', long, group = "compute-id", default_value = generate_compute_id())]
|
||||
#[arg(short = 'i', long, group = "compute-id")]
|
||||
pub compute_id: String,
|
||||
|
||||
#[arg(short = 'p', long, conflicts_with_all = ["spec", "spec-path"], value_name = "CONTROL_PLANE_API_BASE_URL")]
|
||||
@@ -359,7 +342,7 @@ fn wait_spec(
|
||||
pgbin: cli.pgbin.clone(),
|
||||
pgversion: get_pg_version_string(&cli.pgbin),
|
||||
external_http_port: cli.external_http_port,
|
||||
internal_http_port: cli.internal_http_port.unwrap_or(cli.external_http_port + 1),
|
||||
internal_http_port: cli.internal_http_port,
|
||||
live_config_allowed,
|
||||
state: Mutex::new(new_state),
|
||||
state_changed: Condvar::new(),
|
||||
@@ -383,7 +366,7 @@ fn wait_spec(
|
||||
|
||||
// The internal HTTP server could be launched later, but there isn't much
|
||||
// sense in waiting.
|
||||
Server::Internal(cli.internal_http_port.unwrap_or(cli.external_http_port + 1)).launch(&compute);
|
||||
Server::Internal(cli.internal_http_port).launch(&compute);
|
||||
|
||||
if !spec_set {
|
||||
// No spec provided, hang waiting for it.
|
||||
|
||||
@@ -361,6 +361,14 @@ async fn run_dump_restore(
|
||||
// how we run it
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", &pg_lib_dir)
|
||||
.env(
|
||||
"ASAN_OPTIONS",
|
||||
std::env::var("ASAN_OPTIONS").unwrap_or_default(),
|
||||
)
|
||||
.env(
|
||||
"UBSAN_OPTIONS",
|
||||
std::env::var("UBSAN_OPTIONS").unwrap_or_default(),
|
||||
)
|
||||
.kill_on_drop(true)
|
||||
.stdout(std::process::Stdio::piped())
|
||||
.stderr(std::process::Stdio::piped())
|
||||
@@ -394,6 +402,14 @@ async fn run_dump_restore(
|
||||
// how we run it
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", &pg_lib_dir)
|
||||
.env(
|
||||
"ASAN_OPTIONS",
|
||||
std::env::var("ASAN_OPTIONS").unwrap_or_default(),
|
||||
)
|
||||
.env(
|
||||
"UBSAN_OPTIONS",
|
||||
std::env::var("UBSAN_OPTIONS").unwrap_or_default(),
|
||||
)
|
||||
.kill_on_drop(true)
|
||||
.stdout(std::process::Stdio::piped())
|
||||
.stderr(std::process::Stdio::piped())
|
||||
|
||||
@@ -2,6 +2,7 @@ DO $$
|
||||
DECLARE
|
||||
subname TEXT;
|
||||
BEGIN
|
||||
LOCK TABLE pg_subscription IN ACCESS EXCLUSIVE MODE;
|
||||
FOR subname IN SELECT pg_subscription.subname FROM pg_subscription WHERE subdbid = (SELECT oid FROM pg_database WHERE datname = {datname_str}) LOOP
|
||||
EXECUTE format('ALTER SUBSCRIPTION %I DISABLE;', subname);
|
||||
EXECUTE format('ALTER SUBSCRIPTION %I SET (slot_name = NONE);', subname);
|
||||
|
||||
@@ -46,6 +46,8 @@ use std::process::Command;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use compute_api::requests::ConfigurationRequest;
|
||||
@@ -59,6 +61,7 @@ use nix::sys::signal::Signal;
|
||||
use pageserver_api::shard::ShardStripeSize;
|
||||
use reqwest::header::CONTENT_TYPE;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::debug;
|
||||
use url::Host;
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
|
||||
@@ -81,8 +84,10 @@ pub struct EndpointConf {
|
||||
internal_http_port: u16,
|
||||
pg_version: u32,
|
||||
skip_pg_catalog_updates: bool,
|
||||
reconfigure_concurrency: usize,
|
||||
drop_subscriptions_before_start: bool,
|
||||
features: Vec<ComputeFeature>,
|
||||
cluster: Option<Cluster>,
|
||||
}
|
||||
|
||||
//
|
||||
@@ -179,7 +184,9 @@ impl ComputeControlPlane {
|
||||
// we also skip catalog updates in the cloud.
|
||||
skip_pg_catalog_updates,
|
||||
drop_subscriptions_before_start,
|
||||
reconfigure_concurrency: 1,
|
||||
features: vec![],
|
||||
cluster: None,
|
||||
});
|
||||
|
||||
ep.create_endpoint_dir()?;
|
||||
@@ -196,7 +203,9 @@ impl ComputeControlPlane {
|
||||
pg_version,
|
||||
skip_pg_catalog_updates,
|
||||
drop_subscriptions_before_start,
|
||||
reconfigure_concurrency: 1,
|
||||
features: vec![],
|
||||
cluster: None,
|
||||
})?,
|
||||
)?;
|
||||
std::fs::write(
|
||||
@@ -261,8 +270,11 @@ pub struct Endpoint {
|
||||
skip_pg_catalog_updates: bool,
|
||||
|
||||
drop_subscriptions_before_start: bool,
|
||||
reconfigure_concurrency: usize,
|
||||
// Feature flags
|
||||
features: Vec<ComputeFeature>,
|
||||
// Cluster settings
|
||||
cluster: Option<Cluster>,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq)]
|
||||
@@ -302,6 +314,8 @@ impl Endpoint {
|
||||
let conf: EndpointConf =
|
||||
serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
|
||||
|
||||
debug!("serialized endpoint conf: {:?}", conf);
|
||||
|
||||
Ok(Endpoint {
|
||||
pg_address: SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), conf.pg_port),
|
||||
external_http_address: SocketAddr::new(
|
||||
@@ -319,8 +333,10 @@ impl Endpoint {
|
||||
tenant_id: conf.tenant_id,
|
||||
pg_version: conf.pg_version,
|
||||
skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
|
||||
reconfigure_concurrency: conf.reconfigure_concurrency,
|
||||
drop_subscriptions_before_start: conf.drop_subscriptions_before_start,
|
||||
features: conf.features,
|
||||
cluster: conf.cluster,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -607,7 +623,7 @@ impl Endpoint {
|
||||
};
|
||||
|
||||
// Create spec file
|
||||
let spec = ComputeSpec {
|
||||
let mut spec = ComputeSpec {
|
||||
skip_pg_catalog_updates: self.skip_pg_catalog_updates,
|
||||
format_version: 1.0,
|
||||
operation_uuid: None,
|
||||
@@ -640,7 +656,7 @@ impl Endpoint {
|
||||
Vec::new()
|
||||
},
|
||||
settings: None,
|
||||
postgresql_conf: Some(postgresql_conf),
|
||||
postgresql_conf: Some(postgresql_conf.clone()),
|
||||
},
|
||||
delta_operations: None,
|
||||
tenant_id: Some(self.tenant_id),
|
||||
@@ -653,9 +669,35 @@ impl Endpoint {
|
||||
pgbouncer_settings: None,
|
||||
shard_stripe_size: Some(shard_stripe_size),
|
||||
local_proxy_config: None,
|
||||
reconfigure_concurrency: 1,
|
||||
reconfigure_concurrency: self.reconfigure_concurrency,
|
||||
drop_subscriptions_before_start: self.drop_subscriptions_before_start,
|
||||
};
|
||||
|
||||
// this strange code is needed to support respec() in tests
|
||||
if self.cluster.is_some() {
|
||||
debug!("Cluster is already set in the endpoint spec, using it");
|
||||
spec.cluster = self.cluster.clone().unwrap();
|
||||
|
||||
debug!("spec.cluster {:?}", spec.cluster);
|
||||
|
||||
// fill missing fields again
|
||||
if create_test_user {
|
||||
spec.cluster.roles.push(Role {
|
||||
name: PgIdent::from_str("test").unwrap(),
|
||||
encrypted_password: None,
|
||||
options: None,
|
||||
});
|
||||
spec.cluster.databases.push(Database {
|
||||
name: PgIdent::from_str("neondb").unwrap(),
|
||||
owner: PgIdent::from_str("test").unwrap(),
|
||||
options: None,
|
||||
restrict_conn: false,
|
||||
invalid: false,
|
||||
});
|
||||
}
|
||||
spec.cluster.postgresql_conf = Some(postgresql_conf);
|
||||
}
|
||||
|
||||
let spec_path = self.endpoint_path().join("spec.json");
|
||||
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
|
||||
|
||||
@@ -673,18 +715,14 @@ impl Endpoint {
|
||||
println!("Also at '{}'", conn_str);
|
||||
}
|
||||
let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
|
||||
//cmd.args([
|
||||
// "--external-http-port",
|
||||
// &self.external_http_address.port().to_string(),
|
||||
//])
|
||||
//.args([
|
||||
// "--internal-http-port",
|
||||
// &self.internal_http_address.port().to_string(),
|
||||
//])
|
||||
cmd.args([
|
||||
"--http-port",
|
||||
"--external-http-port",
|
||||
&self.external_http_address.port().to_string(),
|
||||
])
|
||||
.args([
|
||||
"--internal-http-port",
|
||||
&self.internal_http_address.port().to_string(),
|
||||
])
|
||||
.args(["--pgdata", self.pgdata().to_str().unwrap()])
|
||||
.args(["--connstr", &conn_str])
|
||||
.args([
|
||||
@@ -701,20 +739,16 @@ impl Endpoint {
|
||||
])
|
||||
// TODO: It would be nice if we generated compute IDs with the same
|
||||
// algorithm as the real control plane.
|
||||
//
|
||||
// TODO: Add this back when
|
||||
// https://github.com/neondatabase/neon/pull/10747 is merged.
|
||||
//
|
||||
//.args([
|
||||
// "--compute-id",
|
||||
// &format!(
|
||||
// "compute-{}",
|
||||
// SystemTime::now()
|
||||
// .duration_since(UNIX_EPOCH)
|
||||
// .unwrap()
|
||||
// .as_secs()
|
||||
// ),
|
||||
//])
|
||||
.args([
|
||||
"--compute-id",
|
||||
&format!(
|
||||
"compute-{}",
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs()
|
||||
),
|
||||
])
|
||||
.stdin(std::process::Stdio::null())
|
||||
.stderr(logfile.try_clone()?)
|
||||
.stdout(logfile);
|
||||
|
||||
@@ -335,13 +335,21 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'checkpoint_distance' as an integer")?,
|
||||
checkpoint_timeout: settings.remove("checkpoint_timeout").map(|x| x.to_string()),
|
||||
checkpoint_timeout: settings
|
||||
.remove("checkpoint_timeout")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'checkpoint_timeout' as duration")?,
|
||||
compaction_target_size: settings
|
||||
.remove("compaction_target_size")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'compaction_target_size' as an integer")?,
|
||||
compaction_period: settings.remove("compaction_period").map(|x| x.to_string()),
|
||||
compaction_period: settings
|
||||
.remove("compaction_period")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'compaction_period' as duration")?,
|
||||
compaction_threshold: settings
|
||||
.remove("compaction_threshold")
|
||||
.map(|x| x.parse::<usize>())
|
||||
@@ -387,7 +395,10 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_horizon' as an integer")?,
|
||||
gc_period: settings.remove("gc_period").map(|x| x.to_string()),
|
||||
gc_period: settings.remove("gc_period")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_period' as duration")?,
|
||||
image_creation_threshold: settings
|
||||
.remove("image_creation_threshold")
|
||||
.map(|x| x.parse::<usize>())
|
||||
@@ -403,13 +414,20 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<usize>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'image_creation_preempt_threshold' as integer")?,
|
||||
pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
|
||||
pitr_interval: settings.remove("pitr_interval")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'pitr_interval' as duration")?,
|
||||
walreceiver_connect_timeout: settings
|
||||
.remove("walreceiver_connect_timeout")
|
||||
.map(|x| x.to_string()),
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'walreceiver_connect_timeout' as duration")?,
|
||||
lagging_wal_timeout: settings
|
||||
.remove("lagging_wal_timeout")
|
||||
.map(|x| x.to_string()),
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'lagging_wal_timeout' as duration")?,
|
||||
max_lsn_wal_lag: settings
|
||||
.remove("max_lsn_wal_lag")
|
||||
.map(|x| x.parse::<NonZeroU64>())
|
||||
@@ -427,8 +445,14 @@ impl PageServerNode {
|
||||
.context("Failed to parse 'min_resident_size_override' as integer")?,
|
||||
evictions_low_residence_duration_metric_threshold: settings
|
||||
.remove("evictions_low_residence_duration_metric_threshold")
|
||||
.map(|x| x.to_string()),
|
||||
heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()),
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'evictions_low_residence_duration_metric_threshold' as duration")?,
|
||||
heatmap_period: settings
|
||||
.remove("heatmap_period")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'heatmap_period' as duration")?,
|
||||
lazy_slru_download: settings
|
||||
.remove("lazy_slru_download")
|
||||
.map(|x| x.parse::<bool>())
|
||||
@@ -439,10 +463,15 @@ impl PageServerNode {
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("parse `timeline_get_throttle` from json")?,
|
||||
lsn_lease_length: settings.remove("lsn_lease_length").map(|x| x.to_string()),
|
||||
lsn_lease_length: settings.remove("lsn_lease_length")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'lsn_lease_length' as duration")?,
|
||||
lsn_lease_length_for_ts: settings
|
||||
.remove("lsn_lease_length_for_ts")
|
||||
.map(|x| x.to_string()),
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'lsn_lease_length_for_ts' as duration")?,
|
||||
timeline_offloading: settings
|
||||
.remove("timeline_offloading")
|
||||
.map(|x| x.parse::<bool>())
|
||||
|
||||
@@ -22,7 +22,7 @@ use pageserver_api::{
|
||||
};
|
||||
use pageserver_client::mgmt_api::{self};
|
||||
use reqwest::{Method, StatusCode, Url};
|
||||
use utils::id::{NodeId, TenantId};
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
|
||||
use pageserver_api::controller_api::{
|
||||
NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
|
||||
@@ -47,6 +47,9 @@ enum Command {
|
||||
listen_http_addr: String,
|
||||
#[arg(long)]
|
||||
listen_http_port: u16,
|
||||
#[arg(long)]
|
||||
listen_https_port: Option<u16>,
|
||||
|
||||
#[arg(long)]
|
||||
availability_zone_id: String,
|
||||
},
|
||||
@@ -239,6 +242,19 @@ enum Command {
|
||||
#[arg(long)]
|
||||
scheduling_policy: SkSchedulingPolicyArg,
|
||||
},
|
||||
/// Downloads any missing heatmap layers for all shard for a given timeline
|
||||
DownloadHeatmapLayers {
|
||||
/// Tenant ID or tenant shard ID. When an unsharded tenant ID is specified,
|
||||
/// the operation is performed on all shards. When a sharded tenant ID is
|
||||
/// specified, the operation is only performed on the specified shard.
|
||||
#[arg(long)]
|
||||
tenant_shard_id: TenantShardId,
|
||||
#[arg(long)]
|
||||
timeline_id: TimelineId,
|
||||
/// Optional: Maximum download concurrency (default is 16)
|
||||
#[arg(long)]
|
||||
concurrency: Option<usize>,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
@@ -381,6 +397,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
listen_pg_port,
|
||||
listen_http_addr,
|
||||
listen_http_port,
|
||||
listen_https_port,
|
||||
availability_zone_id,
|
||||
} => {
|
||||
storcon_client
|
||||
@@ -393,6 +410,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
listen_pg_port,
|
||||
listen_http_addr,
|
||||
listen_http_port,
|
||||
listen_https_port,
|
||||
availability_zone_id: AvailabilityZone(availability_zone_id),
|
||||
}),
|
||||
)
|
||||
@@ -941,7 +959,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
threshold: threshold.into(),
|
||||
},
|
||||
)),
|
||||
heatmap_period: Some("300s".to_string()),
|
||||
heatmap_period: Some(Duration::from_secs(300)),
|
||||
..Default::default()
|
||||
},
|
||||
})
|
||||
@@ -1247,6 +1265,24 @@ async fn main() -> anyhow::Result<()> {
|
||||
String::from(scheduling_policy)
|
||||
);
|
||||
}
|
||||
Command::DownloadHeatmapLayers {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
concurrency,
|
||||
} => {
|
||||
let mut path = format!(
|
||||
"/v1/tenant/{}/timeline/{}/download_heatmap_layers",
|
||||
tenant_shard_id, timeline_id,
|
||||
);
|
||||
|
||||
if let Some(c) = concurrency {
|
||||
path = format!("{path}?concurrency={c}");
|
||||
}
|
||||
|
||||
storcon_client
|
||||
.dispatch::<(), ()>(Method::POST, path, None)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -77,4 +77,5 @@ echo "Start compute node"
|
||||
/usr/local/bin/compute_ctl --pgdata /var/db/postgres/compute \
|
||||
-C "postgresql://cloud_admin@localhost:55433/postgres" \
|
||||
-b /usr/local/bin/postgres \
|
||||
--compute-id "compute-$RANDOM" \
|
||||
-S ${SPEC_FILE}
|
||||
|
||||
@@ -81,15 +81,8 @@ for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
|
||||
[ $EXT_SUCCESS -eq 0 ] && FAILED=$(tail -1 testout.txt | awk '{for(i=1;i<=NF;i++){print "/ext-src/"$i;}}')
|
||||
[ $CONTRIB_SUCCESS -eq 0 ] && CONTRIB_FAILED=$(tail -1 testout_contrib.txt | awk '{for(i=0;i<=NF;i++){print "/postgres/contrib/"$i;}}')
|
||||
for d in $FAILED $CONTRIB_FAILED; do
|
||||
dn="$(basename $d)"
|
||||
rm -rf $dn
|
||||
mkdir $dn
|
||||
docker cp $TEST_CONTAINER_NAME:$d/regression.diffs $dn || [ $? -eq 1 ]
|
||||
docker cp $TEST_CONTAINER_NAME:$d/regression.out $dn || [ $? -eq 1 ]
|
||||
cat $dn/regression.out $dn/regression.diffs || true
|
||||
rm -rf $dn
|
||||
docker exec $TEST_CONTAINER_NAME bash -c 'for file in $(find '"$d"' -name regression.diffs -o -name regression.out); do cat $file; done' || [ $? -eq 1 ]
|
||||
done
|
||||
rm -rf $FAILED
|
||||
exit 1
|
||||
fi
|
||||
fi
|
||||
|
||||
5
docker-compose/ext-src/pg_repack-src/test-upgrade.sh
Executable file
5
docker-compose/ext-src/pg_repack-src/test-upgrade.sh
Executable file
@@ -0,0 +1,5 @@
|
||||
#!/bin/sh
|
||||
set -ex
|
||||
cd "$(dirname ${0})"
|
||||
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
|
||||
${PG_REGRESS} --use-existing --inputdir=./regress --bindir='/usr/local/pgsql/bin' --dbname=contrib_regression repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper get_order_by trigger
|
||||
24
docker-compose/ext-src/pg_semver-src/test-upgrade-17.patch
Normal file
24
docker-compose/ext-src/pg_semver-src/test-upgrade-17.patch
Normal file
@@ -0,0 +1,24 @@
|
||||
diff --git a/test/sql/base.sql b/test/sql/base.sql
|
||||
index 53adb30..2eed91b 100644
|
||||
--- a/test/sql/base.sql
|
||||
+++ b/test/sql/base.sql
|
||||
@@ -2,7 +2,6 @@
|
||||
BEGIN;
|
||||
|
||||
\i test/pgtap-core.sql
|
||||
-CREATE EXTENSION semver;
|
||||
|
||||
SELECT plan(334);
|
||||
--SELECT * FROM no_plan();
|
||||
diff --git a/test/sql/corpus.sql b/test/sql/corpus.sql
|
||||
index c0fe98e..39cdd2e 100644
|
||||
--- a/test/sql/corpus.sql
|
||||
+++ b/test/sql/corpus.sql
|
||||
@@ -4,7 +4,6 @@ BEGIN;
|
||||
-- Test the SemVer corpus from https://regex101.com/r/Ly7O1x/3/.
|
||||
|
||||
\i test/pgtap-core.sql
|
||||
-CREATE EXTENSION semver;
|
||||
|
||||
SELECT plan(76);
|
||||
--SELECT * FROM no_plan();
|
||||
@@ -1,6 +1,7 @@
|
||||
#!/bin/sh
|
||||
set -ex
|
||||
cd "$(dirname ${0})"
|
||||
patch -p1 <test-upgrade.patch
|
||||
patch -p1 <test-upgrade-${PG_VERSION}.patch
|
||||
psql -d contrib_regression -c "DROP EXTENSION IF EXISTS pgtap"
|
||||
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
|
||||
${PG_REGRESS} --use-existing --inputdir=./ --bindir='/usr/local/pgsql/bin' --inputdir=test --dbname=contrib_regression base corpus
|
||||
@@ -1,3 +1,16 @@
|
||||
diff --git a/Makefile b/Makefile
|
||||
index f255fe6..0a0fa65 100644
|
||||
--- a/Makefile
|
||||
+++ b/Makefile
|
||||
@@ -346,7 +346,7 @@ test: test-serial test-parallel
|
||||
TB_DIR = test/build
|
||||
GENERATED_SCHEDULE_DEPS = $(TB_DIR)/all_tests $(TB_DIR)/exclude_tests
|
||||
REGRESS = --schedule $(TB_DIR)/run.sch # Set this again just to be safe
|
||||
-REGRESS_OPTS = --inputdir=test --max-connections=$(PARALLEL_CONN) --schedule $(SETUP_SCH) $(REGRESS_CONF)
|
||||
+REGRESS_OPTS = --use-existing --dbname=pgtap_regression --inputdir=test --max-connections=$(PARALLEL_CONN) --schedule $(SETUP_SCH) $(REGRESS_CONF)
|
||||
SETUP_SCH = test/schedule/main.sch # schedule to use for test setup; this can be forcibly changed by some targets!
|
||||
IGNORE_TESTS = $(notdir $(EXCLUDE_TEST_FILES:.sql=))
|
||||
PARALLEL_TESTS = $(filter-out $(IGNORE_TESTS),$(filter-out $(SERIAL_TESTS),$(ALL_TESTS)))
|
||||
diff --git a/test/schedule/create.sql b/test/schedule/create.sql
|
||||
index ba355ed..7e250f5 100644
|
||||
--- a/test/schedule/create.sql
|
||||
|
||||
@@ -2,5 +2,4 @@
|
||||
set -ex
|
||||
cd "$(dirname ${0})"
|
||||
patch -p1 <test-upgrade.patch
|
||||
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
|
||||
${PG_REGRESS} --inputdir=./ --bindir='/usr/local/pgsql/bin' --inputdir=test --max-connections=86 --schedule test/schedule/main.sch --schedule test/build/run.sch --dbname contrib_regression --use-existing
|
||||
make installcheck
|
||||
@@ -2,4 +2,5 @@
|
||||
set -ex
|
||||
cd "$(dirname ${0})"
|
||||
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
|
||||
${PG_REGRESS} --inputdir=./ --bindir='/usr/local/pgsql/bin' --use-existing --dbname=contrib_regression plv8 plv8-errors scalar_args inline json startup_pre startup varparam json_conv jsonb_conv window guc es6 arraybuffer composites currentresource startup_perms bytea find_function_perms memory_limits reset show array_spread regression dialect bigint procedure
|
||||
REGRESS="$(make -n installcheck | awk '{print substr($0,index($0,"init-extension")+15);}')"
|
||||
${PG_REGRESS} --inputdir=./ --bindir='/usr/local/pgsql/bin' --use-existing --dbname=contrib_regression ${REGRESS}
|
||||
@@ -43,7 +43,8 @@ EXTENSIONS='[
|
||||
{"extname": "semver", "extdir": "pg_semver-src"},
|
||||
{"extname": "pg_ivm", "extdir": "pg_ivm-src"},
|
||||
{"extname": "pgjwt", "extdir": "pgjwt-src"},
|
||||
{"extname": "pgtap", "extdir": "pgtap-src"}
|
||||
{"extname": "pgtap", "extdir": "pgtap-src"},
|
||||
{"extname": "pg_repack", "extdir": "pg_repack-src"}
|
||||
]'
|
||||
EXTNAMES=$(echo ${EXTENSIONS} | jq -r '.[].extname' | paste -sd ' ' -)
|
||||
TAG=${NEWTAG} docker compose --profile test-extensions up --quiet-pull --build -d
|
||||
@@ -59,6 +60,8 @@ wait_for_ready
|
||||
docker compose cp ext-src neon-test-extensions:/
|
||||
docker compose exec neon-test-extensions psql -c "DROP DATABASE IF EXISTS contrib_regression"
|
||||
docker compose exec neon-test-extensions psql -c "CREATE DATABASE contrib_regression"
|
||||
docker compose exec neon-test-extensions psql -c "CREATE DATABASE pgtap_regression"
|
||||
docker compose exec neon-test-extensions psql -d pgtap_regression -c "CREATE EXTENSION pgtap"
|
||||
create_extensions "${EXTNAMES}"
|
||||
if [ "${FORCE_ALL_UPGRADE_TESTS:-false}" = true ]; then
|
||||
exts="${EXTNAMES}"
|
||||
|
||||
@@ -252,7 +252,7 @@ pub enum ComputeMode {
|
||||
Replica,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
|
||||
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
|
||||
pub struct Cluster {
|
||||
pub cluster_id: Option<String>,
|
||||
pub name: Option<String>,
|
||||
@@ -283,7 +283,7 @@ pub struct DeltaOp {
|
||||
|
||||
/// Rust representation of Postgres role info with only those fields
|
||||
/// that matter for us.
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
|
||||
pub struct Role {
|
||||
pub name: PgIdent,
|
||||
pub encrypted_password: Option<String>,
|
||||
@@ -292,7 +292,7 @@ pub struct Role {
|
||||
|
||||
/// Rust representation of Postgres database info with only those fields
|
||||
/// that matter for us.
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
|
||||
pub struct Database {
|
||||
pub name: PgIdent,
|
||||
pub owner: PgIdent,
|
||||
@@ -308,7 +308,7 @@ pub struct Database {
|
||||
/// Common type representing both SQL statement params with or without value,
|
||||
/// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config
|
||||
/// options like `wal_level = logical`.
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
|
||||
pub struct GenericOption {
|
||||
pub name: String,
|
||||
pub value: Option<String>,
|
||||
|
||||
@@ -2,7 +2,6 @@ use anyhow::bail;
|
||||
use flate2::write::{GzDecoder, GzEncoder};
|
||||
use flate2::Compression;
|
||||
use itertools::Itertools as _;
|
||||
use once_cell::sync::Lazy;
|
||||
use pprof::protos::{Function, Line, Location, Message as _, Profile};
|
||||
use regex::Regex;
|
||||
|
||||
@@ -58,38 +57,30 @@ pub fn symbolize(mut profile: Profile) -> anyhow::Result<Profile> {
|
||||
|
||||
// Resolve the line and function for each location.
|
||||
backtrace::resolve(loc.address as *mut c_void, |symbol| {
|
||||
let Some(symname) = symbol.name() else {
|
||||
let Some(symbol_name) = symbol.name() else {
|
||||
return;
|
||||
};
|
||||
let mut name = symname.to_string();
|
||||
|
||||
// Strip the Rust monomorphization suffix from the symbol name.
|
||||
static SUFFIX_REGEX: Lazy<Regex> =
|
||||
Lazy::new(|| Regex::new("::h[0-9a-f]{16}$").expect("invalid regex"));
|
||||
if let Some(m) = SUFFIX_REGEX.find(&name) {
|
||||
name.truncate(m.start());
|
||||
}
|
||||
|
||||
let function_id = match functions.get(&name) {
|
||||
Some(function) => function.id,
|
||||
None => {
|
||||
let id = functions.len() as u64 + 1;
|
||||
let system_name = String::from_utf8_lossy(symname.as_bytes());
|
||||
let function_name = format!("{symbol_name:#}");
|
||||
let functions_len = functions.len();
|
||||
let function_id = functions
|
||||
.entry(function_name)
|
||||
.or_insert_with_key(|function_name| {
|
||||
let function_id = functions_len as u64 + 1;
|
||||
let system_name = String::from_utf8_lossy(symbol_name.as_bytes());
|
||||
let filename = symbol
|
||||
.filename()
|
||||
.map(|path| path.to_string_lossy())
|
||||
.unwrap_or(Cow::Borrowed(""));
|
||||
let function = Function {
|
||||
id,
|
||||
name: string_id(&name),
|
||||
Function {
|
||||
id: function_id,
|
||||
name: string_id(function_name),
|
||||
system_name: string_id(&system_name),
|
||||
filename: string_id(&filename),
|
||||
..Default::default()
|
||||
};
|
||||
functions.insert(name, function);
|
||||
id
|
||||
}
|
||||
};
|
||||
}
|
||||
})
|
||||
.id;
|
||||
loc.line.push(Line {
|
||||
function_id,
|
||||
line: symbol.lineno().unwrap_or(0) as i64,
|
||||
|
||||
@@ -122,6 +122,8 @@ pub struct ConfigToml {
|
||||
pub page_service_pipelining: PageServicePipeliningConfig,
|
||||
pub get_vectored_concurrent_io: GetVectoredConcurrentIo,
|
||||
pub enable_read_path_debugging: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub validate_wal_contiguity: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -521,6 +523,7 @@ impl Default for ConfigToml {
|
||||
} else {
|
||||
None
|
||||
},
|
||||
validate_wal_contiguity: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -544,10 +547,11 @@ pub mod tenant_conf_defaults {
|
||||
pub const DEFAULT_COMPACTION_PERIOD: &str = "20 s";
|
||||
pub const DEFAULT_COMPACTION_THRESHOLD: usize = 10;
|
||||
|
||||
// This value needs to be tuned to avoid OOM. We have 3/4 of the total CPU threads to do background works, that's 16*3/4=9 on
|
||||
// most of our pageservers. Compaction ~50 layers requires about 2GB memory (could be reduced later by optimizing L0 hole
|
||||
// calculation to avoid loading all keys into the memory). So with this config, we can get a maximum peak compaction usage of 18GB.
|
||||
pub const DEFAULT_COMPACTION_UPPER_LIMIT: usize = 50;
|
||||
// This value needs to be tuned to avoid OOM. We have 3/4*CPUs threads for L0 compaction, that's
|
||||
// 3/4*16=9 on most of our pageservers. Compacting 20 layers requires about 1 GB memory (could
|
||||
// be reduced later by optimizing L0 hole calculation to avoid loading all keys into memory). So
|
||||
// with this config, we can get a maximum peak compaction usage of 9 GB.
|
||||
pub const DEFAULT_COMPACTION_UPPER_LIMIT: usize = 20;
|
||||
pub const DEFAULT_COMPACTION_L0_FIRST: bool = false;
|
||||
pub const DEFAULT_COMPACTION_L0_SEMAPHORE: bool = true;
|
||||
|
||||
|
||||
@@ -57,6 +57,7 @@ pub struct NodeRegisterRequest {
|
||||
|
||||
pub listen_http_addr: String,
|
||||
pub listen_http_port: u16,
|
||||
pub listen_https_port: Option<u16>,
|
||||
|
||||
pub availability_zone_id: AvailabilityZone,
|
||||
}
|
||||
@@ -105,6 +106,7 @@ pub struct TenantLocateResponseShard {
|
||||
|
||||
pub listen_http_addr: String,
|
||||
pub listen_http_port: u16,
|
||||
pub listen_https_port: Option<u16>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
@@ -148,6 +150,7 @@ pub struct NodeDescribeResponse {
|
||||
|
||||
pub listen_http_addr: String,
|
||||
pub listen_http_port: u16,
|
||||
pub listen_https_port: Option<u16>,
|
||||
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_pg_port: u16,
|
||||
|
||||
@@ -526,9 +526,13 @@ pub struct TenantConfigPatch {
|
||||
#[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)]
|
||||
pub struct TenantConfig {
|
||||
pub checkpoint_distance: Option<u64>,
|
||||
pub checkpoint_timeout: Option<String>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub checkpoint_timeout: Option<Duration>,
|
||||
pub compaction_target_size: Option<u64>,
|
||||
pub compaction_period: Option<String>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub compaction_period: Option<Duration>,
|
||||
pub compaction_threshold: Option<usize>,
|
||||
pub compaction_upper_limit: Option<usize>,
|
||||
// defer parsing compaction_algorithm, like eviction_policy
|
||||
@@ -539,22 +543,38 @@ pub struct TenantConfig {
|
||||
pub l0_flush_stall_threshold: Option<usize>,
|
||||
pub l0_flush_wait_upload: Option<bool>,
|
||||
pub gc_horizon: Option<u64>,
|
||||
pub gc_period: Option<String>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub gc_period: Option<Duration>,
|
||||
pub image_creation_threshold: Option<usize>,
|
||||
pub pitr_interval: Option<String>,
|
||||
pub walreceiver_connect_timeout: Option<String>,
|
||||
pub lagging_wal_timeout: Option<String>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub pitr_interval: Option<Duration>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub walreceiver_connect_timeout: Option<Duration>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lagging_wal_timeout: Option<Duration>,
|
||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||
pub eviction_policy: Option<EvictionPolicy>,
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
pub evictions_low_residence_duration_metric_threshold: Option<String>,
|
||||
pub heatmap_period: Option<String>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub evictions_low_residence_duration_metric_threshold: Option<Duration>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub heatmap_period: Option<Duration>,
|
||||
pub lazy_slru_download: Option<bool>,
|
||||
pub timeline_get_throttle: Option<ThrottleConfig>,
|
||||
pub image_layer_creation_check_threshold: Option<u8>,
|
||||
pub image_creation_preempt_threshold: Option<usize>,
|
||||
pub lsn_lease_length: Option<String>,
|
||||
pub lsn_lease_length_for_ts: Option<String>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lsn_lease_length: Option<Duration>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lsn_lease_length_for_ts: Option<Duration>,
|
||||
pub timeline_offloading: Option<bool>,
|
||||
pub wal_receiver_protocol_override: Option<PostgresClientProtocol>,
|
||||
pub rel_size_v2_enabled: Option<bool>,
|
||||
@@ -564,7 +584,10 @@ pub struct TenantConfig {
|
||||
}
|
||||
|
||||
impl TenantConfig {
|
||||
pub fn apply_patch(self, patch: TenantConfigPatch) -> TenantConfig {
|
||||
pub fn apply_patch(
|
||||
self,
|
||||
patch: TenantConfigPatch,
|
||||
) -> Result<TenantConfig, humantime::DurationError> {
|
||||
let Self {
|
||||
mut checkpoint_distance,
|
||||
mut checkpoint_timeout,
|
||||
@@ -604,11 +627,17 @@ impl TenantConfig {
|
||||
} = self;
|
||||
|
||||
patch.checkpoint_distance.apply(&mut checkpoint_distance);
|
||||
patch.checkpoint_timeout.apply(&mut checkpoint_timeout);
|
||||
patch
|
||||
.checkpoint_timeout
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut checkpoint_timeout);
|
||||
patch
|
||||
.compaction_target_size
|
||||
.apply(&mut compaction_target_size);
|
||||
patch.compaction_period.apply(&mut compaction_period);
|
||||
patch
|
||||
.compaction_period
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut compaction_period);
|
||||
patch.compaction_threshold.apply(&mut compaction_threshold);
|
||||
patch
|
||||
.compaction_upper_limit
|
||||
@@ -626,15 +655,25 @@ impl TenantConfig {
|
||||
.apply(&mut l0_flush_stall_threshold);
|
||||
patch.l0_flush_wait_upload.apply(&mut l0_flush_wait_upload);
|
||||
patch.gc_horizon.apply(&mut gc_horizon);
|
||||
patch.gc_period.apply(&mut gc_period);
|
||||
patch
|
||||
.gc_period
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut gc_period);
|
||||
patch
|
||||
.image_creation_threshold
|
||||
.apply(&mut image_creation_threshold);
|
||||
patch.pitr_interval.apply(&mut pitr_interval);
|
||||
patch
|
||||
.pitr_interval
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut pitr_interval);
|
||||
patch
|
||||
.walreceiver_connect_timeout
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut walreceiver_connect_timeout);
|
||||
patch.lagging_wal_timeout.apply(&mut lagging_wal_timeout);
|
||||
patch
|
||||
.lagging_wal_timeout
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut lagging_wal_timeout);
|
||||
patch.max_lsn_wal_lag.apply(&mut max_lsn_wal_lag);
|
||||
patch.eviction_policy.apply(&mut eviction_policy);
|
||||
patch
|
||||
@@ -642,8 +681,12 @@ impl TenantConfig {
|
||||
.apply(&mut min_resident_size_override);
|
||||
patch
|
||||
.evictions_low_residence_duration_metric_threshold
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut evictions_low_residence_duration_metric_threshold);
|
||||
patch.heatmap_period.apply(&mut heatmap_period);
|
||||
patch
|
||||
.heatmap_period
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut heatmap_period);
|
||||
patch.lazy_slru_download.apply(&mut lazy_slru_download);
|
||||
patch
|
||||
.timeline_get_throttle
|
||||
@@ -654,9 +697,13 @@ impl TenantConfig {
|
||||
patch
|
||||
.image_creation_preempt_threshold
|
||||
.apply(&mut image_creation_preempt_threshold);
|
||||
patch.lsn_lease_length.apply(&mut lsn_lease_length);
|
||||
patch
|
||||
.lsn_lease_length
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut lsn_lease_length);
|
||||
patch
|
||||
.lsn_lease_length_for_ts
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut lsn_lease_length_for_ts);
|
||||
patch.timeline_offloading.apply(&mut timeline_offloading);
|
||||
patch
|
||||
@@ -673,7 +720,7 @@ impl TenantConfig {
|
||||
.gc_compaction_ratio_percent
|
||||
.apply(&mut gc_compaction_ratio_percent);
|
||||
|
||||
Self {
|
||||
Ok(Self {
|
||||
checkpoint_distance,
|
||||
checkpoint_timeout,
|
||||
compaction_target_size,
|
||||
@@ -709,7 +756,7 @@ impl TenantConfig {
|
||||
gc_compaction_enabled,
|
||||
gc_compaction_initial_threshold_kb,
|
||||
gc_compaction_ratio_percent,
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1080,8 +1127,7 @@ pub struct TenantInfo {
|
||||
|
||||
/// Opaque explanation if gc is being blocked.
|
||||
///
|
||||
/// Only looked up for the individual tenant detail, not the listing. This is purely for
|
||||
/// debugging, not included in openapi.
|
||||
/// Only looked up for the individual tenant detail, not the listing.
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub gc_blocking: Option<String>,
|
||||
}
|
||||
@@ -2504,7 +2550,7 @@ mod tests {
|
||||
..base.clone()
|
||||
};
|
||||
|
||||
let patched = base.apply_patch(decoded.config);
|
||||
let patched = base.apply_patch(decoded.config).unwrap();
|
||||
|
||||
assert_eq!(patched, expected);
|
||||
}
|
||||
|
||||
@@ -278,7 +278,7 @@ pub fn generate_pg_control(
|
||||
checkpoint_bytes: &[u8],
|
||||
lsn: Lsn,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<(Bytes, u64)> {
|
||||
) -> anyhow::Result<(Bytes, u64, bool)> {
|
||||
dispatch_pgversion!(
|
||||
pg_version,
|
||||
pgv::xlog_utils::generate_pg_control(pg_control_bytes, checkpoint_bytes, lsn),
|
||||
|
||||
@@ -124,23 +124,59 @@ pub fn normalize_lsn(lsn: Lsn, seg_sz: usize) -> Lsn {
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate a pg_control file, for a basebackup for starting up Postgres at the given LSN
|
||||
///
|
||||
/// 'pg_control_bytes' and 'checkpoint_bytes' are the contents of those keys persisted in
|
||||
/// the pageserver. They use the same format as the PostgreSQL control file and the
|
||||
/// checkpoint record, but see walingest.rs for how exactly they are kept up to date.
|
||||
/// 'lsn' is the LSN at which we're starting up.
|
||||
///
|
||||
/// Returns:
|
||||
/// - pg_control file contents
|
||||
/// - system_identifier, extracted from the persisted information
|
||||
/// - true, if we're starting up from a "clean shutdown", i.e. if there was a shutdown
|
||||
/// checkpoint at the given LSN
|
||||
pub fn generate_pg_control(
|
||||
pg_control_bytes: &[u8],
|
||||
checkpoint_bytes: &[u8],
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<(Bytes, u64)> {
|
||||
) -> anyhow::Result<(Bytes, u64, bool)> {
|
||||
let mut pg_control = ControlFileData::decode(pg_control_bytes)?;
|
||||
let mut checkpoint = CheckPoint::decode(checkpoint_bytes)?;
|
||||
|
||||
// Generate new pg_control needed for bootstrap
|
||||
//
|
||||
// NB: In the checkpoint struct that we persist in the pageserver, we have a different
|
||||
// convention for the 'redo' field than in PostgreSQL: On a shutdown checkpoint,
|
||||
// 'redo' points the *end* of the checkpoint WAL record. On PostgreSQL, it points to
|
||||
// the beginning. Furthermore, on an online checkpoint, 'redo' is set to 0.
|
||||
//
|
||||
// We didn't always have this convention however, and old persisted records will have
|
||||
// old REDO values that point to some old LSN.
|
||||
//
|
||||
// The upshot is that if 'redo' is equal to the "current" LSN, there was a shutdown
|
||||
// checkpoint record at that point in WAL, with no new WAL records after it. That case
|
||||
// can be treated as starting from a clean shutdown. All other cases are treated as
|
||||
// non-clean shutdown. In Neon, we don't do WAL replay at startup in either case, so
|
||||
// that distinction doesn't matter very much. As of this writing, it only affects
|
||||
// whether the persisted pg_stats information can be used or not.
|
||||
//
|
||||
// In the Checkpoint struct in the returned pg_control file, the redo pointer is
|
||||
// always set to the LSN we're starting at, to hint that no WAL replay is required.
|
||||
// (There's some neon-specific code in Postgres startup to make that work, though.
|
||||
// Just setting the redo pointer is not sufficient.)
|
||||
let was_shutdown = Lsn(checkpoint.redo) == lsn;
|
||||
checkpoint.redo = normalize_lsn(lsn, WAL_SEGMENT_SIZE).0;
|
||||
|
||||
//save new values in pg_control
|
||||
// We use DBState_DB_SHUTDOWNED even if it was not a clean shutdown. The
|
||||
// neon-specific code at postgres startup ignores the state stored in the control
|
||||
// file, similar to archive recovery in standalone PostgreSQL. Similarly, the
|
||||
// checkPoint pointer is ignored, so just set it to 0.
|
||||
pg_control.checkPoint = 0;
|
||||
pg_control.checkPointCopy = checkpoint;
|
||||
pg_control.state = DBState_DB_SHUTDOWNED;
|
||||
|
||||
Ok((pg_control.encode(), pg_control.system_identifier))
|
||||
Ok((pg_control.encode(), pg_control.system_identifier, was_shutdown))
|
||||
}
|
||||
|
||||
pub fn get_current_timestamp() -> TimestampTz {
|
||||
|
||||
@@ -5,18 +5,15 @@ edition = "2021"
|
||||
license = "MIT/Apache-2.0"
|
||||
|
||||
[dependencies]
|
||||
async-trait.workspace = true
|
||||
bytes.workspace = true
|
||||
byteorder.workspace = true
|
||||
fallible-iterator.workspace = true
|
||||
futures-util = { workspace = true, features = ["sink"] }
|
||||
log = "0.4"
|
||||
parking_lot.workspace = true
|
||||
percent-encoding = "2.0"
|
||||
pin-project-lite.workspace = true
|
||||
phf = "0.11"
|
||||
postgres-protocol2 = { path = "../postgres-protocol2" }
|
||||
postgres-types2 = { path = "../postgres-types2" }
|
||||
tokio = { workspace = true, features = ["io-util", "time", "net"] }
|
||||
tokio-util = { workspace = true, features = ["codec"] }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
|
||||
@@ -9,13 +9,43 @@ use anyhow::bail;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::id::NodeId;
|
||||
|
||||
/// Number uniquely identifying safekeeper configuration.
|
||||
/// Note: it is a part of sk control file.
|
||||
pub type Generation = u32;
|
||||
/// 1 is the first valid generation, 0 is used as
|
||||
/// a placeholder before we fully migrate to generations.
|
||||
pub const INVALID_GENERATION: Generation = 0;
|
||||
pub const INITIAL_GENERATION: Generation = 1;
|
||||
pub const INVALID_GENERATION: SafekeeperGeneration = SafekeeperGeneration::new(0);
|
||||
pub const INITIAL_GENERATION: SafekeeperGeneration = SafekeeperGeneration::new(1);
|
||||
|
||||
/// Number uniquely identifying safekeeper configuration.
|
||||
/// Note: it is a part of sk control file.
|
||||
///
|
||||
/// Like tenant generations, but for safekeepers.
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
|
||||
pub struct SafekeeperGeneration(u32);
|
||||
|
||||
impl SafekeeperGeneration {
|
||||
pub const fn new(v: u32) -> Self {
|
||||
Self(v)
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
pub fn previous(&self) -> Option<Self> {
|
||||
Some(Self(self.0.checked_sub(1)?))
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
pub fn next(&self) -> Self {
|
||||
Self(self.0 + 1)
|
||||
}
|
||||
|
||||
pub fn into_inner(self) -> u32 {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for SafekeeperGeneration {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
/// Membership is defined by ids so e.g. walproposer uses them to figure out
|
||||
/// quorums, but we also carry host and port to give wp idea where to connect.
|
||||
@@ -89,7 +119,7 @@ impl Display for MemberSet {
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct Configuration {
|
||||
/// Unique id.
|
||||
pub generation: Generation,
|
||||
pub generation: SafekeeperGeneration,
|
||||
/// Current members of the configuration.
|
||||
pub members: MemberSet,
|
||||
/// Some means it is a joint conf.
|
||||
|
||||
@@ -282,3 +282,18 @@ pub struct TimelineTermBumpResponse {
|
||||
pub struct SafekeeperUtilization {
|
||||
pub timeline_count: u64,
|
||||
}
|
||||
|
||||
/// pull_timeline request body.
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
pub struct PullTimelineRequest {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub http_hosts: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct PullTimelineResponse {
|
||||
// Donor safekeeper host
|
||||
pub safekeeper_host: String,
|
||||
// TODO: add more fields?
|
||||
}
|
||||
|
||||
@@ -24,11 +24,10 @@ diatomic-waker.workspace = true
|
||||
git-version.workspace = true
|
||||
hex = { workspace = true, features = ["serde"] }
|
||||
humantime.workspace = true
|
||||
inferno.workspace = true
|
||||
fail.workspace = true
|
||||
futures = { workspace = true }
|
||||
jsonwebtoken.workspace = true
|
||||
nix = {workspace = true, features = [ "ioctl" ] }
|
||||
nix = { workspace = true, features = ["ioctl"] }
|
||||
once_cell.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
regex.workspace = true
|
||||
@@ -62,6 +61,7 @@ bytes.workspace = true
|
||||
criterion.workspace = true
|
||||
hex-literal.workspace = true
|
||||
camino-tempfile.workspace = true
|
||||
pprof.workspace = true
|
||||
serde_assert.workspace = true
|
||||
tokio = { workspace = true, features = ["test-util"] }
|
||||
|
||||
|
||||
26
libs/utils/benches/README.md
Normal file
26
libs/utils/benches/README.md
Normal file
@@ -0,0 +1,26 @@
|
||||
## Utils Benchmarks
|
||||
|
||||
To run benchmarks:
|
||||
|
||||
```sh
|
||||
# All benchmarks.
|
||||
cargo bench --package utils
|
||||
|
||||
# Specific file.
|
||||
cargo bench --package utils --bench benchmarks
|
||||
|
||||
# Specific benchmark.
|
||||
cargo bench --package utils --bench benchmarks warn_slow/enabled=true
|
||||
|
||||
# List available benchmarks.
|
||||
cargo bench --package utils --benches -- --list
|
||||
|
||||
# Generate flamegraph profiles using pprof-rs, profiling for 10 seconds.
|
||||
# Output in target/criterion/*/profile/flamegraph.svg.
|
||||
cargo bench --package utils --bench benchmarks warn_slow/enabled=true --profile-time 10
|
||||
```
|
||||
|
||||
Additional charts and statistics are available in `target/criterion/report/index.html`.
|
||||
|
||||
Benchmarks are automatically compared against the previous run. To compare against other runs, see
|
||||
`--baseline` and `--save-baseline`.
|
||||
@@ -1,5 +1,18 @@
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use std::time::Duration;
|
||||
|
||||
use criterion::{criterion_group, criterion_main, Bencher, Criterion};
|
||||
use pprof::criterion::{Output, PProfProfiler};
|
||||
use utils::id;
|
||||
use utils::logging::warn_slow;
|
||||
|
||||
// Register benchmarks with Criterion.
|
||||
criterion_group!(
|
||||
name = benches;
|
||||
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
|
||||
targets = bench_id_stringify,
|
||||
bench_warn_slow,
|
||||
);
|
||||
criterion_main!(benches);
|
||||
|
||||
pub fn bench_id_stringify(c: &mut Criterion) {
|
||||
// Can only use public methods.
|
||||
@@ -16,5 +29,31 @@ pub fn bench_id_stringify(c: &mut Criterion) {
|
||||
});
|
||||
}
|
||||
|
||||
criterion_group!(benches, bench_id_stringify);
|
||||
criterion_main!(benches);
|
||||
pub fn bench_warn_slow(c: &mut Criterion) {
|
||||
for enabled in [false, true] {
|
||||
c.bench_function(&format!("warn_slow/enabled={enabled}"), |b| {
|
||||
run_bench(b, enabled).unwrap()
|
||||
});
|
||||
}
|
||||
|
||||
// The actual benchmark.
|
||||
fn run_bench(b: &mut Bencher, enabled: bool) -> anyhow::Result<()> {
|
||||
const THRESHOLD: Duration = Duration::from_secs(1);
|
||||
|
||||
// Use a multi-threaded runtime to avoid thread parking overhead when yielding.
|
||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.build()?;
|
||||
|
||||
// Test both with and without warn_slow, since we're essentially measuring Tokio scheduling
|
||||
// performance too. Use a simple noop future that yields once, to avoid any scheduler fast
|
||||
// paths for a ready future.
|
||||
if enabled {
|
||||
b.iter(|| runtime.block_on(warn_slow("ready", THRESHOLD, tokio::task::yield_now())));
|
||||
} else {
|
||||
b.iter(|| runtime.block_on(tokio::task::yield_now()));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -286,6 +286,11 @@ mod tests {
|
||||
const SHORT2_ENC_LE: &[u8] = &[8, 0, 0, 3, 7];
|
||||
const SHORT2_ENC_LE_TRAILING: &[u8] = &[8, 0, 0, 3, 7, 0xff, 0xff, 0xff];
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
struct NewTypeStruct(u32);
|
||||
const NT1: NewTypeStruct = NewTypeStruct(414243);
|
||||
const NT1_INNER: u32 = 414243;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct LongMsg {
|
||||
pub tag: u8,
|
||||
@@ -408,4 +413,42 @@ mod tests {
|
||||
let msg2 = LongMsg::des(&encoded).unwrap();
|
||||
assert_eq!(msg, msg2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
/// Ensure that newtype wrappers around u32 don't change the serialization format
|
||||
fn be_nt() {
|
||||
use super::BeSer;
|
||||
|
||||
assert_eq!(NT1.serialized_size().unwrap(), 4);
|
||||
|
||||
let msg = NT1;
|
||||
|
||||
let encoded = msg.ser().unwrap();
|
||||
let expected = hex_literal::hex!("0006 5223");
|
||||
assert_eq!(encoded, expected);
|
||||
|
||||
assert_eq!(encoded, NT1_INNER.ser().unwrap());
|
||||
|
||||
let msg2 = NewTypeStruct::des(&encoded).unwrap();
|
||||
assert_eq!(msg, msg2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
/// Ensure that newtype wrappers around u32 don't change the serialization format
|
||||
fn le_nt() {
|
||||
use super::LeSer;
|
||||
|
||||
assert_eq!(NT1.serialized_size().unwrap(), 4);
|
||||
|
||||
let msg = NT1;
|
||||
|
||||
let encoded = msg.ser().unwrap();
|
||||
let expected = hex_literal::hex!("2352 0600");
|
||||
assert_eq!(encoded, expected);
|
||||
|
||||
assert_eq!(encoded, NT1_INNER.ser().unwrap());
|
||||
|
||||
let msg2 = NewTypeStruct::des(&encoded).unwrap();
|
||||
assert_eq!(msg, msg2);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
use std::future::Future;
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use metrics::{IntCounter, IntCounterVec};
|
||||
use once_cell::sync::Lazy;
|
||||
use strum_macros::{EnumString, VariantNames};
|
||||
use tokio::time::Instant;
|
||||
use tracing::warn;
|
||||
|
||||
/// Logs a critical error, similarly to `tracing::error!`. This will:
|
||||
///
|
||||
@@ -318,6 +322,41 @@ impl std::fmt::Debug for SecretString {
|
||||
}
|
||||
}
|
||||
|
||||
/// Logs a periodic warning if a future is slow to complete.
|
||||
///
|
||||
/// This is performance-sensitive as it's used on the GetPage read path.
|
||||
#[inline]
|
||||
pub async fn warn_slow<O>(name: &str, threshold: Duration, f: impl Future<Output = O>) -> O {
|
||||
// TODO: we unfortunately have to pin the future on the heap, since GetPage futures are huge and
|
||||
// won't fit on the stack.
|
||||
let mut f = Box::pin(f);
|
||||
|
||||
let started = Instant::now();
|
||||
let mut attempt = 1;
|
||||
|
||||
loop {
|
||||
// NB: use timeout_at() instead of timeout() to avoid an extra clock reading in the common
|
||||
// case where the timeout doesn't fire.
|
||||
let deadline = started + attempt * threshold;
|
||||
if let Ok(output) = tokio::time::timeout_at(deadline, &mut f).await {
|
||||
// NB: we check if we exceeded the threshold even if the timeout never fired, because
|
||||
// scheduling or execution delays may cause the future to succeed even if it exceeds the
|
||||
// timeout. This costs an extra unconditional clock reading, but seems worth it to avoid
|
||||
// false negatives.
|
||||
let elapsed = started.elapsed();
|
||||
if elapsed >= threshold {
|
||||
warn!("slow {name} completed after {:.3}s", elapsed.as_secs_f64());
|
||||
}
|
||||
return output;
|
||||
}
|
||||
|
||||
let elapsed = started.elapsed().as_secs_f64();
|
||||
warn!("slow {name} still running after {elapsed:.3}s",);
|
||||
|
||||
attempt += 1;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use metrics::{core::Opts, IntCounterVec};
|
||||
|
||||
@@ -117,6 +117,10 @@ impl TenantShardId {
|
||||
)
|
||||
}
|
||||
|
||||
pub fn range(&self) -> RangeInclusive<Self> {
|
||||
RangeInclusive::new(*self, *self)
|
||||
}
|
||||
|
||||
pub fn shard_slug(&self) -> impl std::fmt::Display + '_ {
|
||||
ShardSlug(self)
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ package interpreted_wal;
|
||||
message InterpretedWalRecords {
|
||||
repeated InterpretedWalRecord records = 1;
|
||||
optional uint64 next_record_lsn = 2;
|
||||
optional uint64 raw_wal_start_lsn = 3;
|
||||
}
|
||||
|
||||
message InterpretedWalRecord {
|
||||
|
||||
@@ -60,7 +60,11 @@ pub struct InterpretedWalRecords {
|
||||
pub records: Vec<InterpretedWalRecord>,
|
||||
// Start LSN of the next record after the batch.
|
||||
// Note that said record may not belong to the current shard.
|
||||
pub next_record_lsn: Option<Lsn>,
|
||||
pub next_record_lsn: Lsn,
|
||||
// Inclusive start LSN of the PG WAL from which the interpreted
|
||||
// WAL records were extracted. Note that this is not necessarily the
|
||||
// start LSN of the first interpreted record in the batch.
|
||||
pub raw_wal_start_lsn: Option<Lsn>,
|
||||
}
|
||||
|
||||
/// An interpreted Postgres WAL record, ready to be handled by the pageserver
|
||||
|
||||
@@ -167,7 +167,8 @@ impl TryFrom<InterpretedWalRecords> for proto::InterpretedWalRecords {
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
Ok(proto::InterpretedWalRecords {
|
||||
records,
|
||||
next_record_lsn: value.next_record_lsn.map(|l| l.0),
|
||||
next_record_lsn: Some(value.next_record_lsn.0),
|
||||
raw_wal_start_lsn: value.raw_wal_start_lsn.map(|l| l.0),
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -254,7 +255,11 @@ impl TryFrom<proto::InterpretedWalRecords> for InterpretedWalRecords {
|
||||
|
||||
Ok(InterpretedWalRecords {
|
||||
records,
|
||||
next_record_lsn: value.next_record_lsn.map(Lsn::from),
|
||||
next_record_lsn: value
|
||||
.next_record_lsn
|
||||
.map(Lsn::from)
|
||||
.expect("Always provided"),
|
||||
raw_wal_start_lsn: value.raw_wal_start_lsn.map(Lsn::from),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -477,6 +477,26 @@ impl Client {
|
||||
self.request(Method::POST, &uri, ()).await.map(|_| ())
|
||||
}
|
||||
|
||||
pub async fn timeline_download_heatmap_layers(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
concurrency: Option<usize>,
|
||||
) -> Result<()> {
|
||||
let mut path = reqwest::Url::parse(&format!(
|
||||
"{}/v1/tenant/{}/timeline/{}/download_heatmap_layers",
|
||||
self.mgmt_api_endpoint, tenant_shard_id, timeline_id
|
||||
))
|
||||
.expect("Cannot build URL");
|
||||
|
||||
if let Some(concurrency) = concurrency {
|
||||
path.query_pairs_mut()
|
||||
.append_pair("concurrency", &format!("{}", concurrency));
|
||||
}
|
||||
|
||||
self.request(Method::POST, path, ()).await.map(|_| ())
|
||||
}
|
||||
|
||||
pub async fn tenant_reset(&self, tenant_shard_id: TenantShardId) -> Result<()> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{}/reset",
|
||||
|
||||
@@ -345,6 +345,7 @@ impl AuxFileV2 {
|
||||
AuxFileV2::Recognized("pg_logical/replorigin_checkpoint", hash)
|
||||
}
|
||||
(2, 1) => AuxFileV2::Recognized("pg_replslot/", hash),
|
||||
(3, 1) => AuxFileV2::Recognized("pg_stat/pgstat.stat", hash),
|
||||
(1, 0xff) => AuxFileV2::OtherWithPrefix("pg_logical/", hash),
|
||||
(0xff, 0xff) => AuxFileV2::Other(hash),
|
||||
_ => return None,
|
||||
|
||||
@@ -39,6 +39,7 @@ fn aux_hash_to_metadata_key(dir_level1: u8, dir_level2: u8, data: &[u8]) -> Key
|
||||
|
||||
const AUX_DIR_PG_LOGICAL: u8 = 0x01;
|
||||
const AUX_DIR_PG_REPLSLOT: u8 = 0x02;
|
||||
const AUX_DIR_PG_STAT: u8 = 0x03;
|
||||
const AUX_DIR_PG_UNKNOWN: u8 = 0xFF;
|
||||
|
||||
/// Encode the aux file into a fixed-size key.
|
||||
@@ -53,6 +54,7 @@ const AUX_DIR_PG_UNKNOWN: u8 = 0xFF;
|
||||
/// * pg_logical/replorigin_checkpoint -> 0x0103
|
||||
/// * pg_logical/others -> 0x01FF
|
||||
/// * pg_replslot/ -> 0x0201
|
||||
/// * pg_stat/pgstat.stat -> 0x0301
|
||||
/// * others -> 0xFFFF
|
||||
///
|
||||
/// If you add new AUX files to this function, please also add a test case to `test_encoding_portable`.
|
||||
@@ -75,6 +77,8 @@ pub fn encode_aux_file_key(path: &str) -> Key {
|
||||
aux_hash_to_metadata_key(AUX_DIR_PG_LOGICAL, 0xFF, fname.as_bytes())
|
||||
} else if let Some(fname) = path.strip_prefix("pg_replslot/") {
|
||||
aux_hash_to_metadata_key(AUX_DIR_PG_REPLSLOT, 0x01, fname.as_bytes())
|
||||
} else if let Some(fname) = path.strip_prefix("pg_stat/") {
|
||||
aux_hash_to_metadata_key(AUX_DIR_PG_STAT, 0x01, fname.as_bytes())
|
||||
} else {
|
||||
if cfg!(debug_assertions) {
|
||||
warn!(
|
||||
|
||||
@@ -264,6 +264,31 @@ where
|
||||
async fn send_tarball(mut self) -> Result<(), BasebackupError> {
|
||||
// TODO include checksum
|
||||
|
||||
// Construct the pg_control file from the persisted checkpoint and pg_control
|
||||
// information. But we only add this to the tarball at the end, so that if the
|
||||
// writing is interrupted half-way through, the resulting incomplete tarball will
|
||||
// be missing the pg_control file, which prevents PostgreSQL from starting up on
|
||||
// it. With proper error handling, you should never try to start up from an
|
||||
// incomplete basebackup in the first place, of course, but this is a nice little
|
||||
// extra safety measure.
|
||||
let checkpoint_bytes = self
|
||||
.timeline
|
||||
.get_checkpoint(self.lsn, self.ctx)
|
||||
.await
|
||||
.context("failed to get checkpoint bytes")?;
|
||||
let pg_control_bytes = self
|
||||
.timeline
|
||||
.get_control_file(self.lsn, self.ctx)
|
||||
.await
|
||||
.context("failed to get control bytes")?;
|
||||
let (pg_control_bytes, system_identifier, was_shutdown) =
|
||||
postgres_ffi::generate_pg_control(
|
||||
&pg_control_bytes,
|
||||
&checkpoint_bytes,
|
||||
self.lsn,
|
||||
self.timeline.pg_version,
|
||||
)?;
|
||||
|
||||
let lazy_slru_download = self.timeline.get_lazy_slru_download() && !self.full_backup;
|
||||
|
||||
let pgversion = self.timeline.pg_version;
|
||||
@@ -401,6 +426,10 @@ where
|
||||
// In future we will not generate AUX record for "pg_logical/replorigin_checkpoint" at all,
|
||||
// but now we should handle (skip) it for backward compatibility.
|
||||
continue;
|
||||
} else if path == "pg_stat/pgstat.stat" && !was_shutdown {
|
||||
// Drop statistic in case of abnormal termination, i.e. if we're not starting from the exact LSN
|
||||
// of a shutdown checkpoint.
|
||||
continue;
|
||||
}
|
||||
let header = new_tar_header(&path, content.len() as u64)?;
|
||||
self.ar
|
||||
@@ -462,8 +491,9 @@ where
|
||||
)))
|
||||
});
|
||||
|
||||
// Generate pg_control and bootstrap WAL segment.
|
||||
self.add_pgcontrol_file().await?;
|
||||
// Last, add the pg_control file and bootstrap WAL segment.
|
||||
self.add_pgcontrol_file(pg_control_bytes, system_identifier)
|
||||
.await?;
|
||||
self.ar
|
||||
.finish()
|
||||
.await
|
||||
@@ -671,7 +701,11 @@ where
|
||||
// Add generated pg_control file and bootstrap WAL segment.
|
||||
// Also send zenith.signal file with extra bootstrap data.
|
||||
//
|
||||
async fn add_pgcontrol_file(&mut self) -> Result<(), BasebackupError> {
|
||||
async fn add_pgcontrol_file(
|
||||
&mut self,
|
||||
pg_control_bytes: Bytes,
|
||||
system_identifier: u64,
|
||||
) -> Result<(), BasebackupError> {
|
||||
// add zenith.signal file
|
||||
let mut zenith_signal = String::new();
|
||||
if self.prev_record_lsn == Lsn(0) {
|
||||
@@ -694,24 +728,6 @@ where
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,zenith.signal"))?;
|
||||
|
||||
let checkpoint_bytes = self
|
||||
.timeline
|
||||
.get_checkpoint(self.lsn, self.ctx)
|
||||
.await
|
||||
.context("failed to get checkpoint bytes")?;
|
||||
let pg_control_bytes = self
|
||||
.timeline
|
||||
.get_control_file(self.lsn, self.ctx)
|
||||
.await
|
||||
.context("failed get control bytes")?;
|
||||
|
||||
let (pg_control_bytes, system_identifier) = postgres_ffi::generate_pg_control(
|
||||
&pg_control_bytes,
|
||||
&checkpoint_bytes,
|
||||
self.lsn,
|
||||
self.timeline.pg_version,
|
||||
)?;
|
||||
|
||||
//send pg_control
|
||||
let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?;
|
||||
self.ar
|
||||
|
||||
@@ -134,6 +134,7 @@ fn main() -> anyhow::Result<()> {
|
||||
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
|
||||
info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode");
|
||||
info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol");
|
||||
info!(?conf.validate_wal_contiguity, "starting with WAL contiguity validation");
|
||||
info!(?conf.page_service_pipelining, "starting with page service pipelining config");
|
||||
info!(?conf.get_vectored_concurrent_io, "starting with get_vectored IO concurrency config");
|
||||
|
||||
|
||||
@@ -197,6 +197,10 @@ pub struct PageServerConf {
|
||||
/// Enable read path debugging. If enabled, read key errors will print a backtrace of the layer
|
||||
/// files read.
|
||||
pub enable_read_path_debugging: bool,
|
||||
|
||||
/// Interpreted protocol feature: if enabled, validate that the logical WAL received from
|
||||
/// safekeepers does not have gaps.
|
||||
pub validate_wal_contiguity: bool,
|
||||
}
|
||||
|
||||
/// Token for authentication to safekeepers
|
||||
@@ -360,6 +364,7 @@ impl PageServerConf {
|
||||
page_service_pipelining,
|
||||
get_vectored_concurrent_io,
|
||||
enable_read_path_debugging,
|
||||
validate_wal_contiguity,
|
||||
} = config_toml;
|
||||
|
||||
let mut conf = PageServerConf {
|
||||
@@ -446,6 +451,7 @@ impl PageServerConf {
|
||||
virtual_file_io_mode: virtual_file_io_mode.unwrap_or(virtual_file::IoMode::preferred()),
|
||||
no_sync: no_sync.unwrap_or(false),
|
||||
enable_read_path_debugging: enable_read_path_debugging.unwrap_or(false),
|
||||
validate_wal_contiguity: validate_wal_contiguity.unwrap_or(false),
|
||||
};
|
||||
|
||||
// ------------------------------------------------------------
|
||||
|
||||
@@ -98,6 +98,7 @@ pub struct RequestContext {
|
||||
download_behavior: DownloadBehavior,
|
||||
access_stats_behavior: AccessStatsBehavior,
|
||||
page_content_kind: PageContentKind,
|
||||
read_path_debug: bool,
|
||||
}
|
||||
|
||||
/// The kind of access to the page cache.
|
||||
@@ -155,6 +156,7 @@ impl RequestContextBuilder {
|
||||
download_behavior: DownloadBehavior::Download,
|
||||
access_stats_behavior: AccessStatsBehavior::Update,
|
||||
page_content_kind: PageContentKind::Unknown,
|
||||
read_path_debug: false,
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -168,6 +170,7 @@ impl RequestContextBuilder {
|
||||
download_behavior: original.download_behavior,
|
||||
access_stats_behavior: original.access_stats_behavior,
|
||||
page_content_kind: original.page_content_kind,
|
||||
read_path_debug: original.read_path_debug,
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -191,6 +194,11 @@ impl RequestContextBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
pub(crate) fn read_path_debug(mut self, b: bool) -> Self {
|
||||
self.inner.read_path_debug = b;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(self) -> RequestContext {
|
||||
self.inner
|
||||
}
|
||||
@@ -291,4 +299,8 @@ impl RequestContext {
|
||||
pub(crate) fn page_content_kind(&self) -> PageContentKind {
|
||||
self.page_content_kind
|
||||
}
|
||||
|
||||
pub(crate) fn read_path_debug(&self) -> bool {
|
||||
self.read_path_debug
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,6 +173,7 @@ impl ControlPlaneGenerationsApi for ControllerUpcallClient {
|
||||
listen_pg_port: m.postgres_port,
|
||||
listen_http_addr: m.http_host,
|
||||
listen_http_port: m.http_port,
|
||||
listen_https_port: None, // TODO: Support https.
|
||||
availability_zone_id: az_id.expect("Checked above"),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -824,6 +824,38 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/TenantConfigResponse"
|
||||
|
||||
/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers:
|
||||
parameters:
|
||||
- name: tenant_shard_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
- name: timeline_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
- name: concurrency
|
||||
description: Maximum number of concurrent downloads (capped at remote storage concurrency)
|
||||
in: query
|
||||
required: false
|
||||
schema:
|
||||
type: integer
|
||||
post:
|
||||
description: |
|
||||
Download all layers in the specified timeline's heatmap. The `tenant_shard_id` parameter
|
||||
may be used to target all shards of a tenant when the unsharded form is used, or a specific
|
||||
tenant shard with the sharded form.
|
||||
responses:
|
||||
"200":
|
||||
description: Success
|
||||
delete:
|
||||
description: Stop any on-going background downloads of heatmap layers for the specified timeline.
|
||||
responses:
|
||||
"200":
|
||||
description: Success
|
||||
|
||||
/v1/utilization:
|
||||
get:
|
||||
description: |
|
||||
@@ -882,6 +914,8 @@ components:
|
||||
properties:
|
||||
reason:
|
||||
type: string
|
||||
gc_blocking:
|
||||
type: string
|
||||
|
||||
TenantCreateRequest:
|
||||
allOf:
|
||||
@@ -1083,6 +1117,9 @@ components:
|
||||
min_readable_lsn:
|
||||
type: string
|
||||
format: hex
|
||||
latest_gc_cutoff_lsn:
|
||||
type: string
|
||||
format: hex
|
||||
applied_gc_cutoff_lsn:
|
||||
type: string
|
||||
format: hex
|
||||
|
||||
@@ -68,6 +68,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContextBuilder;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
@@ -1463,6 +1464,59 @@ async fn timeline_layer_scan_disposable_keys(
|
||||
)
|
||||
}
|
||||
|
||||
async fn timeline_download_heatmap_layers_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
// Only used in the case where remote storage is not configured.
|
||||
const DEFAULT_MAX_CONCURRENCY: usize = 100;
|
||||
// A conservative default.
|
||||
const DEFAULT_CONCURRENCY: usize = 16;
|
||||
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
|
||||
let desired_concurrency =
|
||||
parse_query_param(&request, "concurrency")?.unwrap_or(DEFAULT_CONCURRENCY);
|
||||
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let state = get_state(&request);
|
||||
let timeline =
|
||||
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
|
||||
.await?;
|
||||
|
||||
let max_concurrency = get_config(&request)
|
||||
.remote_storage_config
|
||||
.as_ref()
|
||||
.map(|c| c.concurrency_limit())
|
||||
.unwrap_or(DEFAULT_MAX_CONCURRENCY);
|
||||
let concurrency = std::cmp::min(max_concurrency, desired_concurrency);
|
||||
|
||||
timeline.start_heatmap_layers_download(concurrency).await?;
|
||||
|
||||
json_response(StatusCode::ACCEPTED, ())
|
||||
}
|
||||
|
||||
async fn timeline_shutdown_download_heatmap_layers_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let state = get_state(&request);
|
||||
let timeline =
|
||||
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
|
||||
.await?;
|
||||
|
||||
timeline.stop_and_drain_heatmap_layers_download().await;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn layer_download_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
@@ -2341,6 +2395,7 @@ async fn timeline_checkpoint_handler(
|
||||
match e {
|
||||
CompactionError::ShuttingDown => ApiError::ShuttingDown,
|
||||
CompactionError::Offload(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
|
||||
CompactionError::CollectKeySpaceError(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
|
||||
CompactionError::Other(e) => ApiError::InternalServerError(e)
|
||||
}
|
||||
)?;
|
||||
@@ -2518,14 +2573,30 @@ async fn deletion_queue_flush(
|
||||
}
|
||||
}
|
||||
|
||||
/// Try if `GetPage@Lsn` is successful, useful for manual debugging.
|
||||
async fn getpage_at_lsn_handler(
|
||||
request: Request<Body>,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
getpage_at_lsn_handler_inner(false, request, cancel).await
|
||||
}
|
||||
|
||||
async fn touchpage_at_lsn_handler(
|
||||
request: Request<Body>,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
getpage_at_lsn_handler_inner(true, request, cancel).await
|
||||
}
|
||||
|
||||
/// Try if `GetPage@Lsn` is successful, useful for manual debugging.
|
||||
async fn getpage_at_lsn_handler_inner(
|
||||
touch: bool,
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
// Require pageserver admin permission for this API instead of only tenant-level token.
|
||||
check_permission(&request, None)?;
|
||||
let state = get_state(&request);
|
||||
|
||||
struct Key(pageserver_api::key::Key);
|
||||
@@ -2540,22 +2611,29 @@ async fn getpage_at_lsn_handler(
|
||||
|
||||
let key: Key = parse_query_param(&request, "key")?
|
||||
.ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'key' query parameter")))?;
|
||||
let lsn: Lsn = parse_query_param(&request, "lsn")?
|
||||
.ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'lsn' query parameter")))?;
|
||||
let lsn: Option<Lsn> = parse_query_param(&request, "lsn")?;
|
||||
|
||||
async {
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
// Enable read path debugging
|
||||
let ctx = RequestContextBuilder::extend(&ctx).read_path_debug(true).build();
|
||||
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
|
||||
|
||||
// Use last_record_lsn if no lsn is provided
|
||||
let lsn = lsn.unwrap_or_else(|| timeline.get_last_record_lsn());
|
||||
let page = timeline.get(key.0, lsn, &ctx).await?;
|
||||
|
||||
Result::<_, ApiError>::Ok(
|
||||
Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(header::CONTENT_TYPE, "application/octet-stream")
|
||||
.body(hyper::Body::from(page))
|
||||
.unwrap(),
|
||||
)
|
||||
if touch {
|
||||
json_response(StatusCode::OK, ())
|
||||
} else {
|
||||
Result::<_, ApiError>::Ok(
|
||||
Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(header::CONTENT_TYPE, "application/octet-stream")
|
||||
.body(hyper::Body::from(page))
|
||||
.unwrap(),
|
||||
)
|
||||
}
|
||||
}
|
||||
.instrument(info_span!("timeline_get", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
|
||||
.await
|
||||
@@ -3626,6 +3704,14 @@ pub fn make_router(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer",
|
||||
|r| api_handler(r, layer_map_info_handler),
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_heatmap_layers",
|
||||
|r| api_handler(r, timeline_download_heatmap_layers_handler),
|
||||
)
|
||||
.delete(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_heatmap_layers",
|
||||
|r| api_handler(r, timeline_shutdown_download_heatmap_layers_handler),
|
||||
)
|
||||
.get(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer/:layer_file_name",
|
||||
|r| api_handler(r, layer_download_handler),
|
||||
@@ -3682,6 +3768,10 @@ pub fn make_router(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/getpage",
|
||||
|r| testing_api_handler("getpage@lsn", r, getpage_at_lsn_handler),
|
||||
)
|
||||
.get(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/touchpage",
|
||||
|r| api_handler(r, touchpage_at_lsn_handler),
|
||||
)
|
||||
.get(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/keyspace",
|
||||
|r| api_handler(r, timeline_collect_keyspace),
|
||||
|
||||
@@ -34,11 +34,13 @@ use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::SystemTime;
|
||||
use std::time::{Duration, Instant};
|
||||
use strum_macros::IntoStaticStr;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::io::{AsyncWriteExt, BufWriter};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::logging::warn_slow;
|
||||
use utils::sync::gate::{Gate, GateGuard};
|
||||
use utils::sync::spsc_fold;
|
||||
use utils::{
|
||||
@@ -81,6 +83,9 @@ use std::os::fd::AsRawFd;
|
||||
/// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
|
||||
const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
|
||||
|
||||
/// Threshold at which to log a warning about slow GetPage requests.
|
||||
const WARN_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
pub struct Listener {
|
||||
@@ -594,6 +599,7 @@ struct BatchedTestRequest {
|
||||
/// NB: we only hold [`timeline::handle::WeakHandle`] inside this enum,
|
||||
/// so that we don't keep the [`Timeline::gate`] open while the batch
|
||||
/// is being built up inside the [`spsc_fold`] (pagestream pipelining).
|
||||
#[derive(IntoStaticStr)]
|
||||
enum BatchedFeMessage {
|
||||
Exists {
|
||||
span: Span,
|
||||
@@ -638,6 +644,10 @@ enum BatchedFeMessage {
|
||||
}
|
||||
|
||||
impl BatchedFeMessage {
|
||||
fn as_static_str(&self) -> &'static str {
|
||||
self.into()
|
||||
}
|
||||
|
||||
fn observe_execution_start(&mut self, at: Instant) {
|
||||
match self {
|
||||
BatchedFeMessage::Exists { timer, .. }
|
||||
@@ -1463,17 +1473,20 @@ impl PageServerHandler {
|
||||
}
|
||||
};
|
||||
|
||||
let err = self
|
||||
.pagesteam_handle_batched_message(
|
||||
let result = warn_slow(
|
||||
msg.as_static_str(),
|
||||
WARN_SLOW_GETPAGE_THRESHOLD,
|
||||
self.pagesteam_handle_batched_message(
|
||||
pgb_writer,
|
||||
msg,
|
||||
io_concurrency.clone(),
|
||||
&cancel,
|
||||
protocol_version,
|
||||
ctx,
|
||||
)
|
||||
.await;
|
||||
match err {
|
||||
),
|
||||
)
|
||||
.await;
|
||||
match result {
|
||||
Ok(()) => {}
|
||||
Err(e) => break e,
|
||||
}
|
||||
@@ -1636,13 +1649,17 @@ impl PageServerHandler {
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
self.pagesteam_handle_batched_message(
|
||||
pgb_writer,
|
||||
batch,
|
||||
io_concurrency.clone(),
|
||||
&cancel,
|
||||
protocol_version,
|
||||
&ctx,
|
||||
warn_slow(
|
||||
batch.as_static_str(),
|
||||
WARN_SLOW_GETPAGE_THRESHOLD,
|
||||
self.pagesteam_handle_batched_message(
|
||||
pgb_writer,
|
||||
batch,
|
||||
io_concurrency.clone(),
|
||||
&cancel,
|
||||
protocol_version,
|
||||
&ctx,
|
||||
),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
@@ -1799,6 +1816,13 @@ impl PageServerHandler {
|
||||
.as_millis()
|
||||
.to_string()
|
||||
});
|
||||
|
||||
info!(
|
||||
"acquired lease for {} until {}",
|
||||
lsn,
|
||||
valid_until_str.as_deref().unwrap_or("<unknown>")
|
||||
);
|
||||
|
||||
let bytes = valid_until_str.as_ref().map(|x| x.as_bytes());
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
|
||||
|
||||
@@ -45,7 +45,7 @@ use std::ops::ControlFlow;
|
||||
use std::ops::Range;
|
||||
use strum::IntoEnumIterator;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, trace, warn};
|
||||
use tracing::{debug, info, trace, warn};
|
||||
use utils::bin_ser::DeserializeError;
|
||||
use utils::pausable_failpoint;
|
||||
use utils::{bin_ser::BeSer, lsn::Lsn};
|
||||
@@ -2264,6 +2264,13 @@ impl DatadirModification<'_> {
|
||||
self.tline.aux_file_size_estimator.on_add(content.len());
|
||||
new_files.push((path, content));
|
||||
}
|
||||
// Compute may request delete of old version of pgstat AUX file if new one exceeds size limit.
|
||||
// Compute doesn't know if previous version of this file exists or not, so
|
||||
// attempt to delete non-existing file can cause this message.
|
||||
// To avoid false alarms, log it as info rather than warning.
|
||||
(None, true) if path.starts_with("pg_stat/") => {
|
||||
info!("removing non-existing pg_stat file: {}", path)
|
||||
}
|
||||
(None, true) => warn!("removing non-existing aux file: {}", path),
|
||||
}
|
||||
let new_val = aux_file::encode_file_value(&new_files)?;
|
||||
|
||||
@@ -3101,6 +3101,9 @@ impl Tenant {
|
||||
if let Some(queue) = queue {
|
||||
outcome = queue
|
||||
.iteration(cancel, ctx, &self.gc_block, &timeline)
|
||||
.instrument(
|
||||
info_span!("gc_compact_timeline", timeline_id = %timeline.timeline_id),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
@@ -3147,6 +3150,12 @@ impl Tenant {
|
||||
// Offload failures don't trip the circuit breaker, since they're cheap to retry and
|
||||
// shouldn't block compaction.
|
||||
CompactionError::Offload(_) => {}
|
||||
CompactionError::CollectKeySpaceError(err) => {
|
||||
self.compaction_circuit_breaker
|
||||
.lock()
|
||||
.unwrap()
|
||||
.fail(&CIRCUIT_BREAKERS_BROKEN, err);
|
||||
}
|
||||
CompactionError::Other(err) => {
|
||||
self.compaction_circuit_breaker
|
||||
.lock()
|
||||
@@ -7846,18 +7855,6 @@ mod tests {
|
||||
}
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
// Force layers to L1
|
||||
tline
|
||||
.compact(
|
||||
&cancel,
|
||||
{
|
||||
let mut flags = EnumSet::new();
|
||||
flags.insert(CompactFlags::ForceL0Compaction);
|
||||
flags
|
||||
},
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if iter % 5 == 0 {
|
||||
let (_, before_delta_file_accessed) =
|
||||
@@ -7870,7 +7867,6 @@ mod tests {
|
||||
let mut flags = EnumSet::new();
|
||||
flags.insert(CompactFlags::ForceImageLayerCreation);
|
||||
flags.insert(CompactFlags::ForceRepartition);
|
||||
flags.insert(CompactFlags::ForceL0Compaction);
|
||||
flags
|
||||
},
|
||||
&ctx,
|
||||
@@ -8317,8 +8313,6 @@ mod tests {
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
// Image layer creation happens on the disk_consistent_lsn so we need to force set it now.
|
||||
tline.force_set_disk_consistent_lsn(Lsn(0x40));
|
||||
tline
|
||||
.compact(
|
||||
&cancel,
|
||||
@@ -8332,7 +8326,8 @@ mod tests {
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
// Image layers are created at repartition LSN
|
||||
|
||||
// Image layers are created at last_record_lsn
|
||||
let images = tline
|
||||
.inspect_image_layers(Lsn(0x40), &ctx, io_concurrency.clone())
|
||||
.await
|
||||
|
||||
@@ -693,16 +693,15 @@ impl TryFrom<&'_ models::TenantConfig> for TenantConfOpt {
|
||||
/// This is a conversion from our internal tenant config object to the one used
|
||||
/// in external APIs.
|
||||
impl From<TenantConfOpt> for models::TenantConfig {
|
||||
// TODO(vlad): These are now the same, but they have different serialization logic.
|
||||
// Can we merge them?
|
||||
fn from(value: TenantConfOpt) -> Self {
|
||||
fn humantime(d: Duration) -> String {
|
||||
format!("{}s", d.as_secs())
|
||||
}
|
||||
Self {
|
||||
checkpoint_distance: value.checkpoint_distance,
|
||||
checkpoint_timeout: value.checkpoint_timeout.map(humantime),
|
||||
checkpoint_timeout: value.checkpoint_timeout,
|
||||
compaction_algorithm: value.compaction_algorithm,
|
||||
compaction_target_size: value.compaction_target_size,
|
||||
compaction_period: value.compaction_period.map(humantime),
|
||||
compaction_period: value.compaction_period,
|
||||
compaction_threshold: value.compaction_threshold,
|
||||
compaction_upper_limit: value.compaction_upper_limit,
|
||||
compaction_l0_first: value.compaction_l0_first,
|
||||
@@ -711,24 +710,23 @@ impl From<TenantConfOpt> for models::TenantConfig {
|
||||
l0_flush_stall_threshold: value.l0_flush_stall_threshold,
|
||||
l0_flush_wait_upload: value.l0_flush_wait_upload,
|
||||
gc_horizon: value.gc_horizon,
|
||||
gc_period: value.gc_period.map(humantime),
|
||||
gc_period: value.gc_period,
|
||||
image_creation_threshold: value.image_creation_threshold,
|
||||
pitr_interval: value.pitr_interval.map(humantime),
|
||||
walreceiver_connect_timeout: value.walreceiver_connect_timeout.map(humantime),
|
||||
lagging_wal_timeout: value.lagging_wal_timeout.map(humantime),
|
||||
pitr_interval: value.pitr_interval,
|
||||
walreceiver_connect_timeout: value.walreceiver_connect_timeout,
|
||||
lagging_wal_timeout: value.lagging_wal_timeout,
|
||||
max_lsn_wal_lag: value.max_lsn_wal_lag,
|
||||
eviction_policy: value.eviction_policy,
|
||||
min_resident_size_override: value.min_resident_size_override,
|
||||
evictions_low_residence_duration_metric_threshold: value
|
||||
.evictions_low_residence_duration_metric_threshold
|
||||
.map(humantime),
|
||||
heatmap_period: value.heatmap_period.map(humantime),
|
||||
.evictions_low_residence_duration_metric_threshold,
|
||||
heatmap_period: value.heatmap_period,
|
||||
lazy_slru_download: value.lazy_slru_download,
|
||||
timeline_get_throttle: value.timeline_get_throttle,
|
||||
image_layer_creation_check_threshold: value.image_layer_creation_check_threshold,
|
||||
image_creation_preempt_threshold: value.image_creation_preempt_threshold,
|
||||
lsn_lease_length: value.lsn_lease_length.map(humantime),
|
||||
lsn_lease_length_for_ts: value.lsn_lease_length_for_ts.map(humantime),
|
||||
lsn_lease_length: value.lsn_lease_length,
|
||||
lsn_lease_length_for_ts: value.lsn_lease_length_for_ts,
|
||||
timeline_offloading: value.timeline_offloading,
|
||||
wal_receiver_protocol_override: value.wal_receiver_protocol_override,
|
||||
rel_size_v2_enabled: value.rel_size_v2_enabled,
|
||||
@@ -760,29 +758,10 @@ mod tests {
|
||||
assert_eq!(small_conf, serde_json::from_str(&json_form).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_try_from_models_tenant_config_err() {
|
||||
let tenant_config = models::TenantConfig {
|
||||
lagging_wal_timeout: Some("5a".to_string()),
|
||||
..TenantConfig::default()
|
||||
};
|
||||
|
||||
let tenant_conf_opt = TenantConfOpt::try_from(&tenant_config);
|
||||
|
||||
assert!(
|
||||
tenant_conf_opt.is_err(),
|
||||
"Suceeded to convert TenantConfig to TenantConfOpt"
|
||||
);
|
||||
|
||||
let expected_error_str =
|
||||
"lagging_wal_timeout: invalid value: string \"5a\", expected a duration";
|
||||
assert_eq!(tenant_conf_opt.unwrap_err().to_string(), expected_error_str);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_try_from_models_tenant_config_success() {
|
||||
let tenant_config = models::TenantConfig {
|
||||
lagging_wal_timeout: Some("5s".to_string()),
|
||||
lagging_wal_timeout: Some(Duration::from_secs(5)),
|
||||
..TenantConfig::default()
|
||||
};
|
||||
|
||||
|
||||
@@ -51,8 +51,7 @@ use camino::{Utf8Path, Utf8PathBuf};
|
||||
use futures::StreamExt;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::config::MaxVectoredReadBytes;
|
||||
use pageserver_api::key::DBDIR_KEY;
|
||||
use pageserver_api::key::{Key, KEY_SIZE};
|
||||
use pageserver_api::key::{Key, DBDIR_KEY, KEY_SIZE};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -967,7 +966,10 @@ impl DeltaLayerInner {
|
||||
.as_slice()
|
||||
.iter()
|
||||
.filter_map(|(_, blob_meta)| {
|
||||
if blob_meta.key.is_rel_dir_key() || blob_meta.key == DBDIR_KEY {
|
||||
if blob_meta.key.is_rel_dir_key()
|
||||
|| blob_meta.key == DBDIR_KEY
|
||||
|| blob_meta.key.is_aux_file_key()
|
||||
{
|
||||
// The size of values for these keys is unbounded and can
|
||||
// grow very large in pathological cases.
|
||||
None
|
||||
|
||||
@@ -48,8 +48,7 @@ use camino::{Utf8Path, Utf8PathBuf};
|
||||
use hex;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::config::MaxVectoredReadBytes;
|
||||
use pageserver_api::key::DBDIR_KEY;
|
||||
use pageserver_api::key::{Key, KEY_SIZE};
|
||||
use pageserver_api::key::{Key, DBDIR_KEY, KEY_SIZE};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::shard::{ShardIdentity, TenantShardId};
|
||||
use pageserver_api::value::Value;
|
||||
@@ -603,7 +602,10 @@ impl ImageLayerInner {
|
||||
.as_slice()
|
||||
.iter()
|
||||
.filter_map(|(_, blob_meta)| {
|
||||
if blob_meta.key.is_rel_dir_key() || blob_meta.key == DBDIR_KEY {
|
||||
if blob_meta.key.is_rel_dir_key()
|
||||
|| blob_meta.key == DBDIR_KEY
|
||||
|| blob_meta.key.is_aux_file_key()
|
||||
{
|
||||
// The size of values for these keys is unbounded and can
|
||||
// grow very large in pathological cases.
|
||||
None
|
||||
|
||||
@@ -287,6 +287,7 @@ fn log_compaction_error(
|
||||
sleep_duration: Duration,
|
||||
task_cancelled: bool,
|
||||
) {
|
||||
use crate::pgdatadir_mapping::CollectKeySpaceError;
|
||||
use crate::tenant::upload_queue::NotInitialized;
|
||||
use crate::tenant::PageReconstructError;
|
||||
use CompactionError::*;
|
||||
@@ -294,6 +295,8 @@ fn log_compaction_error(
|
||||
let level = match err {
|
||||
ShuttingDown => return,
|
||||
Offload(_) => Level::ERROR,
|
||||
CollectKeySpaceError(CollectKeySpaceError::Cancelled) => Level::INFO,
|
||||
CollectKeySpaceError(_) => Level::ERROR,
|
||||
_ if task_cancelled => Level::INFO,
|
||||
Other(err) => {
|
||||
let root_cause = err.root_cause();
|
||||
|
||||
@@ -4,6 +4,7 @@ pub mod delete;
|
||||
pub(crate) mod detach_ancestor;
|
||||
mod eviction_task;
|
||||
pub(crate) mod handle;
|
||||
mod heatmap_layers_downloader;
|
||||
pub(crate) mod import_pgdata;
|
||||
mod init;
|
||||
pub mod layer_manager;
|
||||
@@ -21,6 +22,7 @@ use chrono::{DateTime, Utc};
|
||||
use compaction::CompactionOutcome;
|
||||
use enumset::EnumSet;
|
||||
use fail::fail_point;
|
||||
use futures::FutureExt;
|
||||
use futures::{stream::FuturesUnordered, StreamExt};
|
||||
use handle::ShardTimelineId;
|
||||
use layer_manager::Shutdown;
|
||||
@@ -467,6 +469,10 @@ pub struct Timeline {
|
||||
pub(crate) page_trace: ArcSwapOption<Sender<PageTraceEvent>>,
|
||||
|
||||
previous_heatmap: ArcSwapOption<PreviousHeatmap>,
|
||||
|
||||
/// May host a background Tokio task which downloads all the layers from the current
|
||||
/// heatmap on demand.
|
||||
heatmap_layers_downloader: Mutex<Option<heatmap_layers_downloader::HeatmapLayersDownloader>>,
|
||||
}
|
||||
|
||||
pub(crate) enum PreviousHeatmap {
|
||||
@@ -1293,7 +1299,7 @@ impl Timeline {
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
|
||||
let read_path = if self.conf.enable_read_path_debugging {
|
||||
let read_path = if self.conf.enable_read_path_debugging || ctx.read_path_debug() {
|
||||
Some(ReadPath::new(keyspace.clone(), lsn))
|
||||
} else {
|
||||
None
|
||||
@@ -1876,7 +1882,7 @@ impl Timeline {
|
||||
// Signal compaction failure to avoid L0 flush stalls when it's broken.
|
||||
match result {
|
||||
Ok(_) => self.compaction_failed.store(false, AtomicOrdering::Relaxed),
|
||||
Err(CompactionError::Other(_)) => {
|
||||
Err(CompactionError::Other(_)) | Err(CompactionError::CollectKeySpaceError(_)) => {
|
||||
self.compaction_failed.store(true, AtomicOrdering::Relaxed)
|
||||
}
|
||||
// Don't change the current value on offload failure or shutdown. We don't want to
|
||||
@@ -2039,6 +2045,11 @@ impl Timeline {
|
||||
tracing::debug!("Cancelling CancellationToken");
|
||||
self.cancel.cancel();
|
||||
|
||||
// If we have a background task downloading heatmap layers stop it.
|
||||
// The background downloads are sensitive to timeline cancellation (done above),
|
||||
// so the drain will be immediate.
|
||||
self.stop_and_drain_heatmap_layers_download().await;
|
||||
|
||||
// Ensure Prevent new page service requests from starting.
|
||||
self.handles.shutdown();
|
||||
|
||||
@@ -2752,6 +2763,8 @@ impl Timeline {
|
||||
page_trace: Default::default(),
|
||||
|
||||
previous_heatmap: ArcSwapOption::from_pointee(previous_heatmap),
|
||||
|
||||
heatmap_layers_downloader: Mutex::new(None),
|
||||
};
|
||||
|
||||
result.repartition_threshold =
|
||||
@@ -2861,6 +2874,7 @@ impl Timeline {
|
||||
auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
|
||||
availability_zone: self.conf.availability_zone.clone(),
|
||||
ingest_batch_size: self.conf.ingest_batch_size,
|
||||
validate_wal_contiguity: self.conf.validate_wal_contiguity,
|
||||
},
|
||||
broker_client,
|
||||
ctx,
|
||||
@@ -4592,7 +4606,10 @@ impl Timeline {
|
||||
));
|
||||
}
|
||||
|
||||
let (dense_ks, sparse_ks) = self.collect_keyspace(lsn, ctx).await?;
|
||||
let (dense_ks, sparse_ks) = self
|
||||
.collect_keyspace(lsn, ctx)
|
||||
.await
|
||||
.map_err(CompactionError::CollectKeySpaceError)?;
|
||||
let dense_partitioning = dense_ks.partition(&self.shard_identity, partition_size);
|
||||
let sparse_partitioning = SparseKeyPartitioning {
|
||||
parts: vec![sparse_ks],
|
||||
@@ -5113,20 +5130,26 @@ impl Timeline {
|
||||
// image layer generation taking too long time and blocking L0 compaction. So in this
|
||||
// mode, we also inspect the current number of L0 layers and skip image layer generation
|
||||
// if there are too many of them.
|
||||
let num_of_l0_layers = {
|
||||
let layers = self.layers.read().await;
|
||||
layers.layer_map()?.level0_deltas().len()
|
||||
};
|
||||
let image_preempt_threshold = self.get_image_creation_preempt_threshold()
|
||||
* self.get_compaction_threshold();
|
||||
if image_preempt_threshold != 0 && num_of_l0_layers >= image_preempt_threshold {
|
||||
tracing::info!(
|
||||
"preempt image layer generation at {lsn} when processing partition {}..{}: too many L0 layers {}",
|
||||
partition.start().unwrap(), partition.end().unwrap(), num_of_l0_layers
|
||||
);
|
||||
last_partition_processed = Some(partition.clone());
|
||||
all_generated = false;
|
||||
break;
|
||||
// TODO: currently we do not respect `get_image_creation_preempt_threshold` and always yield
|
||||
// when there is a single timeline with more than L0 threshold L0 layers. As long as the
|
||||
// `get_image_creation_preempt_threshold` is set to a value greater than 0, we will yield for L0 compaction.
|
||||
if image_preempt_threshold != 0 {
|
||||
let should_yield = self
|
||||
.l0_compaction_trigger
|
||||
.notified()
|
||||
.now_or_never()
|
||||
.is_some();
|
||||
if should_yield {
|
||||
tracing::info!(
|
||||
"preempt image layer generation at {lsn} when processing partition {}..{}: too many L0 layers",
|
||||
partition.start().unwrap(), partition.end().unwrap()
|
||||
);
|
||||
last_partition_processed = Some(partition.clone());
|
||||
all_generated = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -5155,14 +5178,16 @@ impl Timeline {
|
||||
.map(|l| l.metadata().file_size)
|
||||
.sum::<u64>();
|
||||
|
||||
info!(
|
||||
"created {} image layers ({} bytes) in {}s, processed {} out of {} partitions",
|
||||
image_layers.len(),
|
||||
total_layer_size,
|
||||
duration.as_secs_f64(),
|
||||
partition_processed,
|
||||
total_partitions
|
||||
);
|
||||
if !image_layers.is_empty() {
|
||||
info!(
|
||||
"created {} image layers ({} bytes) in {}s, processed {} out of {} partitions",
|
||||
image_layers.len(),
|
||||
total_layer_size,
|
||||
duration.as_secs_f64(),
|
||||
partition_processed,
|
||||
total_partitions
|
||||
);
|
||||
}
|
||||
|
||||
Ok((
|
||||
image_layers,
|
||||
@@ -5305,6 +5330,8 @@ pub(crate) enum CompactionError {
|
||||
#[error("Failed to offload timeline: {0}")]
|
||||
Offload(OffloadError),
|
||||
/// Compaction cannot be done right now; page reconstruction and so on.
|
||||
#[error("Failed to collect keyspace: {0}")]
|
||||
CollectKeySpaceError(CollectKeySpaceError),
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
@@ -5318,12 +5345,6 @@ impl From<OffloadError> for CompactionError {
|
||||
}
|
||||
}
|
||||
|
||||
impl CompactionError {
|
||||
pub fn is_cancelled(&self) -> bool {
|
||||
matches!(self, CompactionError::ShuttingDown)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<CollectKeySpaceError> for CompactionError {
|
||||
fn from(err: CollectKeySpaceError) -> Self {
|
||||
match err {
|
||||
@@ -6588,7 +6609,7 @@ impl TimelineWriter<'_> {
|
||||
|
||||
if let Some(wait_threshold) = wait_threshold {
|
||||
if l0_count >= wait_threshold {
|
||||
info!("layer roll waiting for flush due to compaction backpressure at {l0_count} L0 layers");
|
||||
debug!("layer roll waiting for flush due to compaction backpressure at {l0_count} L0 layers");
|
||||
self.tl.wait_flush_completion(flush_id).await?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,7 +11,8 @@ use std::sync::Arc;
|
||||
use super::layer_manager::LayerManager;
|
||||
use super::{
|
||||
CompactFlags, CompactOptions, CreateImageLayersError, DurationRecorder, GetVectoredError,
|
||||
ImageLayerCreationMode, LastImageLayerCreationStatus, RecordedDuration, Timeline,
|
||||
ImageLayerCreationMode, LastImageLayerCreationStatus, PageReconstructError, RecordedDuration,
|
||||
Timeline,
|
||||
};
|
||||
|
||||
use anyhow::{anyhow, bail, Context};
|
||||
@@ -25,12 +26,13 @@ use pageserver_api::models::CompactInfoResponse;
|
||||
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
|
||||
use serde::Serialize;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, info, info_span, trace, warn, Instrument};
|
||||
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
|
||||
use utils::critical;
|
||||
use utils::id::TimelineId;
|
||||
|
||||
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
|
||||
use crate::page_cache;
|
||||
use crate::pgdatadir_mapping::CollectKeySpaceError;
|
||||
use crate::statvfs::Statvfs;
|
||||
use crate::tenant::checks::check_valid_layermap;
|
||||
use crate::tenant::gc_block::GcBlock;
|
||||
@@ -301,18 +303,12 @@ impl GcCompactionQueue {
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.gc_guards.insert(id, gc_guard);
|
||||
}
|
||||
let _ = timeline
|
||||
.compact_with_options(cancel, options, ctx)
|
||||
.instrument(info_span!("scheduled_compact_timeline", %timeline.timeline_id))
|
||||
.await?;
|
||||
let _ = timeline.compact_with_options(cancel, options, ctx).await?;
|
||||
self.notify_and_unblock(id);
|
||||
}
|
||||
}
|
||||
GcCompactionQueueItem::SubCompactionJob(options) => {
|
||||
let _ = timeline
|
||||
.compact_with_options(cancel, options, ctx)
|
||||
.instrument(info_span!("scheduled_compact_timeline", %timeline.timeline_id))
|
||||
.await?;
|
||||
let _ = timeline.compact_with_options(cancel, options, ctx).await?;
|
||||
}
|
||||
GcCompactionQueueItem::Notify(id) => {
|
||||
self.notify_and_unblock(id);
|
||||
@@ -692,21 +688,6 @@ impl Timeline {
|
||||
|
||||
// Define partitioning schema if needed
|
||||
|
||||
let l0_l1_boundary_lsn = {
|
||||
// We do the repartition on the L0-L1 boundary. All data below the boundary
|
||||
// are compacted by L0 with low read amplification, thus making the `repartition`
|
||||
// function run fast.
|
||||
let guard = self.layers.read().await;
|
||||
let l0_min_lsn = guard
|
||||
.layer_map()?
|
||||
.level0_deltas()
|
||||
.iter()
|
||||
.map(|l| l.get_lsn_range().start)
|
||||
.min()
|
||||
.unwrap_or(self.get_disk_consistent_lsn());
|
||||
l0_min_lsn.max(self.get_ancestor_lsn())
|
||||
};
|
||||
|
||||
// 1. L0 Compact
|
||||
let l0_outcome = {
|
||||
let timer = self.metrics.compact_time_histo.start_timer();
|
||||
@@ -733,86 +714,87 @@ impl Timeline {
|
||||
return Ok(CompactionOutcome::YieldForL0);
|
||||
}
|
||||
|
||||
if l0_l1_boundary_lsn < self.partitioning.read().1 {
|
||||
// We never go backwards when repartition and create image layers.
|
||||
info!("skipping image layer generation because repartition LSN is greater than L0-L1 boundary LSN.");
|
||||
} else {
|
||||
// 2. Repartition and create image layers if necessary
|
||||
match self
|
||||
.repartition(
|
||||
l0_l1_boundary_lsn,
|
||||
self.get_compaction_target_size(),
|
||||
options.flags,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
|
||||
// Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
|
||||
let image_ctx = RequestContextBuilder::extend(ctx)
|
||||
.access_stats_behavior(AccessStatsBehavior::Skip)
|
||||
.build();
|
||||
// 2. Repartition and create image layers if necessary
|
||||
match self
|
||||
.repartition(
|
||||
self.get_last_record_lsn(),
|
||||
self.get_compaction_target_size(),
|
||||
options.flags,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
|
||||
// Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
|
||||
let image_ctx = RequestContextBuilder::extend(ctx)
|
||||
.access_stats_behavior(AccessStatsBehavior::Skip)
|
||||
.build();
|
||||
|
||||
let mut partitioning = dense_partitioning;
|
||||
partitioning
|
||||
.parts
|
||||
.extend(sparse_partitioning.into_dense().parts);
|
||||
let mut partitioning = dense_partitioning;
|
||||
partitioning
|
||||
.parts
|
||||
.extend(sparse_partitioning.into_dense().parts);
|
||||
|
||||
// 3. Create new image layers for partitions that have been modified "enough".
|
||||
let (image_layers, outcome) = self
|
||||
.create_image_layers(
|
||||
&partitioning,
|
||||
lsn,
|
||||
if options
|
||||
.flags
|
||||
.contains(CompactFlags::ForceImageLayerCreation)
|
||||
{
|
||||
ImageLayerCreationMode::Force
|
||||
} else {
|
||||
ImageLayerCreationMode::Try
|
||||
},
|
||||
&image_ctx,
|
||||
self.last_image_layer_creation_status
|
||||
.load()
|
||||
.as_ref()
|
||||
.clone(),
|
||||
!options.flags.contains(CompactFlags::NoYield),
|
||||
)
|
||||
.await
|
||||
.inspect_err(|err| {
|
||||
if let CreateImageLayersError::GetVectoredError(
|
||||
GetVectoredError::MissingKey(_),
|
||||
) = err
|
||||
{
|
||||
critical!("missing key during compaction: {err:?}");
|
||||
}
|
||||
})?;
|
||||
// 3. Create new image layers for partitions that have been modified "enough".
|
||||
let (image_layers, outcome) = self
|
||||
.create_image_layers(
|
||||
&partitioning,
|
||||
lsn,
|
||||
if options
|
||||
.flags
|
||||
.contains(CompactFlags::ForceImageLayerCreation)
|
||||
{
|
||||
ImageLayerCreationMode::Force
|
||||
} else {
|
||||
ImageLayerCreationMode::Try
|
||||
},
|
||||
&image_ctx,
|
||||
self.last_image_layer_creation_status
|
||||
.load()
|
||||
.as_ref()
|
||||
.clone(),
|
||||
!options.flags.contains(CompactFlags::NoYield),
|
||||
)
|
||||
.await
|
||||
.inspect_err(|err| {
|
||||
if let CreateImageLayersError::GetVectoredError(
|
||||
GetVectoredError::MissingKey(_),
|
||||
) = err
|
||||
{
|
||||
critical!("missing key during compaction: {err:?}");
|
||||
}
|
||||
})?;
|
||||
|
||||
self.last_image_layer_creation_status
|
||||
.store(Arc::new(outcome.clone()));
|
||||
self.last_image_layer_creation_status
|
||||
.store(Arc::new(outcome.clone()));
|
||||
|
||||
self.upload_new_image_layers(image_layers)?;
|
||||
if let LastImageLayerCreationStatus::Incomplete { .. } = outcome {
|
||||
// Yield and do not do any other kind of compaction.
|
||||
info!("skipping shard ancestor compaction due to pending image layer generation tasks (preempted by L0 compaction).");
|
||||
return Ok(CompactionOutcome::YieldForL0);
|
||||
}
|
||||
self.upload_new_image_layers(image_layers)?;
|
||||
if let LastImageLayerCreationStatus::Incomplete { .. } = outcome {
|
||||
// Yield and do not do any other kind of compaction.
|
||||
info!("skipping shard ancestor compaction due to pending image layer generation tasks (preempted by L0 compaction).");
|
||||
return Ok(CompactionOutcome::YieldForL0);
|
||||
}
|
||||
Err(err) => {
|
||||
// no partitioning? This is normal, if the timeline was just created
|
||||
// as an empty timeline. Also in unit tests, when we use the timeline
|
||||
// as a simple key-value store, ignoring the datadir layout. Log the
|
||||
// error but continue.
|
||||
//
|
||||
// Suppress error when it's due to cancellation
|
||||
if !self.cancel.is_cancelled() && !err.is_cancelled() {
|
||||
tracing::error!(
|
||||
"could not compact, repartitioning keyspace failed: {err:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// Suppress errors when cancelled.
|
||||
Err(_) if self.cancel.is_cancelled() => {}
|
||||
Err(CompactionError::ShuttingDown) => {}
|
||||
|
||||
// Alert on critical errors that indicate data corruption.
|
||||
Err(
|
||||
err @ CompactionError::CollectKeySpaceError(
|
||||
CollectKeySpaceError::Decode(_)
|
||||
| CollectKeySpaceError::PageRead(
|
||||
PageReconstructError::MissingKey(_) | PageReconstructError::WalRedo(_),
|
||||
),
|
||||
),
|
||||
) => critical!("could not compact, repartitioning keyspace failed: {err:?}"),
|
||||
|
||||
// Log other errors. No partitioning? This is normal, if the timeline was just created
|
||||
// as an empty timeline. Also in unit tests, when we use the timeline as a simple
|
||||
// key-value store, ignoring the datadir layout. Log the error but continue.
|
||||
Err(err) => error!("could not compact, repartitioning keyspace failed: {err:?}"),
|
||||
};
|
||||
|
||||
let partition_count = self.partitioning.read().0 .0.parts.len();
|
||||
|
||||
@@ -2230,7 +2212,7 @@ impl Timeline {
|
||||
let sub_compaction_max_job_size_mb =
|
||||
sub_compaction_max_job_size_mb.unwrap_or(GC_COMPACT_MAX_SIZE_MB);
|
||||
|
||||
let mut compact_jobs = Vec::new();
|
||||
let mut compact_jobs = Vec::<GcCompactJob>::new();
|
||||
// For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
|
||||
// by estimating the amount of files read for a compaction job. We should also partition on LSN.
|
||||
let ((dense_ks, sparse_ks), _) = self.partitioning.read().as_ref().clone();
|
||||
@@ -2317,16 +2299,25 @@ impl Timeline {
|
||||
} else {
|
||||
end
|
||||
};
|
||||
info!(
|
||||
"splitting compaction job: {}..{}, estimated_size={}",
|
||||
start, end, total_size
|
||||
);
|
||||
compact_jobs.push(GcCompactJob {
|
||||
dry_run: job.dry_run,
|
||||
compact_key_range: start..end,
|
||||
compact_lsn_range: job.compact_lsn_range.start..compact_below_lsn,
|
||||
});
|
||||
current_start = Some(end);
|
||||
if total_size == 0 && !compact_jobs.is_empty() {
|
||||
info!(
|
||||
"splitting compaction job: {}..{}, estimated_size={}, extending the previous job",
|
||||
start, end, total_size
|
||||
);
|
||||
compact_jobs.last_mut().unwrap().compact_key_range.end = end;
|
||||
current_start = Some(end);
|
||||
} else {
|
||||
info!(
|
||||
"splitting compaction job: {}..{}, estimated_size={}",
|
||||
start, end, total_size
|
||||
);
|
||||
compact_jobs.push(GcCompactJob {
|
||||
dry_run: job.dry_run,
|
||||
compact_key_range: start..end,
|
||||
compact_lsn_range: job.compact_lsn_range.start..compact_below_lsn,
|
||||
});
|
||||
current_start = Some(end);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(compact_jobs)
|
||||
|
||||
162
pageserver/src/tenant/timeline/heatmap_layers_downloader.rs
Normal file
162
pageserver/src/tenant/timeline/heatmap_layers_downloader.rs
Normal file
@@ -0,0 +1,162 @@
|
||||
//! Timeline utility module to hydrate everything from the current heatmap.
|
||||
//!
|
||||
//! Provides utilities to spawn and abort a background task where the downloads happen.
|
||||
//! See /v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_heatmap_layers.
|
||||
|
||||
use futures::StreamExt;
|
||||
use http_utils::error::ApiError;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::sync::gate::Gate;
|
||||
|
||||
use super::Timeline;
|
||||
|
||||
// This status is not strictly necessary now, but gives us a nice place
|
||||
// to store progress information if we ever wish to expose it.
|
||||
pub(super) enum HeatmapLayersDownloadStatus {
|
||||
InProgress,
|
||||
Complete,
|
||||
}
|
||||
|
||||
pub(super) struct HeatmapLayersDownloader {
|
||||
handle: tokio::task::JoinHandle<()>,
|
||||
status: Arc<Mutex<HeatmapLayersDownloadStatus>>,
|
||||
cancel: CancellationToken,
|
||||
downloads_guard: Arc<Gate>,
|
||||
}
|
||||
|
||||
impl HeatmapLayersDownloader {
|
||||
fn new(
|
||||
timeline: Arc<Timeline>,
|
||||
concurrency: usize,
|
||||
) -> Result<HeatmapLayersDownloader, ApiError> {
|
||||
let tl_guard = timeline.gate.enter().map_err(|_| ApiError::Cancelled)?;
|
||||
|
||||
let cancel = timeline.cancel.child_token();
|
||||
let downloads_guard = Arc::new(Gate::default());
|
||||
|
||||
let status = Arc::new(Mutex::new(HeatmapLayersDownloadStatus::InProgress));
|
||||
|
||||
let handle = tokio::task::spawn({
|
||||
let status = status.clone();
|
||||
let downloads_guard = downloads_guard.clone();
|
||||
let cancel = cancel.clone();
|
||||
|
||||
async move {
|
||||
let _guard = tl_guard;
|
||||
|
||||
scopeguard::defer! {
|
||||
*status.lock().unwrap() = HeatmapLayersDownloadStatus::Complete;
|
||||
}
|
||||
|
||||
let Some(heatmap) = timeline.generate_heatmap().await else {
|
||||
tracing::info!("Heatmap layers download failed to generate heatmap");
|
||||
return;
|
||||
};
|
||||
|
||||
tracing::info!(
|
||||
resident_size=%timeline.resident_physical_size(),
|
||||
heatmap_layers=%heatmap.layers.len(),
|
||||
"Starting heatmap layers download"
|
||||
);
|
||||
|
||||
let stream = futures::stream::iter(heatmap.layers.into_iter().filter_map(
|
||||
|layer| {
|
||||
let tl = timeline.clone();
|
||||
let dl_guard = match downloads_guard.enter() {
|
||||
Ok(g) => g,
|
||||
Err(_) => {
|
||||
// [`Self::shutdown`] was called. Don't spawn any more downloads.
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
Some(async move {
|
||||
let _dl_guard = dl_guard;
|
||||
|
||||
let res = tl.download_layer(&layer.name).await;
|
||||
if let Err(err) = res {
|
||||
if !err.is_cancelled() {
|
||||
tracing::warn!(layer=%layer.name,"Failed to download heatmap layer: {err}")
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
)).buffered(concurrency);
|
||||
|
||||
tokio::select! {
|
||||
_ = stream.collect::<()>() => {
|
||||
tracing::info!(
|
||||
resident_size=%timeline.resident_physical_size(),
|
||||
"Heatmap layers download completed"
|
||||
);
|
||||
},
|
||||
_ = cancel.cancelled() => {
|
||||
tracing::info!("Heatmap layers download cancelled");
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
status,
|
||||
handle,
|
||||
cancel,
|
||||
downloads_guard,
|
||||
})
|
||||
}
|
||||
|
||||
fn is_complete(&self) -> bool {
|
||||
matches!(
|
||||
*self.status.lock().unwrap(),
|
||||
HeatmapLayersDownloadStatus::Complete
|
||||
)
|
||||
}
|
||||
|
||||
/// Drive any in-progress downloads to completion and stop spawning any new ones.
|
||||
///
|
||||
/// This has two callers and they behave differently
|
||||
/// 1. [`Timeline::shutdown`]: the drain will be immediate since downloads themselves
|
||||
/// are sensitive to timeline cancellation.
|
||||
///
|
||||
/// 2. Endpoint handler in [`crate::http::routes`]: the drain will wait for any in-progress
|
||||
/// downloads to complete.
|
||||
async fn stop_and_drain(self) {
|
||||
// Counterintuitive: close the guard before cancelling.
|
||||
// Something needs to poll the already created download futures to completion.
|
||||
// If we cancel first, then the underlying task exits and we lost
|
||||
// the poller.
|
||||
self.downloads_guard.close().await;
|
||||
self.cancel.cancel();
|
||||
if let Err(err) = self.handle.await {
|
||||
tracing::warn!("Failed to join heatmap layer downloader task: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Timeline {
|
||||
pub(crate) async fn start_heatmap_layers_download(
|
||||
self: &Arc<Self>,
|
||||
concurrency: usize,
|
||||
) -> Result<(), ApiError> {
|
||||
let mut locked = self.heatmap_layers_downloader.lock().unwrap();
|
||||
if locked.as_ref().map(|dl| dl.is_complete()).unwrap_or(true) {
|
||||
let dl = HeatmapLayersDownloader::new(self.clone(), concurrency)?;
|
||||
*locked = Some(dl);
|
||||
Ok(())
|
||||
} else {
|
||||
Err(ApiError::Conflict("Already running".to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn stop_and_drain_heatmap_layers_download(&self) {
|
||||
// This can race with the start of a new downloader and lead to a situation
|
||||
// where one donloader is shutting down and another one is in-flight.
|
||||
// The only impact is that we'd end up using more remote storage semaphore
|
||||
// units than expected.
|
||||
let downloader = self.heatmap_layers_downloader.lock().unwrap().take();
|
||||
if let Some(dl) = downloader {
|
||||
dl.stop_and_drain().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -56,6 +56,7 @@ pub struct WalReceiverConf {
|
||||
pub auth_token: Option<Arc<String>>,
|
||||
pub availability_zone: Option<String>,
|
||||
pub ingest_batch_size: u64,
|
||||
pub validate_wal_contiguity: bool,
|
||||
}
|
||||
|
||||
pub struct WalReceiver {
|
||||
|
||||
@@ -537,6 +537,7 @@ impl ConnectionManagerState {
|
||||
let connect_timeout = self.conf.wal_connect_timeout;
|
||||
let ingest_batch_size = self.conf.ingest_batch_size;
|
||||
let protocol = self.conf.protocol;
|
||||
let validate_wal_contiguity = self.conf.validate_wal_contiguity;
|
||||
let timeline = Arc::clone(&self.timeline);
|
||||
let ctx = ctx.detached_child(
|
||||
TaskKind::WalReceiverConnectionHandler,
|
||||
@@ -558,6 +559,7 @@ impl ConnectionManagerState {
|
||||
ctx,
|
||||
node_id,
|
||||
ingest_batch_size,
|
||||
validate_wal_contiguity,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -1563,6 +1565,7 @@ mod tests {
|
||||
auth_token: None,
|
||||
availability_zone: None,
|
||||
ingest_batch_size: 1,
|
||||
validate_wal_contiguity: false,
|
||||
},
|
||||
wal_connection: None,
|
||||
wal_stream_candidates: HashMap::new(),
|
||||
|
||||
@@ -120,6 +120,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
ctx: RequestContext,
|
||||
safekeeper_node: NodeId,
|
||||
ingest_batch_size: u64,
|
||||
validate_wal_contiguity: bool,
|
||||
) -> Result<(), WalReceiverError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
@@ -274,6 +275,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
} => Some((format, compression)),
|
||||
};
|
||||
|
||||
let mut expected_wal_start = startpoint;
|
||||
while let Some(replication_message) = {
|
||||
select! {
|
||||
_ = cancellation.cancelled() => {
|
||||
@@ -340,13 +342,49 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
)
|
||||
})?;
|
||||
|
||||
// Guard against WAL gaps. If the start LSN of the PG WAL section
|
||||
// from which the interpreted records were extracted, doesn't match
|
||||
// the end of the previous batch (or the starting point for the first batch),
|
||||
// then kill this WAL receiver connection and start a new one.
|
||||
if validate_wal_contiguity {
|
||||
if let Some(raw_wal_start_lsn) = batch.raw_wal_start_lsn {
|
||||
match raw_wal_start_lsn.cmp(&expected_wal_start) {
|
||||
std::cmp::Ordering::Greater => {
|
||||
let msg = format!(
|
||||
"Gap in streamed WAL: [{}, {})",
|
||||
expected_wal_start, raw_wal_start_lsn
|
||||
);
|
||||
critical!("{msg}");
|
||||
return Err(WalReceiverError::Other(anyhow!(msg)));
|
||||
}
|
||||
std::cmp::Ordering::Less => {
|
||||
// Other shards are reading WAL behind us.
|
||||
// This is valid, but check that we received records
|
||||
// that we haven't seen before.
|
||||
if let Some(first_rec) = batch.records.first() {
|
||||
if first_rec.next_record_lsn < last_rec_lsn {
|
||||
let msg = format!(
|
||||
"Received record with next_record_lsn multiple times ({} < {})",
|
||||
first_rec.next_record_lsn, expected_wal_start
|
||||
);
|
||||
critical!("{msg}");
|
||||
return Err(WalReceiverError::Other(anyhow!(msg)));
|
||||
}
|
||||
}
|
||||
}
|
||||
std::cmp::Ordering::Equal => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let InterpretedWalRecords {
|
||||
records,
|
||||
next_record_lsn,
|
||||
raw_wal_start_lsn: _,
|
||||
} = batch;
|
||||
|
||||
tracing::debug!(
|
||||
"Received WAL up to {} with next_record_lsn={:?}",
|
||||
"Received WAL up to {} with next_record_lsn={}",
|
||||
streaming_lsn,
|
||||
next_record_lsn
|
||||
);
|
||||
@@ -423,12 +461,11 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
// need to advance last record LSN on all shards. If we've not ingested the latest
|
||||
// record, then set the LSN of the modification past it. This way all shards
|
||||
// advance their last record LSN at the same time.
|
||||
let needs_last_record_lsn_advance = match next_record_lsn {
|
||||
Some(lsn) if lsn > modification.get_lsn() => {
|
||||
modification.set_lsn(lsn).unwrap();
|
||||
true
|
||||
}
|
||||
_ => false,
|
||||
let needs_last_record_lsn_advance = if next_record_lsn > modification.get_lsn() {
|
||||
modification.set_lsn(next_record_lsn).unwrap();
|
||||
true
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
if uncommitted_records > 0 || needs_last_record_lsn_advance {
|
||||
@@ -446,9 +483,8 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
timeline.get_last_record_lsn()
|
||||
);
|
||||
|
||||
if let Some(lsn) = next_record_lsn {
|
||||
last_rec_lsn = lsn;
|
||||
}
|
||||
last_rec_lsn = next_record_lsn;
|
||||
expected_wal_start = streaming_lsn;
|
||||
|
||||
Some(streaming_lsn)
|
||||
}
|
||||
|
||||
@@ -1180,6 +1180,50 @@ impl WalIngest {
|
||||
} else {
|
||||
cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
|
||||
}
|
||||
// NB: We abuse the Checkpoint.redo field:
|
||||
//
|
||||
// - In PostgreSQL, the Checkpoint struct doesn't store the information
|
||||
// of whether this is an online checkpoint or a shutdown checkpoint. It's
|
||||
// stored in the XLOG info field of the WAL record, shutdown checkpoints
|
||||
// use record type XLOG_CHECKPOINT_SHUTDOWN and online checkpoints use
|
||||
// XLOG_CHECKPOINT_ONLINE. We don't store the original WAL record headers
|
||||
// in the pageserver, however.
|
||||
//
|
||||
// - In PostgreSQL, the Checkpoint.redo field stores the *start* of the
|
||||
// checkpoint record, if it's a shutdown checkpoint. But when we are
|
||||
// starting from a shutdown checkpoint, the basebackup LSN is the *end*
|
||||
// of the shutdown checkpoint WAL record. That makes it difficult to
|
||||
// correctly detect whether we're starting from a shutdown record or
|
||||
// not.
|
||||
//
|
||||
// To address both of those issues, we store 0 in the redo field if it's
|
||||
// an online checkpoint record, and the record's *end* LSN if it's a
|
||||
// shutdown checkpoint. We don't need the original redo pointer in neon,
|
||||
// because we don't perform WAL replay at startup anyway, so we can get
|
||||
// away with abusing the redo field like this.
|
||||
//
|
||||
// XXX: Ideally, we would persist the extra information in a more
|
||||
// explicit format, rather than repurpose the fields of the Postgres
|
||||
// struct like this. However, we already have persisted data like this,
|
||||
// so we need to maintain backwards compatibility.
|
||||
//
|
||||
// NB: We didn't originally have this convention, so there are still old
|
||||
// persisted records that didn't do this. Before, we didn't update the
|
||||
// persisted redo field at all. That means that old records have a bogus
|
||||
// redo pointer that points to some old value, from the checkpoint record
|
||||
// that was originally imported from the data directory. If it was a
|
||||
// project created in Neon, that means it points to the first checkpoint
|
||||
// after initdb. That's OK for our purposes: all such old checkpoints are
|
||||
// treated as old online checkpoints when the basebackup is created.
|
||||
cp.redo = if info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN {
|
||||
// Store the *end* LSN of the checkpoint record. Or to be precise,
|
||||
// the start LSN of the *next* record, i.e. if the record ends
|
||||
// exactly at page boundary, the redo LSN points to just after the
|
||||
// page header on the next page.
|
||||
lsn.into()
|
||||
} else {
|
||||
Lsn::INVALID.into()
|
||||
};
|
||||
|
||||
// Write a new checkpoint key-value pair on every checkpoint record, even
|
||||
// if nothing really changed. Not strictly required, but it seems nice to
|
||||
|
||||
@@ -136,7 +136,9 @@ impl WalRedoProcess {
|
||||
Ok(0) => break Ok(()), // eof
|
||||
Ok(num_bytes) => {
|
||||
let output = String::from_utf8_lossy(&buf[..num_bytes]);
|
||||
error!(%output, "received output");
|
||||
if !output.contains("LOG:") {
|
||||
error!(%output, "received output");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
break Err(e);
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -56,6 +56,7 @@ uint32 WAIT_EVENT_NEON_LFC_MAINTENANCE;
|
||||
uint32 WAIT_EVENT_NEON_LFC_READ;
|
||||
uint32 WAIT_EVENT_NEON_LFC_TRUNCATE;
|
||||
uint32 WAIT_EVENT_NEON_LFC_WRITE;
|
||||
uint32 WAIT_EVENT_NEON_LFC_CV_WAIT;
|
||||
uint32 WAIT_EVENT_NEON_PS_STARTING;
|
||||
uint32 WAIT_EVENT_NEON_PS_CONFIGURING;
|
||||
uint32 WAIT_EVENT_NEON_PS_SEND;
|
||||
@@ -538,6 +539,7 @@ neon_shmem_startup_hook(void)
|
||||
WAIT_EVENT_NEON_LFC_READ = WaitEventExtensionNew("Neon/FileCache_Read");
|
||||
WAIT_EVENT_NEON_LFC_TRUNCATE = WaitEventExtensionNew("Neon/FileCache_Truncate");
|
||||
WAIT_EVENT_NEON_LFC_WRITE = WaitEventExtensionNew("Neon/FileCache_Write");
|
||||
WAIT_EVENT_NEON_LFC_CV_WAIT = WaitEventExtensionNew("Neon/FileCache_CvWait");
|
||||
WAIT_EVENT_NEON_PS_STARTING = WaitEventExtensionNew("Neon/PS_Starting");
|
||||
WAIT_EVENT_NEON_PS_CONFIGURING = WaitEventExtensionNew("Neon/PS_Configuring");
|
||||
WAIT_EVENT_NEON_PS_SEND = WaitEventExtensionNew("Neon/PS_SendIO");
|
||||
|
||||
@@ -28,6 +28,7 @@ extern uint32 WAIT_EVENT_NEON_LFC_MAINTENANCE;
|
||||
extern uint32 WAIT_EVENT_NEON_LFC_READ;
|
||||
extern uint32 WAIT_EVENT_NEON_LFC_TRUNCATE;
|
||||
extern uint32 WAIT_EVENT_NEON_LFC_WRITE;
|
||||
extern uint32 WAIT_EVENT_NEON_LFC_CV_WAIT;
|
||||
extern uint32 WAIT_EVENT_NEON_PS_STARTING;
|
||||
extern uint32 WAIT_EVENT_NEON_PS_CONFIGURING;
|
||||
extern uint32 WAIT_EVENT_NEON_PS_SEND;
|
||||
@@ -38,6 +39,7 @@ extern uint32 WAIT_EVENT_NEON_WAL_DL;
|
||||
#define WAIT_EVENT_NEON_LFC_READ WAIT_EVENT_BUFFILE_READ
|
||||
#define WAIT_EVENT_NEON_LFC_TRUNCATE WAIT_EVENT_BUFFILE_TRUNCATE
|
||||
#define WAIT_EVENT_NEON_LFC_WRITE WAIT_EVENT_BUFFILE_WRITE
|
||||
#define WAIT_EVENT_NEON_LFC_CV_WAIT WAIT_EVENT_BUFFILE_READ
|
||||
#define WAIT_EVENT_NEON_PS_STARTING PG_WAIT_EXTENSION
|
||||
#define WAIT_EVENT_NEON_PS_CONFIGURING PG_WAIT_EXTENSION
|
||||
#define WAIT_EVENT_NEON_PS_SEND PG_WAIT_EXTENSION
|
||||
|
||||
@@ -233,6 +233,7 @@ extern char *neon_timeline;
|
||||
extern char *neon_tenant;
|
||||
extern int32 max_cluster_size;
|
||||
extern int neon_protocol_version;
|
||||
extern bool lfc_store_prefetch_result;
|
||||
|
||||
extern shardno_t get_shard_number(BufferTag* tag);
|
||||
|
||||
@@ -301,14 +302,16 @@ extern bool lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum,
|
||||
BlockNumber blkno);
|
||||
extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum,
|
||||
BlockNumber blkno, int nblocks, bits8 *bitmap);
|
||||
extern void lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno);
|
||||
extern void lfc_init(void);
|
||||
extern bool lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
const void* buffer, XLogRecPtr lsn);
|
||||
|
||||
|
||||
static inline bool
|
||||
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
void *buffer)
|
||||
{
|
||||
bits8 rv = 0;
|
||||
bits8 rv = 1;
|
||||
return lfc_readv_select(rinfo, forkNum, blkno, &buffer, 1, &rv) == 1;
|
||||
}
|
||||
|
||||
|
||||
@@ -162,7 +162,7 @@ static uint32 local_request_counter;
|
||||
* UNUSED ------> REQUESTED --> RECEIVED
|
||||
* ^ : | |
|
||||
* | : v |
|
||||
* | : TAG_UNUSED |
|
||||
* | : TAG_REMAINS |
|
||||
* | : | |
|
||||
* +----------------+------------+
|
||||
* :
|
||||
@@ -181,7 +181,7 @@ typedef enum PrefetchStatus
|
||||
/* must fit in uint8; bits 0x1 are used */
|
||||
typedef enum {
|
||||
PRFSF_NONE = 0x0,
|
||||
PRFSF_SEQ = 0x1,
|
||||
PRFSF_LFC = 0x1 /* received prefetch result is stored in LFC */
|
||||
} PrefetchRequestFlags;
|
||||
|
||||
typedef struct PrefetchRequest
|
||||
@@ -305,7 +305,7 @@ GetLastWrittenLSNv(NRelFileInfo relfilenode, ForkNumber forknum,
|
||||
static void
|
||||
neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum,
|
||||
BlockNumber blkno, neon_request_lsns *output,
|
||||
BlockNumber nblocks, const bits8 *mask);
|
||||
BlockNumber nblocks);
|
||||
static bool neon_prefetch_response_usable(neon_request_lsns *request_lsns,
|
||||
PrefetchRequest *slot);
|
||||
|
||||
@@ -363,6 +363,7 @@ compact_prefetch_buffers(void)
|
||||
target_slot->buftag = source_slot->buftag;
|
||||
target_slot->shard_no = source_slot->shard_no;
|
||||
target_slot->status = source_slot->status;
|
||||
target_slot->flags = source_slot->flags;
|
||||
target_slot->response = source_slot->response;
|
||||
target_slot->reqid = source_slot->reqid;
|
||||
target_slot->request_lsns = source_slot->request_lsns;
|
||||
@@ -452,6 +453,18 @@ prefetch_pump_state(void)
|
||||
/* update slot state */
|
||||
slot->status = PRFS_RECEIVED;
|
||||
slot->response = response;
|
||||
|
||||
if (response->tag == T_NeonGetPageResponse && !(slot->flags & PRFSF_LFC) && lfc_store_prefetch_result)
|
||||
{
|
||||
/*
|
||||
* Store prefetched result in LFC (please read comments to lfc_prefetch
|
||||
* explaining why it can be done without holding shared buffer lock
|
||||
*/
|
||||
if (lfc_prefetch(BufTagGetNRelFileInfo(slot->buftag), slot->buftag.forkNum, slot->buftag.blockNum, ((NeonGetPageResponse*)response)->page, slot->request_lsns.not_modified_since))
|
||||
{
|
||||
slot->flags |= PRFSF_LFC;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -474,8 +487,7 @@ readahead_buffer_resize(int newsize, void *extra)
|
||||
*/
|
||||
if (MyPState->n_requests_inflight > newsize)
|
||||
{
|
||||
Assert(MyPState->ring_unused >= MyPState->n_requests_inflight - newsize);
|
||||
prefetch_wait_for(MyPState->ring_unused - (MyPState->n_requests_inflight - newsize));
|
||||
prefetch_wait_for(MyPState->ring_unused - newsize - 1);
|
||||
Assert(MyPState->n_requests_inflight <= newsize);
|
||||
}
|
||||
|
||||
@@ -714,6 +726,18 @@ prefetch_read(PrefetchRequest *slot)
|
||||
/* update slot state */
|
||||
slot->status = PRFS_RECEIVED;
|
||||
slot->response = response;
|
||||
|
||||
if (response->tag == T_NeonGetPageResponse && !(slot->flags & PRFSF_LFC) && lfc_store_prefetch_result)
|
||||
{
|
||||
/*
|
||||
* Store prefetched result in LFC (please read comments to lfc_prefetch
|
||||
* explaining why it can be done without holding shared buffer lock
|
||||
*/
|
||||
if (lfc_prefetch(BufTagGetNRelFileInfo(buftag), buftag.forkNum, buftag.blockNum, ((NeonGetPageResponse*)response)->page, slot->request_lsns.not_modified_since))
|
||||
{
|
||||
slot->flags |= PRFSF_LFC;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
else
|
||||
@@ -865,7 +889,7 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
||||
else
|
||||
neon_get_request_lsns(BufTagGetNRelFileInfo(slot->buftag),
|
||||
slot->buftag.forkNum, slot->buftag.blockNum,
|
||||
&slot->request_lsns, 1, NULL);
|
||||
&slot->request_lsns, 1);
|
||||
request.hdr.lsn = slot->request_lsns.request_lsn;
|
||||
request.hdr.not_modified_since = slot->request_lsns.not_modified_since;
|
||||
|
||||
@@ -891,6 +915,73 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
||||
Assert(!found);
|
||||
}
|
||||
|
||||
/*
|
||||
* Lookup of already received prefetch requests. Only already received responses matching required LSNs are accepted.
|
||||
* Present pages are marked in "mask" bitmap and total number of such pages is returned.
|
||||
*/
|
||||
static int
|
||||
prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blocknum, neon_request_lsns *lsns,
|
||||
BlockNumber nblocks, void **buffers, bits8 *mask)
|
||||
{
|
||||
int hits = 0;
|
||||
PrefetchRequest hashkey;
|
||||
|
||||
/*
|
||||
* Use an intermediate PrefetchRequest struct as the hash key to ensure
|
||||
* correct alignment and that the padding bytes are cleared.
|
||||
*/
|
||||
memset(&hashkey.buftag, 0, sizeof(BufferTag));
|
||||
CopyNRelFileInfoToBufTag(hashkey.buftag, rinfo);
|
||||
hashkey.buftag.forkNum = forknum;
|
||||
|
||||
for (int i = 0; i < nblocks; i++)
|
||||
{
|
||||
PrfHashEntry *entry;
|
||||
|
||||
hashkey.buftag.blockNum = blocknum + i;
|
||||
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
|
||||
|
||||
if (entry != NULL)
|
||||
{
|
||||
PrefetchRequest *slot = entry->slot;
|
||||
uint64 ring_index = slot->my_ring_index;
|
||||
Assert(slot == GetPrfSlot(ring_index));
|
||||
|
||||
Assert(slot->status != PRFS_UNUSED);
|
||||
Assert(MyPState->ring_last <= ring_index &&
|
||||
ring_index < MyPState->ring_unused);
|
||||
Assert(BufferTagsEqual(&slot->buftag, &hashkey.buftag));
|
||||
|
||||
if (slot->status != PRFS_RECEIVED)
|
||||
continue;
|
||||
|
||||
/*
|
||||
* If the caller specified a request LSN to use, only accept
|
||||
* prefetch responses that satisfy that request.
|
||||
*/
|
||||
if (!neon_prefetch_response_usable(&lsns[i], slot))
|
||||
continue;
|
||||
|
||||
memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ);
|
||||
prefetch_set_unused(ring_index);
|
||||
BITMAP_SET(mask, i);
|
||||
|
||||
hits += 1;
|
||||
}
|
||||
}
|
||||
pgBufferUsage.prefetch.hits += hits;
|
||||
return hits;
|
||||
}
|
||||
|
||||
#if PG_MAJORVERSION_NUM < 17
|
||||
static bool
|
||||
prefetch_lookup(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkn, neon_request_lsns *lsns, void *buffer)
|
||||
{
|
||||
bits8 present = 0;
|
||||
return prefetch_lookupv(rinfo, forkNum, blkn, lsns, 1, &buffer, &present) != 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
/*
|
||||
* prefetch_register_bufferv() - register and prefetch buffers
|
||||
*
|
||||
@@ -1014,8 +1105,6 @@ Retry:
|
||||
/* The buffered request is good enough, return that index */
|
||||
if (is_prefetch)
|
||||
pgBufferUsage.prefetch.duplicates++;
|
||||
else
|
||||
pgBufferUsage.prefetch.hits++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@@ -1117,6 +1206,7 @@ Retry:
|
||||
slot->buftag = hashkey.buftag;
|
||||
slot->shard_no = get_shard_number(&tag);
|
||||
slot->my_ring_index = ring_index;
|
||||
slot->flags = 0;
|
||||
|
||||
min_ring_index = Min(min_ring_index, ring_index);
|
||||
|
||||
@@ -2057,8 +2147,7 @@ GetLastWrittenLSNv(NRelFileInfo relfilenode, ForkNumber forknum,
|
||||
*/
|
||||
static void
|
||||
neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
neon_request_lsns *output, BlockNumber nblocks,
|
||||
const bits8 *mask)
|
||||
neon_request_lsns *output, BlockNumber nblocks)
|
||||
{
|
||||
XLogRecPtr last_written_lsns[PG_IOV_MAX];
|
||||
|
||||
@@ -2146,9 +2235,6 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
neon_request_lsns *result = &output[i];
|
||||
XLogRecPtr last_written_lsn = last_written_lsns[i];
|
||||
|
||||
if (PointerIsValid(mask) && !BITMAP_ISSET(mask, i))
|
||||
continue;
|
||||
|
||||
if (last_written_lsn > replay_lsn)
|
||||
{
|
||||
/* GetCurrentReplayRecPtr was introduced in v15 */
|
||||
@@ -2191,8 +2277,6 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
neon_request_lsns *result = &output[i];
|
||||
XLogRecPtr last_written_lsn = last_written_lsns[i];
|
||||
|
||||
if (PointerIsValid(mask) && !BITMAP_ISSET(mask, i))
|
||||
continue;
|
||||
/*
|
||||
* Use the latest LSN that was evicted from the buffer cache as the
|
||||
* 'not_modified_since' hint. Any pages modified by later WAL records
|
||||
@@ -2414,7 +2498,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
|
||||
}
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum,
|
||||
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1, NULL);
|
||||
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1);
|
||||
{
|
||||
NeonExistsRequest request = {
|
||||
.hdr.tag = T_NeonExistsRequest,
|
||||
@@ -2833,8 +2917,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
while (nblocks > 0)
|
||||
{
|
||||
int iterblocks = Min(nblocks, PG_IOV_MAX);
|
||||
bits8 lfc_present[PG_IOV_MAX / 8];
|
||||
memset(lfc_present, 0, sizeof(lfc_present));
|
||||
bits8 lfc_present[PG_IOV_MAX / 8] = {0};
|
||||
|
||||
if (lfc_cache_containsv(InfoFromSMgrRel(reln), forknum, blocknum,
|
||||
iterblocks, lfc_present) == iterblocks)
|
||||
@@ -2845,12 +2928,13 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
}
|
||||
|
||||
tag.blockNum = blocknum;
|
||||
|
||||
|
||||
for (int i = 0; i < PG_IOV_MAX / 8; i++)
|
||||
lfc_present[i] = ~(lfc_present[i]);
|
||||
|
||||
ring_index = prefetch_register_bufferv(tag, NULL, iterblocks,
|
||||
lfc_present, true);
|
||||
|
||||
nblocks -= iterblocks;
|
||||
blocknum += iterblocks;
|
||||
|
||||
@@ -3106,7 +3190,8 @@ Retry:
|
||||
}
|
||||
}
|
||||
memcpy(buffer, getpage_resp->page, BLCKSZ);
|
||||
lfc_write(rinfo, forkNum, blockno, buffer);
|
||||
if (!lfc_store_prefetch_result)
|
||||
lfc_write(rinfo, forkNum, blockno, buffer);
|
||||
break;
|
||||
}
|
||||
case T_NeonErrorResponse:
|
||||
@@ -3191,6 +3276,17 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
}
|
||||
|
||||
/* Try to read PS results if they are available */
|
||||
prefetch_pump_state();
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1);
|
||||
|
||||
if (prefetch_lookup(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, buffer))
|
||||
{
|
||||
/* Prefetch hit */
|
||||
return;
|
||||
}
|
||||
|
||||
/* Try to read from local file cache */
|
||||
if (lfc_read(InfoFromSMgrRel(reln), forkNum, blkno, buffer))
|
||||
{
|
||||
@@ -3198,9 +3294,11 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
return;
|
||||
}
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1, NULL);
|
||||
neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsns, buffer);
|
||||
|
||||
/*
|
||||
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
|
||||
*/
|
||||
prefetch_pump_state();
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
@@ -3281,11 +3379,14 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
#if PG_MAJORVERSION_NUM >= 17
|
||||
static void
|
||||
neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
void **buffers, BlockNumber nblocks)
|
||||
void **buffers, BlockNumber nblocks)
|
||||
{
|
||||
bits8 prefetch_hits[PG_IOV_MAX / 8] = {0};
|
||||
bits8 lfc_hits[PG_IOV_MAX / 8];
|
||||
bits8 read[PG_IOV_MAX / 8];
|
||||
neon_request_lsns request_lsns[PG_IOV_MAX];
|
||||
int lfc_result;
|
||||
int prefetch_result;
|
||||
|
||||
switch (reln->smgr_relpersistence)
|
||||
{
|
||||
@@ -3308,38 +3409,52 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
neon_log(ERROR, "Read request too large: %d is larger than max %d",
|
||||
nblocks, PG_IOV_MAX);
|
||||
|
||||
memset(read, 0, sizeof(read));
|
||||
/* Try to read PS results if they are available */
|
||||
prefetch_pump_state();
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum,
|
||||
request_lsns, nblocks);
|
||||
|
||||
|
||||
prefetch_result = prefetch_lookupv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns, nblocks, buffers, prefetch_hits);
|
||||
|
||||
if (prefetch_result == nblocks)
|
||||
return;
|
||||
|
||||
/* invert the result: exclude prefetched blocks */
|
||||
for (int i = 0; i < PG_IOV_MAX / 8; i++)
|
||||
lfc_hits[i] = ~prefetch_hits[i];
|
||||
|
||||
/* Try to read from local file cache */
|
||||
lfc_result = lfc_readv_select(InfoFromSMgrRel(reln), forknum, blocknum, buffers,
|
||||
nblocks, read);
|
||||
nblocks, lfc_hits);
|
||||
|
||||
if (lfc_result > 0)
|
||||
MyNeonCounters->file_cache_hits_total += lfc_result;
|
||||
|
||||
/* Read all blocks from LFC, so we're done */
|
||||
if (lfc_result == nblocks)
|
||||
if (prefetch_result + lfc_result == nblocks)
|
||||
return;
|
||||
|
||||
if (lfc_result == -1)
|
||||
if (lfc_result <= 0)
|
||||
{
|
||||
/* can't use the LFC result, so read all blocks from PS */
|
||||
for (int i = 0; i < PG_IOV_MAX / 8; i++)
|
||||
read[i] = 0xFF;
|
||||
read[i] = ~prefetch_hits[i];
|
||||
}
|
||||
else
|
||||
{
|
||||
/* invert the result: exclude blocks read from lfc */
|
||||
for (int i = 0; i < PG_IOV_MAX / 8; i++)
|
||||
read[i] = ~(read[i]);
|
||||
read[i] = ~(prefetch_hits[i] | lfc_hits[i]);
|
||||
}
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum,
|
||||
request_lsns, nblocks, read);
|
||||
|
||||
neon_read_at_lsnv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns,
|
||||
buffers, nblocks, read);
|
||||
|
||||
/*
|
||||
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
|
||||
*/
|
||||
prefetch_pump_state();
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
@@ -3611,7 +3726,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
|
||||
}
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum,
|
||||
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1, NULL);
|
||||
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1);
|
||||
|
||||
{
|
||||
NeonNblocksRequest request = {
|
||||
@@ -3696,7 +3811,7 @@ neon_dbsize(Oid dbNode)
|
||||
NRelFileInfo dummy_node = {0};
|
||||
|
||||
neon_get_request_lsns(dummy_node, MAIN_FORKNUM,
|
||||
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1, NULL);
|
||||
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1);
|
||||
|
||||
{
|
||||
NeonDbSizeRequest request = {
|
||||
@@ -4431,7 +4546,12 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
|
||||
if (no_redo_needed)
|
||||
{
|
||||
SetLastWrittenLSNForBlock(end_recptr, rinfo, forknum, blkno);
|
||||
lfc_evict(rinfo, forknum, blkno);
|
||||
/*
|
||||
* Redo changes if page exists in LFC.
|
||||
* We should perform this check after assigning LwLSN to prevent
|
||||
* prefetching of some older version of the page by some other backend.
|
||||
*/
|
||||
no_redo_needed = !lfc_cache_contains(rinfo, forknum, blkno);
|
||||
}
|
||||
|
||||
LWLockRelease(partitionLock);
|
||||
|
||||
@@ -32,8 +32,8 @@
|
||||
|
||||
#include "inmem_smgr.h"
|
||||
|
||||
/* Size of the in-memory smgr */
|
||||
#define MAX_PAGES 64
|
||||
/* Size of the in-memory smgr: XLR_MAX_BLOCK_ID is 32, but we can update up to 3 forks for each block */
|
||||
#define MAX_PAGES 100
|
||||
|
||||
/* If more than WARN_PAGES are used, print a warning in the log */
|
||||
#define WARN_PAGES 32
|
||||
@@ -285,12 +285,12 @@ inmem_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
* WARN_PAGES, print a warning so that we get alerted and get to
|
||||
* investigate why we're accessing so many buffers.
|
||||
*/
|
||||
elog(used_pages >= WARN_PAGES ? WARNING : DEBUG1,
|
||||
"inmem_write() called for %u/%u/%u.%u blk %u: used_pages %u",
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)),
|
||||
forknum,
|
||||
blocknum,
|
||||
used_pages);
|
||||
if (used_pages >= WARN_PAGES)
|
||||
ereport(WARNING, (errmsg("inmem_write() called for %u/%u/%u.%u blk %u: used_pages %u",
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)),
|
||||
forknum,
|
||||
blocknum,
|
||||
used_pages), errbacktrace()));
|
||||
if (used_pages == MAX_PAGES)
|
||||
elog(ERROR, "Inmem storage overflow");
|
||||
|
||||
|
||||
@@ -142,7 +142,7 @@ static BufferTag target_redo_tag;
|
||||
|
||||
static XLogReaderState *reader_state;
|
||||
|
||||
#define TRACE LOG
|
||||
#define TRACE DEBUG1
|
||||
|
||||
#ifdef HAVE_LIBSECCOMP
|
||||
|
||||
@@ -194,6 +194,7 @@ static PgSeccompRule allowed_syscalls[] =
|
||||
* is stored in MyProcPid anyway.
|
||||
*/
|
||||
PG_SCMP_ALLOW(getpid),
|
||||
PG_SCMP_ALLOW(futex), /* needed for errbacktrace */
|
||||
|
||||
/* Enable those for a proper shutdown. */
|
||||
#if 0
|
||||
@@ -253,7 +254,7 @@ WalRedoMain(int argc, char *argv[])
|
||||
* which is super strange but that's not something we can solve
|
||||
* for here. ¯\_(-_-)_/¯
|
||||
*/
|
||||
SetConfigOption("log_min_messages", "FATAL", PGC_SUSET, PGC_S_OVERRIDE);
|
||||
SetConfigOption("log_min_messages", "WARNING", PGC_SUSET, PGC_S_OVERRIDE);
|
||||
SetConfigOption("client_min_messages", "ERROR", PGC_SUSET,
|
||||
PGC_S_OVERRIDE);
|
||||
|
||||
@@ -758,6 +759,11 @@ BeginRedoForBlock(StringInfo input_message)
|
||||
{
|
||||
reln->smgr_cached_nblocks[forknum] = blknum + 1;
|
||||
}
|
||||
if (target_redo_tag.forkNum == MAIN_FORKNUM)
|
||||
{
|
||||
reln->smgr_cached_nblocks[FSM_FORKNUM] = MaxBlockNumber;
|
||||
reln->smgr_cached_nblocks[VISIBILITYMAP_FORKNUM] = MaxBlockNumber;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1053,6 +1059,9 @@ GetPage(StringInfo input_message)
|
||||
DropRelationAllLocalBuffers(rinfo);
|
||||
wal_redo_buffer = InvalidBuffer;
|
||||
|
||||
/* Remove relation from SMGR relastion cache */
|
||||
AtEOXact_SMgr();
|
||||
|
||||
elog(TRACE, "Page sent back for block %u", blknum);
|
||||
}
|
||||
|
||||
|
||||
@@ -279,9 +279,12 @@ impl ClientInnerCommon<postgres_client::Client> {
|
||||
local_data.jti += 1;
|
||||
let token = resign_jwt(&local_data.key, payload, local_data.jti)?;
|
||||
|
||||
// discard all cannot run in a transaction. must be executed alone.
|
||||
self.inner.batch_execute("discard all").await?;
|
||||
|
||||
// initiates the auth session
|
||||
// this is safe from query injections as the jwt format free of any escape characters.
|
||||
let query = format!("discard all; select auth.jwt_session_init('{token}')");
|
||||
let query = format!("select auth.jwt_session_init('{token}')");
|
||||
self.inner.batch_execute(&query).await?;
|
||||
|
||||
let pid = self.inner.get_process_id();
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[toolchain]
|
||||
channel = "1.84.1"
|
||||
channel = "1.85.0"
|
||||
profile = "default"
|
||||
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
|
||||
# https://rust-lang.github.io/rustup/concepts/profiles.html
|
||||
|
||||
@@ -5,7 +5,10 @@
|
||||
|
||||
use http_utils::error::HttpErrorBody;
|
||||
use reqwest::{IntoUrl, Method, StatusCode};
|
||||
use safekeeper_api::models::{SafekeeperUtilization, TimelineCreateRequest, TimelineStatus};
|
||||
use safekeeper_api::models::{
|
||||
PullTimelineRequest, PullTimelineResponse, SafekeeperUtilization, TimelineCreateRequest,
|
||||
TimelineStatus,
|
||||
};
|
||||
use std::error::Error as _;
|
||||
use utils::{
|
||||
id::{NodeId, TenantId, TimelineId},
|
||||
@@ -88,6 +91,12 @@ impl Client {
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn pull_timeline(&self, req: &PullTimelineRequest) -> Result<PullTimelineResponse> {
|
||||
let uri = format!("{}/v1/pull_timeline", self.mgmt_api_endpoint);
|
||||
let resp = self.post(&uri, req).await?;
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn delete_timeline(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
@@ -128,7 +137,7 @@ impl Client {
|
||||
}
|
||||
|
||||
pub async fn utilization(&self) -> Result<SafekeeperUtilization> {
|
||||
let uri = format!("{}/v1/utilization/", self.mgmt_api_endpoint);
|
||||
let uri = format!("{}/v1/utilization", self.mgmt_api_endpoint);
|
||||
let resp = self.get(&uri).await?;
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
@@ -235,7 +235,7 @@ impl Storage for FileStorage {
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use safekeeper_api::membership::{Configuration, MemberSet};
|
||||
use safekeeper_api::membership::{Configuration, MemberSet, SafekeeperGeneration};
|
||||
use tokio::fs;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -246,7 +246,7 @@ mod test {
|
||||
let tempdir = camino_tempfile::tempdir()?;
|
||||
let mut state = TimelinePersistentState::empty();
|
||||
state.mconf = Configuration {
|
||||
generation: 42,
|
||||
generation: SafekeeperGeneration::new(42),
|
||||
members: MemberSet::empty(),
|
||||
new_members: None,
|
||||
};
|
||||
|
||||
@@ -2,6 +2,7 @@ use http_utils::failpoints::failpoints_handler;
|
||||
use hyper::{Body, Request, Response, StatusCode};
|
||||
use safekeeper_api::models;
|
||||
use safekeeper_api::models::AcceptorStateStatus;
|
||||
use safekeeper_api::models::PullTimelineRequest;
|
||||
use safekeeper_api::models::SafekeeperStatus;
|
||||
use safekeeper_api::models::TermSwitchApiEntry;
|
||||
use safekeeper_api::models::TimelineStatus;
|
||||
@@ -230,7 +231,7 @@ async fn timeline_delete_handler(mut request: Request<Body>) -> Result<Response<
|
||||
async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permission(&request, None)?;
|
||||
|
||||
let data: pull_timeline::Request = json_request(&mut request).await?;
|
||||
let data: PullTimelineRequest = json_request(&mut request).await?;
|
||||
let conf = get_conf(&request);
|
||||
let global_timelines = get_global_timelines(&request);
|
||||
|
||||
|
||||
@@ -4,10 +4,13 @@ use camino::Utf8PathBuf;
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures::{SinkExt, StreamExt, TryStreamExt};
|
||||
use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
|
||||
use safekeeper_api::{models::TimelineStatus, Term};
|
||||
use safekeeper_api::{
|
||||
models::{PullTimelineRequest, PullTimelineResponse, TimelineStatus},
|
||||
Term,
|
||||
};
|
||||
use safekeeper_client::mgmt_api;
|
||||
use safekeeper_client::mgmt_api::Client;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde::Deserialize;
|
||||
use std::{
|
||||
cmp::min,
|
||||
io::{self, ErrorKind},
|
||||
@@ -33,7 +36,7 @@ use crate::{
|
||||
};
|
||||
use utils::{
|
||||
crashsafe::fsync_async_opt,
|
||||
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
|
||||
id::{NodeId, TenantTimelineId},
|
||||
logging::SecretString,
|
||||
lsn::Lsn,
|
||||
pausable_failpoint,
|
||||
@@ -378,21 +381,6 @@ impl WalResidentTimeline {
|
||||
}
|
||||
}
|
||||
|
||||
/// pull_timeline request body.
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct Request {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub http_hosts: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct Response {
|
||||
// Donor safekeeper host
|
||||
pub safekeeper_host: String,
|
||||
// TODO: add more fields?
|
||||
}
|
||||
|
||||
/// Response for debug dump request.
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct DebugDumpResponse {
|
||||
@@ -405,10 +393,10 @@ pub struct DebugDumpResponse {
|
||||
|
||||
/// Find the most advanced safekeeper and pull timeline from it.
|
||||
pub async fn handle_request(
|
||||
request: Request,
|
||||
request: PullTimelineRequest,
|
||||
sk_auth_token: Option<SecretString>,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
) -> Result<Response> {
|
||||
) -> Result<PullTimelineResponse> {
|
||||
let existing_tli = global_timelines.get(TenantTimelineId::new(
|
||||
request.tenant_id,
|
||||
request.timeline_id,
|
||||
@@ -460,7 +448,7 @@ async fn pull_timeline(
|
||||
host: String,
|
||||
sk_auth_token: Option<SecretString>,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
) -> Result<Response> {
|
||||
) -> Result<PullTimelineResponse> {
|
||||
let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
|
||||
info!(
|
||||
"pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}",
|
||||
@@ -535,7 +523,7 @@ async fn pull_timeline(
|
||||
.load_temp_timeline(ttid, &tli_dir_path, false)
|
||||
.await?;
|
||||
|
||||
Ok(Response {
|
||||
Ok(PullTimelineResponse {
|
||||
safekeeper_host: host,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1004,7 +1004,7 @@ mod tests {
|
||||
|
||||
use postgres_ffi::{XLogSegNo, WAL_SEGMENT_SIZE};
|
||||
use safekeeper_api::{
|
||||
membership::{Configuration, MemberSet, SafekeeperId},
|
||||
membership::{Configuration, MemberSet, SafekeeperGeneration, SafekeeperId},
|
||||
ServerInfo,
|
||||
};
|
||||
|
||||
@@ -1303,7 +1303,7 @@ mod tests {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
mconf: Configuration {
|
||||
generation: 42,
|
||||
generation: SafekeeperGeneration::new(42),
|
||||
members: MemberSet::new(vec![SafekeeperId {
|
||||
id: NodeId(1),
|
||||
host: "hehe.org".to_owned(),
|
||||
|
||||
@@ -295,6 +295,10 @@ impl InterpretedWalReader {
|
||||
|
||||
let mut wal_decoder = WalStreamDecoder::new(start_pos, self.pg_version);
|
||||
|
||||
// Tracks the start of the PG WAL LSN from which the current batch of
|
||||
// interpreted records originated.
|
||||
let mut current_batch_wal_start_lsn: Option<Lsn> = None;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Main branch for reading WAL and forwarding it
|
||||
@@ -302,7 +306,7 @@ impl InterpretedWalReader {
|
||||
let wal = wal_or_reset.map(|wor| wor.get_wal().expect("reset handled in select branch below"));
|
||||
let WalBytes {
|
||||
wal,
|
||||
wal_start_lsn: _,
|
||||
wal_start_lsn,
|
||||
wal_end_lsn,
|
||||
available_wal_end_lsn,
|
||||
} = match wal {
|
||||
@@ -315,6 +319,12 @@ impl InterpretedWalReader {
|
||||
}
|
||||
};
|
||||
|
||||
// We will already have a value if the previous chunks of WAL
|
||||
// did not decode into anything useful.
|
||||
if current_batch_wal_start_lsn.is_none() {
|
||||
current_batch_wal_start_lsn = Some(wal_start_lsn);
|
||||
}
|
||||
|
||||
wal_decoder.feed_bytes(&wal);
|
||||
|
||||
// Deserialize and interpret WAL records from this batch of WAL.
|
||||
@@ -363,7 +373,9 @@ impl InterpretedWalReader {
|
||||
|
||||
let max_next_record_lsn = match max_next_record_lsn {
|
||||
Some(lsn) => lsn,
|
||||
None => { continue; }
|
||||
None => {
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Update the current position such that new receivers can decide
|
||||
@@ -377,21 +389,38 @@ impl InterpretedWalReader {
|
||||
}
|
||||
}
|
||||
|
||||
let batch_wal_start_lsn = current_batch_wal_start_lsn.take().unwrap();
|
||||
|
||||
// Send interpreted records downstream. Anything that has already been seen
|
||||
// by a shard is filtered out.
|
||||
let mut shard_senders_to_remove = Vec::new();
|
||||
for (shard, states) in &mut self.shard_senders {
|
||||
for state in states {
|
||||
if max_next_record_lsn <= state.next_record_lsn {
|
||||
continue;
|
||||
}
|
||||
|
||||
let shard_sender_id = ShardSenderId::new(*shard, state.sender_id);
|
||||
let records = records_by_sender.remove(&shard_sender_id).unwrap_or_default();
|
||||
|
||||
let batch = InterpretedWalRecords {
|
||||
records,
|
||||
next_record_lsn: Some(max_next_record_lsn),
|
||||
let batch = if max_next_record_lsn > state.next_record_lsn {
|
||||
// This batch contains at least one record that this shard has not
|
||||
// seen yet.
|
||||
let records = records_by_sender.remove(&shard_sender_id).unwrap_or_default();
|
||||
|
||||
InterpretedWalRecords {
|
||||
records,
|
||||
next_record_lsn: max_next_record_lsn,
|
||||
raw_wal_start_lsn: Some(batch_wal_start_lsn),
|
||||
}
|
||||
} else if wal_end_lsn > state.next_record_lsn {
|
||||
// All the records in this batch were seen by the shard
|
||||
// However, the batch maps to a chunk of WAL that the
|
||||
// shard has not yet seen. Notify it of the start LSN
|
||||
// of the PG WAL chunk such that it doesn't look like a gap.
|
||||
InterpretedWalRecords {
|
||||
records: Vec::default(),
|
||||
next_record_lsn: state.next_record_lsn,
|
||||
raw_wal_start_lsn: Some(batch_wal_start_lsn),
|
||||
}
|
||||
} else {
|
||||
// The shard has seen this chunk of WAL before. Skip it.
|
||||
continue;
|
||||
};
|
||||
|
||||
let res = state.tx.send(Batch {
|
||||
@@ -403,7 +432,7 @@ impl InterpretedWalReader {
|
||||
if res.is_err() {
|
||||
shard_senders_to_remove.push(shard_sender_id);
|
||||
} else {
|
||||
state.next_record_lsn = max_next_record_lsn;
|
||||
state.next_record_lsn = std::cmp::max(state.next_record_lsn, max_next_record_lsn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,9 @@ build_tag = os.environ["BUILD_TAG"]
|
||||
branch = os.environ["BRANCH"]
|
||||
dev_acr = os.environ["DEV_ACR"]
|
||||
prod_acr = os.environ["PROD_ACR"]
|
||||
dev_aws = os.environ["DEV_AWS"]
|
||||
prod_aws = os.environ["PROD_AWS"]
|
||||
aws_region = os.environ["AWS_REGION"]
|
||||
|
||||
components = {
|
||||
"neon": ["neon"],
|
||||
@@ -24,11 +27,11 @@ components = {
|
||||
registries = {
|
||||
"dev": [
|
||||
"docker.io/neondatabase",
|
||||
"369495373322.dkr.ecr.eu-central-1.amazonaws.com",
|
||||
f"{dev_aws}.dkr.ecr.{aws_region}.amazonaws.com",
|
||||
f"{dev_acr}.azurecr.io/neondatabase",
|
||||
],
|
||||
"prod": [
|
||||
"093970136003.dkr.ecr.eu-central-1.amazonaws.com",
|
||||
f"{prod_aws}.dkr.ecr.{aws_region}.amazonaws.com",
|
||||
f"{prod_acr}.azurecr.io/neondatabase",
|
||||
],
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ hex.workspace = true
|
||||
hyper0.workspace = true
|
||||
humantime.workspace = true
|
||||
itertools.workspace = true
|
||||
json-structural-diff.workspace = true
|
||||
lasso.workspace = true
|
||||
once_cell.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
@@ -34,6 +35,7 @@ reqwest = { workspace = true, features = ["stream"] }
|
||||
routerify.workspace = true
|
||||
safekeeper_api.workspace = true
|
||||
safekeeper_client.workspace = true
|
||||
tikv-jemallocator.workspace = true
|
||||
regex.workspace = true
|
||||
rustls-native-certs.workspace = true
|
||||
serde.workspace = true
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
ALTER TABLE nodes DROP listen_https_port;
|
||||
@@ -0,0 +1 @@
|
||||
ALTER TABLE nodes ADD listen_https_port INTEGER;
|
||||
@@ -10,7 +10,10 @@ use std::{
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use pageserver_api::{controller_api::NodeAvailability, models::PageserverUtilization};
|
||||
use pageserver_api::{
|
||||
controller_api::{NodeAvailability, SkSchedulingPolicy},
|
||||
models::PageserverUtilization,
|
||||
};
|
||||
|
||||
use thiserror::Error;
|
||||
use utils::{id::NodeId, logging::SecretString};
|
||||
@@ -137,8 +140,13 @@ where
|
||||
request = self.receiver.recv() => {
|
||||
match request {
|
||||
Some(req) => {
|
||||
if req.reply.is_closed() {
|
||||
// Prevent a possibly infinite buildup of the receiver channel, if requests arrive faster than we can handle them
|
||||
continue;
|
||||
}
|
||||
let res = self.heartbeat(req.servers).await;
|
||||
req.reply.send(res).unwrap();
|
||||
// Ignore the return value in order to not panic if the heartbeat function's future was cancelled
|
||||
_ = req.reply.send(res);
|
||||
},
|
||||
None => { return; }
|
||||
}
|
||||
@@ -311,6 +319,9 @@ impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, Safe
|
||||
|
||||
let mut heartbeat_futs = FuturesUnordered::new();
|
||||
for (node_id, sk) in &*safekeepers {
|
||||
if sk.scheduling_policy() == SkSchedulingPolicy::Decomissioned {
|
||||
continue;
|
||||
}
|
||||
heartbeat_futs.push({
|
||||
let jwt_token = self
|
||||
.jwt_token
|
||||
@@ -340,7 +351,13 @@ impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, Safe
|
||||
// We ignore the node in this case.
|
||||
return None;
|
||||
}
|
||||
Err(_) => SafekeeperState::Offline,
|
||||
Err(e) => {
|
||||
tracing::info!(
|
||||
"Marking safekeeper {} at as offline: {e}",
|
||||
sk.base_url()
|
||||
);
|
||||
SafekeeperState::Offline
|
||||
}
|
||||
};
|
||||
|
||||
Some((*node_id, status))
|
||||
|
||||
@@ -9,7 +9,10 @@ use crate::service::{LeadershipStatus, Service, RECONCILE_TIMEOUT, STARTUP_RECON
|
||||
use anyhow::Context;
|
||||
use futures::Future;
|
||||
use http_utils::{
|
||||
endpoint::{self, auth_middleware, check_permission_with, request_span},
|
||||
endpoint::{
|
||||
self, auth_middleware, check_permission_with, profile_cpu_handler, profile_heap_handler,
|
||||
request_span,
|
||||
},
|
||||
error::ApiError,
|
||||
failpoints::failpoints_handler,
|
||||
json::{json_request, json_response},
|
||||
@@ -54,7 +57,7 @@ pub struct HttpState {
|
||||
service: Arc<crate::service::Service>,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
neon_metrics: NeonMetrics,
|
||||
allowlist_routes: Vec<Uri>,
|
||||
allowlist_routes: &'static [&'static str],
|
||||
}
|
||||
|
||||
impl HttpState {
|
||||
@@ -63,15 +66,17 @@ impl HttpState {
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
build_info: BuildInfo,
|
||||
) -> Self {
|
||||
let allowlist_routes = ["/status", "/ready", "/metrics"]
|
||||
.iter()
|
||||
.map(|v| v.parse().unwrap())
|
||||
.collect::<Vec<_>>();
|
||||
Self {
|
||||
service,
|
||||
auth,
|
||||
neon_metrics: NeonMetrics::new(build_info),
|
||||
allowlist_routes,
|
||||
allowlist_routes: &[
|
||||
"/status",
|
||||
"/ready",
|
||||
"/metrics",
|
||||
"/profile/cpu",
|
||||
"/profile/heap",
|
||||
],
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -516,6 +521,24 @@ async fn handle_tenant_timeline_block_unblock_gc(
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn handle_tenant_timeline_download_heatmap_layers(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
|
||||
|
||||
check_permissions(&req, Scope::PageServerApi)?;
|
||||
|
||||
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
|
||||
let concurrency: Option<usize> = parse_query_param(&req, "concurrency")?;
|
||||
|
||||
service
|
||||
.tenant_timeline_download_heatmap_layers(tenant_shard_id, timeline_id, concurrency)
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
// For metric labels where we would like to include the approximate path, but exclude high-cardinality fields like query parameters
|
||||
// and tenant/timeline IDs. Since we are proxying to arbitrary paths, we don't have routing templates to
|
||||
// compare to, so we can just filter out our well known ID format with regexes.
|
||||
@@ -575,7 +598,10 @@ async fn handle_tenant_timeline_passthrough(
|
||||
|
||||
let _timer = latency.start_timer(labels.clone());
|
||||
|
||||
let client = mgmt_api::Client::new(node.base_url(), service.get_config().jwt_token.as_deref());
|
||||
let client = mgmt_api::Client::new(
|
||||
node.base_url(),
|
||||
service.get_config().pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
let resp = client.get_raw(path).await.map_err(|e|
|
||||
// We return 503 here because if we can't successfully send a request to the pageserver,
|
||||
// either we aren't available or the pageserver is unavailable.
|
||||
@@ -1331,10 +1357,7 @@ async fn handle_safekeeper_scheduling_policy(
|
||||
.set_safekeeper_scheduling_policy(id, body.scheduling_policy)
|
||||
.await?;
|
||||
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::NO_CONTENT)
|
||||
.body(Body::empty())
|
||||
.unwrap())
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
/// Common wrapper for request handlers that call into Service and will operate on tenants: they must only
|
||||
@@ -1398,23 +1421,26 @@ pub fn prologue_leadership_status_check_middleware<
|
||||
let state = get_state(&req);
|
||||
let leadership_status = state.service.get_leadership_status();
|
||||
|
||||
enum AllowedRoutes<'a> {
|
||||
enum AllowedRoutes {
|
||||
All,
|
||||
Some(Vec<&'a str>),
|
||||
Some(&'static [&'static str]),
|
||||
}
|
||||
|
||||
let allowed_routes = match leadership_status {
|
||||
LeadershipStatus::Leader => AllowedRoutes::All,
|
||||
LeadershipStatus::SteppedDown => AllowedRoutes::All,
|
||||
LeadershipStatus::Candidate => {
|
||||
AllowedRoutes::Some(["/ready", "/status", "/metrics"].to_vec())
|
||||
}
|
||||
LeadershipStatus::Candidate => AllowedRoutes::Some(&[
|
||||
"/ready",
|
||||
"/status",
|
||||
"/metrics",
|
||||
"/profile/cpu",
|
||||
"/profile/heap",
|
||||
]),
|
||||
};
|
||||
|
||||
let uri = req.uri().to_string();
|
||||
match allowed_routes {
|
||||
AllowedRoutes::All => Ok(req),
|
||||
AllowedRoutes::Some(allowed) if allowed.contains(&uri.as_str()) => Ok(req),
|
||||
AllowedRoutes::Some(allowed) if allowed.contains(&req.uri().path()) => Ok(req),
|
||||
_ => {
|
||||
tracing::info!(
|
||||
"Request {} not allowed due to current leadership state",
|
||||
@@ -1523,7 +1549,8 @@ enum ForwardOutcome {
|
||||
|
||||
/// Potentially forward the request to the current storage controler leader.
|
||||
/// More specifically we forward when:
|
||||
/// 1. Request is not one of ["/control/v1/step_down", "/status", "/ready", "/metrics"]
|
||||
/// 1. Request is not one of:
|
||||
/// ["/control/v1/step_down", "/status", "/ready", "/metrics", "/profile/cpu", "/profile/heap"]
|
||||
/// 2. Current instance is in [`LeadershipStatus::SteppedDown`] state
|
||||
/// 3. There is a leader in the database to forward to
|
||||
/// 4. Leader from step (3) is not the current instance
|
||||
@@ -1544,10 +1571,17 @@ enum ForwardOutcome {
|
||||
/// Hence, if we are in the edge case scenario the leader persisted in the database is the
|
||||
/// stepped down instance that received the request. Condition (4) above covers this scenario.
|
||||
async fn maybe_forward(req: Request<Body>) -> ForwardOutcome {
|
||||
const NOT_FOR_FORWARD: [&str; 4] = ["/control/v1/step_down", "/status", "/ready", "/metrics"];
|
||||
const NOT_FOR_FORWARD: &[&str] = &[
|
||||
"/control/v1/step_down",
|
||||
"/status",
|
||||
"/ready",
|
||||
"/metrics",
|
||||
"/profile/cpu",
|
||||
"/profile/heap",
|
||||
];
|
||||
|
||||
let uri = req.uri().to_string();
|
||||
let uri_for_forward = !NOT_FOR_FORWARD.contains(&uri.as_str());
|
||||
let uri = req.uri();
|
||||
let uri_for_forward = !NOT_FOR_FORWARD.contains(&uri.path());
|
||||
|
||||
// Fast return before trying to take any Service locks, if we will never forward anyway
|
||||
if !uri_for_forward {
|
||||
@@ -1747,7 +1781,7 @@ pub fn make_router(
|
||||
if auth.is_some() {
|
||||
router = router.middleware(auth_middleware(|request| {
|
||||
let state = get_state(request);
|
||||
if state.allowlist_routes.contains(request.uri()) {
|
||||
if state.allowlist_routes.contains(&request.uri().path()) {
|
||||
None
|
||||
} else {
|
||||
state.auth.as_deref()
|
||||
@@ -1760,13 +1794,19 @@ pub fn make_router(
|
||||
.get("/metrics", |r| {
|
||||
named_request_span(r, measured_metrics_handler, RequestName("metrics"))
|
||||
})
|
||||
// Non-prefixed generic endpoints (status, metrics)
|
||||
// Non-prefixed generic endpoints (status, metrics, profiling)
|
||||
.get("/status", |r| {
|
||||
named_request_span(r, handle_status, RequestName("status"))
|
||||
})
|
||||
.get("/ready", |r| {
|
||||
named_request_span(r, handle_ready, RequestName("ready"))
|
||||
})
|
||||
.get("/profile/cpu", |r| {
|
||||
named_request_span(r, profile_cpu_handler, RequestName("profile_cpu"))
|
||||
})
|
||||
.get("/profile/heap", |r| {
|
||||
named_request_span(r, profile_heap_handler, RequestName("profile_heap"))
|
||||
})
|
||||
// Upcalls for the pageserver: point the pageserver's `control_plane_api` config to this prefix
|
||||
.post("/upcall/v1/re-attach", |r| {
|
||||
named_request_span(r, handle_re_attach, RequestName("upcall_v1_reattach"))
|
||||
@@ -2078,6 +2118,16 @@ pub fn make_router(
|
||||
)
|
||||
},
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_heatmap_layers",
|
||||
|r| {
|
||||
tenant_service_handler(
|
||||
r,
|
||||
handle_tenant_timeline_download_heatmap_layers,
|
||||
RequestName("v1_tenant_timeline_download_heatmap_layers"),
|
||||
)
|
||||
},
|
||||
)
|
||||
// Tenant detail GET passthrough to shard zero:
|
||||
.get("/v1/tenant/:tenant_id", |r| {
|
||||
tenant_service_handler(
|
||||
|
||||
@@ -27,6 +27,16 @@ use utils::{project_build_tag, project_git_version, tcp_listener};
|
||||
project_git_version!(GIT_VERSION);
|
||||
project_build_tag!(BUILD_TAG);
|
||||
|
||||
#[global_allocator]
|
||||
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
|
||||
|
||||
/// Configure jemalloc to profile heap allocations by sampling stack traces every 2 MB (1 << 21).
|
||||
/// This adds roughly 3% overhead for allocations on average, which is acceptable considering
|
||||
/// performance-sensitive code will avoid allocations as far as possible anyway.
|
||||
#[allow(non_upper_case_globals)]
|
||||
#[export_name = "malloc_conf"]
|
||||
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:21\0";
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(author, version, about, long_about = None)]
|
||||
#[command(arg_required_else_help(true))]
|
||||
@@ -43,6 +53,10 @@ struct Cli {
|
||||
#[arg(long)]
|
||||
jwt_token: Option<String>,
|
||||
|
||||
/// Token for authenticating this service with the safekeepers it controls
|
||||
#[arg(long)]
|
||||
safekeeper_jwt_token: Option<String>,
|
||||
|
||||
/// Token for authenticating this service with the control plane, when calling
|
||||
/// the compute notification endpoint
|
||||
#[arg(long)]
|
||||
@@ -116,6 +130,10 @@ struct Cli {
|
||||
|
||||
#[arg(long)]
|
||||
long_reconcile_threshold: Option<humantime::Duration>,
|
||||
|
||||
// Flag to use https for requests to pageserver API.
|
||||
#[arg(long, default_value = "false")]
|
||||
use_https_pageserver_api: bool,
|
||||
}
|
||||
|
||||
enum StrictMode {
|
||||
@@ -139,7 +157,8 @@ impl Default for StrictMode {
|
||||
struct Secrets {
|
||||
database_url: String,
|
||||
public_key: Option<JwtAuth>,
|
||||
jwt_token: Option<String>,
|
||||
pageserver_jwt_token: Option<String>,
|
||||
safekeeper_jwt_token: Option<String>,
|
||||
control_plane_jwt_token: Option<String>,
|
||||
peer_jwt_token: Option<String>,
|
||||
}
|
||||
@@ -147,6 +166,7 @@ struct Secrets {
|
||||
impl Secrets {
|
||||
const DATABASE_URL_ENV: &'static str = "DATABASE_URL";
|
||||
const PAGESERVER_JWT_TOKEN_ENV: &'static str = "PAGESERVER_JWT_TOKEN";
|
||||
const SAFEKEEPER_JWT_TOKEN_ENV: &'static str = "SAFEKEEPER_JWT_TOKEN";
|
||||
const CONTROL_PLANE_JWT_TOKEN_ENV: &'static str = "CONTROL_PLANE_JWT_TOKEN";
|
||||
const PEER_JWT_TOKEN_ENV: &'static str = "PEER_JWT_TOKEN";
|
||||
const PUBLIC_KEY_ENV: &'static str = "PUBLIC_KEY";
|
||||
@@ -170,7 +190,14 @@ impl Secrets {
|
||||
let this = Self {
|
||||
database_url,
|
||||
public_key,
|
||||
jwt_token: Self::load_secret(&args.jwt_token, Self::PAGESERVER_JWT_TOKEN_ENV),
|
||||
pageserver_jwt_token: Self::load_secret(
|
||||
&args.jwt_token,
|
||||
Self::PAGESERVER_JWT_TOKEN_ENV,
|
||||
),
|
||||
safekeeper_jwt_token: Self::load_secret(
|
||||
&args.safekeeper_jwt_token,
|
||||
Self::SAFEKEEPER_JWT_TOKEN_ENV,
|
||||
),
|
||||
control_plane_jwt_token: Self::load_secret(
|
||||
&args.control_plane_jwt_token,
|
||||
Self::CONTROL_PLANE_JWT_TOKEN_ENV,
|
||||
@@ -250,11 +277,17 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
|
||||
let secrets = Secrets::load(&args).await?;
|
||||
|
||||
// TODO: once we've rolled out the safekeeper JWT token everywhere, put it into the validation code below
|
||||
tracing::info!(
|
||||
"safekeeper_jwt_token set: {:?}",
|
||||
secrets.safekeeper_jwt_token.is_some()
|
||||
);
|
||||
|
||||
// Validate required secrets and arguments are provided in strict mode
|
||||
match strict_mode {
|
||||
StrictMode::Strict
|
||||
if (secrets.public_key.is_none()
|
||||
|| secrets.jwt_token.is_none()
|
||||
|| secrets.pageserver_jwt_token.is_none()
|
||||
|| secrets.control_plane_jwt_token.is_none()) =>
|
||||
{
|
||||
// Production systems should always have secrets configured: if public_key was not set
|
||||
@@ -279,7 +312,8 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
let config = Config {
|
||||
jwt_token: secrets.jwt_token,
|
||||
pageserver_jwt_token: secrets.pageserver_jwt_token,
|
||||
safekeeper_jwt_token: secrets.safekeeper_jwt_token,
|
||||
control_plane_jwt_token: secrets.control_plane_jwt_token,
|
||||
peer_jwt_token: secrets.peer_jwt_token,
|
||||
compute_hook_url: args.compute_hook_url,
|
||||
@@ -311,6 +345,7 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
address_for_peers: args.address_for_peers,
|
||||
start_as_candidate: args.start_as_candidate,
|
||||
http_service_port: args.listen.port() as i32,
|
||||
use_https_pageserver_api: args.use_https_pageserver_api,
|
||||
};
|
||||
|
||||
// Validate that we can connect to the database
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::{str::FromStr, time::Duration};
|
||||
|
||||
use anyhow::anyhow;
|
||||
use pageserver_api::{
|
||||
controller_api::{
|
||||
AvailabilityZone, NodeAvailability, NodeDescribeResponse, NodeRegisterRequest,
|
||||
@@ -32,12 +33,16 @@ pub(crate) struct Node {
|
||||
|
||||
listen_http_addr: String,
|
||||
listen_http_port: u16,
|
||||
listen_https_port: Option<u16>,
|
||||
|
||||
listen_pg_addr: String,
|
||||
listen_pg_port: u16,
|
||||
|
||||
availability_zone_id: AvailabilityZone,
|
||||
|
||||
// Flag from storcon's config to use https for pageserver admin API.
|
||||
// Invariant: if |true|, listen_https_port should contain a value.
|
||||
use_https: bool,
|
||||
// This cancellation token means "stop any RPCs in flight to this node, and don't start
|
||||
// any more". It is not related to process shutdown.
|
||||
#[serde(skip)]
|
||||
@@ -56,7 +61,16 @@ pub(crate) enum AvailabilityTransition {
|
||||
|
||||
impl Node {
|
||||
pub(crate) fn base_url(&self) -> String {
|
||||
format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
|
||||
if self.use_https {
|
||||
format!(
|
||||
"https://{}:{}",
|
||||
self.listen_http_addr,
|
||||
self.listen_https_port
|
||||
.expect("https port should be specified if use_https is on")
|
||||
)
|
||||
} else {
|
||||
format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_id(&self) -> NodeId {
|
||||
@@ -82,11 +96,20 @@ impl Node {
|
||||
self.id == register_req.node_id
|
||||
&& self.listen_http_addr == register_req.listen_http_addr
|
||||
&& self.listen_http_port == register_req.listen_http_port
|
||||
// Note: listen_https_port may change. See [`Self::need_update`] for mode details.
|
||||
// && self.listen_https_port == register_req.listen_https_port
|
||||
&& self.listen_pg_addr == register_req.listen_pg_addr
|
||||
&& self.listen_pg_port == register_req.listen_pg_port
|
||||
&& self.availability_zone_id == register_req.availability_zone_id
|
||||
}
|
||||
|
||||
// Do we need to update an existing record in DB on this registration request?
|
||||
pub(crate) fn need_update(&self, register_req: &NodeRegisterRequest) -> bool {
|
||||
// listen_https_port is checked here because it may change during migration to https.
|
||||
// After migration, this check may be moved to registration_match.
|
||||
self.listen_https_port != register_req.listen_https_port
|
||||
}
|
||||
|
||||
/// For a shard located on this node, populate a response object
|
||||
/// with this node's address information.
|
||||
pub(crate) fn shard_location(&self, shard_id: TenantShardId) -> TenantLocateResponseShard {
|
||||
@@ -95,6 +118,7 @@ impl Node {
|
||||
node_id: self.id,
|
||||
listen_http_addr: self.listen_http_addr.clone(),
|
||||
listen_http_port: self.listen_http_port,
|
||||
listen_https_port: self.listen_https_port,
|
||||
listen_pg_addr: self.listen_pg_addr.clone(),
|
||||
listen_pg_port: self.listen_pg_port,
|
||||
}
|
||||
@@ -175,25 +199,34 @@ impl Node {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn new(
|
||||
id: NodeId,
|
||||
listen_http_addr: String,
|
||||
listen_http_port: u16,
|
||||
listen_https_port: Option<u16>,
|
||||
listen_pg_addr: String,
|
||||
listen_pg_port: u16,
|
||||
availability_zone_id: AvailabilityZone,
|
||||
) -> Self {
|
||||
Self {
|
||||
use_https: bool,
|
||||
) -> anyhow::Result<Self> {
|
||||
if use_https && listen_https_port.is_none() {
|
||||
return Err(anyhow!("https is enabled, but node has no https port"));
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
id,
|
||||
listen_http_addr,
|
||||
listen_http_port,
|
||||
listen_https_port,
|
||||
listen_pg_addr,
|
||||
listen_pg_port,
|
||||
scheduling: NodeSchedulingPolicy::Active,
|
||||
availability: NodeAvailability::Offline,
|
||||
availability_zone_id,
|
||||
use_https,
|
||||
cancel: CancellationToken::new(),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn to_persistent(&self) -> NodePersistence {
|
||||
@@ -202,14 +235,19 @@ impl Node {
|
||||
scheduling_policy: self.scheduling.into(),
|
||||
listen_http_addr: self.listen_http_addr.clone(),
|
||||
listen_http_port: self.listen_http_port as i32,
|
||||
listen_https_port: self.listen_https_port.map(|x| x as i32),
|
||||
listen_pg_addr: self.listen_pg_addr.clone(),
|
||||
listen_pg_port: self.listen_pg_port as i32,
|
||||
availability_zone_id: self.availability_zone_id.0.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn from_persistent(np: NodePersistence) -> Self {
|
||||
Self {
|
||||
pub(crate) fn from_persistent(np: NodePersistence, use_https: bool) -> anyhow::Result<Self> {
|
||||
if use_https && np.listen_https_port.is_none() {
|
||||
return Err(anyhow!("https is enabled, but node has no https port"));
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
id: NodeId(np.node_id as u64),
|
||||
// At startup we consider a node offline until proven otherwise.
|
||||
availability: NodeAvailability::Offline,
|
||||
@@ -217,11 +255,13 @@ impl Node {
|
||||
.expect("Bad scheduling policy in DB"),
|
||||
listen_http_addr: np.listen_http_addr,
|
||||
listen_http_port: np.listen_http_port as u16,
|
||||
listen_https_port: np.listen_https_port.map(|x| x as u16),
|
||||
listen_pg_addr: np.listen_pg_addr,
|
||||
listen_pg_port: np.listen_pg_port as u16,
|
||||
availability_zone_id: AvailabilityZone(np.availability_zone_id),
|
||||
use_https,
|
||||
cancel: CancellationToken::new(),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Wrapper for issuing requests to pageserver management API: takes care of generic
|
||||
@@ -285,8 +325,9 @@ impl Node {
|
||||
warn_threshold,
|
||||
max_retries,
|
||||
&format!(
|
||||
"Call to node {} ({}:{}) management API",
|
||||
self.id, self.listen_http_addr, self.listen_http_port
|
||||
"Call to node {} ({}) management API",
|
||||
self.id,
|
||||
self.base_url(),
|
||||
),
|
||||
cancel,
|
||||
)
|
||||
@@ -302,6 +343,7 @@ impl Node {
|
||||
availability_zone_id: self.availability_zone_id.0.clone(),
|
||||
listen_http_addr: self.listen_http_addr.clone(),
|
||||
listen_http_port: self.listen_http_port,
|
||||
listen_https_port: self.listen_https_port,
|
||||
listen_pg_addr: self.listen_pg_addr.clone(),
|
||||
listen_pg_port: self.listen_pg_port,
|
||||
}
|
||||
|
||||
@@ -280,6 +280,22 @@ impl PageserverClient {
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn timeline_download_heatmap_layers(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
concurrency: Option<usize>,
|
||||
) -> Result<()> {
|
||||
measured_request!(
|
||||
"download_heatmap_layers",
|
||||
crate::metrics::Method::Post,
|
||||
&self.node_id_label,
|
||||
self.inner
|
||||
.timeline_download_heatmap_layers(tenant_shard_id, timeline_id, concurrency)
|
||||
.await
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_utilization(&self) -> Result<PageserverUtilization> {
|
||||
measured_request!(
|
||||
"utilization",
|
||||
|
||||
@@ -375,18 +375,23 @@ impl Persistence {
|
||||
Ok(nodes)
|
||||
}
|
||||
|
||||
pub(crate) async fn update_node(
|
||||
pub(crate) async fn update_node<V>(
|
||||
&self,
|
||||
input_node_id: NodeId,
|
||||
input_scheduling: NodeSchedulingPolicy,
|
||||
) -> DatabaseResult<()> {
|
||||
values: V,
|
||||
) -> DatabaseResult<()>
|
||||
where
|
||||
V: diesel::AsChangeset<Target = crate::schema::nodes::table> + Clone + Send + Sync,
|
||||
V::Changeset: diesel::query_builder::QueryFragment<diesel::pg::Pg> + Send, // valid Postgres SQL
|
||||
{
|
||||
use crate::schema::nodes::dsl::*;
|
||||
let updated = self
|
||||
.with_measured_conn(DatabaseOperation::UpdateNode, move |conn| {
|
||||
let values = values.clone();
|
||||
Box::pin(async move {
|
||||
let updated = diesel::update(nodes)
|
||||
.filter(node_id.eq(input_node_id.0 as i64))
|
||||
.set((scheduling_policy.eq(String::from(input_scheduling)),))
|
||||
.set(values)
|
||||
.execute(conn)
|
||||
.await?;
|
||||
Ok(updated)
|
||||
@@ -403,6 +408,32 @@ impl Persistence {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn update_node_scheduling_policy(
|
||||
&self,
|
||||
input_node_id: NodeId,
|
||||
input_scheduling: NodeSchedulingPolicy,
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::nodes::dsl::*;
|
||||
self.update_node(
|
||||
input_node_id,
|
||||
scheduling_policy.eq(String::from(input_scheduling)),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn update_node_on_registration(
|
||||
&self,
|
||||
input_node_id: NodeId,
|
||||
input_https_port: Option<u16>,
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::nodes::dsl::*;
|
||||
self.update_node(
|
||||
input_node_id,
|
||||
listen_https_port.eq(input_https_port.map(|x| x as i32)),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// At startup, load the high level state for shards, such as their config + policy. This will
|
||||
/// be enriched at runtime with state discovered on pageservers.
|
||||
///
|
||||
@@ -1452,6 +1483,7 @@ pub(crate) struct NodePersistence {
|
||||
pub(crate) listen_pg_addr: String,
|
||||
pub(crate) listen_pg_port: i32,
|
||||
pub(crate) availability_zone_id: String,
|
||||
pub(crate) listen_https_port: Option<i32>,
|
||||
}
|
||||
|
||||
/// Tenant metadata health status that are stored durably.
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use crate::pageserver_client::PageserverClient;
|
||||
use crate::persistence::Persistence;
|
||||
use crate::{compute_hook, service};
|
||||
use json_structural_diff::JsonDiff;
|
||||
use pageserver_api::controller_api::{AvailabilityZone, MigrationConfig, PlacementPolicy};
|
||||
use pageserver_api::models::{
|
||||
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig, TenantWaitLsnRequest,
|
||||
@@ -24,7 +25,7 @@ use crate::compute_hook::{ComputeHook, NotifyError};
|
||||
use crate::node::Node;
|
||||
use crate::tenant_shard::{IntentState, ObservedState, ObservedStateDelta, ObservedStateLocation};
|
||||
|
||||
const DEFAULT_HEATMAP_PERIOD: &str = "60s";
|
||||
const DEFAULT_HEATMAP_PERIOD: Duration = Duration::from_secs(60);
|
||||
|
||||
/// Object with the lifetime of the background reconcile task that is created
|
||||
/// for tenants which have a difference between their intent and observed states.
|
||||
@@ -296,7 +297,7 @@ impl Reconciler {
|
||||
.location_config(tenant_shard_id, config.clone(), flush_ms, lazy)
|
||||
.await
|
||||
},
|
||||
&self.service_config.jwt_token,
|
||||
&self.service_config.pageserver_jwt_token,
|
||||
1,
|
||||
3,
|
||||
timeout,
|
||||
@@ -417,7 +418,7 @@ impl Reconciler {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.service_config.jwt_token.as_deref(),
|
||||
self.service_config.pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
|
||||
client
|
||||
@@ -440,7 +441,7 @@ impl Reconciler {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.service_config.jwt_token.as_deref(),
|
||||
self.service_config.pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
|
||||
let timelines = client.timeline_list(&tenant_shard_id).await?;
|
||||
@@ -478,7 +479,7 @@ impl Reconciler {
|
||||
)
|
||||
.await
|
||||
},
|
||||
&self.service_config.jwt_token,
|
||||
&self.service_config.pageserver_jwt_token,
|
||||
1,
|
||||
3,
|
||||
request_download_timeout * 2,
|
||||
@@ -771,7 +772,7 @@ impl Reconciler {
|
||||
let observed_conf = match attached_node
|
||||
.with_client_retries(
|
||||
|client| async move { client.get_location_config(tenant_shard_id).await },
|
||||
&self.service_config.jwt_token,
|
||||
&self.service_config.pageserver_jwt_token,
|
||||
1,
|
||||
1,
|
||||
Duration::from_secs(5),
|
||||
@@ -880,7 +881,27 @@ impl Reconciler {
|
||||
self.generation = Some(generation);
|
||||
wanted_conf.generation = generation.into();
|
||||
}
|
||||
tracing::info!(node_id=%node.get_id(), "Observed configuration requires update.");
|
||||
|
||||
let diff = match observed {
|
||||
Some(ObservedStateLocation {
|
||||
conf: Some(observed),
|
||||
}) => {
|
||||
let diff = JsonDiff::diff(
|
||||
&serde_json::to_value(observed.clone()).unwrap(),
|
||||
&serde_json::to_value(wanted_conf.clone()).unwrap(),
|
||||
false,
|
||||
);
|
||||
|
||||
if let Some(json_diff) = diff.diff {
|
||||
serde_json::to_string(&json_diff).unwrap_or("diff err".to_string())
|
||||
} else {
|
||||
"unknown".to_string()
|
||||
}
|
||||
}
|
||||
_ => "full".to_string(),
|
||||
};
|
||||
|
||||
tracing::info!(node_id=%node.get_id(), "Observed configuration requires update: {diff}");
|
||||
|
||||
// Because `node` comes from a ref to &self, clone it before calling into a &mut self
|
||||
// function: this could be avoided by refactoring the state mutated by location_config into
|
||||
@@ -1099,7 +1120,7 @@ impl Reconciler {
|
||||
match origin
|
||||
.with_client_retries(
|
||||
|client| async move { client.get_location_config(tenant_shard_id).await },
|
||||
&self.service_config.jwt_token,
|
||||
&self.service_config.pageserver_jwt_token,
|
||||
1,
|
||||
3,
|
||||
Duration::from_secs(5),
|
||||
@@ -1180,7 +1201,7 @@ fn ha_aware_config(config: &TenantConfig, has_secondaries: bool) -> TenantConfig
|
||||
let mut config = config.clone();
|
||||
if has_secondaries {
|
||||
if config.heatmap_period.is_none() {
|
||||
config.heatmap_period = Some(DEFAULT_HEATMAP_PERIOD.to_string());
|
||||
config.heatmap_period = Some(DEFAULT_HEATMAP_PERIOD);
|
||||
}
|
||||
} else {
|
||||
config.heatmap_period = None;
|
||||
|
||||
@@ -18,12 +18,14 @@ pub struct Safekeeper {
|
||||
cancel: CancellationToken,
|
||||
listen_http_addr: String,
|
||||
listen_http_port: u16,
|
||||
scheduling_policy: SkSchedulingPolicy,
|
||||
id: NodeId,
|
||||
availability: SafekeeperState,
|
||||
}
|
||||
|
||||
impl Safekeeper {
|
||||
pub(crate) fn from_persistence(skp: SafekeeperPersistence, cancel: CancellationToken) -> Self {
|
||||
let scheduling_policy = SkSchedulingPolicy::from_str(&skp.scheduling_policy).unwrap();
|
||||
Self {
|
||||
cancel,
|
||||
listen_http_addr: skp.host.clone(),
|
||||
@@ -31,6 +33,7 @@ impl Safekeeper {
|
||||
id: NodeId(skp.id as u64),
|
||||
skp,
|
||||
availability: SafekeeperState::Offline,
|
||||
scheduling_policy,
|
||||
}
|
||||
}
|
||||
pub(crate) fn base_url(&self) -> String {
|
||||
@@ -46,6 +49,13 @@ impl Safekeeper {
|
||||
pub(crate) fn set_availability(&mut self, availability: SafekeeperState) {
|
||||
self.availability = availability;
|
||||
}
|
||||
pub(crate) fn scheduling_policy(&self) -> SkSchedulingPolicy {
|
||||
self.scheduling_policy
|
||||
}
|
||||
pub(crate) fn set_scheduling_policy(&mut self, scheduling_policy: SkSchedulingPolicy) {
|
||||
self.scheduling_policy = scheduling_policy;
|
||||
self.skp.scheduling_policy = String::from(scheduling_policy);
|
||||
}
|
||||
/// Perform an operation (which is given a [`SafekeeperClient`]) with retries
|
||||
pub(crate) async fn with_client_retries<T, O, F>(
|
||||
&self,
|
||||
@@ -102,7 +112,7 @@ impl Safekeeper {
|
||||
warn_threshold,
|
||||
max_retries,
|
||||
&format!(
|
||||
"Call to node {} ({}:{}) management API",
|
||||
"Call to safekeeper {} ({}:{}) management API",
|
||||
self.id, self.listen_http_addr, self.listen_http_port
|
||||
),
|
||||
cancel,
|
||||
@@ -129,10 +139,8 @@ impl Safekeeper {
|
||||
self.id.0
|
||||
);
|
||||
}
|
||||
self.skp = crate::persistence::SafekeeperPersistence::from_upsert(
|
||||
record,
|
||||
SkSchedulingPolicy::from_str(&self.skp.scheduling_policy).unwrap(),
|
||||
);
|
||||
self.skp =
|
||||
crate::persistence::SafekeeperPersistence::from_upsert(record, self.scheduling_policy);
|
||||
self.listen_http_port = http_port as u16;
|
||||
self.listen_http_addr = host;
|
||||
}
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user