Merge pull request #10952 from neondatabase/rc/release-compute/2025-02-24

Compute release 2025-02-24
This commit is contained in:
Tristan Partin
2025-02-24 13:09:01 -06:00
committed by GitHub
213 changed files with 3446 additions and 1394 deletions

View File

@@ -28,3 +28,7 @@ config-variables:
- DEV_AWS_OIDC_ROLE_MANAGE_BENCHMARK_EC2_VMS_ARN
- SLACK_ON_CALL_STORAGE_STAGING_STREAM
- SLACK_CICD_CHANNEL_ID
- SLACK_STORAGE_CHANNEL_ID
- NEON_DEV_AWS_ACCOUNT_ID
- NEON_PROD_AWS_ACCOUNT_ID
- AWS_ECR_REGION

View File

@@ -38,9 +38,11 @@ runs:
#
- name: Set variables
shell: bash -euxo pipefail {0}
env:
PR_NUMBER: ${{ github.event.pull_request.number }}
BUCKET: neon-github-public-dev
run: |
PR_NUMBER=$(jq --raw-output .pull_request.number "$GITHUB_EVENT_PATH" || true)
if [ "${PR_NUMBER}" != "null" ]; then
if [ -n "${PR_NUMBER}" ]; then
BRANCH_OR_PR=pr-${PR_NUMBER}
elif [ "${GITHUB_REF_NAME}" = "main" ] || [ "${GITHUB_REF_NAME}" = "release" ] || \
[ "${GITHUB_REF_NAME}" = "release-proxy" ] || [ "${GITHUB_REF_NAME}" = "release-compute" ]; then
@@ -59,8 +61,6 @@ runs:
echo "LOCK_FILE=${LOCK_FILE}" >> $GITHUB_ENV
echo "WORKDIR=${WORKDIR}" >> $GITHUB_ENV
echo "BUCKET=${BUCKET}" >> $GITHUB_ENV
env:
BUCKET: neon-github-public-dev
# TODO: We can replace with a special docker image with Java and Allure pre-installed
- uses: actions/setup-java@v4
@@ -80,8 +80,8 @@ runs:
rm -f ${ALLURE_ZIP}
fi
env:
ALLURE_VERSION: 2.27.0
ALLURE_ZIP_SHA256: b071858fb2fa542c65d8f152c5c40d26267b2dfb74df1f1608a589ecca38e777
ALLURE_VERSION: 2.32.2
ALLURE_ZIP_SHA256: 3f28885e2118f6317c92f667eaddcc6491400af1fb9773c1f3797a5fa5174953
- uses: aws-actions/configure-aws-credentials@v4
if: ${{ !cancelled() }}

View File

@@ -18,9 +18,11 @@ runs:
steps:
- name: Set variables
shell: bash -euxo pipefail {0}
env:
PR_NUMBER: ${{ github.event.pull_request.number }}
REPORT_DIR: ${{ inputs.report-dir }}
run: |
PR_NUMBER=$(jq --raw-output .pull_request.number "$GITHUB_EVENT_PATH" || true)
if [ "${PR_NUMBER}" != "null" ]; then
if [ -n "${PR_NUMBER}" ]; then
BRANCH_OR_PR=pr-${PR_NUMBER}
elif [ "${GITHUB_REF_NAME}" = "main" ] || [ "${GITHUB_REF_NAME}" = "release" ] || \
[ "${GITHUB_REF_NAME}" = "release-proxy" ] || [ "${GITHUB_REF_NAME}" = "release-compute" ]; then
@@ -32,8 +34,6 @@ runs:
echo "BRANCH_OR_PR=${BRANCH_OR_PR}" >> $GITHUB_ENV
echo "REPORT_DIR=${REPORT_DIR}" >> $GITHUB_ENV
env:
REPORT_DIR: ${{ inputs.report-dir }}
- uses: aws-actions/configure-aws-credentials@v4
if: ${{ !cancelled() }}

View File

@@ -19,7 +19,11 @@ inputs:
default: '[1, 1]'
# settings below only needed if you want the project to be sharded from the beginning
shard_split_project:
description: 'by default new projects are not shard-split, specify true to shard-split'
description: 'by default new projects are not shard-split initiailly, but only when shard-split threshold is reached, specify true to explicitly shard-split initially'
required: false
default: 'false'
disable_sharding:
description: 'by default new projects use storage controller default policy to shard-split when shard-split threshold is reached, specify true to explicitly disable sharding'
required: false
default: 'false'
admin_api_key:
@@ -107,6 +111,21 @@ runs:
-H "Accept: application/json" -H "Content-Type: application/json" -H "Authorization: Bearer ${ADMIN_API_KEY}" \
-d "{\"new_shard_count\": $SHARD_COUNT, \"new_stripe_size\": $STRIPE_SIZE}"
fi
if [ "${DISABLE_SHARDING}" = "true" ]; then
# determine tenant ID
TENANT_ID=`${PSQL} ${dsn} -t -A -c "SHOW neon.tenant_id"`
echo "Explicitly disabling shard-splitting for project ${project_id} with tenant_id ${TENANT_ID}"
echo "Sending PUT request to https://${API_HOST}/regions/${REGION_ID}/api/v1/admin/storage/proxy/control/v1/tenant/${TENANT_ID}/policy"
echo "with body {\"scheduling\": \"Essential\"}"
# we need an ADMIN API KEY to invoke storage controller API for shard splitting (bash -u above checks that the variable is set)
curl -X PUT \
"https://${API_HOST}/regions/${REGION_ID}/api/v1/admin/storage/proxy/control/v1/tenant/${TENANT_ID}/policy" \
-H "Accept: application/json" -H "Content-Type: application/json" -H "Authorization: Bearer ${ADMIN_API_KEY}" \
-d "{\"scheduling\": \"Essential\"}"
fi
env:
API_HOST: ${{ inputs.api_host }}
@@ -116,6 +135,7 @@ runs:
MIN_CU: ${{ fromJSON(inputs.compute_units)[0] }}
MAX_CU: ${{ fromJSON(inputs.compute_units)[1] }}
SHARD_SPLIT_PROJECT: ${{ inputs.shard_split_project }}
DISABLE_SHARDING: ${{ inputs.disable_sharding }}
ADMIN_API_KEY: ${{ inputs.admin_api_key }}
SHARD_COUNT: ${{ inputs.shard_count }}
STRIPE_SIZE: ${{ inputs.stripe_size }}

View File

@@ -236,5 +236,5 @@ runs:
uses: ./.github/actions/allure-report-store
with:
report-dir: /tmp/test_output/allure/results
unique-key: ${{ inputs.build_type }}-${{ inputs.pg_version }}
unique-key: ${{ inputs.build_type }}-${{ inputs.pg_version }}-${{ runner.arch }}
aws-oicd-role-arn: ${{ inputs.aws-oicd-role-arn }}

View File

@@ -2,7 +2,7 @@ name: Push images to Container Registry
on:
workflow_call:
inputs:
# Example: {"docker.io/neondatabase/neon:13196061314":["369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:13196061314","neoneastus2.azurecr.io/neondatabase/neon:13196061314"]}
# Example: {"docker.io/neondatabase/neon:13196061314":["${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}.dkr.ecr.${{ vars.AWS_ECR_REGION }}.amazonaws.com/neon:13196061314","neoneastus2.azurecr.io/neondatabase/neon:13196061314"]}
image-map:
description: JSON map of images, mapping from a source image to an array of target images that should be pushed.
required: true

View File

@@ -68,7 +68,7 @@ jobs:
tag:
needs: [ check-permissions ]
runs-on: [ self-hosted, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
container: ${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}.dkr.ecr.${{ vars.AWS_ECR_REGION }}.amazonaws.com/base:pinned
outputs:
build-tag: ${{steps.build-tag.outputs.tag}}
@@ -859,14 +859,17 @@ jobs:
BRANCH: "${{ github.ref_name }}"
DEV_ACR: "${{ vars.AZURE_DEV_REGISTRY_NAME }}"
PROD_ACR: "${{ vars.AZURE_PROD_REGISTRY_NAME }}"
DEV_AWS: "${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}"
PROD_AWS: "${{ vars.NEON_PROD_AWS_ACCOUNT_ID }}"
AWS_REGION: "${{ vars.AWS_ECR_REGION }}"
push-neon-image-dev:
needs: [ generate-image-maps, neon-image ]
uses: ./.github/workflows/_push-to-container-registry.yml
with:
image-map: '${{ needs.generate-image-maps.outputs.neon-dev }}'
aws-region: eu-central-1
aws-account-ids: "369495373322"
aws-region: ${{ vars.AWS_ECR_REGION }}
aws-account-ids: "${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}"
azure-client-id: ${{ vars.AZURE_DEV_CLIENT_ID }}
azure-subscription-id: ${{ vars.AZURE_DEV_SUBSCRIPTION_ID }}
azure-tenant-id: ${{ vars.AZURE_TENANT_ID }}
@@ -881,8 +884,8 @@ jobs:
uses: ./.github/workflows/_push-to-container-registry.yml
with:
image-map: '${{ needs.generate-image-maps.outputs.compute-dev }}'
aws-region: eu-central-1
aws-account-ids: "369495373322"
aws-region: ${{ vars.AWS_ECR_REGION }}
aws-account-ids: "${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}"
azure-client-id: ${{ vars.AZURE_DEV_CLIENT_ID }}
azure-subscription-id: ${{ vars.AZURE_DEV_SUBSCRIPTION_ID }}
azure-tenant-id: ${{ vars.AZURE_TENANT_ID }}
@@ -898,8 +901,8 @@ jobs:
uses: ./.github/workflows/_push-to-container-registry.yml
with:
image-map: '${{ needs.generate-image-maps.outputs.neon-prod }}'
aws-region: eu-central-1
aws-account-ids: "093970136003"
aws-region: ${{ vars.AWS_ECR_REGION }}
aws-account-ids: "${{ vars.NEON_PROD_AWS_ACCOUNT_ID }}"
azure-client-id: ${{ vars.AZURE_PROD_CLIENT_ID }}
azure-subscription-id: ${{ vars.AZURE_PROD_SUBSCRIPTION_ID }}
azure-tenant-id: ${{ vars.AZURE_TENANT_ID }}
@@ -915,8 +918,8 @@ jobs:
uses: ./.github/workflows/_push-to-container-registry.yml
with:
image-map: '${{ needs.generate-image-maps.outputs.compute-prod }}'
aws-region: eu-central-1
aws-account-ids: "093970136003"
aws-region: ${{ vars.AWS_ECR_REGION }}
aws-account-ids: "${{ vars.NEON_PROD_AWS_ACCOUNT_ID }}"
azure-client-id: ${{ vars.AZURE_PROD_CLIENT_ID }}
azure-subscription-id: ${{ vars.AZURE_PROD_SUBSCRIPTION_ID }}
azure-tenant-id: ${{ vars.AZURE_TENANT_ID }}
@@ -1029,7 +1032,7 @@ jobs:
statuses: write
contents: write
runs-on: [ self-hosted, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
container: ${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}.dkr.ecr.${{ vars.AWS_ECR_REGION }}.amazonaws.com/ansible:latest
steps:
- uses: actions/checkout@v4
@@ -1178,6 +1181,22 @@ jobs:
exit 1
fi
notify-storage-release-deploy-failure:
needs: [ deploy ]
# We want this to run even if (transitive) dependencies are skipped, because deploy should really be successful on release branch workflow runs.
if: github.ref_name == 'release' && needs.deploy.result != 'success' && always()
runs-on: ubuntu-22.04
steps:
- name: Post release-deploy failure to team-storage slack channel
uses: slackapi/slack-github-action@v2
with:
method: chat.postMessage
token: ${{ secrets.SLACK_BOT_TOKEN }}
payload: |
channel: ${{ vars.SLACK_STORAGE_CHANNEL_ID }}
text: |
🔴 @oncall-storage: deploy job on release branch had unexpected status "${{ needs.deploy.result }}" <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>.
# The job runs on `release` branch and copies compatibility data and Neon artifact from the last *release PR* to the latest directory
promote-compatibility-data:
needs: [ deploy ]
@@ -1274,7 +1293,7 @@ jobs:
done
pin-build-tools-image:
needs: [ build-build-tools-image, push-compute-image-prod, push-neon-image-prod, build-and-test-locally ]
needs: [ build-build-tools-image, test-images, build-and-test-locally ]
if: github.ref_name == 'main'
uses: ./.github/workflows/pin-build-tools-image.yml
with:

View File

@@ -27,7 +27,7 @@ env:
jobs:
tag:
runs-on: [ self-hosted, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
container: ${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}.dkr.ecr.${{ vars.AWS_ECR_REGION }}.amazonaws.com/base:pinned
outputs:
build-tag: ${{steps.build-tag.outputs.tag}}

View File

@@ -32,18 +32,27 @@ jobs:
- target_project: new_empty_project_stripe_size_2048
stripe_size: 2048 # 16 MiB
postgres_version: 16
disable_sharding: false
- target_project: new_empty_project_stripe_size_32768
stripe_size: 32768 # 256 MiB # note that this is different from null because using null will shard_split the project only if it reaches the threshold
# while here it is sharded from the beginning with a shard size of 256 MiB
disable_sharding: false
postgres_version: 16
- target_project: new_empty_project
stripe_size: null # run with neon defaults which will shard split only when reaching the threshold
disable_sharding: false
postgres_version: 16
- target_project: new_empty_project
stripe_size: null # run with neon defaults which will shard split only when reaching the threshold
disable_sharding: false
postgres_version: 17
- target_project: large_existing_project
stripe_size: null # cannot re-shared or choose different stripe size for existing, already sharded project
disable_sharding: false
postgres_version: 16
- target_project: new_empty_project_unsharded
stripe_size: null # run with neon defaults which will shard split only when reaching the threshold
disable_sharding: true
postgres_version: 16
max-parallel: 1 # we want to run each stripe size sequentially to be able to compare the results
permissions:
@@ -96,6 +105,7 @@ jobs:
admin_api_key: ${{ secrets.NEON_STAGING_ADMIN_API_KEY }}
shard_count: 8
stripe_size: ${{ matrix.stripe_size }}
disable_sharding: ${{ matrix.disable_sharding }}
- name: Initialize Neon project
if: ${{ startsWith(matrix.target_project, 'new_empty_project') }}

View File

@@ -33,10 +33,6 @@ concurrency:
# No permission for GITHUB_TOKEN by default; the **minimal required** set of permissions should be granted in each job.
permissions: {}
env:
FROM_TAG: ${{ inputs.from-tag }}
TO_TAG: pinned
jobs:
check-manifests:
runs-on: ubuntu-22.04
@@ -46,11 +42,14 @@ jobs:
steps:
- name: Check if we really need to pin the image
id: check-manifests
env:
FROM_TAG: ${{ inputs.from-tag }}
TO_TAG: pinned
run: |
docker manifest inspect neondatabase/build-tools:${FROM_TAG} > ${FROM_TAG}.json
docker manifest inspect neondatabase/build-tools:${TO_TAG} > ${TO_TAG}.json
docker manifest inspect "docker.io/neondatabase/build-tools:${FROM_TAG}" > "${FROM_TAG}.json"
docker manifest inspect "docker.io/neondatabase/build-tools:${TO_TAG}" > "${TO_TAG}.json"
if diff ${FROM_TAG}.json ${TO_TAG}.json; then
if diff "${FROM_TAG}.json" "${TO_TAG}.json"; then
skip=true
else
skip=false
@@ -64,55 +63,34 @@ jobs:
# use format(..) to catch both inputs.force = true AND inputs.force = 'true'
if: needs.check-manifests.outputs.skip == 'false' || format('{0}', inputs.force) == 'true'
runs-on: ubuntu-22.04
permissions:
id-token: write # for `azure/login` and aws auth
id-token: write # Required for aws/azure login
steps:
- uses: docker/login-action@v3
with:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 3600
- name: Login to Amazon Dev ECR
uses: aws-actions/amazon-ecr-login@v2
- name: Azure login
uses: azure/login@6c251865b4e6290e7b78be643ea2d005bc51f69a # @v2.1.1
with:
client-id: ${{ secrets.AZURE_DEV_CLIENT_ID }}
tenant-id: ${{ secrets.AZURE_TENANT_ID }}
subscription-id: ${{ secrets.AZURE_DEV_SUBSCRIPTION_ID }}
- name: Login to ACR
run: |
az acr login --name=neoneastus2
- name: Tag build-tools with `${{ env.TO_TAG }}` in Docker Hub, ECR, and ACR
env:
DEFAULT_DEBIAN_VERSION: bookworm
run: |
for debian_version in bullseye bookworm; do
tags=()
tags+=("-t" "neondatabase/build-tools:${TO_TAG}-${debian_version}")
tags+=("-t" "369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:${TO_TAG}-${debian_version}")
tags+=("-t" "neoneastus2.azurecr.io/neondatabase/build-tools:${TO_TAG}-${debian_version}")
if [ "${debian_version}" == "${DEFAULT_DEBIAN_VERSION}" ]; then
tags+=("-t" "neondatabase/build-tools:${TO_TAG}")
tags+=("-t" "369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:${TO_TAG}")
tags+=("-t" "neoneastus2.azurecr.io/neondatabase/build-tools:${TO_TAG}")
fi
docker buildx imagetools create "${tags[@]}" \
neondatabase/build-tools:${FROM_TAG}-${debian_version}
done
uses: ./.github/workflows/_push-to-container-registry.yml
with:
image-map: |
{
"docker.io/neondatabase/build-tools:${{ inputs.from-tag }}-bullseye": [
"docker.io/neondatabase/build-tools:pinned-bullseye",
"${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}.dkr.ecr.${{ vars.AWS_ECR_REGION }}.amazonaws.com/build-tools:pinned-bullseye",
"${{ vars.AZURE_DEV_REGISTRY_NAME }}.azurecr.io/neondatabase/build-tools:pinned-bullseye"
],
"docker.io/neondatabase/build-tools:${{ inputs.from-tag }}-bookworm": [
"docker.io/neondatabase/build-tools:pinned-bookworm",
"docker.io/neondatabase/build-tools:pinned",
"${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}.dkr.ecr.${{ vars.AWS_ECR_REGION }}.amazonaws.com/build-tools:pinned-bookworm",
"${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}.dkr.ecr.${{ vars.AWS_ECR_REGION }}.amazonaws.com/build-tools:pinned",
"${{ vars.AZURE_DEV_REGISTRY_NAME }}.azurecr.io/neondatabase/build-tools:pinned-bookworm",
"${{ vars.AZURE_DEV_REGISTRY_NAME }}.azurecr.io/neondatabase/build-tools:pinned"
]
}
aws-region: ${{ vars.AWS_ECR_REGION }}
aws-account-ids: "${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}"
azure-client-id: ${{ vars.AZURE_DEV_CLIENT_ID }}
azure-subscription-id: ${{ vars.AZURE_DEV_SUBSCRIPTION_ID }}
azure-tenant-id: ${{ vars.AZURE_TENANT_ID }}
acr-registry-name: ${{ vars.AZURE_DEV_REGISTRY_NAME }}
secrets:
aws-role-to-assume: "${{ vars.DEV_AWS_OIDC_ROLE_ARN }}"
docker-hub-username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
docker-hub-password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}

31
Cargo.lock generated
View File

@@ -1316,7 +1316,6 @@ dependencies = [
"flate2",
"futures",
"http 1.1.0",
"jsonwebtoken",
"metrics",
"nix 0.27.1",
"notify",
@@ -1326,7 +1325,6 @@ dependencies = [
"opentelemetry_sdk",
"postgres",
"postgres_initdb",
"prometheus",
"regex",
"remote_storage",
"reqwest",
@@ -1345,7 +1343,6 @@ dependencies = [
"tower 0.5.2",
"tower-http",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
"tracing-utils",
"url",
@@ -1877,6 +1874,12 @@ dependencies = [
"syn 2.0.90",
]
[[package]]
name = "difflib"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8"
[[package]]
name = "digest"
version = "0.10.7"
@@ -3334,6 +3337,17 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "json-structural-diff"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e878e36a8a44c158505c2c818abdc1350413ad83dcb774a0459f6a7ef2b65cbf"
dependencies = [
"difflib",
"regex",
"serde_json",
]
[[package]]
name = "jsonwebtoken"
version = "9.2.0"
@@ -4158,7 +4172,6 @@ dependencies = [
"pageserver_client",
"pageserver_compaction",
"pin-project-lite",
"postgres",
"postgres-protocol",
"postgres-types",
"postgres_backend",
@@ -4245,7 +4258,6 @@ dependencies = [
"futures",
"http-utils",
"pageserver_api",
"postgres",
"reqwest",
"serde",
"thiserror 1.0.69",
@@ -4660,7 +4672,6 @@ dependencies = [
"anyhow",
"itertools 0.10.5",
"once_cell",
"postgres",
"tokio-postgres",
"url",
]
@@ -5802,7 +5813,6 @@ dependencies = [
"once_cell",
"pageserver_api",
"parking_lot 0.12.1",
"postgres",
"postgres-protocol",
"postgres_backend",
"postgres_ffi",
@@ -6446,6 +6456,7 @@ dependencies = [
"humantime",
"hyper 0.14.30",
"itertools 0.10.5",
"json-structural-diff",
"lasso",
"measured",
"metrics",
@@ -6468,6 +6479,7 @@ dependencies = [
"strum",
"strum_macros",
"thiserror 1.0.69",
"tikv-jemallocator",
"tokio",
"tokio-postgres",
"tokio-postgres-rustls",
@@ -7021,14 +7033,11 @@ dependencies = [
name = "tokio-postgres2"
version = "0.1.0"
dependencies = [
"async-trait",
"byteorder",
"bytes",
"fallible-iterator",
"futures-util",
"log",
"parking_lot 0.12.1",
"percent-encoding",
"phf",
"pin-project-lite",
"postgres-protocol2",
@@ -7615,13 +7624,13 @@ dependencies = [
"hex",
"hex-literal",
"humantime",
"inferno 0.12.0",
"jsonwebtoken",
"metrics",
"nix 0.27.1",
"once_cell",
"pin-project-lite",
"postgres_connection",
"pprof",
"pq_proto",
"rand 0.8.5",
"regex",

View File

@@ -210,6 +210,7 @@ rustls-native-certs = "0.8"
x509-parser = "0.16"
whoami = "1.5.1"
zerocopy = { version = "0.7", features = ["derive"] }
json-structural-diff = { version = "0.2.0" }
## TODO replace this with tracing
env_logger = "0.10"

View File

@@ -292,7 +292,7 @@ WORKDIR /home/nonroot
# Rust
# Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`)
ENV RUSTC_VERSION=1.84.1
ENV RUSTC_VERSION=1.85.0
ENV RUSTUP_HOME="/home/nonroot/.rustup"
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
ARG RUSTFILT_VERSION=0.2.1

View File

@@ -395,15 +395,22 @@ RUN case "${PG_VERSION:?}" in \
cd plv8-src && \
if [[ "${PG_VERSION:?}" < "v17" ]]; then patch -p1 < /ext-src/plv8-3.1.10.patch; fi
FROM pg-build AS plv8-build
# Step 1: Build the vendored V8 engine. It doesn't depend on PostgreSQL, so use
# 'build-deps' as the base. This enables caching and avoids unnecessary rebuilds.
# (The V8 engine takes a very long time to build)
FROM build-deps AS plv8-build
ARG PG_VERSION
WORKDIR /ext-src/plv8-src
RUN apt update && \
apt install --no-install-recommends --no-install-suggests -y \
ninja-build python3-dev libncurses5 binutils clang \
&& apt clean && rm -rf /var/lib/apt/lists/*
COPY --from=plv8-src /ext-src/ /ext-src/
WORKDIR /ext-src/plv8-src
RUN make DOCKER=1 -j $(getconf _NPROCESSORS_ONLN) v8
# Step 2: Build the PostgreSQL-dependent parts
COPY --from=pg-build /usr/local/pgsql /usr/local/pgsql
ENV PATH="/usr/local/pgsql/bin:$PATH"
RUN \
# generate and copy upgrade scripts
make generate_upgrades && \
@@ -1848,14 +1855,20 @@ COPY --from=pg_semver-src /ext-src/ /ext-src/
COPY --from=pg_ivm-src /ext-src/ /ext-src/
COPY --from=pg_partman-src /ext-src/ /ext-src/
#COPY --from=pg_mooncake-src /ext-src/ /ext-src/
#COPY --from=pg_repack-src /ext-src/ /ext-src/
COPY --from=pg_repack-src /ext-src/ /ext-src/
COPY --from=pg_repack-build /usr/local/pgsql/ /usr/local/pgsql/
COPY compute/patches/pg_repack.patch /ext-src
RUN cd /ext-src/pg_repack-src && patch -p1 </ext-src/pg_repack.patch && rm -f /ext-src/pg_repack.patch
COPY --chmod=755 docker-compose/run-tests.sh /run-tests.sh
RUN apt-get update && apt-get install -y libtap-parser-sourcehandler-pgtap-perl\
&& apt clean && rm -rf /ext-src/*.tar.gz /var/lib/apt/lists/*
ENV PATH=/usr/local/pgsql/bin:$PATH
ENV PGHOST=compute
ENV PGPORT=55433
ENV PGUSER=cloud_admin
ENV PGDATABASE=postgres
ENV PG_VERSION=${PG_VERSION:?}
#########################################################################################
#

View File

@@ -0,0 +1,72 @@
diff --git a/regress/Makefile b/regress/Makefile
index bf6edcb..89b4c7f 100644
--- a/regress/Makefile
+++ b/regress/Makefile
@@ -17,7 +17,7 @@ INTVERSION := $(shell echo $$(($$(echo $(VERSION).0 | sed 's/\([[:digit:]]\{1,\}
# Test suite
#
-REGRESS := init-extension repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper tablespace get_order_by trigger
+REGRESS := init-extension repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper get_order_by trigger
USE_PGXS = 1 # use pgxs if not in contrib directory
PGXS := $(shell $(PG_CONFIG) --pgxs)
diff --git a/regress/expected/nosuper.out b/regress/expected/nosuper.out
index 8d0a94e..63b68bf 100644
--- a/regress/expected/nosuper.out
+++ b/regress/expected/nosuper.out
@@ -4,22 +4,22 @@
SET client_min_messages = error;
DROP ROLE IF EXISTS nosuper;
SET client_min_messages = warning;
-CREATE ROLE nosuper WITH LOGIN;
+CREATE ROLE nosuper WITH LOGIN PASSWORD 'NoSuPeRpAsSwOrD';
-- => OK
\! pg_repack --dbname=contrib_regression --table=tbl_cluster --no-superuser-check
INFO: repacking table "public.tbl_cluster"
-- => ERROR
-\! pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper
+\! PGPASSWORD=NoSuPeRpAsSwOrD pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper
ERROR: pg_repack failed with error: You must be a superuser to use pg_repack
-- => ERROR
-\! pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper --no-superuser-check
+\! PGPASSWORD=NoSuPeRpAsSwOrD pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper --no-superuser-check
ERROR: pg_repack failed with error: ERROR: permission denied for schema repack
LINE 1: select repack.version(), repack.version_sql()
^
GRANT ALL ON ALL TABLES IN SCHEMA repack TO nosuper;
GRANT USAGE ON SCHEMA repack TO nosuper;
-- => ERROR
-\! pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper --no-superuser-check
+\! PGPASSWORD=NoSuPeRpAsSwOrD pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper --no-superuser-check
INFO: repacking table "public.tbl_cluster"
ERROR: query failed: ERROR: current transaction is aborted, commands ignored until end of transaction block
DETAIL: query was: RESET lock_timeout
diff --git a/regress/sql/nosuper.sql b/regress/sql/nosuper.sql
index 072f0fa..dbe60f8 100644
--- a/regress/sql/nosuper.sql
+++ b/regress/sql/nosuper.sql
@@ -4,19 +4,19 @@
SET client_min_messages = error;
DROP ROLE IF EXISTS nosuper;
SET client_min_messages = warning;
-CREATE ROLE nosuper WITH LOGIN;
+CREATE ROLE nosuper WITH LOGIN PASSWORD 'NoSuPeRpAsSwOrD';
-- => OK
\! pg_repack --dbname=contrib_regression --table=tbl_cluster --no-superuser-check
-- => ERROR
-\! pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper
+\! PGPASSWORD=NoSuPeRpAsSwOrD pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper
-- => ERROR
-\! pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper --no-superuser-check
+\! PGPASSWORD=NoSuPeRpAsSwOrD pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper --no-superuser-check
GRANT ALL ON ALL TABLES IN SCHEMA repack TO nosuper;
GRANT USAGE ON SCHEMA repack TO nosuper;
-- => ERROR
-\! pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper --no-superuser-check
+\! PGPASSWORD=NoSuPeRpAsSwOrD pg_repack --dbname=contrib_regression --table=tbl_cluster --username=nosuper --no-superuser-check
REVOKE ALL ON ALL TABLES IN SCHEMA repack FROM nosuper;
REVOKE USAGE ON SCHEMA repack FROM nosuper;

View File

@@ -25,7 +25,6 @@ fail.workspace = true
flate2.workspace = true
futures.workspace = true
http.workspace = true
jsonwebtoken.workspace = true
metrics.workspace = true
nix.workspace = true
notify.workspace = true
@@ -48,13 +47,11 @@ tokio-postgres.workspace = true
tokio-util.workspace = true
tokio-stream.workspace = true
tracing.workspace = true
tracing-opentelemetry.workspace = true
tracing-subscriber.workspace = true
tracing-utils.workspace = true
thiserror.workspace = true
url.workspace = true
uuid.workspace = true
prometheus.workspace = true
walkdir.workspace = true
postgres_initdb.workspace = true

View File

@@ -41,7 +41,6 @@ use std::process::exit;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::sync::{mpsc, Arc, Condvar, Mutex, RwLock};
use std::time::SystemTime;
use std::{thread, time::Duration};
use anyhow::{Context, Result};
@@ -86,19 +85,6 @@ fn parse_remote_ext_config(arg: &str) -> Result<String> {
}
}
/// Generate a compute ID if one is not supplied. This exists to keep forward
/// compatibility tests working, but will be removed in a future iteration.
fn generate_compute_id() -> String {
let now = SystemTime::now();
format!(
"compute-{}",
now.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
)
}
#[derive(Parser)]
#[command(rename_all = "kebab-case")]
struct Cli {
@@ -112,16 +98,13 @@ struct Cli {
/// outside the compute will talk to the compute through this port. Keep
/// the previous name for this argument around for a smoother release
/// with the control plane.
///
/// TODO: Remove the alias after the control plane release which teaches the
/// control plane about the renamed argument.
#[arg(long, alias = "http-port", default_value_t = 3080)]
#[arg(long, default_value_t = 3080)]
pub external_http_port: u16,
/// The port to bind the internal listening HTTP server to. Clients like
/// The port to bind the internal listening HTTP server to. Clients include
/// the neon extension (for installing remote extensions) and local_proxy.
#[arg(long)]
pub internal_http_port: Option<u16>,
#[arg(long, default_value_t = 3081)]
pub internal_http_port: u16,
#[arg(short = 'D', long, value_name = "DATADIR")]
pub pgdata: String,
@@ -156,7 +139,7 @@ struct Cli {
#[arg(short = 'S', long, group = "spec-path")]
pub spec_path: Option<OsString>,
#[arg(short = 'i', long, group = "compute-id", default_value = generate_compute_id())]
#[arg(short = 'i', long, group = "compute-id")]
pub compute_id: String,
#[arg(short = 'p', long, conflicts_with_all = ["spec", "spec-path"], value_name = "CONTROL_PLANE_API_BASE_URL")]
@@ -359,7 +342,7 @@ fn wait_spec(
pgbin: cli.pgbin.clone(),
pgversion: get_pg_version_string(&cli.pgbin),
external_http_port: cli.external_http_port,
internal_http_port: cli.internal_http_port.unwrap_or(cli.external_http_port + 1),
internal_http_port: cli.internal_http_port,
live_config_allowed,
state: Mutex::new(new_state),
state_changed: Condvar::new(),
@@ -383,7 +366,7 @@ fn wait_spec(
// The internal HTTP server could be launched later, but there isn't much
// sense in waiting.
Server::Internal(cli.internal_http_port.unwrap_or(cli.external_http_port + 1)).launch(&compute);
Server::Internal(cli.internal_http_port).launch(&compute);
if !spec_set {
// No spec provided, hang waiting for it.

View File

@@ -7,12 +7,12 @@ use std::sync::Arc;
use crate::compute::construct_superuser_query;
use crate::pg_helpers::{escape_literal, DatabaseExt, Escaping, GenericOptionsSearch, RoleExt};
use anyhow::{bail, Result};
use anyhow::Result;
use compute_api::spec::{ComputeFeature, ComputeSpec, Database, PgIdent, Role};
use futures::future::join_all;
use tokio::sync::RwLock;
use tokio_postgres::Client;
use tracing::{debug, info_span, Instrument};
use tracing::{debug, info_span, warn, Instrument};
#[derive(Clone)]
pub enum DB {
@@ -47,6 +47,11 @@ pub enum PerDatabasePhase {
DeleteDBRoleReferences,
ChangeSchemaPerms,
HandleAnonExtension,
/// This is a shared phase, used for both i) dropping dangling LR subscriptions
/// before dropping the DB, and ii) dropping all subscriptions after creating
/// a fresh branch.
/// N.B. we will skip all DBs that are not present in Postgres, invalid, or
/// have `datallowconn = false` (`restrict_conn`).
DropLogicalSubscriptions,
}
@@ -168,7 +173,7 @@ where
///
/// In the future we may generate a single stream of changes and then
/// sort/merge/batch execution, but for now this is a nice way to improve
/// batching behaviour of the commands.
/// batching behavior of the commands.
async fn get_operations<'a>(
spec: &'a ComputeSpec,
ctx: &'a RwLock<MutableApplyContext>,
@@ -451,6 +456,38 @@ async fn get_operations<'a>(
)),
}))),
ApplySpecPhase::RunInEachDatabase { db, subphase } => {
// Do some checks that user DB exists and we can access it.
//
// During the phases like DropLogicalSubscriptions, DeleteDBRoleReferences,
// which happen before dropping the DB, the current run could be a retry,
// so it's a valid case when DB is absent already. The case of
// `pg_database.datallowconn = false`/`restrict_conn` is a bit tricky, as
// in theory user can have some dangling objects there, so we will fail at
// the actual drop later. Yet, to fix that in the current code we would need
// to ALTER DATABASE, and then check back, but that even more invasive, so
// that's not what we really want to do here.
//
// For ChangeSchemaPerms, skipping DBs we cannot access is totally fine.
if let DB::UserDB(db) = db {
let databases = &ctx.read().await.dbs;
let edb = match databases.get(&db.name) {
Some(edb) => edb,
None => {
warn!("skipping RunInEachDatabase phase {:?}, database {} doesn't exist in PostgreSQL", subphase, db.name);
return Ok(Box::new(empty()));
}
};
if edb.restrict_conn || edb.invalid {
warn!(
"skipping RunInEachDatabase phase {:?}, database {} is (restrict_conn={}, invalid={})",
subphase, db.name, edb.restrict_conn, edb.invalid
);
return Ok(Box::new(empty()));
}
}
match subphase {
PerDatabasePhase::DropLogicalSubscriptions => {
match &db {
@@ -530,25 +567,12 @@ async fn get_operations<'a>(
Ok(Box::new(operations))
}
PerDatabasePhase::ChangeSchemaPerms => {
let ctx = ctx.read().await;
let databases = &ctx.dbs;
let db = match &db {
// ignore schema permissions on the system database
DB::SystemDB => return Ok(Box::new(empty())),
DB::UserDB(db) => db,
};
if databases.get(&db.name).is_none() {
bail!("database {} doesn't exist in PostgreSQL", db.name);
}
let edb = databases.get(&db.name).unwrap();
if edb.restrict_conn || edb.invalid {
return Ok(Box::new(empty()));
}
let operations = vec![
Operation {
query: format!(
@@ -566,6 +590,7 @@ async fn get_operations<'a>(
Ok(Box::new(operations))
}
// TODO: remove this completely https://github.com/neondatabase/cloud/issues/22663
PerDatabasePhase::HandleAnonExtension => {
// Only install Anon into user databases
let db = match &db {

View File

@@ -2,6 +2,7 @@ DO $$
DECLARE
subname TEXT;
BEGIN
LOCK TABLE pg_subscription IN ACCESS EXCLUSIVE MODE;
FOR subname IN SELECT pg_subscription.subname FROM pg_subscription WHERE subdbid = (SELECT oid FROM pg_database WHERE datname = {datname_str}) LOOP
EXECUTE format('ALTER SUBSCRIPTION %I DISABLE;', subname);
EXECUTE format('ALTER SUBSCRIPTION %I SET (slot_name = NONE);', subname);

View File

@@ -46,6 +46,8 @@ use std::process::Command;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use anyhow::{anyhow, bail, Context, Result};
use compute_api::requests::ConfigurationRequest;
@@ -59,6 +61,7 @@ use nix::sys::signal::Signal;
use pageserver_api::shard::ShardStripeSize;
use reqwest::header::CONTENT_TYPE;
use serde::{Deserialize, Serialize};
use tracing::debug;
use url::Host;
use utils::id::{NodeId, TenantId, TimelineId};
@@ -81,8 +84,10 @@ pub struct EndpointConf {
internal_http_port: u16,
pg_version: u32,
skip_pg_catalog_updates: bool,
reconfigure_concurrency: usize,
drop_subscriptions_before_start: bool,
features: Vec<ComputeFeature>,
cluster: Option<Cluster>,
}
//
@@ -179,7 +184,9 @@ impl ComputeControlPlane {
// we also skip catalog updates in the cloud.
skip_pg_catalog_updates,
drop_subscriptions_before_start,
reconfigure_concurrency: 1,
features: vec![],
cluster: None,
});
ep.create_endpoint_dir()?;
@@ -196,7 +203,9 @@ impl ComputeControlPlane {
pg_version,
skip_pg_catalog_updates,
drop_subscriptions_before_start,
reconfigure_concurrency: 1,
features: vec![],
cluster: None,
})?,
)?;
std::fs::write(
@@ -261,8 +270,11 @@ pub struct Endpoint {
skip_pg_catalog_updates: bool,
drop_subscriptions_before_start: bool,
reconfigure_concurrency: usize,
// Feature flags
features: Vec<ComputeFeature>,
// Cluster settings
cluster: Option<Cluster>,
}
#[derive(PartialEq, Eq)]
@@ -302,6 +314,8 @@ impl Endpoint {
let conf: EndpointConf =
serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
debug!("serialized endpoint conf: {:?}", conf);
Ok(Endpoint {
pg_address: SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), conf.pg_port),
external_http_address: SocketAddr::new(
@@ -319,8 +333,10 @@ impl Endpoint {
tenant_id: conf.tenant_id,
pg_version: conf.pg_version,
skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
reconfigure_concurrency: conf.reconfigure_concurrency,
drop_subscriptions_before_start: conf.drop_subscriptions_before_start,
features: conf.features,
cluster: conf.cluster,
})
}
@@ -607,7 +623,7 @@ impl Endpoint {
};
// Create spec file
let spec = ComputeSpec {
let mut spec = ComputeSpec {
skip_pg_catalog_updates: self.skip_pg_catalog_updates,
format_version: 1.0,
operation_uuid: None,
@@ -640,7 +656,7 @@ impl Endpoint {
Vec::new()
},
settings: None,
postgresql_conf: Some(postgresql_conf),
postgresql_conf: Some(postgresql_conf.clone()),
},
delta_operations: None,
tenant_id: Some(self.tenant_id),
@@ -653,9 +669,35 @@ impl Endpoint {
pgbouncer_settings: None,
shard_stripe_size: Some(shard_stripe_size),
local_proxy_config: None,
reconfigure_concurrency: 1,
reconfigure_concurrency: self.reconfigure_concurrency,
drop_subscriptions_before_start: self.drop_subscriptions_before_start,
};
// this strange code is needed to support respec() in tests
if self.cluster.is_some() {
debug!("Cluster is already set in the endpoint spec, using it");
spec.cluster = self.cluster.clone().unwrap();
debug!("spec.cluster {:?}", spec.cluster);
// fill missing fields again
if create_test_user {
spec.cluster.roles.push(Role {
name: PgIdent::from_str("test").unwrap(),
encrypted_password: None,
options: None,
});
spec.cluster.databases.push(Database {
name: PgIdent::from_str("neondb").unwrap(),
owner: PgIdent::from_str("test").unwrap(),
options: None,
restrict_conn: false,
invalid: false,
});
}
spec.cluster.postgresql_conf = Some(postgresql_conf);
}
let spec_path = self.endpoint_path().join("spec.json");
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
@@ -673,18 +715,14 @@ impl Endpoint {
println!("Also at '{}'", conn_str);
}
let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
//cmd.args([
// "--external-http-port",
// &self.external_http_address.port().to_string(),
//])
//.args([
// "--internal-http-port",
// &self.internal_http_address.port().to_string(),
//])
cmd.args([
"--http-port",
"--external-http-port",
&self.external_http_address.port().to_string(),
])
.args([
"--internal-http-port",
&self.internal_http_address.port().to_string(),
])
.args(["--pgdata", self.pgdata().to_str().unwrap()])
.args(["--connstr", &conn_str])
.args([
@@ -701,20 +739,16 @@ impl Endpoint {
])
// TODO: It would be nice if we generated compute IDs with the same
// algorithm as the real control plane.
//
// TODO: Add this back when
// https://github.com/neondatabase/neon/pull/10747 is merged.
//
//.args([
// "--compute-id",
// &format!(
// "compute-{}",
// SystemTime::now()
// .duration_since(UNIX_EPOCH)
// .unwrap()
// .as_secs()
// ),
//])
.args([
"--compute-id",
&format!(
"compute-{}",
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
),
])
.stdin(std::process::Stdio::null())
.stderr(logfile.try_clone()?)
.stdout(logfile);

View File

@@ -335,13 +335,21 @@ impl PageServerNode {
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'checkpoint_distance' as an integer")?,
checkpoint_timeout: settings.remove("checkpoint_timeout").map(|x| x.to_string()),
checkpoint_timeout: settings
.remove("checkpoint_timeout")
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'checkpoint_timeout' as duration")?,
compaction_target_size: settings
.remove("compaction_target_size")
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'compaction_target_size' as an integer")?,
compaction_period: settings.remove("compaction_period").map(|x| x.to_string()),
compaction_period: settings
.remove("compaction_period")
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'compaction_period' as duration")?,
compaction_threshold: settings
.remove("compaction_threshold")
.map(|x| x.parse::<usize>())
@@ -387,7 +395,10 @@ impl PageServerNode {
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'gc_horizon' as an integer")?,
gc_period: settings.remove("gc_period").map(|x| x.to_string()),
gc_period: settings.remove("gc_period")
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'gc_period' as duration")?,
image_creation_threshold: settings
.remove("image_creation_threshold")
.map(|x| x.parse::<usize>())
@@ -403,13 +414,20 @@ impl PageServerNode {
.map(|x| x.parse::<usize>())
.transpose()
.context("Failed to parse 'image_creation_preempt_threshold' as integer")?,
pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
pitr_interval: settings.remove("pitr_interval")
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'pitr_interval' as duration")?,
walreceiver_connect_timeout: settings
.remove("walreceiver_connect_timeout")
.map(|x| x.to_string()),
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'walreceiver_connect_timeout' as duration")?,
lagging_wal_timeout: settings
.remove("lagging_wal_timeout")
.map(|x| x.to_string()),
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'lagging_wal_timeout' as duration")?,
max_lsn_wal_lag: settings
.remove("max_lsn_wal_lag")
.map(|x| x.parse::<NonZeroU64>())
@@ -427,8 +445,14 @@ impl PageServerNode {
.context("Failed to parse 'min_resident_size_override' as integer")?,
evictions_low_residence_duration_metric_threshold: settings
.remove("evictions_low_residence_duration_metric_threshold")
.map(|x| x.to_string()),
heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()),
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'evictions_low_residence_duration_metric_threshold' as duration")?,
heatmap_period: settings
.remove("heatmap_period")
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'heatmap_period' as duration")?,
lazy_slru_download: settings
.remove("lazy_slru_download")
.map(|x| x.parse::<bool>())
@@ -439,10 +463,15 @@ impl PageServerNode {
.map(serde_json::from_str)
.transpose()
.context("parse `timeline_get_throttle` from json")?,
lsn_lease_length: settings.remove("lsn_lease_length").map(|x| x.to_string()),
lsn_lease_length: settings.remove("lsn_lease_length")
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'lsn_lease_length' as duration")?,
lsn_lease_length_for_ts: settings
.remove("lsn_lease_length_for_ts")
.map(|x| x.to_string()),
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'lsn_lease_length_for_ts' as duration")?,
timeline_offloading: settings
.remove("timeline_offloading")
.map(|x| x.parse::<bool>())

View File

@@ -47,6 +47,9 @@ enum Command {
listen_http_addr: String,
#[arg(long)]
listen_http_port: u16,
#[arg(long)]
listen_https_port: Option<u16>,
#[arg(long)]
availability_zone_id: String,
},
@@ -394,6 +397,7 @@ async fn main() -> anyhow::Result<()> {
listen_pg_port,
listen_http_addr,
listen_http_port,
listen_https_port,
availability_zone_id,
} => {
storcon_client
@@ -406,6 +410,7 @@ async fn main() -> anyhow::Result<()> {
listen_pg_port,
listen_http_addr,
listen_http_port,
listen_https_port,
availability_zone_id: AvailabilityZone(availability_zone_id),
}),
)
@@ -954,7 +959,7 @@ async fn main() -> anyhow::Result<()> {
threshold: threshold.into(),
},
)),
heatmap_period: Some("300s".to_string()),
heatmap_period: Some(Duration::from_secs(300)),
..Default::default()
},
})

View File

@@ -77,4 +77,5 @@ echo "Start compute node"
/usr/local/bin/compute_ctl --pgdata /var/db/postgres/compute \
-C "postgresql://cloud_admin@localhost:55433/postgres" \
-b /usr/local/bin/postgres \
--compute-id "compute-$RANDOM" \
-S ${SPEC_FILE}

View File

@@ -81,15 +81,8 @@ for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
[ $EXT_SUCCESS -eq 0 ] && FAILED=$(tail -1 testout.txt | awk '{for(i=1;i<=NF;i++){print "/ext-src/"$i;}}')
[ $CONTRIB_SUCCESS -eq 0 ] && CONTRIB_FAILED=$(tail -1 testout_contrib.txt | awk '{for(i=0;i<=NF;i++){print "/postgres/contrib/"$i;}}')
for d in $FAILED $CONTRIB_FAILED; do
dn="$(basename $d)"
rm -rf $dn
mkdir $dn
docker cp $TEST_CONTAINER_NAME:$d/regression.diffs $dn || [ $? -eq 1 ]
docker cp $TEST_CONTAINER_NAME:$d/regression.out $dn || [ $? -eq 1 ]
cat $dn/regression.out $dn/regression.diffs || true
rm -rf $dn
docker exec $TEST_CONTAINER_NAME bash -c 'for file in $(find '"$d"' -name regression.diffs -o -name regression.out); do cat $file; done' || [ $? -eq 1 ]
done
rm -rf $FAILED
exit 1
fi
fi

View File

@@ -0,0 +1,5 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --use-existing --inputdir=./regress --bindir='/usr/local/pgsql/bin' --dbname=contrib_regression repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper get_order_by trigger

View File

@@ -0,0 +1,24 @@
diff --git a/test/sql/base.sql b/test/sql/base.sql
index 53adb30..2eed91b 100644
--- a/test/sql/base.sql
+++ b/test/sql/base.sql
@@ -2,7 +2,6 @@
BEGIN;
\i test/pgtap-core.sql
-CREATE EXTENSION semver;
SELECT plan(334);
--SELECT * FROM no_plan();
diff --git a/test/sql/corpus.sql b/test/sql/corpus.sql
index c0fe98e..39cdd2e 100644
--- a/test/sql/corpus.sql
+++ b/test/sql/corpus.sql
@@ -4,7 +4,6 @@ BEGIN;
-- Test the SemVer corpus from https://regex101.com/r/Ly7O1x/3/.
\i test/pgtap-core.sql
-CREATE EXTENSION semver;
SELECT plan(76);
--SELECT * FROM no_plan();

View File

@@ -1,6 +1,7 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
patch -p1 <test-upgrade.patch
patch -p1 <test-upgrade-${PG_VERSION}.patch
psql -d contrib_regression -c "DROP EXTENSION IF EXISTS pgtap"
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --use-existing --inputdir=./ --bindir='/usr/local/pgsql/bin' --inputdir=test --dbname=contrib_regression base corpus

View File

@@ -1,3 +1,16 @@
diff --git a/Makefile b/Makefile
index f255fe6..0a0fa65 100644
--- a/Makefile
+++ b/Makefile
@@ -346,7 +346,7 @@ test: test-serial test-parallel
TB_DIR = test/build
GENERATED_SCHEDULE_DEPS = $(TB_DIR)/all_tests $(TB_DIR)/exclude_tests
REGRESS = --schedule $(TB_DIR)/run.sch # Set this again just to be safe
-REGRESS_OPTS = --inputdir=test --max-connections=$(PARALLEL_CONN) --schedule $(SETUP_SCH) $(REGRESS_CONF)
+REGRESS_OPTS = --use-existing --dbname=pgtap_regression --inputdir=test --max-connections=$(PARALLEL_CONN) --schedule $(SETUP_SCH) $(REGRESS_CONF)
SETUP_SCH = test/schedule/main.sch # schedule to use for test setup; this can be forcibly changed by some targets!
IGNORE_TESTS = $(notdir $(EXCLUDE_TEST_FILES:.sql=))
PARALLEL_TESTS = $(filter-out $(IGNORE_TESTS),$(filter-out $(SERIAL_TESTS),$(ALL_TESTS)))
diff --git a/test/schedule/create.sql b/test/schedule/create.sql
index ba355ed..7e250f5 100644
--- a/test/schedule/create.sql

View File

@@ -2,5 +2,4 @@
set -ex
cd "$(dirname ${0})"
patch -p1 <test-upgrade.patch
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --inputdir=./ --bindir='/usr/local/pgsql/bin' --inputdir=test --max-connections=86 --schedule test/schedule/main.sch --schedule test/build/run.sch --dbname contrib_regression --use-existing
make installcheck

View File

@@ -2,4 +2,5 @@
set -ex
cd "$(dirname ${0})"
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --inputdir=./ --bindir='/usr/local/pgsql/bin' --use-existing --dbname=contrib_regression plv8 plv8-errors scalar_args inline json startup_pre startup varparam json_conv jsonb_conv window guc es6 arraybuffer composites currentresource startup_perms bytea find_function_perms memory_limits reset show array_spread regression dialect bigint procedure
REGRESS="$(make -n installcheck | awk '{print substr($0,index($0,"init-extension")+15);}')"
${PG_REGRESS} --inputdir=./ --bindir='/usr/local/pgsql/bin' --use-existing --dbname=contrib_regression ${REGRESS}

View File

@@ -43,7 +43,8 @@ EXTENSIONS='[
{"extname": "semver", "extdir": "pg_semver-src"},
{"extname": "pg_ivm", "extdir": "pg_ivm-src"},
{"extname": "pgjwt", "extdir": "pgjwt-src"},
{"extname": "pgtap", "extdir": "pgtap-src"}
{"extname": "pgtap", "extdir": "pgtap-src"},
{"extname": "pg_repack", "extdir": "pg_repack-src"}
]'
EXTNAMES=$(echo ${EXTENSIONS} | jq -r '.[].extname' | paste -sd ' ' -)
TAG=${NEWTAG} docker compose --profile test-extensions up --quiet-pull --build -d
@@ -59,6 +60,8 @@ wait_for_ready
docker compose cp ext-src neon-test-extensions:/
docker compose exec neon-test-extensions psql -c "DROP DATABASE IF EXISTS contrib_regression"
docker compose exec neon-test-extensions psql -c "CREATE DATABASE contrib_regression"
docker compose exec neon-test-extensions psql -c "CREATE DATABASE pgtap_regression"
docker compose exec neon-test-extensions psql -d pgtap_regression -c "CREATE EXTENSION pgtap"
create_extensions "${EXTNAMES}"
if [ "${FORCE_ALL_UPGRADE_TESTS:-false}" = true ]; then
exts="${EXTNAMES}"

View File

@@ -252,7 +252,7 @@ pub enum ComputeMode {
Replica,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
pub struct Cluster {
pub cluster_id: Option<String>,
pub name: Option<String>,
@@ -283,7 +283,7 @@ pub struct DeltaOp {
/// Rust representation of Postgres role info with only those fields
/// that matter for us.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
pub struct Role {
pub name: PgIdent,
pub encrypted_password: Option<String>,
@@ -292,7 +292,7 @@ pub struct Role {
/// Rust representation of Postgres database info with only those fields
/// that matter for us.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
pub struct Database {
pub name: PgIdent,
pub owner: PgIdent,
@@ -308,7 +308,7 @@ pub struct Database {
/// Common type representing both SQL statement params with or without value,
/// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config
/// options like `wal_level = logical`.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
pub struct GenericOption {
pub name: String,
pub value: Option<String>,

View File

@@ -2,7 +2,6 @@ use anyhow::bail;
use flate2::write::{GzDecoder, GzEncoder};
use flate2::Compression;
use itertools::Itertools as _;
use once_cell::sync::Lazy;
use pprof::protos::{Function, Line, Location, Message as _, Profile};
use regex::Regex;
@@ -58,38 +57,30 @@ pub fn symbolize(mut profile: Profile) -> anyhow::Result<Profile> {
// Resolve the line and function for each location.
backtrace::resolve(loc.address as *mut c_void, |symbol| {
let Some(symname) = symbol.name() else {
let Some(symbol_name) = symbol.name() else {
return;
};
let mut name = symname.to_string();
// Strip the Rust monomorphization suffix from the symbol name.
static SUFFIX_REGEX: Lazy<Regex> =
Lazy::new(|| Regex::new("::h[0-9a-f]{16}$").expect("invalid regex"));
if let Some(m) = SUFFIX_REGEX.find(&name) {
name.truncate(m.start());
}
let function_id = match functions.get(&name) {
Some(function) => function.id,
None => {
let id = functions.len() as u64 + 1;
let system_name = String::from_utf8_lossy(symname.as_bytes());
let function_name = format!("{symbol_name:#}");
let functions_len = functions.len();
let function_id = functions
.entry(function_name)
.or_insert_with_key(|function_name| {
let function_id = functions_len as u64 + 1;
let system_name = String::from_utf8_lossy(symbol_name.as_bytes());
let filename = symbol
.filename()
.map(|path| path.to_string_lossy())
.unwrap_or(Cow::Borrowed(""));
let function = Function {
id,
name: string_id(&name),
Function {
id: function_id,
name: string_id(function_name),
system_name: string_id(&system_name),
filename: string_id(&filename),
..Default::default()
};
functions.insert(name, function);
id
}
};
}
})
.id;
loc.line.push(Line {
function_id,
line: symbol.lineno().unwrap_or(0) as i64,

View File

@@ -122,6 +122,8 @@ pub struct ConfigToml {
pub page_service_pipelining: PageServicePipeliningConfig,
pub get_vectored_concurrent_io: GetVectoredConcurrentIo,
pub enable_read_path_debugging: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub validate_wal_contiguity: Option<bool>,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -521,6 +523,7 @@ impl Default for ConfigToml {
} else {
None
},
validate_wal_contiguity: None,
}
}
}
@@ -544,10 +547,11 @@ pub mod tenant_conf_defaults {
pub const DEFAULT_COMPACTION_PERIOD: &str = "20 s";
pub const DEFAULT_COMPACTION_THRESHOLD: usize = 10;
// This value needs to be tuned to avoid OOM. We have 3/4 of the total CPU threads to do background works, that's 16*3/4=9 on
// most of our pageservers. Compaction ~50 layers requires about 2GB memory (could be reduced later by optimizing L0 hole
// calculation to avoid loading all keys into the memory). So with this config, we can get a maximum peak compaction usage of 18GB.
pub const DEFAULT_COMPACTION_UPPER_LIMIT: usize = 50;
// This value needs to be tuned to avoid OOM. We have 3/4*CPUs threads for L0 compaction, that's
// 3/4*16=9 on most of our pageservers. Compacting 20 layers requires about 1 GB memory (could
// be reduced later by optimizing L0 hole calculation to avoid loading all keys into memory). So
// with this config, we can get a maximum peak compaction usage of 9 GB.
pub const DEFAULT_COMPACTION_UPPER_LIMIT: usize = 20;
pub const DEFAULT_COMPACTION_L0_FIRST: bool = false;
pub const DEFAULT_COMPACTION_L0_SEMAPHORE: bool = true;

View File

@@ -57,6 +57,7 @@ pub struct NodeRegisterRequest {
pub listen_http_addr: String,
pub listen_http_port: u16,
pub listen_https_port: Option<u16>,
pub availability_zone_id: AvailabilityZone,
}
@@ -105,6 +106,7 @@ pub struct TenantLocateResponseShard {
pub listen_http_addr: String,
pub listen_http_port: u16,
pub listen_https_port: Option<u16>,
}
#[derive(Serialize, Deserialize)]
@@ -148,6 +150,7 @@ pub struct NodeDescribeResponse {
pub listen_http_addr: String,
pub listen_http_port: u16,
pub listen_https_port: Option<u16>,
pub listen_pg_addr: String,
pub listen_pg_port: u16,

View File

@@ -526,9 +526,13 @@ pub struct TenantConfigPatch {
#[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)]
pub struct TenantConfig {
pub checkpoint_distance: Option<u64>,
pub checkpoint_timeout: Option<String>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub checkpoint_timeout: Option<Duration>,
pub compaction_target_size: Option<u64>,
pub compaction_period: Option<String>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub compaction_period: Option<Duration>,
pub compaction_threshold: Option<usize>,
pub compaction_upper_limit: Option<usize>,
// defer parsing compaction_algorithm, like eviction_policy
@@ -539,22 +543,38 @@ pub struct TenantConfig {
pub l0_flush_stall_threshold: Option<usize>,
pub l0_flush_wait_upload: Option<bool>,
pub gc_horizon: Option<u64>,
pub gc_period: Option<String>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub gc_period: Option<Duration>,
pub image_creation_threshold: Option<usize>,
pub pitr_interval: Option<String>,
pub walreceiver_connect_timeout: Option<String>,
pub lagging_wal_timeout: Option<String>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub pitr_interval: Option<Duration>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub walreceiver_connect_timeout: Option<Duration>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub lagging_wal_timeout: Option<Duration>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
pub eviction_policy: Option<EvictionPolicy>,
pub min_resident_size_override: Option<u64>,
pub evictions_low_residence_duration_metric_threshold: Option<String>,
pub heatmap_period: Option<String>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub evictions_low_residence_duration_metric_threshold: Option<Duration>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub heatmap_period: Option<Duration>,
pub lazy_slru_download: Option<bool>,
pub timeline_get_throttle: Option<ThrottleConfig>,
pub image_layer_creation_check_threshold: Option<u8>,
pub image_creation_preempt_threshold: Option<usize>,
pub lsn_lease_length: Option<String>,
pub lsn_lease_length_for_ts: Option<String>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub lsn_lease_length: Option<Duration>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub lsn_lease_length_for_ts: Option<Duration>,
pub timeline_offloading: Option<bool>,
pub wal_receiver_protocol_override: Option<PostgresClientProtocol>,
pub rel_size_v2_enabled: Option<bool>,
@@ -564,7 +584,10 @@ pub struct TenantConfig {
}
impl TenantConfig {
pub fn apply_patch(self, patch: TenantConfigPatch) -> TenantConfig {
pub fn apply_patch(
self,
patch: TenantConfigPatch,
) -> Result<TenantConfig, humantime::DurationError> {
let Self {
mut checkpoint_distance,
mut checkpoint_timeout,
@@ -604,11 +627,17 @@ impl TenantConfig {
} = self;
patch.checkpoint_distance.apply(&mut checkpoint_distance);
patch.checkpoint_timeout.apply(&mut checkpoint_timeout);
patch
.checkpoint_timeout
.map(|v| humantime::parse_duration(&v))?
.apply(&mut checkpoint_timeout);
patch
.compaction_target_size
.apply(&mut compaction_target_size);
patch.compaction_period.apply(&mut compaction_period);
patch
.compaction_period
.map(|v| humantime::parse_duration(&v))?
.apply(&mut compaction_period);
patch.compaction_threshold.apply(&mut compaction_threshold);
patch
.compaction_upper_limit
@@ -626,15 +655,25 @@ impl TenantConfig {
.apply(&mut l0_flush_stall_threshold);
patch.l0_flush_wait_upload.apply(&mut l0_flush_wait_upload);
patch.gc_horizon.apply(&mut gc_horizon);
patch.gc_period.apply(&mut gc_period);
patch
.gc_period
.map(|v| humantime::parse_duration(&v))?
.apply(&mut gc_period);
patch
.image_creation_threshold
.apply(&mut image_creation_threshold);
patch.pitr_interval.apply(&mut pitr_interval);
patch
.pitr_interval
.map(|v| humantime::parse_duration(&v))?
.apply(&mut pitr_interval);
patch
.walreceiver_connect_timeout
.map(|v| humantime::parse_duration(&v))?
.apply(&mut walreceiver_connect_timeout);
patch.lagging_wal_timeout.apply(&mut lagging_wal_timeout);
patch
.lagging_wal_timeout
.map(|v| humantime::parse_duration(&v))?
.apply(&mut lagging_wal_timeout);
patch.max_lsn_wal_lag.apply(&mut max_lsn_wal_lag);
patch.eviction_policy.apply(&mut eviction_policy);
patch
@@ -642,8 +681,12 @@ impl TenantConfig {
.apply(&mut min_resident_size_override);
patch
.evictions_low_residence_duration_metric_threshold
.map(|v| humantime::parse_duration(&v))?
.apply(&mut evictions_low_residence_duration_metric_threshold);
patch.heatmap_period.apply(&mut heatmap_period);
patch
.heatmap_period
.map(|v| humantime::parse_duration(&v))?
.apply(&mut heatmap_period);
patch.lazy_slru_download.apply(&mut lazy_slru_download);
patch
.timeline_get_throttle
@@ -654,9 +697,13 @@ impl TenantConfig {
patch
.image_creation_preempt_threshold
.apply(&mut image_creation_preempt_threshold);
patch.lsn_lease_length.apply(&mut lsn_lease_length);
patch
.lsn_lease_length
.map(|v| humantime::parse_duration(&v))?
.apply(&mut lsn_lease_length);
patch
.lsn_lease_length_for_ts
.map(|v| humantime::parse_duration(&v))?
.apply(&mut lsn_lease_length_for_ts);
patch.timeline_offloading.apply(&mut timeline_offloading);
patch
@@ -673,7 +720,7 @@ impl TenantConfig {
.gc_compaction_ratio_percent
.apply(&mut gc_compaction_ratio_percent);
Self {
Ok(Self {
checkpoint_distance,
checkpoint_timeout,
compaction_target_size,
@@ -709,7 +756,7 @@ impl TenantConfig {
gc_compaction_enabled,
gc_compaction_initial_threshold_kb,
gc_compaction_ratio_percent,
}
})
}
}
@@ -2503,7 +2550,7 @@ mod tests {
..base.clone()
};
let patched = base.apply_patch(decoded.config);
let patched = base.apply_patch(decoded.config).unwrap();
assert_eq!(patched, expected);
}

View File

@@ -7,7 +7,6 @@ license.workspace = true
[dependencies]
anyhow.workspace = true
itertools.workspace = true
postgres.workspace = true
tokio-postgres.workspace = true
url.workspace = true

View File

@@ -171,10 +171,10 @@ impl PgConnectionConfig {
tokio_postgres::Client,
tokio_postgres::Connection<tokio_postgres::Socket, tokio_postgres::tls::NoTlsStream>,
),
postgres::Error,
tokio_postgres::Error,
> {
self.to_tokio_postgres_config()
.connect(postgres::NoTls)
.connect(tokio_postgres::NoTls)
.await
}
}

View File

@@ -278,7 +278,7 @@ pub fn generate_pg_control(
checkpoint_bytes: &[u8],
lsn: Lsn,
pg_version: u32,
) -> anyhow::Result<(Bytes, u64)> {
) -> anyhow::Result<(Bytes, u64, bool)> {
dispatch_pgversion!(
pg_version,
pgv::xlog_utils::generate_pg_control(pg_control_bytes, checkpoint_bytes, lsn),

View File

@@ -124,23 +124,59 @@ pub fn normalize_lsn(lsn: Lsn, seg_sz: usize) -> Lsn {
}
}
/// Generate a pg_control file, for a basebackup for starting up Postgres at the given LSN
///
/// 'pg_control_bytes' and 'checkpoint_bytes' are the contents of those keys persisted in
/// the pageserver. They use the same format as the PostgreSQL control file and the
/// checkpoint record, but see walingest.rs for how exactly they are kept up to date.
/// 'lsn' is the LSN at which we're starting up.
///
/// Returns:
/// - pg_control file contents
/// - system_identifier, extracted from the persisted information
/// - true, if we're starting up from a "clean shutdown", i.e. if there was a shutdown
/// checkpoint at the given LSN
pub fn generate_pg_control(
pg_control_bytes: &[u8],
checkpoint_bytes: &[u8],
lsn: Lsn,
) -> anyhow::Result<(Bytes, u64)> {
) -> anyhow::Result<(Bytes, u64, bool)> {
let mut pg_control = ControlFileData::decode(pg_control_bytes)?;
let mut checkpoint = CheckPoint::decode(checkpoint_bytes)?;
// Generate new pg_control needed for bootstrap
//
// NB: In the checkpoint struct that we persist in the pageserver, we have a different
// convention for the 'redo' field than in PostgreSQL: On a shutdown checkpoint,
// 'redo' points the *end* of the checkpoint WAL record. On PostgreSQL, it points to
// the beginning. Furthermore, on an online checkpoint, 'redo' is set to 0.
//
// We didn't always have this convention however, and old persisted records will have
// old REDO values that point to some old LSN.
//
// The upshot is that if 'redo' is equal to the "current" LSN, there was a shutdown
// checkpoint record at that point in WAL, with no new WAL records after it. That case
// can be treated as starting from a clean shutdown. All other cases are treated as
// non-clean shutdown. In Neon, we don't do WAL replay at startup in either case, so
// that distinction doesn't matter very much. As of this writing, it only affects
// whether the persisted pg_stats information can be used or not.
//
// In the Checkpoint struct in the returned pg_control file, the redo pointer is
// always set to the LSN we're starting at, to hint that no WAL replay is required.
// (There's some neon-specific code in Postgres startup to make that work, though.
// Just setting the redo pointer is not sufficient.)
let was_shutdown = Lsn(checkpoint.redo) == lsn;
checkpoint.redo = normalize_lsn(lsn, WAL_SEGMENT_SIZE).0;
//save new values in pg_control
// We use DBState_DB_SHUTDOWNED even if it was not a clean shutdown. The
// neon-specific code at postgres startup ignores the state stored in the control
// file, similar to archive recovery in standalone PostgreSQL. Similarly, the
// checkPoint pointer is ignored, so just set it to 0.
pg_control.checkPoint = 0;
pg_control.checkPointCopy = checkpoint;
pg_control.state = DBState_DB_SHUTDOWNED;
Ok((pg_control.encode(), pg_control.system_identifier))
Ok((pg_control.encode(), pg_control.system_identifier, was_shutdown))
}
pub fn get_current_timestamp() -> TimestampTz {

View File

@@ -1,7 +1,7 @@
[package]
name = "postgres-protocol2"
version = "0.1.0"
edition = "2021"
edition = "2024"
license = "MIT/Apache-2.0"
[dependencies]

View File

@@ -1,14 +1,12 @@
//! SASL-based authentication support.
use std::fmt::Write;
use std::{io, iter, mem, str};
use hmac::{Hmac, Mac};
use rand::{self, Rng};
use sha2::digest::FixedOutput;
use sha2::{Digest, Sha256};
use std::fmt::Write;
use std::io;
use std::iter;
use std::mem;
use std::str;
use tokio::task::yield_now;
const NONCE_LENGTH: usize = 24;
@@ -493,11 +491,9 @@ mod test {
let nonce = "9IZ2O01zb9IgiIZ1WJ/zgpJB";
let client_first = "n,,n=,r=9IZ2O01zb9IgiIZ1WJ/zgpJB";
let server_first =
"r=9IZ2O01zb9IgiIZ1WJ/zgpJBjx/oIRLs02gGSHcw1KEty3eY,s=fs3IXBy7U7+IvVjZ,i\
let server_first = "r=9IZ2O01zb9IgiIZ1WJ/zgpJBjx/oIRLs02gGSHcw1KEty3eY,s=fs3IXBy7U7+IvVjZ,i\
=4096";
let client_final =
"c=biws,r=9IZ2O01zb9IgiIZ1WJ/zgpJBjx/oIRLs02gGSHcw1KEty3eY,p=AmNKosjJzS3\
let client_final = "c=biws,r=9IZ2O01zb9IgiIZ1WJ/zgpJBjx/oIRLs02gGSHcw1KEty3eY,p=AmNKosjJzS3\
1NTlQYNs5BTeQjdHdk7lOflDo5re2an8=";
let server_final = "v=U+ppxD5XUKtradnv8e2MkeupiA8FU87Sg8CXzXHDAzw=";

View File

@@ -11,9 +11,10 @@
//! set to `UTF8`. It will most likely not behave properly if that is not the case.
#![warn(missing_docs, clippy::all)]
use std::io;
use byteorder::{BigEndian, ByteOrder};
use bytes::{BufMut, BytesMut};
use std::io;
pub mod authentication;
pub mod escape;

View File

@@ -1,13 +1,13 @@
#![allow(missing_docs)]
use std::io::{self, Read};
use std::ops::Range;
use std::{cmp, str};
use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use bytes::{Bytes, BytesMut};
use fallible_iterator::FallibleIterator;
use memchr::memchr;
use std::cmp;
use std::io::{self, Read};
use std::ops::Range;
use std::str;
use crate::Oid;

View File

@@ -1,13 +1,13 @@
//! Frontend message serialization.
#![allow(missing_docs)]
use std::error::Error;
use std::{io, marker};
use byteorder::{BigEndian, ByteOrder};
use bytes::{Buf, BufMut, BytesMut};
use std::error::Error;
use std::io;
use std::marker;
use crate::{write_nullable, FromUsize, IsNull, Oid};
use crate::{FromUsize, IsNull, Oid, write_nullable};
#[inline]
fn write_body<F, E>(buf: &mut BytesMut, f: F) -> Result<(), E>

View File

@@ -6,12 +6,13 @@
//! side. This is good because it ensures the cleartext password won't
//! end up in logs pg_stat displays, etc.
use crate::authentication::sasl;
use hmac::{Hmac, Mac};
use rand::RngCore;
use sha2::digest::FixedOutput;
use sha2::{Digest, Sha256};
use crate::authentication::sasl;
#[cfg(test)]
mod test;

View File

@@ -1,11 +1,12 @@
//! Conversions to and from Postgres's binary format for various types.
use byteorder::{BigEndian, ReadBytesExt};
use bytes::{BufMut, BytesMut};
use fallible_iterator::FallibleIterator;
use std::boxed::Box as StdBox;
use std::error::Error;
use std::str;
use byteorder::{BigEndian, ReadBytesExt};
use bytes::{BufMut, BytesMut};
use fallible_iterator::FallibleIterator;
use crate::Oid;
#[cfg(test)]

View File

@@ -1,7 +1,7 @@
[package]
name = "postgres-types2"
version = "0.1.0"
edition = "2021"
edition = "2024"
license = "MIT/Apache-2.0"
[dependencies]

View File

@@ -4,19 +4,18 @@
//! unless you want to define your own `ToSql` or `FromSql` definitions.
#![warn(clippy::all, missing_docs)]
use fallible_iterator::FallibleIterator;
use postgres_protocol2::types;
use std::any::type_name;
use std::error::Error;
use std::fmt;
use std::sync::Arc;
use crate::type_gen::{Inner, Other};
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
#[doc(inline)]
pub use postgres_protocol2::Oid;
use postgres_protocol2::types;
use bytes::BytesMut;
use crate::type_gen::{Inner, Other};
/// Generates a simple implementation of `ToSql::accepts` which accepts the
/// types passed to it.

View File

@@ -1,7 +1,9 @@
use crate::{FromSql, Type};
pub use bytes::BytesMut;
use std::error::Error;
pub use bytes::BytesMut;
use crate::{FromSql, Type};
pub fn read_be_i32(buf: &mut &[u8]) -> Result<i32, Box<dyn Error + Sync + Send>> {
if buf.len() < 4 {
return Err("invalid buffer size".into());

View File

@@ -1,22 +1,19 @@
[package]
name = "tokio-postgres2"
version = "0.1.0"
edition = "2021"
edition = "2024"
license = "MIT/Apache-2.0"
[dependencies]
async-trait.workspace = true
bytes.workspace = true
byteorder.workspace = true
fallible-iterator.workspace = true
futures-util = { workspace = true, features = ["sink"] }
log = "0.4"
parking_lot.workspace = true
percent-encoding = "2.0"
pin-project-lite.workspace = true
phf = "0.11"
postgres-protocol2 = { path = "../postgres-protocol2" }
postgres-types2 = { path = "../postgres-types2" }
tokio = { workspace = true, features = ["io-util", "time", "net"] }
tokio-util = { workspace = true, features = ["codec"] }
serde = { workspace = true, features = ["derive"] }
serde = { workspace = true, features = ["derive"] }

View File

@@ -1,10 +1,11 @@
use std::io;
use tokio::net::TcpStream;
use crate::client::SocketConfig;
use crate::config::{Host, SslMode};
use crate::tls::MakeTlsConnect;
use crate::{cancel_query_raw, connect_socket, Error};
use std::io;
use crate::{Error, cancel_query_raw, connect_socket};
pub(crate) async fn cancel_query<T>(
config: Option<SocketConfig>,
@@ -22,7 +23,7 @@ where
return Err(Error::connect(io::Error::new(
io::ErrorKind::InvalidInput,
"unknown host",
)))
)));
}
};

View File

@@ -1,10 +1,11 @@
use crate::config::SslMode;
use crate::tls::TlsConnect;
use crate::{connect_tls, Error};
use bytes::BytesMut;
use postgres_protocol2::message::frontend;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use crate::config::SslMode;
use crate::tls::TlsConnect;
use crate::{Error, connect_tls};
pub async fn cancel_query_raw<S, T>(
stream: S,
mode: SslMode,

View File

@@ -1,12 +1,12 @@
use crate::config::SslMode;
use crate::tls::TlsConnect;
use crate::{cancel_query, client::SocketConfig, tls::MakeTlsConnect};
use crate::{cancel_query_raw, Error};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use crate::client::SocketConfig;
use crate::config::SslMode;
use crate::tls::{MakeTlsConnect, TlsConnect};
use crate::{Error, cancel_query, cancel_query_raw};
/// The capability to request cancellation of in-progress queries on a
/// connection.
#[derive(Clone, Serialize, Deserialize)]

View File

@@ -1,31 +1,28 @@
use crate::codec::{BackendMessages, FrontendMessage};
use crate::config::Host;
use crate::config::SslMode;
use crate::connection::{Request, RequestMessages};
use crate::query::RowStream;
use crate::simple_query::SimpleQueryStream;
use crate::types::{Oid, ToSql, Type};
use crate::{
query, simple_query, slice_iter, CancelToken, Error, ReadyForQueryStatus, Row,
SimpleQueryMessage, Statement, Transaction, TransactionBuilder,
};
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
use futures_util::{future, ready, TryStreamExt};
use parking_lot::Mutex;
use postgres_protocol2::message::{backend::Message, frontend};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
use futures_util::{TryStreamExt, future, ready};
use parking_lot::Mutex;
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use std::time::Duration;
use crate::codec::{BackendMessages, FrontendMessage};
use crate::config::{Host, SslMode};
use crate::connection::{Request, RequestMessages};
use crate::query::RowStream;
use crate::simple_query::SimpleQueryStream;
use crate::types::{Oid, ToSql, Type};
use crate::{
CancelToken, Error, ReadyForQueryStatus, Row, SimpleQueryMessage, Statement, Transaction,
TransactionBuilder, query, simple_query, slice_iter,
};
pub struct Responses {
receiver: mpsc::Receiver<BackendMessages>,

View File

@@ -1,8 +1,9 @@
use std::io;
use bytes::{Buf, Bytes, BytesMut};
use fallible_iterator::FallibleIterator;
use postgres_protocol2::message::backend;
use postgres_protocol2::message::frontend::CopyData;
use std::io;
use tokio_util::codec::{Decoder, Encoder};
pub enum FrontendMessage {

View File

@@ -1,21 +1,19 @@
//! Connection configuration.
use crate::connect::connect;
use crate::connect_raw::connect_raw;
use crate::connect_raw::RawConnection;
use crate::tls::MakeTlsConnect;
use crate::tls::TlsConnect;
use crate::{Client, Connection, Error};
use postgres_protocol2::message::frontend::StartupMessageParams;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::str;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite};
use std::{fmt, str};
pub use postgres_protocol2::authentication::sasl::ScramKeys;
use postgres_protocol2::message::frontend::StartupMessageParams;
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use crate::connect::connect;
use crate::connect_raw::{RawConnection, connect_raw};
use crate::tls::{MakeTlsConnect, TlsConnect};
use crate::{Client, Connection, Error};
/// TLS configuration.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]

View File

@@ -1,3 +1,7 @@
use postgres_protocol2::message::backend::Message;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use crate::client::SocketConfig;
use crate::codec::BackendMessage;
use crate::config::Host;
@@ -5,9 +9,6 @@ use crate::connect_raw::connect_raw;
use crate::connect_socket::connect_socket;
use crate::tls::{MakeTlsConnect, TlsConnect};
use crate::{Client, Config, Connection, Error, RawConnection};
use postgres_protocol2::message::backend::Message;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
pub async fn connect<T>(
mut tls: T,

View File

@@ -1,22 +1,24 @@
use std::collections::HashMap;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
use futures_util::{Sink, SinkExt, Stream, TryStreamExt, ready};
use postgres_protocol2::authentication::sasl;
use postgres_protocol2::authentication::sasl::ScramSha256;
use postgres_protocol2::message::backend::{AuthenticationSaslBody, Message, NoticeResponseBody};
use postgres_protocol2::message::frontend;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::Framed;
use crate::Error;
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
use crate::config::{self, AuthKeys, Config};
use crate::connect_tls::connect_tls;
use crate::maybe_tls_stream::MaybeTlsStream;
use crate::tls::{TlsConnect, TlsStream};
use crate::Error;
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
use futures_util::{ready, Sink, SinkExt, Stream, TryStreamExt};
use postgres_protocol2::authentication::sasl;
use postgres_protocol2::authentication::sasl::ScramSha256;
use postgres_protocol2::message::backend::{AuthenticationSaslBody, Message, NoticeResponseBody};
use postgres_protocol2::message::frontend;
use std::collections::HashMap;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::Framed;
pub struct StartupStream<S, T> {
inner: Framed<MaybeTlsStream<S, T>, PostgresCodec>,
@@ -158,7 +160,7 @@ where
| Some(Message::AuthenticationSspi) => {
return Err(Error::authentication(
"unsupported authentication method".into(),
))
));
}
Some(Message::ErrorResponse(body)) => return Err(Error::db(body)),
Some(_) => return Err(Error::unexpected_message()),

View File

@@ -1,11 +1,13 @@
use crate::config::Host;
use crate::Error;
use std::future::Future;
use std::io;
use std::time::Duration;
use tokio::net::{self, TcpStream};
use tokio::time;
use crate::Error;
use crate::config::Host;
pub(crate) async fn connect_socket(
host: &Host,
port: u16,

View File

@@ -1,12 +1,13 @@
use crate::config::SslMode;
use crate::maybe_tls_stream::MaybeTlsStream;
use crate::tls::private::ForcePrivateApi;
use crate::tls::TlsConnect;
use crate::Error;
use bytes::BytesMut;
use postgres_protocol2::message::frontend;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use crate::Error;
use crate::config::SslMode;
use crate::maybe_tls_stream::MaybeTlsStream;
use crate::tls::TlsConnect;
use crate::tls::private::ForcePrivateApi;
pub async fn connect_tls<S, T>(
mut stream: S,
mode: SslMode,
@@ -19,7 +20,7 @@ where
match mode {
SslMode::Disable => return Ok(MaybeTlsStream::Raw(stream)),
SslMode::Prefer if !tls.can_connect(ForcePrivateApi) => {
return Ok(MaybeTlsStream::Raw(stream))
return Ok(MaybeTlsStream::Raw(stream));
}
SslMode::Prefer | SslMode::Require => {}
}

View File

@@ -1,22 +1,24 @@
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
use crate::error::DbError;
use crate::maybe_tls_stream::MaybeTlsStream;
use crate::{AsyncMessage, Error, Notification};
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
use futures_util::{ready, Sink, Stream};
use log::{info, trace};
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use std::collections::{HashMap, VecDeque};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
use futures_util::{Sink, Stream, ready};
use log::{info, trace};
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::mpsc;
use tokio_util::codec::Framed;
use tokio_util::sync::PollSender;
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
use crate::error::DbError;
use crate::maybe_tls_stream::MaybeTlsStream;
use crate::{AsyncMessage, Error, Notification};
pub enum RequestMessages {
Single(FrontendMessage),
}
@@ -139,7 +141,7 @@ where
Some(response) => response,
None => match messages.next().map_err(Error::parse)? {
Some(Message::ErrorResponse(error)) => {
return Poll::Ready(Err(Error::db(error)))
return Poll::Ready(Err(Error::db(error)));
}
_ => return Poll::Ready(Err(Error::unexpected_message())),
},

View File

@@ -1,10 +1,10 @@
//! Errors.
use std::error::{self, Error as _Error};
use std::{fmt, io};
use fallible_iterator::FallibleIterator;
use postgres_protocol2::message::backend::{ErrorFields, ErrorResponseBody};
use std::error::{self, Error as _Error};
use std::fmt;
use std::io;
pub use self::sqlstate::*;

View File

@@ -1,9 +1,10 @@
#![allow(async_fn_in_trait)]
use postgres_protocol2::Oid;
use crate::query::RowStream;
use crate::types::Type;
use crate::{Client, Error, Transaction};
use postgres_protocol2::Oid;
mod private {
pub trait Sealed {}

View File

@@ -1,6 +1,8 @@
//! An asynchronous, pipelined, PostgreSQL client.
#![warn(clippy::all)]
use postgres_protocol2::message::backend::ReadyForQueryBody;
pub use crate::cancel_token::CancelToken;
pub use crate::client::{Client, SocketConfig};
pub use crate::config::Config;
@@ -17,7 +19,6 @@ pub use crate::tls::NoTls;
pub use crate::transaction::Transaction;
pub use crate::transaction_builder::{IsolationLevel, TransactionBuilder};
use crate::types::ToSql;
use postgres_protocol2::message::backend::ReadyForQueryBody;
/// After executing a query, the connection will be in one of these states
#[derive(Clone, Copy, Debug, PartialEq)]

View File

@@ -1,12 +1,14 @@
//! MaybeTlsStream.
//!
//! Represents a stream that may or may not be encrypted with TLS.
use crate::tls::{ChannelBinding, TlsStream};
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use crate::tls::{ChannelBinding, TlsStream};
/// A stream that may or may not be encrypted with TLS.
pub enum MaybeTlsStream<S, T> {
/// An unencrypted stream.

View File

@@ -1,18 +1,19 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use bytes::Bytes;
use fallible_iterator::FallibleIterator;
use futures_util::{TryStreamExt, pin_mut};
use log::debug;
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use crate::client::InnerClient;
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::types::{Field, Kind, Oid, Type};
use crate::{query, slice_iter};
use crate::{Column, Error, Statement};
use bytes::Bytes;
use fallible_iterator::FallibleIterator;
use futures_util::{pin_mut, TryStreamExt};
use log::debug;
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use crate::{Column, Error, Statement, query, slice_iter};
pub(crate) const TYPEINFO_QUERY: &str = "\
SELECT t.typname, t.typtype, t.typelem, r.rngsubtype, t.typbasetype, n.nspname, t.typrelid

View File

@@ -1,22 +1,24 @@
use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::types::IsNull;
use crate::{Column, Error, ReadyForQueryStatus, Row, Statement};
use bytes::{BufMut, Bytes, BytesMut};
use fallible_iterator::FallibleIterator;
use futures_util::{ready, Stream};
use log::{debug, log_enabled, Level};
use pin_project_lite::pin_project;
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use postgres_types2::{Format, ToSql, Type};
use std::fmt;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use bytes::{BufMut, Bytes, BytesMut};
use fallible_iterator::FallibleIterator;
use futures_util::{Stream, ready};
use log::{Level, debug, log_enabled};
use pin_project_lite::pin_project;
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use postgres_types2::{Format, ToSql, Type};
use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::types::IsNull;
use crate::{Column, Error, ReadyForQueryStatus, Row, Statement};
struct BorrowToSqlParamsDebug<'a>(&'a [&'a (dyn ToSql + Sync)]);
impl fmt::Debug for BorrowToSqlParamsDebug<'_> {
@@ -257,7 +259,7 @@ impl Stream for RowStream {
this.statement.clone(),
body,
*this.output_format,
)?)))
)?)));
}
Message::EmptyQueryResponse | Message::PortalSuspended => {}
Message::CommandComplete(body) => {

View File

@@ -1,17 +1,18 @@
//! Rows.
use std::ops::Range;
use std::sync::Arc;
use std::{fmt, str};
use fallible_iterator::FallibleIterator;
use postgres_protocol2::message::backend::DataRowBody;
use postgres_types2::{Format, WrongFormat};
use crate::row::sealed::{AsName, Sealed};
use crate::simple_query::SimpleColumn;
use crate::statement::Column;
use crate::types::{FromSql, Type, WrongType};
use crate::{Error, Statement};
use fallible_iterator::FallibleIterator;
use postgres_protocol2::message::backend::DataRowBody;
use postgres_types2::{Format, WrongFormat};
use std::fmt;
use std::ops::Range;
use std::str;
use std::sync::Arc;
mod sealed {
pub trait Sealed {}

View File

@@ -1,19 +1,21 @@
use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::{Error, ReadyForQueryStatus, SimpleQueryMessage, SimpleQueryRow};
use bytes::Bytes;
use fallible_iterator::FallibleIterator;
use futures_util::{ready, Stream};
use log::debug;
use pin_project_lite::pin_project;
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use bytes::Bytes;
use fallible_iterator::FallibleIterator;
use futures_util::{Stream, ready};
use log::debug;
use pin_project_lite::pin_project;
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::{Error, ReadyForQueryStatus, SimpleQueryMessage, SimpleQueryRow};
/// Information about a column of a single query row.
#[derive(Debug)]
pub struct SimpleColumn {

View File

@@ -1,15 +1,14 @@
use std::fmt;
use std::sync::{Arc, Weak};
use postgres_protocol2::Oid;
use postgres_protocol2::message::backend::Field;
use postgres_protocol2::message::frontend;
use crate::client::InnerClient;
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::types::Type;
use postgres_protocol2::{
message::{backend::Field, frontend},
Oid,
};
use std::{
fmt,
sync::{Arc, Weak},
};
struct StatementInner {
client: Weak<InnerClient>,

View File

@@ -5,6 +5,7 @@ use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{fmt, io};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
pub(crate) mod private {

View File

@@ -1,8 +1,9 @@
use postgres_protocol2::message::frontend;
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::query::RowStream;
use crate::{CancelToken, Client, Error, ReadyForQueryStatus};
use postgres_protocol2::message::frontend;
/// A representation of a PostgreSQL database transaction.
///

View File

@@ -24,11 +24,10 @@ diatomic-waker.workspace = true
git-version.workspace = true
hex = { workspace = true, features = ["serde"] }
humantime.workspace = true
inferno.workspace = true
fail.workspace = true
futures = { workspace = true }
jsonwebtoken.workspace = true
nix = {workspace = true, features = [ "ioctl" ] }
nix = { workspace = true, features = ["ioctl"] }
once_cell.workspace = true
pin-project-lite.workspace = true
regex.workspace = true
@@ -62,6 +61,7 @@ bytes.workspace = true
criterion.workspace = true
hex-literal.workspace = true
camino-tempfile.workspace = true
pprof.workspace = true
serde_assert.workspace = true
tokio = { workspace = true, features = ["test-util"] }

View File

@@ -0,0 +1,26 @@
## Utils Benchmarks
To run benchmarks:
```sh
# All benchmarks.
cargo bench --package utils
# Specific file.
cargo bench --package utils --bench benchmarks
# Specific benchmark.
cargo bench --package utils --bench benchmarks warn_slow/enabled=true
# List available benchmarks.
cargo bench --package utils --benches -- --list
# Generate flamegraph profiles using pprof-rs, profiling for 10 seconds.
# Output in target/criterion/*/profile/flamegraph.svg.
cargo bench --package utils --bench benchmarks warn_slow/enabled=true --profile-time 10
```
Additional charts and statistics are available in `target/criterion/report/index.html`.
Benchmarks are automatically compared against the previous run. To compare against other runs, see
`--baseline` and `--save-baseline`.

View File

@@ -1,5 +1,18 @@
use criterion::{criterion_group, criterion_main, Criterion};
use std::time::Duration;
use criterion::{criterion_group, criterion_main, Bencher, Criterion};
use pprof::criterion::{Output, PProfProfiler};
use utils::id;
use utils::logging::warn_slow;
// Register benchmarks with Criterion.
criterion_group!(
name = benches;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = bench_id_stringify,
bench_warn_slow,
);
criterion_main!(benches);
pub fn bench_id_stringify(c: &mut Criterion) {
// Can only use public methods.
@@ -16,5 +29,31 @@ pub fn bench_id_stringify(c: &mut Criterion) {
});
}
criterion_group!(benches, bench_id_stringify);
criterion_main!(benches);
pub fn bench_warn_slow(c: &mut Criterion) {
for enabled in [false, true] {
c.bench_function(&format!("warn_slow/enabled={enabled}"), |b| {
run_bench(b, enabled).unwrap()
});
}
// The actual benchmark.
fn run_bench(b: &mut Bencher, enabled: bool) -> anyhow::Result<()> {
const THRESHOLD: Duration = Duration::from_secs(1);
// Use a multi-threaded runtime to avoid thread parking overhead when yielding.
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
// Test both with and without warn_slow, since we're essentially measuring Tokio scheduling
// performance too. Use a simple noop future that yields once, to avoid any scheduler fast
// paths for a ready future.
if enabled {
b.iter(|| runtime.block_on(warn_slow("ready", THRESHOLD, tokio::task::yield_now())));
} else {
b.iter(|| runtime.block_on(tokio::task::yield_now()));
}
Ok(())
}
}

View File

@@ -1,9 +1,13 @@
use std::future::Future;
use std::str::FromStr;
use std::time::Duration;
use anyhow::Context;
use metrics::{IntCounter, IntCounterVec};
use once_cell::sync::Lazy;
use strum_macros::{EnumString, VariantNames};
use tokio::time::Instant;
use tracing::warn;
/// Logs a critical error, similarly to `tracing::error!`. This will:
///
@@ -318,6 +322,41 @@ impl std::fmt::Debug for SecretString {
}
}
/// Logs a periodic warning if a future is slow to complete.
///
/// This is performance-sensitive as it's used on the GetPage read path.
#[inline]
pub async fn warn_slow<O>(name: &str, threshold: Duration, f: impl Future<Output = O>) -> O {
// TODO: we unfortunately have to pin the future on the heap, since GetPage futures are huge and
// won't fit on the stack.
let mut f = Box::pin(f);
let started = Instant::now();
let mut attempt = 1;
loop {
// NB: use timeout_at() instead of timeout() to avoid an extra clock reading in the common
// case where the timeout doesn't fire.
let deadline = started + attempt * threshold;
if let Ok(output) = tokio::time::timeout_at(deadline, &mut f).await {
// NB: we check if we exceeded the threshold even if the timeout never fired, because
// scheduling or execution delays may cause the future to succeed even if it exceeds the
// timeout. This costs an extra unconditional clock reading, but seems worth it to avoid
// false negatives.
let elapsed = started.elapsed();
if elapsed >= threshold {
warn!("slow {name} completed after {:.3}s", elapsed.as_secs_f64());
}
return output;
}
let elapsed = started.elapsed().as_secs_f64();
warn!("slow {name} still running after {elapsed:.3}s",);
attempt += 1;
}
}
#[cfg(test)]
mod tests {
use metrics::{core::Opts, IntCounterVec};

View File

@@ -1,7 +1,7 @@
[package]
name = "vm_monitor"
version = "0.1.0"
edition.workspace = true
edition = "2024"
license.workspace = true
[[bin]]

View File

@@ -1,12 +1,10 @@
use std::fmt::{self, Debug, Formatter};
use std::time::{Duration, Instant};
use anyhow::{anyhow, Context};
use cgroups_rs::{
hierarchies::{self, is_cgroup2_unified_mode},
memory::MemController,
Subsystem,
};
use anyhow::{Context, anyhow};
use cgroups_rs::Subsystem;
use cgroups_rs::hierarchies::{self, is_cgroup2_unified_mode};
use cgroups_rs::memory::MemController;
use tokio::sync::watch;
use tracing::{info, warn};

View File

@@ -6,17 +6,15 @@
//! the cgroup (requesting upscale), and the signals that go to the cgroup
//! (notifying it of upscale).
use anyhow::{bail, Context};
use anyhow::{Context, bail};
use axum::extract::ws::{Message, Utf8Bytes, WebSocket};
use futures::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
use futures::stream::{SplitSink, SplitStream};
use futures::{SinkExt, StreamExt};
use tracing::{debug, info};
use crate::protocol::{
OutboundMsg, OutboundMsgKind, ProtocolRange, ProtocolResponse, ProtocolVersion,
PROTOCOL_MAX_VERSION, PROTOCOL_MIN_VERSION,
OutboundMsg, OutboundMsgKind, PROTOCOL_MAX_VERSION, PROTOCOL_MIN_VERSION, ProtocolRange,
ProtocolResponse, ProtocolVersion,
};
/// The central handler for all communications in the monitor.

View File

@@ -2,12 +2,14 @@
use std::num::NonZeroU64;
use crate::MiB;
use anyhow::{anyhow, Context};
use tokio_postgres::{types::ToSql, Client, NoTls, Row};
use anyhow::{Context, anyhow};
use tokio_postgres::types::ToSql;
use tokio_postgres::{Client, NoTls, Row};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
use crate::MiB;
/// Manages Postgres' file cache by keeping a connection open.
#[derive(Debug)]
pub struct FileCacheState {

View File

@@ -2,24 +2,26 @@
#![deny(clippy::undocumented_unsafe_blocks)]
#![cfg(target_os = "linux")]
use std::fmt::Debug;
use std::net::SocketAddr;
use std::time::Duration;
use anyhow::Context;
use axum::{
extract::{ws::WebSocket, State, WebSocketUpgrade},
response::Response,
};
use axum::{routing::get, Router};
use axum::Router;
use axum::extract::ws::WebSocket;
use axum::extract::{State, WebSocketUpgrade};
use axum::response::Response;
use axum::routing::get;
use clap::Parser;
use futures::Future;
use std::net::SocketAddr;
use std::{fmt::Debug, time::Duration};
use runner::Runner;
use sysinfo::{RefreshKind, System, SystemExt};
use tokio::net::TcpListener;
use tokio::{sync::broadcast, task::JoinHandle};
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
use runner::Runner;
// Code that interfaces with agent
pub mod dispatcher;
pub mod protocol;

View File

@@ -35,7 +35,8 @@
use core::fmt;
use std::cmp;
use serde::{de::Error, Deserialize, Serialize};
use serde::de::Error;
use serde::{Deserialize, Serialize};
/// A Message we send to the agent.
#[derive(Serialize, Deserialize, Debug, Clone)]

View File

@@ -7,7 +7,7 @@
use std::fmt::Debug;
use std::time::{Duration, Instant};
use anyhow::{bail, Context};
use anyhow::{Context, bail};
use axum::extract::ws::{Message, WebSocket};
use futures::StreamExt;
use tokio::sync::{broadcast, watch};
@@ -18,7 +18,7 @@ use crate::cgroup::{self, CgroupWatcher};
use crate::dispatcher::Dispatcher;
use crate::filecache::{FileCacheConfig, FileCacheState};
use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources};
use crate::{bytes_to_mebibytes, get_total_system_memory, spawn_with_cancel, Args, MiB};
use crate::{Args, MiB, bytes_to_mebibytes, get_total_system_memory, spawn_with_cancel};
/// Central struct that interacts with agent, dispatcher, and cgroup to handle
/// signals from the agent.
@@ -233,7 +233,9 @@ impl Runner {
//
// TODO: make the duration here configurable.
if last_time.elapsed() > Duration::from_secs(5) {
bail!("haven't gotten cgroup memory stats recently enough to determine downscaling information");
bail!(
"haven't gotten cgroup memory stats recently enough to determine downscaling information"
);
} else if last_history.samples_count <= 1 {
let status = "haven't received enough cgroup memory stats yet";
info!(status, "discontinuing downscale");

View File

@@ -5,6 +5,7 @@ package interpreted_wal;
message InterpretedWalRecords {
repeated InterpretedWalRecord records = 1;
optional uint64 next_record_lsn = 2;
optional uint64 raw_wal_start_lsn = 3;
}
message InterpretedWalRecord {

View File

@@ -60,7 +60,11 @@ pub struct InterpretedWalRecords {
pub records: Vec<InterpretedWalRecord>,
// Start LSN of the next record after the batch.
// Note that said record may not belong to the current shard.
pub next_record_lsn: Option<Lsn>,
pub next_record_lsn: Lsn,
// Inclusive start LSN of the PG WAL from which the interpreted
// WAL records were extracted. Note that this is not necessarily the
// start LSN of the first interpreted record in the batch.
pub raw_wal_start_lsn: Option<Lsn>,
}
/// An interpreted Postgres WAL record, ready to be handled by the pageserver

View File

@@ -167,7 +167,8 @@ impl TryFrom<InterpretedWalRecords> for proto::InterpretedWalRecords {
.collect::<Result<Vec<_>, _>>()?;
Ok(proto::InterpretedWalRecords {
records,
next_record_lsn: value.next_record_lsn.map(|l| l.0),
next_record_lsn: Some(value.next_record_lsn.0),
raw_wal_start_lsn: value.raw_wal_start_lsn.map(|l| l.0),
})
}
}
@@ -254,7 +255,11 @@ impl TryFrom<proto::InterpretedWalRecords> for InterpretedWalRecords {
Ok(InterpretedWalRecords {
records,
next_record_lsn: value.next_record_lsn.map(Lsn::from),
next_record_lsn: value
.next_record_lsn
.map(Lsn::from)
.expect("Always provided"),
raw_wal_start_lsn: value.raw_wal_start_lsn.map(Lsn::from),
})
}
}

View File

@@ -40,7 +40,6 @@ num_cpus.workspace = true
num-traits.workspace = true
once_cell.workspace = true
pin-project-lite.workspace = true
postgres.workspace = true
postgres_backend.workspace = true
postgres-protocol.workspace = true
postgres-types.workspace = true

View File

@@ -21,5 +21,4 @@ tokio.workspace = true
futures.workspace = true
tokio-util.workspace = true
anyhow.workspace = true
postgres.workspace = true
bytes.workspace = true

View File

@@ -34,7 +34,8 @@ pub struct BasebackupRequest {
impl Client {
pub async fn new(connstring: String) -> anyhow::Result<Self> {
let (client, connection) = tokio_postgres::connect(&connstring, postgres::NoTls).await?;
let (client, connection) =
tokio_postgres::connect(&connstring, tokio_postgres::NoTls).await?;
let conn_task_cancel = CancellationToken::new();
let conn_task = tokio::spawn({

View File

@@ -345,6 +345,7 @@ impl AuxFileV2 {
AuxFileV2::Recognized("pg_logical/replorigin_checkpoint", hash)
}
(2, 1) => AuxFileV2::Recognized("pg_replslot/", hash),
(3, 1) => AuxFileV2::Recognized("pg_stat/pgstat.stat", hash),
(1, 0xff) => AuxFileV2::OtherWithPrefix("pg_logical/", hash),
(0xff, 0xff) => AuxFileV2::Other(hash),
_ => return None,

View File

@@ -39,6 +39,7 @@ fn aux_hash_to_metadata_key(dir_level1: u8, dir_level2: u8, data: &[u8]) -> Key
const AUX_DIR_PG_LOGICAL: u8 = 0x01;
const AUX_DIR_PG_REPLSLOT: u8 = 0x02;
const AUX_DIR_PG_STAT: u8 = 0x03;
const AUX_DIR_PG_UNKNOWN: u8 = 0xFF;
/// Encode the aux file into a fixed-size key.
@@ -53,6 +54,7 @@ const AUX_DIR_PG_UNKNOWN: u8 = 0xFF;
/// * pg_logical/replorigin_checkpoint -> 0x0103
/// * pg_logical/others -> 0x01FF
/// * pg_replslot/ -> 0x0201
/// * pg_stat/pgstat.stat -> 0x0301
/// * others -> 0xFFFF
///
/// If you add new AUX files to this function, please also add a test case to `test_encoding_portable`.
@@ -75,6 +77,8 @@ pub fn encode_aux_file_key(path: &str) -> Key {
aux_hash_to_metadata_key(AUX_DIR_PG_LOGICAL, 0xFF, fname.as_bytes())
} else if let Some(fname) = path.strip_prefix("pg_replslot/") {
aux_hash_to_metadata_key(AUX_DIR_PG_REPLSLOT, 0x01, fname.as_bytes())
} else if let Some(fname) = path.strip_prefix("pg_stat/") {
aux_hash_to_metadata_key(AUX_DIR_PG_STAT, 0x01, fname.as_bytes())
} else {
if cfg!(debug_assertions) {
warn!(

View File

@@ -264,6 +264,31 @@ where
async fn send_tarball(mut self) -> Result<(), BasebackupError> {
// TODO include checksum
// Construct the pg_control file from the persisted checkpoint and pg_control
// information. But we only add this to the tarball at the end, so that if the
// writing is interrupted half-way through, the resulting incomplete tarball will
// be missing the pg_control file, which prevents PostgreSQL from starting up on
// it. With proper error handling, you should never try to start up from an
// incomplete basebackup in the first place, of course, but this is a nice little
// extra safety measure.
let checkpoint_bytes = self
.timeline
.get_checkpoint(self.lsn, self.ctx)
.await
.context("failed to get checkpoint bytes")?;
let pg_control_bytes = self
.timeline
.get_control_file(self.lsn, self.ctx)
.await
.context("failed to get control bytes")?;
let (pg_control_bytes, system_identifier, was_shutdown) =
postgres_ffi::generate_pg_control(
&pg_control_bytes,
&checkpoint_bytes,
self.lsn,
self.timeline.pg_version,
)?;
let lazy_slru_download = self.timeline.get_lazy_slru_download() && !self.full_backup;
let pgversion = self.timeline.pg_version;
@@ -401,6 +426,10 @@ where
// In future we will not generate AUX record for "pg_logical/replorigin_checkpoint" at all,
// but now we should handle (skip) it for backward compatibility.
continue;
} else if path == "pg_stat/pgstat.stat" && !was_shutdown {
// Drop statistic in case of abnormal termination, i.e. if we're not starting from the exact LSN
// of a shutdown checkpoint.
continue;
}
let header = new_tar_header(&path, content.len() as u64)?;
self.ar
@@ -462,8 +491,9 @@ where
)))
});
// Generate pg_control and bootstrap WAL segment.
self.add_pgcontrol_file().await?;
// Last, add the pg_control file and bootstrap WAL segment.
self.add_pgcontrol_file(pg_control_bytes, system_identifier)
.await?;
self.ar
.finish()
.await
@@ -671,7 +701,11 @@ where
// Add generated pg_control file and bootstrap WAL segment.
// Also send zenith.signal file with extra bootstrap data.
//
async fn add_pgcontrol_file(&mut self) -> Result<(), BasebackupError> {
async fn add_pgcontrol_file(
&mut self,
pg_control_bytes: Bytes,
system_identifier: u64,
) -> Result<(), BasebackupError> {
// add zenith.signal file
let mut zenith_signal = String::new();
if self.prev_record_lsn == Lsn(0) {
@@ -694,24 +728,6 @@ where
.await
.map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,zenith.signal"))?;
let checkpoint_bytes = self
.timeline
.get_checkpoint(self.lsn, self.ctx)
.await
.context("failed to get checkpoint bytes")?;
let pg_control_bytes = self
.timeline
.get_control_file(self.lsn, self.ctx)
.await
.context("failed get control bytes")?;
let (pg_control_bytes, system_identifier) = postgres_ffi::generate_pg_control(
&pg_control_bytes,
&checkpoint_bytes,
self.lsn,
self.timeline.pg_version,
)?;
//send pg_control
let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?;
self.ar

View File

@@ -134,6 +134,7 @@ fn main() -> anyhow::Result<()> {
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode");
info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol");
info!(?conf.validate_wal_contiguity, "starting with WAL contiguity validation");
info!(?conf.page_service_pipelining, "starting with page service pipelining config");
info!(?conf.get_vectored_concurrent_io, "starting with get_vectored IO concurrency config");

View File

@@ -197,6 +197,10 @@ pub struct PageServerConf {
/// Enable read path debugging. If enabled, read key errors will print a backtrace of the layer
/// files read.
pub enable_read_path_debugging: bool,
/// Interpreted protocol feature: if enabled, validate that the logical WAL received from
/// safekeepers does not have gaps.
pub validate_wal_contiguity: bool,
}
/// Token for authentication to safekeepers
@@ -360,6 +364,7 @@ impl PageServerConf {
page_service_pipelining,
get_vectored_concurrent_io,
enable_read_path_debugging,
validate_wal_contiguity,
} = config_toml;
let mut conf = PageServerConf {
@@ -446,6 +451,7 @@ impl PageServerConf {
virtual_file_io_mode: virtual_file_io_mode.unwrap_or(virtual_file::IoMode::preferred()),
no_sync: no_sync.unwrap_or(false),
enable_read_path_debugging: enable_read_path_debugging.unwrap_or(false),
validate_wal_contiguity: validate_wal_contiguity.unwrap_or(false),
};
// ------------------------------------------------------------

View File

@@ -98,6 +98,7 @@ pub struct RequestContext {
download_behavior: DownloadBehavior,
access_stats_behavior: AccessStatsBehavior,
page_content_kind: PageContentKind,
read_path_debug: bool,
}
/// The kind of access to the page cache.
@@ -155,6 +156,7 @@ impl RequestContextBuilder {
download_behavior: DownloadBehavior::Download,
access_stats_behavior: AccessStatsBehavior::Update,
page_content_kind: PageContentKind::Unknown,
read_path_debug: false,
},
}
}
@@ -168,6 +170,7 @@ impl RequestContextBuilder {
download_behavior: original.download_behavior,
access_stats_behavior: original.access_stats_behavior,
page_content_kind: original.page_content_kind,
read_path_debug: original.read_path_debug,
},
}
}
@@ -191,6 +194,11 @@ impl RequestContextBuilder {
self
}
pub(crate) fn read_path_debug(mut self, b: bool) -> Self {
self.inner.read_path_debug = b;
self
}
pub fn build(self) -> RequestContext {
self.inner
}
@@ -291,4 +299,8 @@ impl RequestContext {
pub(crate) fn page_content_kind(&self) -> PageContentKind {
self.page_content_kind
}
pub(crate) fn read_path_debug(&self) -> bool {
self.read_path_debug
}
}

View File

@@ -173,6 +173,7 @@ impl ControlPlaneGenerationsApi for ControllerUpcallClient {
listen_pg_port: m.postgres_port,
listen_http_addr: m.http_host,
listen_http_port: m.http_port,
listen_https_port: None, // TODO: Support https.
availability_zone_id: az_id.expect("Checked above"),
})
}

View File

@@ -68,6 +68,7 @@ use tokio_util::sync::CancellationToken;
use tracing::*;
use crate::config::PageServerConf;
use crate::context::RequestContextBuilder;
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::DeletionQueueClient;
use crate::pgdatadir_mapping::LsnForTimestamp;
@@ -2394,6 +2395,7 @@ async fn timeline_checkpoint_handler(
match e {
CompactionError::ShuttingDown => ApiError::ShuttingDown,
CompactionError::Offload(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
CompactionError::CollectKeySpaceError(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
CompactionError::Other(e) => ApiError::InternalServerError(e)
}
)?;
@@ -2571,14 +2573,30 @@ async fn deletion_queue_flush(
}
}
/// Try if `GetPage@Lsn` is successful, useful for manual debugging.
async fn getpage_at_lsn_handler(
request: Request<Body>,
cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
getpage_at_lsn_handler_inner(false, request, cancel).await
}
async fn touchpage_at_lsn_handler(
request: Request<Body>,
cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
getpage_at_lsn_handler_inner(true, request, cancel).await
}
/// Try if `GetPage@Lsn` is successful, useful for manual debugging.
async fn getpage_at_lsn_handler_inner(
touch: bool,
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
// Require pageserver admin permission for this API instead of only tenant-level token.
check_permission(&request, None)?;
let state = get_state(&request);
struct Key(pageserver_api::key::Key);
@@ -2593,22 +2611,29 @@ async fn getpage_at_lsn_handler(
let key: Key = parse_query_param(&request, "key")?
.ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'key' query parameter")))?;
let lsn: Lsn = parse_query_param(&request, "lsn")?
.ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'lsn' query parameter")))?;
let lsn: Option<Lsn> = parse_query_param(&request, "lsn")?;
async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
// Enable read path debugging
let ctx = RequestContextBuilder::extend(&ctx).read_path_debug(true).build();
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
// Use last_record_lsn if no lsn is provided
let lsn = lsn.unwrap_or_else(|| timeline.get_last_record_lsn());
let page = timeline.get(key.0, lsn, &ctx).await?;
Result::<_, ApiError>::Ok(
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/octet-stream")
.body(hyper::Body::from(page))
.unwrap(),
)
if touch {
json_response(StatusCode::OK, ())
} else {
Result::<_, ApiError>::Ok(
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/octet-stream")
.body(hyper::Body::from(page))
.unwrap(),
)
}
}
.instrument(info_span!("timeline_get", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
.await
@@ -3743,6 +3768,10 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/getpage",
|r| testing_api_handler("getpage@lsn", r, getpage_at_lsn_handler),
)
.get(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/touchpage",
|r| api_handler(r, touchpage_at_lsn_handler),
)
.get(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/keyspace",
|r| api_handler(r, timeline_collect_keyspace),

View File

@@ -34,11 +34,13 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::SystemTime;
use std::time::{Duration, Instant};
use strum_macros::IntoStaticStr;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::logging::warn_slow;
use utils::sync::gate::{Gate, GateGuard};
use utils::sync::spsc_fold;
use utils::{
@@ -81,6 +83,9 @@ use std::os::fd::AsRawFd;
/// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
/// Threshold at which to log a warning about slow GetPage requests.
const WARN_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
///////////////////////////////////////////////////////////////////////////////
pub struct Listener {
@@ -594,6 +599,7 @@ struct BatchedTestRequest {
/// NB: we only hold [`timeline::handle::WeakHandle`] inside this enum,
/// so that we don't keep the [`Timeline::gate`] open while the batch
/// is being built up inside the [`spsc_fold`] (pagestream pipelining).
#[derive(IntoStaticStr)]
enum BatchedFeMessage {
Exists {
span: Span,
@@ -638,6 +644,10 @@ enum BatchedFeMessage {
}
impl BatchedFeMessage {
fn as_static_str(&self) -> &'static str {
self.into()
}
fn observe_execution_start(&mut self, at: Instant) {
match self {
BatchedFeMessage::Exists { timer, .. }
@@ -1463,17 +1473,20 @@ impl PageServerHandler {
}
};
let err = self
.pagesteam_handle_batched_message(
let result = warn_slow(
msg.as_static_str(),
WARN_SLOW_GETPAGE_THRESHOLD,
self.pagesteam_handle_batched_message(
pgb_writer,
msg,
io_concurrency.clone(),
&cancel,
protocol_version,
ctx,
)
.await;
match err {
),
)
.await;
match result {
Ok(()) => {}
Err(e) => break e,
}
@@ -1636,13 +1649,17 @@ impl PageServerHandler {
return Err(e);
}
};
self.pagesteam_handle_batched_message(
pgb_writer,
batch,
io_concurrency.clone(),
&cancel,
protocol_version,
&ctx,
warn_slow(
batch.as_static_str(),
WARN_SLOW_GETPAGE_THRESHOLD,
self.pagesteam_handle_batched_message(
pgb_writer,
batch,
io_concurrency.clone(),
&cancel,
protocol_version,
&ctx,
),
)
.await?;
}

Some files were not shown because too many files have changed in this diff Show More