Compare commits

..

1 Commits

Author SHA1 Message Date
Alexey Kondratov
6cfaa775b0 chore(compute): Minor compute_ctl startup refactoring 2025-02-26 21:55:25 +01:00
167 changed files with 1797 additions and 2705 deletions

View File

@@ -1,25 +0,0 @@
# Expects response from https://docs.github.com/en/rest/releases/releases?apiVersion=2022-11-28#list-releases as input,
# with tag names `release` for storage, `release-compute` for compute and `release-proxy` for proxy releases.
# Extract only the `tag_name` field from each release object
[ .[].tag_name ]
# Transform each tag name into a structured object using regex capture
| reduce map(
capture("^(?<full>release(-(?<component>proxy|compute))?-(?<version>\\d+))$")
| {
component: (.component // "storage"), # Default to "storage" if no component is specified
version: (.version | tonumber), # Convert the version number to an integer
full: .full # Store the full tag name for final output
}
)[] as $entry # Loop over the transformed list
# Accumulate the latest (highest-numbered) version for each component
({};
.[$entry.component] |= (if . == null or $entry.version > .version then $entry else . end))
# Convert the resulting object into an array of formatted strings
| to_entries
| map("\(.key)=\(.value.full)")
# Output each string separately
| .[]

View File

@@ -1,103 +0,0 @@
name: Generate run metadata
on:
workflow_call:
inputs:
github-event-name:
type: string
required: true
outputs:
build-tag:
description: "Tag for the current workflow run"
value: ${{ jobs.tags.outputs.build-tag }}
previous-storage-release:
description: "Tag of the last storage release"
value: ${{ jobs.tags.outputs.storage }}
previous-proxy-release:
description: "Tag of the last proxy release"
value: ${{ jobs.tags.outputs.proxy }}
previous-compute-release:
description: "Tag of the last compute release"
value: ${{ jobs.tags.outputs.compute }}
run-kind:
description: "The kind of run we're currently in. Will be one of `pr`, `push-main`, `storage-rc`, `storage-release`, `proxy-rc`, `proxy-release`, `compute-rc`, `compute-release` or `merge_queue`"
value: ${{ jobs.tags.outputs.run-kind }}
permissions: {}
jobs:
tags:
runs-on: ubuntu-22.04
outputs:
build-tag: ${{ steps.build-tag.outputs.tag }}
compute: ${{ steps.previous-releases.outputs.compute }}
proxy: ${{ steps.previous-releases.outputs.proxy }}
storage: ${{ steps.previous-releases.outputs.storage }}
run-kind: ${{ steps.run-kind.outputs.run-kind }}
permissions:
contents: read
steps:
# Need `fetch-depth: 0` to count the number of commits in the branch
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Get run kind
id: run-kind
env:
RUN_KIND: >-
${{
'compute-rc-pr'
|| (inputs.github-event-name == 'push' && github.ref_name == 'main') && 'push-main'
|| (inputs.github-event-name == 'push' && github.ref_name == 'release') && 'storage-release'
|| (inputs.github-event-name == 'push' && github.ref_name == 'release-compute') && 'compute-release'
|| (inputs.github-event-name == 'push' && github.ref_name == 'release-proxy') && 'proxy-release'
|| (inputs.github-event-name == 'pull_request' && github.base_ref == 'release') && 'storage-rc-pr'
|| (inputs.github-event-name == 'pull_request' && github.base_ref == 'release-compute') && 'compute-rc-pr'
|| (inputs.github-event-name == 'pull_request' && github.base_ref == 'release-proxy') && 'proxy-rc-pr'
|| (inputs.github-event-name == 'pull_request') && 'pr'
|| 'unknown'
}}
run: |
echo "run-kind=$RUN_KIND" | tee -a $GITHUB_OUTPUT
- name: Get build tag
id: build-tag
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CURRENT_BRANCH: ${{ github.head_ref || github.ref_name }}
CURRENT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
RUN_KIND: ${{ steps.run-kind.outputs.run-kind }}
run: |
case $RUN_KIND in
push-main)
echo "tag=$(git rev-list --count HEAD)" | tee -a $GITHUB_OUTPUT
;;
storage-release)
echo "tag=release-$(git rev-list --count HEAD)" | tee -a $GITHUB_OUTPUT
;;
proxy-release)
echo "tag=release-proxy-$(git rev-list --count HEAD)" | tee -a $GITHUB_OUTPUT
;;
compute-release)
echo "tag=release-compute-$(git rev-list --count HEAD)" | tee -a $GITHUB_OUTPUT
;;
pr|storage-rc-pr|compute-rc-pr|proxy-rc-pr)
BUILD_AND_TEST_RUN_ID=$(gh run list -b $CURRENT_BRANCH -c $CURRENT_SHA -w 'Build and Test' -L 1 --json databaseId --jq '.[].databaseId')
echo "tag=$BUILD_AND_TEST_RUN_ID" | tee -a $GITHUB_OUTPUT
;;
*)
echo "Unexpected RUN_KIND ('${RUN_KIND}'), failing to assign build-tag!"
exit 1
esac
- name: Get the previous release-tags
id: previous-releases
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
gh api --paginate \
-H "Accept: application/vnd.github+json" \
-H "X-GitHub-Api-Version: 2022-11-28" \
"/repos/${GITHUB_REPOSITORY}/releases" \
| jq -f .github/scripts/previous-releases.jq -r \
| tee -a "${GITHUB_OUTPUT}"

View File

@@ -51,7 +51,7 @@ jobs:
steps:
- uses: actions/checkout@v4
with:
sparse-checkout: .github/scripts/push_with_image_map.py
sparse-checkout: scripts/push_with_image_map.py
sparse-checkout-cone-mode: false
- name: Print image-map
@@ -99,6 +99,6 @@ jobs:
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
- name: Copy docker images to target registries
run: python3 .github/scripts/push_with_image_map.py
run: python scripts/push_with_image_map.py
env:
IMAGE_MAP: ${{ inputs.image-map }}

View File

@@ -140,7 +140,6 @@ jobs:
--ignore test_runner/performance/test_logical_replication.py
--ignore test_runner/performance/test_physical_replication.py
--ignore test_runner/performance/test_perf_ingest_using_pgcopydb.py
--ignore test_runner/performance/test_cumulative_statistics_persistence.py
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -172,61 +171,6 @@ jobs:
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
cumstats-test:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
permissions:
contents: write
statuses: write
id-token: write # aws-actions/configure-aws-credentials
env:
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 17
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
PLATFORM: "neon-staging"
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: neondatabase/build-tools:pinned-bookworm
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
options: --init
steps:
- uses: actions/checkout@v4
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 18000 # 5 hours
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Verify that cumulative statistics are preserved
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance/test_cumulative_statistics_persistence.py
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 3600
pg_version: ${{ env.DEFAULT_PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
NEON_API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
replication-tests:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
permissions:

View File

@@ -65,11 +65,38 @@ jobs:
token: ${{ secrets.GITHUB_TOKEN }}
filters: .github/file-filters.yaml
meta:
tag:
needs: [ check-permissions ]
uses: ./.github/workflows/_meta.yml
with:
github-event-name: ${{ github.event_name }}
runs-on: [ self-hosted, small ]
container: ${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}.dkr.ecr.${{ vars.AWS_ECR_REGION }}.amazonaws.com/base:pinned
outputs:
build-tag: ${{steps.build-tag.outputs.tag}}
steps:
# Need `fetch-depth: 0` to count the number of commits in the branch
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Get build tag
run: |
echo run:$GITHUB_RUN_ID
echo ref:$GITHUB_REF_NAME
echo rev:$(git rev-list --count HEAD)
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
echo "tag=$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
echo "tag=release-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release-proxy" ]]; then
echo "tag=release-proxy-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release-compute" ]]; then
echo "tag=release-compute-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release', 'release-proxy', 'release-compute'"
echo "tag=$GITHUB_RUN_ID" >> $GITHUB_OUTPUT
fi
shell: bash
id: build-tag
build-build-tools-image:
needs: [ check-permissions ]
@@ -172,7 +199,7 @@ jobs:
secrets: inherit
build-and-test-locally:
needs: [ meta, build-build-tools-image ]
needs: [ tag, build-build-tools-image ]
strategy:
fail-fast: false
matrix:
@@ -186,7 +213,7 @@ jobs:
with:
arch: ${{ matrix.arch }}
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
build-tag: ${{ needs.meta.outputs.build-tag }}
build-tag: ${{ needs.tag.outputs.build-tag }}
build-type: ${{ matrix.build-type }}
# Run tests on all Postgres versions in release builds and only on the latest version in debug builds.
# Run without LFC on v17 release and debug builds only. For all the other cases LFC is enabled.
@@ -470,24 +497,13 @@ jobs:
})
trigger-e2e-tests:
# Depends on jobs that can get skipped
if: >-
${{
(
!github.event.pull_request.draft
|| contains( github.event.pull_request.labels.*.name, 'run-e2e-tests-in-draft')
|| contains(fromJSON('["push-main", "storage-release", "proxy-release", "compute-release"]'), needs.meta.outputs.run-kind)
) && !failure() && !cancelled()
}}
needs: [ check-permissions, push-neon-image-dev, push-compute-image-dev, meta ]
if: ${{ !github.event.pull_request.draft || contains( github.event.pull_request.labels.*.name, 'run-e2e-tests-in-draft') || github.ref_name == 'main' || github.ref_name == 'release' || github.ref_name == 'release-proxy' || github.ref_name == 'release-compute' }}
needs: [ check-permissions, push-neon-image-dev, push-compute-image-dev, tag ]
uses: ./.github/workflows/trigger-e2e-tests.yml
with:
github-event-name: ${{ github.event_name }}
secrets: inherit
neon-image-arch:
needs: [ check-permissions, build-build-tools-image, meta ]
if: ${{ contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
needs: [ check-permissions, build-build-tools-image, tag ]
strategy:
matrix:
arch: [ x64, arm64 ]
@@ -523,7 +539,7 @@ jobs:
build-args: |
ADDITIONAL_RUSTFLAGS=${{ matrix.arch == 'arm64' && '-Ctarget-feature=+lse -Ctarget-cpu=neoverse-n1' || '' }}
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
BUILD_TAG=${{ needs.meta.outputs.build-tag }}
BUILD_TAG=${{ needs.tag.outputs.build-tag }}
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}-bookworm
DEBIAN_VERSION=bookworm
provenance: false
@@ -533,11 +549,10 @@ jobs:
cache-from: type=registry,ref=cache.neon.build/neon:cache-bookworm-${{ matrix.arch }}
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/neon:cache-{0}-{1},mode=max', 'bookworm', matrix.arch) || '' }}
tags: |
neondatabase/neon:${{ needs.meta.outputs.build-tag }}-bookworm-${{ matrix.arch }}
neondatabase/neon:${{ needs.tag.outputs.build-tag }}-bookworm-${{ matrix.arch }}
neon-image:
needs: [ neon-image-arch, meta ]
if: ${{ contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
needs: [ neon-image-arch, tag ]
runs-on: ubuntu-22.04
permissions:
id-token: write # aws-actions/configure-aws-credentials
@@ -552,14 +567,13 @@ jobs:
- name: Create multi-arch image
run: |
docker buildx imagetools create -t neondatabase/neon:${{ needs.meta.outputs.build-tag }} \
-t neondatabase/neon:${{ needs.meta.outputs.build-tag }}-bookworm \
neondatabase/neon:${{ needs.meta.outputs.build-tag }}-bookworm-x64 \
neondatabase/neon:${{ needs.meta.outputs.build-tag }}-bookworm-arm64
docker buildx imagetools create -t neondatabase/neon:${{ needs.tag.outputs.build-tag }} \
-t neondatabase/neon:${{ needs.tag.outputs.build-tag }}-bookworm \
neondatabase/neon:${{ needs.tag.outputs.build-tag }}-bookworm-x64 \
neondatabase/neon:${{ needs.tag.outputs.build-tag }}-bookworm-arm64
compute-node-image-arch:
needs: [ check-permissions, build-build-tools-image, meta ]
if: ${{ contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
needs: [ check-permissions, build-build-tools-image, tag ]
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
@@ -617,7 +631,7 @@ jobs:
build-args: |
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
PG_VERSION=${{ matrix.version.pg }}
BUILD_TAG=${{ needs.meta.outputs.build-tag }}
BUILD_TAG=${{ needs.tag.outputs.build-tag }}
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}-${{ matrix.version.debian }}
DEBIAN_VERSION=${{ matrix.version.debian }}
provenance: false
@@ -627,7 +641,7 @@ jobs:
cache-from: type=registry,ref=cache.neon.build/compute-node-${{ matrix.version.pg }}:cache-${{ matrix.version.debian }}-${{ matrix.arch }}
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/compute-node-{0}:cache-{1}-{2},mode=max', matrix.version.pg, matrix.version.debian, matrix.arch) || '' }}
tags: |
neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.meta.outputs.build-tag }}-${{ matrix.version.debian }}-${{ matrix.arch }}
neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-${{ matrix.arch }}
- name: Build neon extensions test image
if: matrix.version.pg >= 'v16'
@@ -637,7 +651,7 @@ jobs:
build-args: |
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
PG_VERSION=${{ matrix.version.pg }}
BUILD_TAG=${{ needs.meta.outputs.build-tag }}
BUILD_TAG=${{ needs.tag.outputs.build-tag }}
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}-${{ matrix.version.debian }}
DEBIAN_VERSION=${{ matrix.version.debian }}
provenance: false
@@ -647,11 +661,10 @@ jobs:
target: extension-tests
cache-from: type=registry,ref=cache.neon.build/compute-node-${{ matrix.version.pg }}:cache-${{ matrix.version.debian }}-${{ matrix.arch }}
tags: |
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{needs.meta.outputs.build-tag}}-${{ matrix.version.debian }}-${{ matrix.arch }}
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{needs.tag.outputs.build-tag}}-${{ matrix.version.debian }}-${{ matrix.arch }}
compute-node-image:
needs: [ compute-node-image-arch, meta ]
if: ${{ contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
needs: [ compute-node-image-arch, tag ]
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
@@ -679,22 +692,21 @@ jobs:
- name: Create multi-arch compute-node image
run: |
docker buildx imagetools create -t neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.meta.outputs.build-tag }} \
-t neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.meta.outputs.build-tag }}-${{ matrix.version.debian }} \
neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.meta.outputs.build-tag }}-${{ matrix.version.debian }}-x64 \
neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.meta.outputs.build-tag }}-${{ matrix.version.debian }}-arm64
docker buildx imagetools create -t neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }} \
-t neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }} \
neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-x64 \
neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-arm64
- name: Create multi-arch neon-test-extensions image
if: matrix.version.pg >= 'v16'
run: |
docker buildx imagetools create -t neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{ needs.meta.outputs.build-tag }} \
-t neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{ needs.meta.outputs.build-tag }}-${{ matrix.version.debian }} \
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{ needs.meta.outputs.build-tag }}-${{ matrix.version.debian }}-x64 \
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{ needs.meta.outputs.build-tag }}-${{ matrix.version.debian }}-arm64
docker buildx imagetools create -t neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }} \
-t neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }} \
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-x64 \
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-arm64
vm-compute-node-image:
needs: [ check-permissions, meta, compute-node-image ]
if: ${{ contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
needs: [ check-permissions, tag, compute-node-image ]
runs-on: [ self-hosted, large ]
strategy:
fail-fast: false
@@ -730,25 +742,23 @@ jobs:
# it won't have the proper authentication (written at v0.6.0)
- name: Pulling compute-node image
run: |
docker pull neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.meta.outputs.build-tag }}
docker pull neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}
- name: Build vm image
run: |
./vm-builder \
-size=2G \
-spec=compute/vm-image-spec-${{ matrix.version.debian }}.yaml \
-src=neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.meta.outputs.build-tag }} \
-dst=neondatabase/vm-compute-node-${{ matrix.version.pg }}:${{ needs.meta.outputs.build-tag }} \
-src=neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }} \
-dst=neondatabase/vm-compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }} \
-target-arch=linux/amd64
- name: Pushing vm-compute-node image
run: |
docker push neondatabase/vm-compute-node-${{ matrix.version.pg }}:${{ needs.meta.outputs.build-tag }}
docker push neondatabase/vm-compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}
test-images:
needs: [ check-permissions, meta, neon-image, compute-node-image ]
# Depends on jobs that can get skipped
if: "!failure() && !cancelled()"
needs: [ check-permissions, tag, neon-image, compute-node-image ]
strategy:
fail-fast: false
matrix:
@@ -766,6 +776,17 @@ jobs:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
- name: Get the last compute release tag
id: get-last-compute-release-tag
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
run: |
tag=$(gh api -q '[.[].tag_name | select(startswith("release-compute"))][0]'\
-H "Accept: application/vnd.github+json" \
-H "X-GitHub-Api-Version: 2022-11-28" \
"/repos/${{ github.repository }}/releases")
echo tag=${tag} >> ${GITHUB_OUTPUT}
# `neondatabase/neon` contains multiple binaries, all of them use the same input for the version into the same version formatting library.
# Pick pageserver as currently the only binary with extra "version" features printed in the string to verify.
# Regular pageserver version string looks like
@@ -775,9 +796,8 @@ jobs:
# Ensure that we don't have bad versions.
- name: Verify image versions
shell: bash # ensure no set -e for better error messages
if: ${{ contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
run: |
pageserver_version=$(docker run --rm neondatabase/neon:${{ needs.meta.outputs.build-tag }} "/bin/sh" "-c" "/usr/local/bin/pageserver --version")
pageserver_version=$(docker run --rm neondatabase/neon:${{ needs.tag.outputs.build-tag }} "/bin/sh" "-c" "/usr/local/bin/pageserver --version")
echo "Pageserver version string: $pageserver_version"
@@ -794,24 +814,7 @@ jobs:
- name: Verify docker-compose example and test extensions
timeout-minutes: 20
env:
TAG: >-
${{
contains(fromJSON('["compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind)
&& needs.meta.outputs.previous-storage-release
|| needs.meta.outputs.build-tag
}}
COMPUTE_TAG: >-
${{
contains(fromJSON('["storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind)
&& needs.meta.outputs.previous-compute-release
|| needs.meta.outputs.build-tag
}}
TEST_EXTENSIONS_TAG: >-
${{
contains(fromJSON('["storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind)
&& 'latest'
|| needs.meta.outputs.build-tag
}}
TAG: ${{needs.tag.outputs.build-tag}}
TEST_VERSION_ONLY: ${{ matrix.pg_version }}
run: ./docker-compose/docker_compose_test.sh
@@ -823,17 +826,10 @@ jobs:
- name: Test extension upgrade
timeout-minutes: 20
if: ${{ contains(fromJSON('["pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ needs.tag.outputs.build-tag == github.run_id }}
env:
TAG: >-
${{
false
|| needs.meta.outputs.run-kind == 'pr' && needs.meta.outputs.build-tag
|| needs.meta.outputs.run-kind == 'compute-rc-pr' && needs.meta.outputs.previous-storage-release
}}
TEST_EXTENSIONS_TAG: ${{ needs.meta.outputs.previous-compute-release }}
NEW_COMPUTE_TAG: ${{ needs.meta.outputs.build-tag }}
OLD_COMPUTE_TAG: ${{ needs.meta.outputs.previous-compute-release }}
NEWTAG: ${{ needs.tag.outputs.build-tag }}
OLDTAG: ${{ steps.get-last-compute-release-tag.outputs.tag }}
run: ./docker-compose/test_extensions_upgrade.sh
- name: Print logs and clean up
@@ -843,7 +839,7 @@ jobs:
docker compose --profile test-extensions -f ./docker-compose/docker-compose.yml down
generate-image-maps:
needs: [ meta ]
needs: [ tag ]
runs-on: ubuntu-22.04
outputs:
neon-dev: ${{ steps.generate.outputs.neon-dev }}
@@ -853,14 +849,14 @@ jobs:
steps:
- uses: actions/checkout@v4
with:
sparse-checkout: .github/scripts/generate_image_maps.py
sparse-checkout: scripts/generate_image_maps.py
sparse-checkout-cone-mode: false
- name: Generate Image Maps
id: generate
run: python3 .github/scripts/generate_image_maps.py
run: python scripts/generate_image_maps.py
env:
BUILD_TAG: "${{ needs.meta.outputs.build-tag }}"
BUILD_TAG: "${{ needs.tag.outputs.build-tag }}"
BRANCH: "${{ github.ref_name }}"
DEV_ACR: "${{ vars.AZURE_DEV_REGISTRY_NAME }}"
PROD_ACR: "${{ vars.AZURE_PROD_REGISTRY_NAME }}"
@@ -869,8 +865,7 @@ jobs:
AWS_REGION: "${{ vars.AWS_ECR_REGION }}"
push-neon-image-dev:
needs: [ meta, generate-image-maps, neon-image ]
if: ${{ contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
needs: [ generate-image-maps, neon-image ]
uses: ./.github/workflows/_push-to-container-registry.yml
permissions:
id-token: write # Required for aws/azure login
@@ -887,8 +882,7 @@ jobs:
secrets: inherit
push-compute-image-dev:
needs: [ meta, generate-image-maps, vm-compute-node-image ]
if: ${{ contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
needs: [ generate-image-maps, vm-compute-node-image ]
uses: ./.github/workflows/_push-to-container-registry.yml
permissions:
id-token: write # Required for aws/azure login
@@ -905,9 +899,8 @@ jobs:
secrets: inherit
push-neon-image-prod:
needs: [ meta, generate-image-maps, neon-image, test-images ]
# Depends on jobs that can get skipped
if: ${{ !failure() && !cancelled() && contains(fromJSON('["storage-release", "proxy-release"]'), needs.meta.outputs.run-kind) }}
if: github.ref_name == 'release' || github.ref_name == 'release-proxy' || github.ref_name == 'release-compute'
needs: [ generate-image-maps, neon-image, test-images ]
uses: ./.github/workflows/_push-to-container-registry.yml
permissions:
id-token: write # Required for aws/azure login
@@ -924,9 +917,8 @@ jobs:
secrets: inherit
push-compute-image-prod:
needs: [ meta, generate-image-maps, vm-compute-node-image, test-images ]
# Depends on jobs that can get skipped
if: ${{ !failure() && !cancelled() && needs.meta.outputs.run-kind == 'compute-release' }}
if: github.ref_name == 'release' || github.ref_name == 'release-proxy' || github.ref_name == 'release-compute'
needs: [ generate-image-maps, vm-compute-node-image, test-images ]
uses: ./.github/workflows/_push-to-container-registry.yml
permissions:
id-token: write # Required for aws/azure login
@@ -945,19 +937,18 @@ jobs:
# This is a bit of a special case so we're not using a generated image map.
add-latest-tag-to-neon-extensions-test-image:
if: github.ref_name == 'main'
needs: [ meta, compute-node-image ]
needs: [ tag, compute-node-image ]
uses: ./.github/workflows/_push-to-container-registry.yml
with:
image-map: |
{
"docker.io/neondatabase/neon-test-extensions-v16:${{ needs.meta.outputs.build-tag }}": ["docker.io/neondatabase/neon-test-extensions-v16:latest"],
"docker.io/neondatabase/neon-test-extensions-v17:${{ needs.meta.outputs.build-tag }}": ["docker.io/neondatabase/neon-test-extensions-v17:latest"]
"docker.io/neondatabase/neon-test-extensions-v16:${{ needs.tag.outputs.build-tag }}": ["docker.io/neondatabase/neon-test-extensions-v16:latest"],
"docker.io/neondatabase/neon-test-extensions-v17:${{ needs.tag.outputs.build-tag }}": ["docker.io/neondatabase/neon-test-extensions-v17:latest"]
}
secrets: inherit
trigger-custom-extensions-build-and-wait:
needs: [ check-permissions, meta ]
if: ${{ contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
needs: [ check-permissions, tag ]
runs-on: ubuntu-22.04
permissions:
id-token: write # aws-actions/configure-aws-credentials
@@ -992,7 +983,7 @@ jobs:
\"ci_job_name\": \"build-and-upload-extensions\",
\"commit_hash\": \"$COMMIT_SHA\",
\"remote_repo\": \"${{ github.repository }}\",
\"compute_image_tag\": \"${{ needs.meta.outputs.build-tag }}\",
\"compute_image_tag\": \"${{ needs.tag.outputs.build-tag }}\",
\"remote_branch_name\": \"${{ github.ref_name }}\"
}
}"
@@ -1036,9 +1027,9 @@ jobs:
exit 1
deploy:
needs: [ check-permissions, push-neon-image-prod, push-compute-image-prod, meta, build-and-test-locally, trigger-custom-extensions-build-and-wait ]
# `!failure() && !cancelled()` is required because the workflow depends on the job that can be skipped: `push-neon-image-prod` and `push-compute-image-prod`
if: ${{ contains(fromJSON('["push-main", "storage-release", "proxy-release", "compute-release"]'), needs.meta.outputs.run-kind) && !failure() && !cancelled() }}
needs: [ check-permissions, push-neon-image-prod, push-compute-image-prod, tag, build-and-test-locally, trigger-custom-extensions-build-and-wait ]
# `!failure() && !cancelled()` is required because the workflow depends on the job that can be skipped: `push-to-acr-dev` and `push-to-acr-prod`
if: (github.ref_name == 'main' || github.ref_name == 'release' || github.ref_name == 'release-proxy' || github.ref_name == 'release-compute') && !failure() && !cancelled()
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
@@ -1049,103 +1040,108 @@ jobs:
- uses: actions/checkout@v4
- name: Create git tag and GitHub release
if: ${{ contains(fromJSON('["storage-release", "proxy-release", "compute-release"]'), needs.meta.outputs.run-kind) }}
if: github.ref_name == 'release' || github.ref_name == 'release-proxy' || github.ref_name == 'release-compute'
uses: actions/github-script@v7
env:
TAG: "${{ needs.meta.outputs.build-tag }}"
BRANCH: "${{ github.ref_name }}"
PREVIOUS_RELEASE: >-
${{
false
|| needs.meta.outputs.run-kind == 'storage-release' && needs.meta.outputs.previous-storage-release
|| needs.meta.outputs.run-kind == 'proxy-release' && needs.meta.outputs.previous-proxy-release
|| needs.meta.outputs.run-kind == 'compute-release' && needs.meta.outputs.previous-compute-release
|| 'unknown'
}}
with:
retries: 5
script: |
const { TAG, BRANCH, PREVIOUS_RELEASE } = process.env
const tag = "${{ needs.tag.outputs.build-tag }}";
const branch = "${{ github.ref_name }}";
try {
const existingRef = await github.rest.git.getRef({
owner: context.repo.owner,
repo: context.repo.repo,
ref: `tags/${TAG}`,
ref: `tags/${tag}`,
});
if (existingRef.data.object.sha !== context.sha) {
throw new Error(`Tag ${TAG} already exists but points to a different commit (expected: ${context.sha}, actual: ${existingRef.data.object.sha}).`);
throw new Error(`Tag ${tag} already exists but points to a different commit (expected: ${context.sha}, actual: ${existingRef.data.object.sha}).`);
}
console.log(`Tag ${TAG} already exists and points to ${context.sha} as expected.`);
console.log(`Tag ${tag} already exists and points to ${context.sha} as expected.`);
} catch (error) {
if (error.status !== 404) {
throw error;
}
console.log(`Tag ${TAG} does not exist. Creating it...`);
console.log(`Tag ${tag} does not exist. Creating it...`);
await github.rest.git.createRef({
owner: context.repo.owner,
repo: context.repo.repo,
ref: `refs/tags/${TAG}`,
ref: `refs/tags/${tag}`,
sha: context.sha,
});
console.log(`Tag ${TAG} created successfully.`);
console.log(`Tag ${tag} created successfully.`);
}
try {
const existingRelease = await github.rest.repos.getReleaseByTag({
owner: context.repo.owner,
repo: context.repo.repo,
tag: TAG,
tag: tag,
});
console.log(`Release for tag ${TAG} already exists (ID: ${existingRelease.data.id}).`);
console.log(`Release for tag ${tag} already exists (ID: ${existingRelease.data.id}).`);
} catch (error) {
if (error.status !== 404) {
throw error;
}
console.log(`Release for tag ${TAG} does not exist. Creating it...`);
console.log(`Release for tag ${tag} does not exist. Creating it...`);
// Find the PR number using the commit SHA
const pullRequests = await github.rest.pulls.list({
owner: context.repo.owner,
repo: context.repo.repo,
state: 'closed',
base: BRANCH,
base: branch,
});
const pr = pullRequests.data.find(pr => pr.merge_commit_sha === context.sha);
const prNumber = pr ? pr.number : null;
// Find the previous release on the branch
const releases = await github.rest.repos.listReleases({
owner: context.repo.owner,
repo: context.repo.repo,
per_page: 100,
});
const branchReleases = releases.data
.filter((release) => {
const regex = new RegExp(`^${branch}-\\d+$`);
return regex.test(release.tag_name) && !release.draft && !release.prerelease;
})
.sort((a, b) => new Date(b.created_at) - new Date(a.created_at));
const previousTag = branchReleases.length > 0 ? branchReleases[0].tag_name : null;
const releaseNotes = [
prNumber
? `Release PR https://github.com/${context.repo.owner}/${context.repo.repo}/pull/${prNumber}.`
: 'Release PR not found.',
`Diff with the previous release https://github.com/${context.repo.owner}/${context.repo.repo}/compare/${PREVIOUS_RELEASE}...${TAG}.`
previousTag
? `Diff with the previous release https://github.com/${context.repo.owner}/${context.repo.repo}/compare/${previousTag}...${tag}.`
: `No previous release found on branch ${branch}.`,
].join('\n\n');
await github.rest.repos.createRelease({
owner: context.repo.owner,
repo: context.repo.repo,
tag_name: TAG,
tag_name: tag,
body: releaseNotes,
});
console.log(`Release for tag ${TAG} created successfully.`);
console.log(`Release for tag ${tag} created successfully.`);
}
- name: Trigger deploy workflow
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
RUN_KIND: ${{ needs.meta.outputs.run-kind }}
run: |
case ${RUN_KIND} in
push-main)
gh workflow --repo neondatabase/infra run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.meta.outputs.build-tag}} -f deployPreprodRegion=false
;;
storage-release)
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
gh workflow --repo neondatabase/infra run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=false
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
gh workflow --repo neondatabase/infra run deploy-dev.yml --ref main \
-f deployPgSniRouter=false \
-f deployProxy=false \
@@ -1153,7 +1149,7 @@ jobs:
-f deployStorageBroker=true \
-f deployStorageController=true \
-f branch=main \
-f dockerTag=${{needs.meta.outputs.build-tag}} \
-f dockerTag=${{needs.tag.outputs.build-tag}} \
-f deployPreprodRegion=true
gh workflow --repo neondatabase/infra run deploy-prod.yml --ref main \
@@ -1161,9 +1157,8 @@ jobs:
-f deployStorageBroker=true \
-f deployStorageController=true \
-f branch=main \
-f dockerTag=${{needs.meta.outputs.build-tag}}
;;
proxy-release)
-f dockerTag=${{needs.tag.outputs.build-tag}}
elif [[ "$GITHUB_REF_NAME" == "release-proxy" ]]; then
gh workflow --repo neondatabase/infra run deploy-dev.yml --ref main \
-f deployPgSniRouter=true \
-f deployProxy=true \
@@ -1171,7 +1166,7 @@ jobs:
-f deployStorageBroker=false \
-f deployStorageController=false \
-f branch=main \
-f dockerTag=${{needs.meta.outputs.build-tag}} \
-f dockerTag=${{needs.tag.outputs.build-tag}} \
-f deployPreprodRegion=true
gh workflow --repo neondatabase/infra run deploy-proxy-prod.yml --ref main \
@@ -1181,16 +1176,13 @@ jobs:
-f deployProxyScram=true \
-f deployProxyAuthBroker=true \
-f branch=main \
-f dockerTag=${{needs.meta.outputs.build-tag}}
;;
compute-release)
gh workflow --repo neondatabase/infra run deploy-compute-dev.yml --ref main -f dockerTag=${{needs.meta.outputs.build-tag}}
;;
*)
echo "RUN_KIND (value '${RUN_KIND}') is not set to either 'push-main', 'storage-release', 'proxy-release' or 'compute-release'"
-f dockerTag=${{needs.tag.outputs.build-tag}}
elif [[ "$GITHUB_REF_NAME" == "release-compute" ]]; then
gh workflow --repo neondatabase/infra run deploy-compute-dev.yml --ref main -f dockerTag=${{needs.tag.outputs.build-tag}}
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main', 'release', 'release-proxy' or 'release-compute'"
exit 1
;;
esac
fi
notify-storage-release-deploy-failure:
needs: [ deploy ]
@@ -1215,7 +1207,7 @@ jobs:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
contents: read
# `!failure() && !cancelled()` is required because the workflow transitively depends on the job that can be skipped: `push-neon-image-prod` and `push-compute-image-prod`
# `!failure() && !cancelled()` is required because the workflow transitively depends on the job that can be skipped: `push-to-acr-dev` and `push-to-acr-prod`
if: github.ref_name == 'release' && !failure() && !cancelled()
runs-on: ubuntu-22.04
@@ -1305,8 +1297,7 @@ jobs:
pin-build-tools-image:
needs: [ build-build-tools-image, test-images, build-and-test-locally ]
# `!failure() && !cancelled()` is required because the job (transitively) depends on jobs that can be skipped
if: github.ref_name == 'main' && !failure() && !cancelled()
if: github.ref_name == 'main'
uses: ./.github/workflows/pin-build-tools-image.yml
with:
from-tag: ${{ needs.build-build-tools-image.outputs.image-tag }}
@@ -1325,7 +1316,6 @@ jobs:
# Format `needs` differently to make the list more readable.
# Usually we do `needs: [...]`
needs:
- meta
- build-and-test-locally
- check-codestyle-python
- check-codestyle-rust
@@ -1349,7 +1339,7 @@ jobs:
|| needs.check-codestyle-python.result == 'skipped'
|| needs.check-codestyle-rust.result == 'skipped'
|| needs.files-changed.result == 'skipped'
|| (needs.push-compute-image-dev.result == 'skipped' && contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind))
|| (needs.push-neon-image-dev.result == 'skipped' && contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind))
|| needs.push-compute-image-dev.result == 'skipped'
|| needs.push-neon-image-dev.result == 'skipped'
|| needs.test-images.result == 'skipped'
|| (needs.trigger-custom-extensions-build-and-wait.result == 'skipped' && contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind))
|| needs.trigger-custom-extensions-build-and-wait.result == 'skipped'

View File

@@ -71,7 +71,7 @@ jobs:
uses: ./.github/workflows/build-macos.yml
with:
pg_versions: ${{ needs.files-changed.outputs.postgres_changes }}
rebuild_rust_code: ${{ fromJson(needs.files-changed.outputs.rebuild_rust_code) }}
rebuild_rust_code: ${{ needs.files-changed.outputs.rebuild_rust_code }}
rebuild_everything: ${{ fromJson(needs.files-changed.outputs.rebuild_everything) }}
gather-rust-build-stats:

View File

@@ -5,10 +5,6 @@ on:
types:
- ready_for_review
workflow_call:
inputs:
github-event-name:
type: string
required: true
defaults:
run:
@@ -23,7 +19,7 @@ jobs:
if: ${{ !contains(github.event.pull_request.labels.*.name, 'run-no-ci') }}
uses: ./.github/workflows/check-permissions.yml
with:
github-event-name: ${{ inputs.github-event-name || github.event_name }}
github-event-name: ${{ github.event_name }}
cancel-previous-e2e-tests:
needs: [ check-permissions ]
@@ -39,29 +35,46 @@ jobs:
run cancel-previous-in-concurrency-group.yml \
--field concurrency_group="${{ env.E2E_CONCURRENCY_GROUP }}"
meta:
uses: ./.github/workflows/_meta.yml
with:
github-event-name: ${{ inputs.github-event-name || github.event_name }}
tag:
needs: [ check-permissions ]
runs-on: ubuntu-22.04
outputs:
build-tag: ${{ steps.build-tag.outputs.tag }}
steps:
# Need `fetch-depth: 0` to count the number of commits in the branch
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Get build tag
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
CURRENT_BRANCH: ${{ github.head_ref || github.ref_name }}
CURRENT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
run: |
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
echo "tag=$(git rev-list --count HEAD)" | tee -a $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
echo "tag=release-$(git rev-list --count HEAD)" | tee -a $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release-proxy" ]]; then
echo "tag=release-proxy-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release-compute" ]]; then
echo "tag=release-compute-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"
BUILD_AND_TEST_RUN_ID=$(gh run list -b $CURRENT_BRANCH -c $CURRENT_SHA -w 'Build and Test' -L 1 --json databaseId --jq '.[].databaseId')
echo "tag=$BUILD_AND_TEST_RUN_ID" | tee -a $GITHUB_OUTPUT
fi
id: build-tag
trigger-e2e-tests:
needs: [ meta ]
needs: [ tag ]
runs-on: ubuntu-22.04
env:
EVENT_ACTION: ${{ github.event.action }}
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
TAG: >-
${{
contains(fromJSON('["compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind)
&& needs.meta.outputs.previous-storage-release
|| needs.meta.outputs.build-tag
}}
COMPUTE_TAG: >-
${{
contains(fromJSON('["storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind)
&& needs.meta.outputs.previous-compute-release
|| needs.meta.outputs.build-tag
}}
TAG: ${{ needs.tag.outputs.build-tag }}
steps:
- name: Wait for `push-{neon,compute}-image-dev` job to finish
# It's important to have a timeout here, the script in the step can run infinitely
@@ -144,6 +157,6 @@ jobs:
--raw-field "commit_hash=$COMMIT_SHA" \
--raw-field "remote_repo=${GITHUB_REPOSITORY}" \
--raw-field "storage_image_tag=${TAG}" \
--raw-field "compute_image_tag=${COMPUTE_TAG}" \
--raw-field "compute_image_tag=${TAG}" \
--raw-field "concurrency_group=${E2E_CONCURRENCY_GROUP}" \
--raw-field "e2e-platforms=${E2E_PLATFORMS}"

40
Cargo.lock generated
View File

@@ -984,9 +984,9 @@ dependencies = [
[[package]]
name = "bindgen"
version = "0.71.1"
version = "0.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f58bf3d7db68cfbac37cfc485a8d711e87e064c3d0fe0435b92f7a407f9d6b3"
checksum = "f49d8fed880d473ea71efb9bf597651e77201bdd4893efe54c9e5d65ae04ce6f"
dependencies = [
"bitflags 2.8.0",
"cexpr",
@@ -997,7 +997,7 @@ dependencies = [
"proc-macro2",
"quote",
"regex",
"rustc-hash 2.1.1",
"rustc-hash",
"shlex",
"syn 2.0.90",
]
@@ -1342,9 +1342,7 @@ dependencies = [
"tokio-util",
"tower 0.5.2",
"tower-http",
"tower-otel",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
"tracing-utils",
"url",
@@ -3537,7 +3535,7 @@ dependencies = [
"measured-derive",
"memchr",
"parking_lot 0.12.1",
"rustc-hash 1.1.0",
"rustc-hash",
"ryu",
]
@@ -4486,18 +4484,18 @@ dependencies = [
[[package]]
name = "pin-project"
version = "1.1.9"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfe2e71e1471fe07709406bf725f710b02927c9c54b2b5b2ec0e8087d97c327d"
checksum = "c95a7476719eab1e366eaf73d0260af3021184f18177925b07f54b30089ceead"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.1.9"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6e859e6e5bd50440ab63c47e3ebabc90f26251f7c73c3d3e837b74a1cc3fa67"
checksum = "39407670928234ebc5e6e580247dd567ad73a3578460c5990f9503df207e8f07"
dependencies = [
"proc-macro2",
"quote",
@@ -5012,7 +5010,7 @@ dependencies = [
"reqwest-tracing",
"rsa",
"rstest",
"rustc-hash 1.1.0",
"rustc-hash",
"rustls 0.23.18",
"rustls-native-certs 0.8.0",
"rustls-pemfile 2.1.1",
@@ -5630,12 +5628,6 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc-hash"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
[[package]]
name = "rustc_version"
version = "0.4.0"
@@ -7302,20 +7294,6 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e"
[[package]]
name = "tower-otel"
version = "0.2.0"
source = "git+https://github.com/mattiapenati/tower-otel?rev=56a7321053bcb72443888257b622ba0d43a11fcd#56a7321053bcb72443888257b622ba0d43a11fcd"
dependencies = [
"http 1.1.0",
"opentelemetry",
"pin-project",
"tower-layer",
"tower-service",
"tracing",
"tracing-opentelemetry",
]
[[package]]
name = "tower-service"
version = "0.3.3"

View File

@@ -43,7 +43,7 @@ members = [
]
[workspace.package]
edition = "2024"
edition = "2021"
license = "Apache-2.0"
## All dependency versions, used in the project
@@ -70,7 +70,7 @@ aws-types = "1.3"
axum = { version = "0.8.1", features = ["ws"] }
base64 = "0.13.0"
bincode = "1.3"
bindgen = "0.71"
bindgen = "0.70"
bit_field = "0.10.2"
bstr = "1.0"
byteorder = "1.4"
@@ -193,10 +193,6 @@ toml_edit = "0.22"
tonic = {version = "0.12.3", default-features = false, features = ["channel", "tls", "tls-roots"]}
tower = { version = "0.5.2", default-features = false }
tower-http = { version = "0.6.2", features = ["request-id", "trace"] }
# This revision uses opentelemetry 0.27. There's no tag for it.
tower-otel = { git = "https://github.com/mattiapenati/tower-otel", rev = "56a7321053bcb72443888257b622ba0d43a11fcd" }
tower-service = "0.3.3"
tracing = "0.1"
tracing-error = "0.2"

View File

@@ -46,9 +46,7 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
tokio-postgres.workspace = true
tokio-util.workspace = true
tokio-stream.workspace = true
tower-otel.workspace = true
tracing.workspace = true
tracing-opentelemetry.workspace = true
tracing-subscriber.workspace = true
tracing-utils.workspace = true
thiserror.workspace = true

View File

@@ -53,7 +53,6 @@ use compute_tools::compute::{
ComputeNode, ComputeState, PG_PID, ParsedSpec, forward_termination_signal,
};
use compute_tools::configurator::launch_configurator;
use compute_tools::disk_quota::set_disk_quota;
use compute_tools::extension_server::get_pg_version_string;
use compute_tools::http::server::Server;
use compute_tools::logger::*;
@@ -61,11 +60,10 @@ use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static;
use compute_tools::monitor::launch_monitor;
use compute_tools::params::*;
use compute_tools::spec::*;
use compute_tools::swap::resize_swap;
use rlimit::{Resource, setrlimit};
use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM};
use signal_hook::iterator::Signals;
use tracing::{error, info, warn};
use tracing::{error, info};
use url::Url;
use utils::failpoint_support;
@@ -172,7 +170,11 @@ fn main() -> Result<()> {
let compute = wait_spec(build_tag, &cli, cli_spec)?;
start_postgres(&cli, compute)?
bootstrap_compute(
compute,
#[cfg(target_os = "linux")]
&cli,
)
// Startup is finished, exit the startup tracing span
};
@@ -348,6 +350,8 @@ fn wait_spec(
ext_remote_storage: cli.remote_ext_config.clone(),
ext_download_progress: RwLock::new(HashMap::new()),
build_tag,
resize_swap_on_bind: cli.resize_swap_on_bind,
set_disk_quota_for_fs: cli.set_disk_quota_for_fs.clone(),
};
let compute = Arc::new(compute_node);
@@ -400,37 +404,22 @@ fn wait_spec(
Ok(compute)
}
fn start_postgres(
cli: &Cli,
// Start Postgres and some aux threads like various monitors
fn bootstrap_compute(
compute: Arc<ComputeNode>,
) -> Result<(Option<PostgresHandle>, StartPostgresResult)> {
#[cfg(target_os = "linux")] cli: &Cli,
) -> (Option<PostgresHandle>, StartPostgresResult) {
// We got all we need, update the state.
let mut state = compute.state.lock().unwrap();
// Create a tracing span for the startup operation.
//
// We could otherwise just annotate the function with #[instrument], but if
// we're being configured from a /configure HTTP request, we want the
// startup to be considered part of the /configure request.
let _this_entered = {
// Temporarily enter the /configure request's span, so that the new span
// becomes its child.
let _parent_entered = state.startup_span.take().map(|p| p.entered());
tracing::info_span!("start_postgres")
}
.entered();
state.set_status(ComputeStatus::Init, &compute.state_changed);
info!(
"running compute with features: {:?}",
state.pspec.as_ref().unwrap().spec.features
);
// before we release the mutex, fetch some parameters for later.
// Before we release the mutex, fetch some parameters for later.
let &ComputeSpec {
swap_size_bytes,
disk_quota_bytes,
#[cfg(target_os = "linux")]
disable_lfc_resizing,
..
@@ -441,120 +430,86 @@ fn start_postgres(
let _monitor_handle = launch_monitor(&compute);
let _configurator_handle = launch_configurator(&compute);
let mut prestartup_failed = false;
let mut delay_exit = false;
// Resize swap to the desired size if the compute spec says so
if let (Some(size_bytes), true) = (swap_size_bytes, cli.resize_swap_on_bind) {
// To avoid 'swapoff' hitting postgres startup, we need to run resize-swap to completion
// *before* starting postgres.
//
// In theory, we could do this asynchronously if SkipSwapon was enabled for VMs, but this
// carries a risk of introducing hard-to-debug issues - e.g. if postgres sometimes gets
// OOM-killed during startup because swap wasn't available yet.
match resize_swap(size_bytes) {
Ok(()) => {
let size_mib = size_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
info!(%size_bytes, %size_mib, "resized swap");
}
Err(err) => {
let err = err.context("failed to resize swap");
error!("{err:#}");
// Mark compute startup as failed; don't try to start postgres, and report this
// error to the control plane when it next asks.
prestartup_failed = true;
compute.set_failed_status(err);
delay_exit = true;
}
}
}
// Set disk quota if the compute spec says so
if let (Some(disk_quota_bytes), Some(disk_quota_fs_mountpoint)) =
(disk_quota_bytes, cli.set_disk_quota_for_fs.as_ref())
{
match set_disk_quota(disk_quota_bytes, disk_quota_fs_mountpoint) {
Ok(()) => {
let size_mib = disk_quota_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
info!(%disk_quota_bytes, %size_mib, "set disk quota");
}
Err(err) => {
let err = err.context("failed to set disk quota");
error!("{err:#}");
// Mark compute startup as failed; don't try to start postgres, and report this
// error to the control plane when it next asks.
prestartup_failed = true;
compute.set_failed_status(err);
delay_exit = true;
}
}
}
// Start Postgres
let mut pg = None;
if !prestartup_failed {
pg = match compute.start_compute() {
Ok(pg) => {
info!(postmaster_pid = %pg.0.id(), "Postgres was started");
Some(pg)
match compute.start_compute() {
Ok(pg) => {
info!(postmaster_pid = %pg.0.id(), "Postgres was started");
// Start the vm-monitor if directed to. The vm-monitor only runs on linux
// because it requires cgroups.
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
use std::env;
use tokio_util::sync::CancellationToken;
// This token is used internally by the monitor to clean up all threads
let token = CancellationToken::new();
// Don't pass postgres connection string to vm-monitor if we don't want it to resize LFC
let pgconnstr = if disable_lfc_resizing.unwrap_or(false) {
None
} else {
Some(cli.filecache_connstr.clone())
};
let vm_monitor = if env::var_os("AUTOSCALING").is_some() {
let vm_monitor = tokio::spawn(vm_monitor::start(
Box::leak(Box::new(vm_monitor::Args {
cgroup: Some(cli.cgroup.clone()),
pgconnstr,
addr: cli.vm_monitor_addr.clone(),
})),
token.clone(),
));
Some(vm_monitor)
} else {
None
};
}
}
Err(err) => {
error!("could not start the compute node: {:#}", err);
compute.set_failed_status(err);
delay_exit = true;
None
(
Some(pg),
StartPostgresResult {
delay_exit,
compute,
#[cfg(target_os = "linux")]
token,
#[cfg(target_os = "linux")]
vm_monitor,
},
)
}
Err(err) => {
error!("could not start the compute node: {:#}", err);
compute.set_failed_status(err);
delay_exit = true;
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
use tokio_util::sync::CancellationToken;
let token = CancellationToken::new();
(
None,
StartPostgresResult {
delay_exit,
compute,
token,
vm_monitor: None,
},
)
} else {
(None, StartPostgresResult {
delay_exit,
compute,
})
}
}
};
} else {
warn!("skipping postgres startup because pre-startup step failed");
}
// Start the vm-monitor if directed to. The vm-monitor only runs on linux
// because it requires cgroups.
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
use std::env;
use tokio_util::sync::CancellationToken;
// This token is used internally by the monitor to clean up all threads
let token = CancellationToken::new();
// don't pass postgres connection string to vm-monitor if we don't want it to resize LFC
let pgconnstr = if disable_lfc_resizing.unwrap_or(false) {
None
} else {
Some(cli.filecache_connstr.clone())
};
let vm_monitor = if env::var_os("AUTOSCALING").is_some() {
let vm_monitor = tokio::spawn(vm_monitor::start(
Box::leak(Box::new(vm_monitor::Args {
cgroup: Some(cli.cgroup.clone()),
pgconnstr,
addr: cli.vm_monitor_addr.clone(),
})),
token.clone(),
));
Some(vm_monitor)
} else {
None
};
}
}
Ok((
pg,
StartPostgresResult {
delay_exit,
compute,
#[cfg(target_os = "linux")]
token,
#[cfg(target_os = "linux")]
vm_monitor,
},
))
}
type PostgresHandle = (std::process::Child, tokio::task::JoinHandle<Result<()>>);

View File

@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::iter::once;
use std::os::unix::fs::{PermissionsExt, symlink};
use std::path::Path;
use std::process::{Command, Stdio};
@@ -12,7 +13,9 @@ use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use compute_api::privilege::Privilege;
use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::{ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PgIdent};
use compute_api::spec::{
ComputeFeature, ComputeMode, ComputeSpec, Database, ExtVersion, PgIdent, Role,
};
use futures::StreamExt;
use futures::future::join_all;
use futures::stream::FuturesUnordered;
@@ -28,12 +31,23 @@ use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::measured_stream::MeasuredReader;
use crate::disk_quota::set_disk_quota;
use crate::installed_extensions::get_installed_extensions;
use crate::pg_helpers::*;
use crate::spec::*;
use crate::spec_apply::ApplySpecPhase::{
CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreateSchemaNeon,
CreateSuperUser, DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions,
HandleNeonExtension, HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles,
RunInEachDatabase,
};
use crate::spec_apply::PerDatabasePhase::{
ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions, HandleAnonExtension,
};
use crate::spec_apply::{DB, MutableApplyContext, PerDatabasePhase, apply_operations};
use crate::swap::resize_swap;
use crate::sync_sk::{check_if_synced, ping_safekeeper};
use crate::{config, extension_server, local_proxy};
pub static SYNC_SAFEKEEPERS_PID: AtomicU32 = AtomicU32::new(0);
pub static PG_PID: AtomicU32 = AtomicU32::new(0);
@@ -79,6 +93,10 @@ pub struct ComputeNode {
// key: ext_archive_name, value: started download time, download_completed?
pub ext_download_progress: RwLock<HashMap<String, (DateTime<Utc>, bool)>>,
pub build_tag: String,
/// Initialized from cli.resize_swap_on_bind
pub resize_swap_on_bind: bool,
/// Initialized from cli.set_disk_quota_for_fs
pub set_disk_quota_for_fs: Option<String>,
}
// store some metrics about download size that might impact startup time
@@ -97,23 +115,7 @@ pub struct ComputeState {
/// compute wasn't used since start.
pub last_active: Option<DateTime<Utc>>,
pub error: Option<String>,
/// Compute spec. This can be received from the CLI or - more likely -
/// passed by the control plane with a /configure HTTP request.
pub pspec: Option<ParsedSpec>,
/// If the spec is passed by a /configure request, 'startup_span' is the
/// /configure request's tracing span. The main thread enters it when it
/// processes the compute startup, so that the compute startup is considered
/// to be part of the /configure request for tracing purposes.
///
/// If the request handling thread/task called startup_compute() directly,
/// it would automatically be a child of the request handling span, and we
/// wouldn't need this. But because we use the main thread to perform the
/// startup, and the /configure task just waits for it to finish, we need to
/// set up the span relationship ourselves.
pub startup_span: Option<tracing::span::Span>,
pub metrics: ComputeMetrics,
}
@@ -125,7 +127,6 @@ impl ComputeState {
last_active: None,
error: None,
pspec: None,
startup_span: None,
metrics: ComputeMetrics::default(),
}
}
@@ -775,9 +776,8 @@ impl ComputeNode {
Ok(())
}
/// Start Postgres as a child process and wait for it to start accepting
/// connections.
///
/// Start Postgres as a child process and manage DBs/roles.
/// After that this will hang waiting on the postmaster process to exit.
/// Returns a handle to the child process and a handle to the logs thread.
#[instrument(skip_all)]
pub fn start_postgres(
@@ -915,6 +915,388 @@ impl ComputeNode {
Ok(client)
}
/// Apply the spec to the running PostgreSQL instance.
/// The caller can decide to run with multiple clients in parallel, or
/// single mode. Either way, the commands executed will be the same, and
/// only commands run in different databases are parallelized.
#[instrument(skip_all)]
pub fn apply_spec_sql(
&self,
spec: Arc<ComputeSpec>,
conf: Arc<tokio_postgres::Config>,
concurrency: usize,
) -> Result<()> {
info!("Applying config with max {} concurrency", concurrency);
debug!("Config: {:?}", spec);
let rt = tokio::runtime::Handle::current();
rt.block_on(async {
// Proceed with post-startup configuration. Note, that order of operations is important.
let client = Self::get_maintenance_client(&conf).await?;
let spec = spec.clone();
let databases = get_existing_dbs_async(&client).await?;
let roles = get_existing_roles_async(&client)
.await?
.into_iter()
.map(|role| (role.name.clone(), role))
.collect::<HashMap<String, Role>>();
// Check if we need to drop subscriptions before starting the endpoint.
//
// It is important to do this operation exactly once when endpoint starts on a new branch.
// Otherwise, we may drop not inherited, but newly created subscriptions.
//
// We cannot rely only on spec.drop_subscriptions_before_start flag,
// because if for some reason compute restarts inside VM,
// it will start again with the same spec and flag value.
//
// To handle this, we save the fact of the operation in the database
// in the neon.drop_subscriptions_done table.
// If the table does not exist, we assume that the operation was never performed, so we must do it.
// If table exists, we check if the operation was performed on the current timelilne.
//
let mut drop_subscriptions_done = false;
if spec.drop_subscriptions_before_start {
let timeline_id = self.get_timeline_id().context("timeline_id must be set")?;
let query = format!("select 1 from neon.drop_subscriptions_done where timeline_id = '{}'", timeline_id);
info!("Checking if drop subscription operation was already performed for timeline_id: {}", timeline_id);
drop_subscriptions_done = match
client.simple_query(&query).await {
Ok(result) => {
matches!(&result[0], postgres::SimpleQueryMessage::Row(_))
},
Err(e) =>
{
match e.code() {
Some(&SqlState::UNDEFINED_TABLE) => false,
_ => {
// We don't expect any other error here, except for the schema/table not existing
error!("Error checking if drop subscription operation was already performed: {}", e);
return Err(e.into());
}
}
}
}
};
let jwks_roles = Arc::new(
spec.as_ref()
.local_proxy_config
.iter()
.flat_map(|it| &it.jwks)
.flatten()
.flat_map(|setting| &setting.role_names)
.cloned()
.collect::<HashSet<_>>(),
);
let ctx = Arc::new(tokio::sync::RwLock::new(MutableApplyContext {
roles,
dbs: databases,
}));
// Apply special pre drop database phase.
// NOTE: we use the code of RunInEachDatabase phase for parallelism
// and connection management, but we don't really run it in *each* database,
// only in databases, we're about to drop.
info!("Applying PerDatabase (pre-dropdb) phase");
let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));
// Run the phase for each database that we're about to drop.
let db_processes = spec
.delta_operations
.iter()
.flatten()
.filter_map(move |op| {
if op.action.as_str() == "delete_db" {
Some(op.name.clone())
} else {
None
}
})
.map(|dbname| {
let spec = spec.clone();
let ctx = ctx.clone();
let jwks_roles = jwks_roles.clone();
let mut conf = conf.as_ref().clone();
let concurrency_token = concurrency_token.clone();
// We only need dbname field for this phase, so set other fields to dummy values
let db = DB::UserDB(Database {
name: dbname.clone(),
owner: "cloud_admin".to_string(),
options: None,
restrict_conn: false,
invalid: false,
});
debug!("Applying per-database phases for Database {:?}", &db);
match &db {
DB::SystemDB => {}
DB::UserDB(db) => {
conf.dbname(db.name.as_str());
}
}
let conf = Arc::new(conf);
let fut = Self::apply_spec_sql_db(
spec.clone(),
conf,
ctx.clone(),
jwks_roles.clone(),
concurrency_token.clone(),
db,
[DropLogicalSubscriptions].to_vec(),
);
Ok(spawn(fut))
})
.collect::<Vec<Result<_, anyhow::Error>>>();
for process in db_processes.into_iter() {
let handle = process?;
if let Err(e) = handle.await? {
// Handle the error case where the database does not exist
// We do not check whether the DB exists or not in the deletion phase,
// so we shouldn't be strict about it in pre-deletion cleanup as well.
if e.to_string().contains("does not exist") {
warn!("Error dropping subscription: {}", e);
} else {
return Err(e);
}
};
}
for phase in [
CreateSuperUser,
DropInvalidDatabases,
RenameRoles,
CreateAndAlterRoles,
RenameAndDeleteDatabases,
CreateAndAlterDatabases,
CreateSchemaNeon,
] {
info!("Applying phase {:?}", &phase);
apply_operations(
spec.clone(),
ctx.clone(),
jwks_roles.clone(),
phase,
|| async { Ok(&client) },
)
.await?;
}
info!("Applying RunInEachDatabase2 phase");
let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));
let db_processes = spec
.cluster
.databases
.iter()
.map(|db| DB::new(db.clone()))
// include
.chain(once(DB::SystemDB))
.map(|db| {
let spec = spec.clone();
let ctx = ctx.clone();
let jwks_roles = jwks_roles.clone();
let mut conf = conf.as_ref().clone();
let concurrency_token = concurrency_token.clone();
let db = db.clone();
debug!("Applying per-database phases for Database {:?}", &db);
match &db {
DB::SystemDB => {}
DB::UserDB(db) => {
conf.dbname(db.name.as_str());
}
}
let conf = Arc::new(conf);
let mut phases = vec![
DeleteDBRoleReferences,
ChangeSchemaPerms,
HandleAnonExtension,
];
if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
info!("Adding DropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
phases.push(DropLogicalSubscriptions);
}
let fut = Self::apply_spec_sql_db(
spec.clone(),
conf,
ctx.clone(),
jwks_roles.clone(),
concurrency_token.clone(),
db,
phases,
);
Ok(spawn(fut))
})
.collect::<Vec<Result<_, anyhow::Error>>>();
for process in db_processes.into_iter() {
let handle = process?;
handle.await??;
}
let mut phases = vec![
HandleOtherExtensions,
HandleNeonExtension, // This step depends on CreateSchemaNeon
CreateAvailabilityCheck,
DropRoles,
];
// This step depends on CreateSchemaNeon
if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
info!("Adding FinalizeDropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
phases.push(FinalizeDropLogicalSubscriptions);
}
for phase in phases {
debug!("Applying phase {:?}", &phase);
apply_operations(
spec.clone(),
ctx.clone(),
jwks_roles.clone(),
phase,
|| async { Ok(&client) },
)
.await?;
}
Ok::<(), anyhow::Error>(())
})?;
Ok(())
}
/// Apply SQL migrations of the RunInEachDatabase phase.
///
/// May opt to not connect to databases that don't have any scheduled
/// operations. The function is concurrency-controlled with the provided
/// semaphore. The caller has to make sure the semaphore isn't exhausted.
async fn apply_spec_sql_db(
spec: Arc<ComputeSpec>,
conf: Arc<tokio_postgres::Config>,
ctx: Arc<tokio::sync::RwLock<MutableApplyContext>>,
jwks_roles: Arc<HashSet<String>>,
concurrency_token: Arc<tokio::sync::Semaphore>,
db: DB,
subphases: Vec<PerDatabasePhase>,
) -> Result<()> {
let _permit = concurrency_token.acquire().await?;
let mut client_conn = None;
for subphase in subphases {
apply_operations(
spec.clone(),
ctx.clone(),
jwks_roles.clone(),
RunInEachDatabase {
db: db.clone(),
subphase,
},
// Only connect if apply_operation actually wants a connection.
// It's quite possible this database doesn't need any queries,
// so by not connecting we save time and effort connecting to
// that database.
|| async {
if client_conn.is_none() {
let db_client = Self::get_maintenance_client(&conf).await?;
client_conn.replace(db_client);
}
let client = client_conn.as_ref().unwrap();
Ok(client)
},
)
.await?;
}
drop(client_conn);
Ok::<(), anyhow::Error>(())
}
/// Choose how many concurrent connections to use for applying the spec changes.
pub fn max_service_connections(
&self,
compute_state: &ComputeState,
spec: &ComputeSpec,
) -> usize {
// If the cluster is in Init state we don't have to deal with user connections,
// and can thus use all `max_connections` connection slots. However, that's generally not
// very efficient, so we generally still limit it to a smaller number.
if compute_state.status == ComputeStatus::Init {
// If the settings contain 'max_connections', use that as template
if let Some(config) = spec.cluster.settings.find("max_connections") {
config.parse::<usize>().ok()
} else {
// Otherwise, try to find the setting in the postgresql_conf string
spec.cluster
.postgresql_conf
.iter()
.flat_map(|conf| conf.split("\n"))
.filter_map(|line| {
if !line.contains("max_connections") {
return None;
}
let (key, value) = line.split_once("=")?;
let key = key
.trim_start_matches(char::is_whitespace)
.trim_end_matches(char::is_whitespace);
let value = value
.trim_start_matches(char::is_whitespace)
.trim_end_matches(char::is_whitespace);
if key != "max_connections" {
return None;
}
value.parse::<usize>().ok()
})
.next()
}
// If max_connections is present, use at most 1/3rd of that.
// When max_connections is lower than 30, try to use at least 10 connections, but
// never more than max_connections.
.map(|limit| match limit {
0..10 => limit,
10..30 => 10,
30.. => limit / 3,
})
// If we didn't find max_connections, default to 10 concurrent connections.
.unwrap_or(10)
} else {
// state == Running
// Because the cluster is already in the Running state, we should assume users are
// already connected to the cluster, and high concurrency could negatively
// impact user connectivity. Therefore, we can limit concurrency to the number of
// reserved superuser connections, which users wouldn't be able to use anyway.
spec.cluster
.settings
.find("superuser_reserved_connections")
.iter()
.filter_map(|val| val.parse::<usize>().ok())
.map(|val| if val > 1 { val - 1 } else { 1 })
.last()
.unwrap_or(3)
}
}
/// Do initial configuration of the already started Postgres.
#[instrument(skip_all)]
pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> {
@@ -1052,12 +1434,59 @@ impl ComputeNode {
Ok(())
}
/// Configure some VM parameters like swap and disk quota
#[instrument(skip_all)]
pub fn configure_vm(&self, spec: &ComputeSpec) -> Result<()> {
// Resize swap to the desired size if the compute spec says so
if let (Some(size_bytes), true) = (spec.swap_size_bytes, self.resize_swap_on_bind) {
// To avoid 'swapoff' hitting postgres startup, we need to run resize-swap to completion
// *before* starting postgres.
//
// In theory, we could do this asynchronously if SkipSwapon was enabled for VMs, but this
// carries a risk of introducing hard-to-debug issues - e.g. if postgres sometimes gets
// OOM-killed during startup because swap wasn't available yet.
match resize_swap(size_bytes) {
Ok(()) => {
let size_mib = size_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
info!(%size_bytes, %size_mib, "resized swap");
}
Err(err) => {
let err = err.context("failed to resize swap");
error!("{err:#}");
return Err(err);
}
}
}
// Set disk quota if the compute spec says so
if let (Some(disk_quota_bytes), Some(disk_quota_fs_mountpoint)) =
(spec.disk_quota_bytes, self.set_disk_quota_for_fs.as_ref())
{
match set_disk_quota(disk_quota_bytes, disk_quota_fs_mountpoint) {
Ok(()) => {
let size_mib = disk_quota_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
info!(%disk_quota_bytes, %size_mib, "set disk quota");
}
Err(err) => {
let err = err.context("failed to set disk quota");
error!("{err:#}");
return Err(err);
}
}
}
Ok(())
}
#[instrument(skip_all)]
pub fn start_compute(
&self,
) -> Result<(std::process::Child, tokio::task::JoinHandle<Result<()>>)> {
let compute_state = self.state.lock().unwrap().clone();
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
info!(
"starting compute for project {}, operation {}, tenant {}, timeline {}",
pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None"),
@@ -1066,9 +1495,11 @@ impl ComputeNode {
pspec.timeline_id,
);
// tune pgbouncer
self.configure_vm(&pspec.spec)?;
// Configure pgbouncer
if let Some(pgbouncer_settings) = &pspec.spec.pgbouncer_settings {
info!("tuning pgbouncer");
info!("configuring pgbouncer");
// Spawn a background task to do the tuning,
// so that we don't block the main thread that starts Postgres.
@@ -1094,14 +1525,11 @@ impl ComputeNode {
});
}
info!(
"start_compute spec.remote_extensions {:?}",
pspec.spec.remote_extensions
);
// This part is sync, because we need to download
// remote shared_preload_libraries before postgres start (if any)
if let Some(remote_extensions) = &pspec.spec.remote_extensions {
info!(?remote_extensions, "processing remote extensions");
// First, create control files for all availale extensions
extension_server::create_control_files(remote_extensions, &self.pgbin);
@@ -1120,7 +1548,7 @@ impl ComputeNode {
state.metrics.largest_ext_size = remote_ext_metrics.largest_ext_size;
state.metrics.total_ext_download_size = remote_ext_metrics.total_ext_download_size;
info!(
"Loading shared_preload_libraries took {:?}ms",
"loading shared_preload_libraries from remote extensions took {:?}ms",
library_load_time
);
info!("{:?}", remote_ext_metrics);
@@ -1179,6 +1607,7 @@ impl ComputeNode {
});
}
let metrics: ComputeMetrics;
let startup_end_time = Utc::now();
{
let mut state = self.state.lock().unwrap();
@@ -1197,21 +1626,17 @@ impl ComputeNode {
.to_std()
.unwrap()
.as_millis() as u64;
metrics = state.metrics.clone();
}
self.set_status(ComputeStatus::Running);
// Log metrics so that we can search for slow operations in logs
info!(
?metrics,
"finished configuration of compute for project {}",
pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None")
);
// Log metrics so that we can search for slow operations in logs
let metrics = {
let state = self.state.lock().unwrap();
state.metrics.clone()
};
info!(?metrics, "compute start finished");
Ok(pg_process)
}

View File

@@ -45,18 +45,13 @@ pub(in crate::http) async fn configure(
return JsonResponse::invalid_status(state.status);
}
// Pass the tracing span to the main thread that performs the startup,
// so that the start_compute operation is considered a child of this
// configure request for tracing purposes.
state.startup_span = Some(tracing::Span::current());
state.pspec = Some(pspec);
state.set_status(ComputeStatus::ConfigurationPending, &compute.state_changed);
drop(state);
}
// Spawn a blocking thread to wait for compute to become Running. This is
// needed to not block the main pool of workers and to be able to serve
// needed to do not block the main pool of workers and be able to serve
// other requests while some particular request is waiting for compute to
// finish configuration.
let c = compute.clone();

View File

@@ -121,7 +121,6 @@ impl From<Server> for Router<Arc<ComputeNode>> {
)
.layer(PropagateRequestIdLayer::x_request_id()),
)
.layer(tower_otel::trace::HttpLayer::server(tracing::Level::INFO))
}
}

View File

@@ -4,413 +4,15 @@ use std::future::Future;
use std::iter::{empty, once};
use std::sync::Arc;
use anyhow::{Context, Result};
use compute_api::responses::ComputeStatus;
use anyhow::Result;
use compute_api::spec::{ComputeFeature, ComputeSpec, Database, PgIdent, Role};
use futures::future::join_all;
use tokio::sync::RwLock;
use tokio_postgres::Client;
use tokio_postgres::error::SqlState;
use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
use tracing::{Instrument, debug, info_span, warn};
use crate::compute::{ComputeNode, ComputeState, construct_superuser_query};
use crate::pg_helpers::{
DatabaseExt, Escaping, GenericOptionsSearch, RoleExt, escape_literal, get_existing_dbs_async,
get_existing_roles_async,
};
use crate::spec_apply::ApplySpecPhase::{
CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreateSchemaNeon,
CreateSuperUser, DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions,
HandleNeonExtension, HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles,
RunInEachDatabase,
};
use crate::spec_apply::PerDatabasePhase::{
ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions, HandleAnonExtension,
};
impl ComputeNode {
/// Apply the spec to the running PostgreSQL instance.
/// The caller can decide to run with multiple clients in parallel, or
/// single mode. Either way, the commands executed will be the same, and
/// only commands run in different databases are parallelized.
#[instrument(skip_all)]
pub fn apply_spec_sql(
&self,
spec: Arc<ComputeSpec>,
conf: Arc<tokio_postgres::Config>,
concurrency: usize,
) -> Result<()> {
info!("Applying config with max {} concurrency", concurrency);
debug!("Config: {:?}", spec);
let rt = tokio::runtime::Handle::current();
rt.block_on(async {
// Proceed with post-startup configuration. Note, that order of operations is important.
let client = Self::get_maintenance_client(&conf).await?;
let spec = spec.clone();
let databases = get_existing_dbs_async(&client).await?;
let roles = get_existing_roles_async(&client)
.await?
.into_iter()
.map(|role| (role.name.clone(), role))
.collect::<HashMap<String, Role>>();
// Check if we need to drop subscriptions before starting the endpoint.
//
// It is important to do this operation exactly once when endpoint starts on a new branch.
// Otherwise, we may drop not inherited, but newly created subscriptions.
//
// We cannot rely only on spec.drop_subscriptions_before_start flag,
// because if for some reason compute restarts inside VM,
// it will start again with the same spec and flag value.
//
// To handle this, we save the fact of the operation in the database
// in the neon.drop_subscriptions_done table.
// If the table does not exist, we assume that the operation was never performed, so we must do it.
// If table exists, we check if the operation was performed on the current timelilne.
//
let mut drop_subscriptions_done = false;
if spec.drop_subscriptions_before_start {
let timeline_id = self.get_timeline_id().context("timeline_id must be set")?;
let query = format!("select 1 from neon.drop_subscriptions_done where timeline_id = '{}'", timeline_id);
info!("Checking if drop subscription operation was already performed for timeline_id: {}", timeline_id);
drop_subscriptions_done = match
client.simple_query(&query).await {
Ok(result) => {
matches!(&result[0], postgres::SimpleQueryMessage::Row(_))
},
Err(e) =>
{
match e.code() {
Some(&SqlState::UNDEFINED_TABLE) => false,
_ => {
// We don't expect any other error here, except for the schema/table not existing
error!("Error checking if drop subscription operation was already performed: {}", e);
return Err(e.into());
}
}
}
}
};
let jwks_roles = Arc::new(
spec.as_ref()
.local_proxy_config
.iter()
.flat_map(|it| &it.jwks)
.flatten()
.flat_map(|setting| &setting.role_names)
.cloned()
.collect::<HashSet<_>>(),
);
let ctx = Arc::new(tokio::sync::RwLock::new(MutableApplyContext {
roles,
dbs: databases,
}));
// Apply special pre drop database phase.
// NOTE: we use the code of RunInEachDatabase phase for parallelism
// and connection management, but we don't really run it in *each* database,
// only in databases, we're about to drop.
info!("Applying PerDatabase (pre-dropdb) phase");
let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));
// Run the phase for each database that we're about to drop.
let db_processes = spec
.delta_operations
.iter()
.flatten()
.filter_map(move |op| {
if op.action.as_str() == "delete_db" {
Some(op.name.clone())
} else {
None
}
})
.map(|dbname| {
let spec = spec.clone();
let ctx = ctx.clone();
let jwks_roles = jwks_roles.clone();
let mut conf = conf.as_ref().clone();
let concurrency_token = concurrency_token.clone();
// We only need dbname field for this phase, so set other fields to dummy values
let db = DB::UserDB(Database {
name: dbname.clone(),
owner: "cloud_admin".to_string(),
options: None,
restrict_conn: false,
invalid: false,
});
debug!("Applying per-database phases for Database {:?}", &db);
match &db {
DB::SystemDB => {}
DB::UserDB(db) => {
conf.dbname(db.name.as_str());
}
}
let conf = Arc::new(conf);
let fut = Self::apply_spec_sql_db(
spec.clone(),
conf,
ctx.clone(),
jwks_roles.clone(),
concurrency_token.clone(),
db,
[DropLogicalSubscriptions].to_vec(),
);
Ok(tokio::spawn(fut))
})
.collect::<Vec<Result<_, anyhow::Error>>>();
for process in db_processes.into_iter() {
let handle = process?;
if let Err(e) = handle.await? {
// Handle the error case where the database does not exist
// We do not check whether the DB exists or not in the deletion phase,
// so we shouldn't be strict about it in pre-deletion cleanup as well.
if e.to_string().contains("does not exist") {
warn!("Error dropping subscription: {}", e);
} else {
return Err(e);
}
};
}
for phase in [
CreateSuperUser,
DropInvalidDatabases,
RenameRoles,
CreateAndAlterRoles,
RenameAndDeleteDatabases,
CreateAndAlterDatabases,
CreateSchemaNeon,
] {
info!("Applying phase {:?}", &phase);
apply_operations(
spec.clone(),
ctx.clone(),
jwks_roles.clone(),
phase,
|| async { Ok(&client) },
)
.await?;
}
info!("Applying RunInEachDatabase2 phase");
let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));
let db_processes = spec
.cluster
.databases
.iter()
.map(|db| DB::new(db.clone()))
// include
.chain(once(DB::SystemDB))
.map(|db| {
let spec = spec.clone();
let ctx = ctx.clone();
let jwks_roles = jwks_roles.clone();
let mut conf = conf.as_ref().clone();
let concurrency_token = concurrency_token.clone();
let db = db.clone();
debug!("Applying per-database phases for Database {:?}", &db);
match &db {
DB::SystemDB => {}
DB::UserDB(db) => {
conf.dbname(db.name.as_str());
}
}
let conf = Arc::new(conf);
let mut phases = vec![
DeleteDBRoleReferences,
ChangeSchemaPerms,
HandleAnonExtension,
];
if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
info!("Adding DropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
phases.push(DropLogicalSubscriptions);
}
let fut = Self::apply_spec_sql_db(
spec.clone(),
conf,
ctx.clone(),
jwks_roles.clone(),
concurrency_token.clone(),
db,
phases,
);
Ok(tokio::spawn(fut))
})
.collect::<Vec<Result<_, anyhow::Error>>>();
for process in db_processes.into_iter() {
let handle = process?;
handle.await??;
}
let mut phases = vec![
HandleOtherExtensions,
HandleNeonExtension, // This step depends on CreateSchemaNeon
CreateAvailabilityCheck,
DropRoles,
];
// This step depends on CreateSchemaNeon
if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
info!("Adding FinalizeDropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
phases.push(FinalizeDropLogicalSubscriptions);
}
for phase in phases {
debug!("Applying phase {:?}", &phase);
apply_operations(
spec.clone(),
ctx.clone(),
jwks_roles.clone(),
phase,
|| async { Ok(&client) },
)
.await?;
}
Ok::<(), anyhow::Error>(())
})?;
Ok(())
}
/// Apply SQL migrations of the RunInEachDatabase phase.
///
/// May opt to not connect to databases that don't have any scheduled
/// operations. The function is concurrency-controlled with the provided
/// semaphore. The caller has to make sure the semaphore isn't exhausted.
async fn apply_spec_sql_db(
spec: Arc<ComputeSpec>,
conf: Arc<tokio_postgres::Config>,
ctx: Arc<tokio::sync::RwLock<MutableApplyContext>>,
jwks_roles: Arc<HashSet<String>>,
concurrency_token: Arc<tokio::sync::Semaphore>,
db: DB,
subphases: Vec<PerDatabasePhase>,
) -> Result<()> {
let _permit = concurrency_token.acquire().await?;
let mut client_conn = None;
for subphase in subphases {
apply_operations(
spec.clone(),
ctx.clone(),
jwks_roles.clone(),
RunInEachDatabase {
db: db.clone(),
subphase,
},
// Only connect if apply_operation actually wants a connection.
// It's quite possible this database doesn't need any queries,
// so by not connecting we save time and effort connecting to
// that database.
|| async {
if client_conn.is_none() {
let db_client = Self::get_maintenance_client(&conf).await?;
client_conn.replace(db_client);
}
let client = client_conn.as_ref().unwrap();
Ok(client)
},
)
.await?;
}
drop(client_conn);
Ok::<(), anyhow::Error>(())
}
/// Choose how many concurrent connections to use for applying the spec changes.
pub fn max_service_connections(
&self,
compute_state: &ComputeState,
spec: &ComputeSpec,
) -> usize {
// If the cluster is in Init state we don't have to deal with user connections,
// and can thus use all `max_connections` connection slots. However, that's generally not
// very efficient, so we generally still limit it to a smaller number.
if compute_state.status == ComputeStatus::Init {
// If the settings contain 'max_connections', use that as template
if let Some(config) = spec.cluster.settings.find("max_connections") {
config.parse::<usize>().ok()
} else {
// Otherwise, try to find the setting in the postgresql_conf string
spec.cluster
.postgresql_conf
.iter()
.flat_map(|conf| conf.split("\n"))
.filter_map(|line| {
if !line.contains("max_connections") {
return None;
}
let (key, value) = line.split_once("=")?;
let key = key
.trim_start_matches(char::is_whitespace)
.trim_end_matches(char::is_whitespace);
let value = value
.trim_start_matches(char::is_whitespace)
.trim_end_matches(char::is_whitespace);
if key != "max_connections" {
return None;
}
value.parse::<usize>().ok()
})
.next()
}
// If max_connections is present, use at most 1/3rd of that.
// When max_connections is lower than 30, try to use at least 10 connections, but
// never more than max_connections.
.map(|limit| match limit {
0..10 => limit,
10..30 => 10,
30.. => limit / 3,
})
// If we didn't find max_connections, default to 10 concurrent connections.
.unwrap_or(10)
} else {
// state == Running
// Because the cluster is already in the Running state, we should assume users are
// already connected to the cluster, and high concurrency could negatively
// impact user connectivity. Therefore, we can limit concurrency to the number of
// reserved superuser connections, which users wouldn't be able to use anyway.
spec.cluster
.settings
.find("superuser_reserved_connections")
.iter()
.filter_map(|val| val.parse::<usize>().ok())
.map(|val| if val > 1 { val - 1 } else { 1 })
.last()
.unwrap_or(3)
}
}
}
use crate::compute::construct_superuser_query;
use crate::pg_helpers::{DatabaseExt, Escaping, GenericOptionsSearch, RoleExt, escape_literal};
#[derive(Clone)]
pub enum DB {

View File

@@ -25,7 +25,7 @@ use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use nix::errno::Errno;
use nix::fcntl::{FcntlArg, FdFlag};
use nix::sys::signal::{Signal, kill};
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use utils::pid_file::{self, PidFileRead};

View File

@@ -5,16 +5,7 @@
//! easier to work with locally. The python tests in `test_runner`
//! rely on `neon_local` to set up the environment for each test.
//!
use std::borrow::Cow;
use std::collections::{BTreeSet, HashMap};
use std::fs::File;
use std::os::fd::AsRawFd;
use std::path::PathBuf;
use std::process::exit;
use std::str::FromStr;
use std::time::Duration;
use anyhow::{Context, Result, anyhow, bail};
use anyhow::{anyhow, bail, Context, Result};
use clap::Parser;
use compute_api::spec::ComputeMode;
use control_plane::endpoint::ComputeControlPlane;
@@ -28,7 +19,7 @@ use control_plane::storage_controller::{
NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController,
};
use control_plane::{broker, local_env};
use nix::fcntl::{FlockArg, flock};
use nix::fcntl::{flock, FlockArg};
use pageserver_api::config::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
@@ -44,13 +35,23 @@ use safekeeper_api::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
};
use std::borrow::Cow;
use std::collections::{BTreeSet, HashMap};
use std::fs::File;
use std::os::fd::AsRawFd;
use std::path::PathBuf;
use std::process::exit;
use std::str::FromStr;
use std::time::Duration;
use storage_broker::DEFAULT_LISTEN_ADDR as DEFAULT_BROKER_ADDR;
use tokio::task::JoinSet;
use url::Host;
use utils::auth::{Claims, Scope};
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
use utils::project_git_version;
use utils::{
auth::{Claims, Scope},
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
project_git_version,
};
// Default id of a safekeeper node, if not specified on the command line.
const DEFAULT_SAFEKEEPER_ID: NodeId = NodeId(1);
@@ -920,9 +921,7 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
let init_conf: NeonLocalInitConf = if let Some(config_path) = &args.config {
// User (likely the Python test suite) provided a description of the environment.
if args.num_pageservers.is_some() {
bail!(
"Cannot specify both --num-pageservers and --config, use key `pageservers` in the --config file instead"
);
bail!("Cannot specify both --num-pageservers and --config, use key `pageservers` in the --config file instead");
}
// load and parse the file
let contents = std::fs::read_to_string(config_path).with_context(|| {
@@ -1316,14 +1315,10 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
match (mode, args.hot_standby) {
(ComputeMode::Static(_), true) => {
bail!(
"Cannot start a node in hot standby mode when it is already configured as a static replica"
)
bail!("Cannot start a node in hot standby mode when it is already configured as a static replica")
}
(ComputeMode::Primary, true) => {
bail!(
"Cannot start a node as a hot standby replica, it is already configured as primary node"
)
bail!("Cannot start a node as a hot standby replica, it is already configured as primary node")
}
_ => {}
}

View File

@@ -8,6 +8,7 @@
use std::time::Duration;
use anyhow::Context;
use camino::Utf8PathBuf;
use crate::{background_process, local_env};

View File

@@ -37,20 +37,27 @@
//! ```
//!
use std::collections::BTreeMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::net::TcpStream;
use std::path::PathBuf;
use std::process::Command;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::time::Duration;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use anyhow::{Context, Result, anyhow, bail};
use anyhow::{anyhow, bail, Context, Result};
use compute_api::requests::ConfigurationRequest;
use compute_api::responses::{ComputeCtlConfig, ComputeStatus, ComputeStatusResponse};
use compute_api::spec::{
Cluster, ComputeFeature, ComputeMode, ComputeSpec, Database, PgIdent, RemoteExtSpec, Role,
};
use nix::sys::signal::{Signal, kill};
use compute_api::responses::ComputeCtlConfig;
use compute_api::spec::Database;
use compute_api::spec::PgIdent;
use compute_api::spec::RemoteExtSpec;
use compute_api::spec::Role;
use nix::sys::signal::kill;
use nix::sys::signal::Signal;
use pageserver_api::shard::ShardStripeSize;
use reqwest::header::CONTENT_TYPE;
use serde::{Deserialize, Serialize};
@@ -62,6 +69,9 @@ use crate::local_env::LocalEnv;
use crate::postgresql_conf::PostgresConf;
use crate::storage_controller::StorageController;
use compute_api::responses::{ComputeStatus, ComputeStatusResponse};
use compute_api::spec::{Cluster, ComputeFeature, ComputeMode, ComputeSpec};
// contents of a endpoint.json file
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
pub struct EndpointConf {
@@ -227,9 +237,7 @@ impl ComputeControlPlane {
});
if let Some((key, _)) = duplicates.next() {
bail!(
"attempting to create a duplicate primary endpoint on tenant {tenant_id}, timeline {timeline_id}: endpoint {key:?} exists already. please don't do this, it is not supported."
);
bail!("attempting to create a duplicate primary endpoint on tenant {tenant_id}, timeline {timeline_id}: endpoint {key:?} exists already. please don't do this, it is not supported.");
}
}
Ok(())

View File

@@ -3,22 +3,28 @@
//! Now it also provides init method which acts like a stub for proper installation
//! script which will use local paths.
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::time::Duration;
use std::{env, fs};
use anyhow::{bail, Context};
use anyhow::{Context, bail};
use clap::ValueEnum;
use postgres_backend::AuthType;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use utils::auth::{Claims, encode_from_key_file};
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use std::collections::HashMap;
use std::env;
use std::fs;
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::time::Duration;
use utils::{
auth::{encode_from_key_file, Claims},
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
};
use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode};
use crate::pageserver::PageServerNode;
use crate::pageserver::PAGESERVER_REMOTE_STORAGE_DIR;
use crate::safekeeper::SafekeeperNode;
pub const DEFAULT_PG_VERSION: u32 = 16;
@@ -459,9 +465,7 @@ impl LocalEnv {
if old_timeline_id == &timeline_id {
Ok(())
} else {
bail!(
"branch '{branch_name}' is already mapped to timeline {old_timeline_id}, cannot map to another timeline {timeline_id}"
);
bail!("branch '{branch_name}' is already mapped to timeline {old_timeline_id}, cannot map to another timeline {timeline_id}");
}
} else {
existing_values.push((tenant_id, timeline_id));

View File

@@ -7,6 +7,7 @@
//! ```
//!
use std::collections::HashMap;
use std::io;
use std::io::Write;
use std::num::NonZeroU64;
@@ -14,19 +15,22 @@ use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;
use anyhow::{Context, bail};
use anyhow::{bail, Context};
use camino::Utf8PathBuf;
use pageserver_api::models::{self, TenantInfo, TimelineInfo};
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
use postgres_backend::AuthType;
use postgres_connection::{PgConnectionConfig, parse_host_port};
use postgres_connection::{parse_host_port, PgConnectionConfig};
use utils::auth::{Claims, Scope};
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::id::NodeId;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
use crate::background_process;
use crate::local_env::{LocalEnv, NeonLocalInitPageserverConf, PageServerConf};
use crate::local_env::{NeonLocalInitPageserverConf, PageServerConf};
use crate::{background_process, local_env::LocalEnv};
/// Directory within .neon which will be used by default for LocalFs remote storage.
pub const PAGESERVER_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/pageserver";
@@ -77,11 +81,7 @@ impl PageServerNode {
&self,
conf: NeonLocalInitPageserverConf,
) -> anyhow::Result<toml_edit::DocumentMut> {
assert_eq!(
&PageServerConf::from(&conf),
&self.conf,
"during neon_local init, we derive the runtime state of ps conf (self.conf) from the --config flag fully"
);
assert_eq!(&PageServerConf::from(&conf), &self.conf, "during neon_local init, we derive the runtime state of ps conf (self.conf) from the --config flag fully");
// TODO(christian): instead of what we do here, create a pageserver_api::config::ConfigToml (PR #7656)

View File

@@ -1,6 +1,3 @@
use std::collections::HashMap;
use std::fmt;
///
/// Module for parsing postgresql.conf file.
///
@@ -9,6 +6,8 @@ use std::fmt;
/// funny stuff like include-directives or funny escaping.
use once_cell::sync::Lazy;
use regex::Regex;
use std::collections::HashMap;
use std::fmt;
/// In-memory representation of a postgresql.conf file
#[derive(Default, Debug)]

View File

@@ -14,15 +14,18 @@ use std::{io, result};
use anyhow::Context;
use camino::Utf8PathBuf;
use http_utils::error::HttpErrorBody;
use postgres_connection::PgConnectionConfig;
use reqwest::{IntoUrl, Method};
use thiserror::Error;
use http_utils::error::HttpErrorBody;
use utils::auth::{Claims, Scope};
use utils::id::NodeId;
use crate::background_process;
use crate::local_env::{LocalEnv, SafekeeperConf};
use crate::{
background_process,
local_env::{LocalEnv, SafekeeperConf},
};
#[derive(Error, Debug)]
pub enum SafekeeperHttpError {

View File

@@ -1,39 +1,44 @@
use std::ffi::OsStr;
use std::fs;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::process::ExitStatus;
use std::str::FromStr;
use std::sync::OnceLock;
use std::time::{Duration, Instant};
use crate::{
background_process,
local_env::{LocalEnv, NeonStorageControllerConf},
};
use camino::{Utf8Path, Utf8PathBuf};
use hyper0::Uri;
use nix::unistd::Pid;
use pageserver_api::controller_api::{
NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest,
TenantCreateResponse, TenantLocateResponse, TenantShardMigrateRequest,
TenantShardMigrateResponse,
use pageserver_api::{
controller_api::{
NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest,
TenantCreateResponse, TenantLocateResponse, TenantShardMigrateRequest,
TenantShardMigrateResponse,
},
models::{
TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo,
},
shard::{ShardStripeSize, TenantShardId},
};
use pageserver_api::models::{
TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo,
};
use pageserver_api::shard::{ShardStripeSize, TenantShardId};
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
use postgres_backend::AuthType;
use reqwest::Method;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{
ffi::OsStr,
fs,
net::SocketAddr,
path::PathBuf,
process::ExitStatus,
str::FromStr,
sync::OnceLock,
time::{Duration, Instant},
};
use tokio::process::Command;
use tracing::instrument;
use url::Url;
use utils::auth::{Claims, Scope, encode_from_key_file};
use utils::id::{NodeId, TenantId};
use utils::{
auth::{encode_from_key_file, Claims, Scope},
id::{NodeId, TenantId},
};
use whoami::username;
use crate::background_process;
use crate::local_env::{LocalEnv, NeonStorageControllerConf};
pub struct StorageController {
env: LocalEnv,
private_key: Option<Vec<u8>>,
@@ -91,8 +96,7 @@ pub struct AttachHookRequest {
#[derive(Serialize, Deserialize)]
pub struct AttachHookResponse {
#[serde(rename = "gen")]
pub generation: Option<u32>,
pub gen: Option<u32>,
}
#[derive(Serialize, Deserialize)]
@@ -775,7 +779,7 @@ impl StorageController {
)
.await?;
Ok(response.generation)
Ok(response.gen)
}
#[instrument(skip(self))]

View File

@@ -1,28 +1,35 @@
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::time::Duration;
use futures::StreamExt;
use std::{
collections::{HashMap, HashSet},
str::FromStr,
time::Duration,
};
use clap::{Parser, Subcommand};
use futures::StreamExt;
use pageserver_api::controller_api::{
AvailabilityZone, NodeAvailabilityWrapper, NodeConfigureRequest, NodeDescribeResponse,
NodeRegisterRequest, NodeSchedulingPolicy, NodeShardResponse, PlacementPolicy,
SafekeeperDescribeResponse, SafekeeperSchedulingPolicyRequest, ShardSchedulingPolicy,
ShardsPreferredAzsRequest, ShardsPreferredAzsResponse, SkSchedulingPolicy, TenantCreateRequest,
TenantDescribeResponse, TenantPolicyRequest, TenantShardMigrateRequest,
TenantShardMigrateResponse,
use pageserver_api::{
controller_api::{
AvailabilityZone, NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse,
SafekeeperDescribeResponse, SafekeeperSchedulingPolicyRequest, ShardSchedulingPolicy,
ShardsPreferredAzsRequest, ShardsPreferredAzsResponse, SkSchedulingPolicy,
TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
},
models::{
EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary,
ShardParameters, TenantConfig, TenantConfigPatchRequest, TenantConfigRequest,
TenantShardSplitRequest, TenantShardSplitResponse,
},
shard::{ShardStripeSize, TenantShardId},
};
use pageserver_api::models::{
EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary, ShardParameters,
TenantConfig, TenantConfigPatchRequest, TenantConfigRequest, TenantShardSplitRequest,
TenantShardSplitResponse,
};
use pageserver_api::shard::{ShardStripeSize, TenantShardId};
use pageserver_client::mgmt_api::{self};
use reqwest::{Method, StatusCode, Url};
use storage_controller_client::control_api::Client;
use utils::id::{NodeId, TenantId, TimelineId};
use pageserver_api::controller_api::{
NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
TenantShardMigrateRequest, TenantShardMigrateResponse,
};
use storage_controller_client::control_api::Client;
#[derive(Subcommand, Debug)]
enum Command {
/// Register a pageserver with the storage controller. This shouldn't usually be necessary,
@@ -914,9 +921,7 @@ async fn main() -> anyhow::Result<()> {
}
Command::TenantDrop { tenant_id, unclean } => {
if !unclean {
anyhow::bail!(
"This command is not a tenant deletion, and uncleanly drops all controller state for the tenant. If you know what you're doing, add `--unclean` to proceed."
)
anyhow::bail!("This command is not a tenant deletion, and uncleanly drops all controller state for the tenant. If you know what you're doing, add `--unclean` to proceed.")
}
storcon_client
.dispatch::<(), ()>(
@@ -928,9 +933,7 @@ async fn main() -> anyhow::Result<()> {
}
Command::NodeDrop { node_id, unclean } => {
if !unclean {
anyhow::bail!(
"This command is not a clean node decommission, and uncleanly drops all controller state for the node, without checking if any tenants still refer to it. If you know what you're doing, add `--unclean` to proceed."
)
anyhow::bail!("This command is not a clean node decommission, and uncleanly drops all controller state for the node, without checking if any tenants still refer to it. If you know what you're doing, add `--unclean` to proceed.")
}
storcon_client
.dispatch::<(), ()>(Method::POST, format!("debug/v1/node/{node_id}/drop"), None)

View File

@@ -186,7 +186,7 @@ services:
neon-test-extensions:
profiles: ["test-extensions"]
image: ${REPOSITORY:-neondatabase}/neon-test-extensions-v${PG_TEST_VERSION:-16}:${TEST_EXTENSIONS_TAG:-${TAG:-latest}}
image: ${REPOSITORY:-neondatabase}/neon-test-extensions-v${PG_TEST_VERSION:-16}:${TAG:-latest}
environment:
- PGPASSWORD=cloud_admin
entrypoint:

View File

@@ -7,7 +7,7 @@ index f255fe6..0a0fa65 100644
GENERATED_SCHEDULE_DEPS = $(TB_DIR)/all_tests $(TB_DIR)/exclude_tests
REGRESS = --schedule $(TB_DIR)/run.sch # Set this again just to be safe
-REGRESS_OPTS = --inputdir=test --max-connections=$(PARALLEL_CONN) --schedule $(SETUP_SCH) $(REGRESS_CONF)
+REGRESS_OPTS = --use-existing --dbname=contrib_regression --inputdir=test --max-connections=$(PARALLEL_CONN) --schedule $(SETUP_SCH) $(REGRESS_CONF)
+REGRESS_OPTS = --use-existing --dbname=pgtap_regression --inputdir=test --max-connections=$(PARALLEL_CONN) --schedule $(SETUP_SCH) $(REGRESS_CONF)
SETUP_SCH = test/schedule/main.sch # schedule to use for test setup; this can be forcibly changed by some targets!
IGNORE_TESTS = $(notdir $(EXCLUDE_TEST_FILES:.sql=))
PARALLEL_TESTS = $(filter-out $(IGNORE_TESTS),$(filter-out $(SERIAL_TESTS),$(ALL_TESTS)))

View File

@@ -6,13 +6,12 @@ generate_id() {
local -n resvar=$1
printf -v resvar '%08x%08x%08x%08x' $SRANDOM $SRANDOM $SRANDOM $SRANDOM
}
if [ -z ${OLD_COMPUTE_TAG+x} ] || [ -z ${NEW_COMPUTE_TAG+x} ] || [ -z "${OLD_COMPUTE_TAG}" ] || [ -z "${NEW_COMPUTE_TAG}" ]; then
echo OLD_COMPUTE_TAG and NEW_COMPUTE_TAG must be defined
if [ -z ${OLDTAG+x} ] || [ -z ${NEWTAG+x} ] || [ -z "${OLDTAG}" ] || [ -z "${NEWTAG}" ]; then
echo OLDTAG and NEWTAG must be defined
exit 1
fi
export PG_VERSION=${PG_VERSION:-16}
export PG_TEST_VERSION=${PG_VERSION}
# Waits for compute node is ready
function wait_for_ready {
TIME=0
while ! docker compose logs compute_is_ready | grep -q "accepting connections" && [ ${TIME} -le 300 ] ; do
@@ -24,45 +23,11 @@ function wait_for_ready {
exit 2
fi
}
# Creates extensions. Gets a string with space-separated extensions as a parameter
function create_extensions() {
for ext in ${1}; do
docker compose exec neon-test-extensions psql -X -v ON_ERROR_STOP=1 -d contrib_regression -c "CREATE EXTENSION IF NOT EXISTS ${ext} CASCADE"
done
}
# Creates a new timeline. Gets the parent ID and an extension name as parameters.
# Saves the timeline ID in the variable EXT_TIMELINE
function create_timeline() {
generate_id new_timeline_id
PARAMS=(
-sbf
-X POST
-H "Content-Type: application/json"
-d "{\"new_timeline_id\": \"${new_timeline_id}\", \"pg_version\": ${PG_VERSION}, \"ancestor_timeline_id\": \"${1}\"}"
"http://127.0.0.1:9898/v1/tenant/${tenant_id}/timeline/"
)
result=$(curl "${PARAMS[@]}")
echo $result | jq .
EXT_TIMELINE[${2}]=${new_timeline_id}
}
# Checks if the timeline ID of the compute node is expected. Gets the timeline ID as a parameter
function check_timeline() {
TID=$(docker compose exec neon-test-extensions psql -Aqt -c "SHOW neon.timeline_id")
if [ "${TID}" != "${1}" ]; then
echo Timeline mismatch
exit 1
fi
}
# Restarts the compute node with the required compute tag and timeline.
# Accepts the tag for the compute node and the timeline as parameters.
function restart_compute() {
docker compose down compute compute_is_ready
COMPUTE_TAG=${1} TENANT_ID=${tenant_id} TIMELINE_ID=${2} docker compose up --quiet-pull -d --build compute compute_is_ready
wait_for_ready
check_timeline ${2}
}
declare -A EXT_TIMELINE
EXTENSIONS='[
{"extname": "plv8", "extdir": "plv8-src"},
{"extname": "vector", "extdir": "pgvector-src"},
@@ -82,7 +47,7 @@ EXTENSIONS='[
{"extname": "pg_repack", "extdir": "pg_repack-src"}
]'
EXTNAMES=$(echo ${EXTENSIONS} | jq -r '.[].extname' | paste -sd ' ' -)
COMPUTE_TAG=${NEW_COMPUTE_TAG} TEST_EXTENSIONS_TAG=${NEW_COMPUTE_TAG} docker compose --profile test-extensions up --quiet-pull --build -d
TAG=${NEWTAG} docker compose --profile test-extensions up --quiet-pull --build -d
wait_for_ready
docker compose exec neon-test-extensions psql -c "DROP DATABASE IF EXISTS contrib_regression"
docker compose exec neon-test-extensions psql -c "CREATE DATABASE contrib_regression"
@@ -90,14 +55,12 @@ create_extensions "${EXTNAMES}"
query="select json_object_agg(extname,extversion) from pg_extension where extname in ('${EXTNAMES// /\',\'}')"
new_vers=$(docker compose exec neon-test-extensions psql -Aqt -d contrib_regression -c "$query")
docker compose --profile test-extensions down
COMPUTE_TAG=${OLD_COMPUTE_TAG} TEST_EXTENSIONS_TAG=${NEW_COMPUTE_TAG} docker compose --profile test-extensions up --quiet-pull --build -d --force-recreate
TAG=${OLDTAG} docker compose --profile test-extensions up --quiet-pull --build -d --force-recreate
wait_for_ready
docker compose exec neon-test-extensions psql -c "DROP DATABASE IF EXISTS contrib_regression"
docker compose exec neon-test-extensions psql -c "CREATE DATABASE contrib_regression"
tenant_id=$(docker compose exec neon-test-extensions psql -Aqt -c "SHOW neon.tenant_id")
EXT_TIMELINE["main"]=$(docker compose exec neon-test-extensions psql -Aqt -c "SHOW neon.timeline_id")
create_timeline "${EXT_TIMELINE["main"]}" init
restart_compute "${OLD_COMPUTE_TAG}" "${EXT_TIMELINE["init"]}"
docker compose exec neon-test-extensions psql -c "CREATE DATABASE pgtap_regression"
docker compose exec neon-test-extensions psql -d pgtap_regression -c "CREATE EXTENSION pgtap"
create_extensions "${EXTNAMES}"
if [ "${FORCE_ALL_UPGRADE_TESTS:-false}" = true ]; then
exts="${EXTNAMES}"
@@ -108,13 +71,29 @@ fi
if [ -z "${exts}" ]; then
echo "No extensions were upgraded"
else
tenant_id=$(docker compose exec neon-test-extensions psql -Aqt -c "SHOW neon.tenant_id")
timeline_id=$(docker compose exec neon-test-extensions psql -Aqt -c "SHOW neon.timeline_id")
for ext in ${exts}; do
echo Testing ${ext}...
create_timeline "${EXT_TIMELINE["main"]}" ${ext}
EXTDIR=$(echo ${EXTENSIONS} | jq -r '.[] | select(.extname=="'${ext}'") | .extdir')
restart_compute "${OLD_COMPUTE_TAG}" "${EXT_TIMELINE[${ext}]}"
docker compose exec neon-test-extensions psql -d contrib_regression -c "CREATE EXTENSION ${ext} CASCADE"
restart_compute "${NEW_COMPUTE_TAG}" "${EXT_TIMELINE[${ext}]}"
generate_id new_timeline_id
PARAMS=(
-sbf
-X POST
-H "Content-Type: application/json"
-d "{\"new_timeline_id\": \"${new_timeline_id}\", \"pg_version\": ${PG_VERSION}, \"ancestor_timeline_id\": \"${timeline_id}\"}"
"http://127.0.0.1:9898/v1/tenant/${tenant_id}/timeline/"
)
result=$(curl "${PARAMS[@]}")
echo $result | jq .
TENANT_ID=${tenant_id} TIMELINE_ID=${new_timeline_id} TAG=${OLDTAG} docker compose down compute compute_is_ready
COMPUTE_TAG=${NEWTAG} TAG=${OLDTAG} TENANT_ID=${tenant_id} TIMELINE_ID=${new_timeline_id} docker compose up --quiet-pull -d --build compute compute_is_ready
wait_for_ready
TID=$(docker compose exec neon-test-extensions psql -Aqt -c "SHOW neon.timeline_id")
if [ ${TID} != ${new_timeline_id} ]; then
echo Timeline mismatch
exit 1
fi
docker compose exec neon-test-extensions psql -d contrib_regression -c "\dx ${ext}"
if ! docker compose exec neon-test-extensions sh -c /ext-src/${EXTDIR}/test-upgrade.sh; then
docker compose exec neon-test-extensions cat /ext-src/${EXTDIR}/regression.diffs

View File

@@ -1,7 +1,7 @@
[package]
name = "consumption_metrics"
version = "0.1.0"
edition = "2024"
edition = "2021"
license = "Apache-2.0"
[dependencies]

View File

@@ -1,5 +1,4 @@
use std::collections::VecDeque;
use std::sync::Arc;
use std::{collections::VecDeque, sync::Arc};
use parking_lot::{Mutex, MutexGuard};

View File

@@ -1,7 +1,11 @@
use std::panic::AssertUnwindSafe;
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, Ordering};
use std::sync::{Arc, OnceLock, mpsc};
use std::thread::JoinHandle;
use std::{
panic::AssertUnwindSafe,
sync::{
atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering},
mpsc, Arc, OnceLock,
},
thread::JoinHandle,
};
use tracing::{debug, error, trace};

View File

@@ -1,19 +1,26 @@
use std::cmp::Ordering;
use std::collections::{BinaryHeap, VecDeque};
use std::fmt::{self, Debug};
use std::ops::DerefMut;
use std::sync::{Arc, mpsc};
use std::{
cmp::Ordering,
collections::{BinaryHeap, VecDeque},
fmt::{self, Debug},
ops::DerefMut,
sync::{mpsc, Arc},
};
use parking_lot::lock_api::{MappedMutexGuard, MutexGuard};
use parking_lot::{Mutex, RawMutex};
use parking_lot::{
lock_api::{MappedMutexGuard, MutexGuard},
Mutex, RawMutex,
};
use rand::rngs::StdRng;
use tracing::debug;
use super::chan::Chan;
use super::proto::AnyMessage;
use crate::executor::{self, ThreadContext};
use crate::options::NetworkOptions;
use crate::proto::{NetEvent, NodeEvent};
use crate::{
executor::{self, ThreadContext},
options::NetworkOptions,
proto::NetEvent,
proto::NodeEvent,
};
use super::{chan::Chan, proto::AnyMessage};
pub struct NetworkTask {
options: Arc<NetworkOptions>,

View File

@@ -2,11 +2,14 @@ use std::sync::Arc;
use rand::Rng;
use super::chan::Chan;
use super::network::TCP;
use super::world::{Node, NodeId, World};
use crate::proto::NodeEvent;
use super::{
chan::Chan,
network::TCP,
world::{Node, NodeId, World},
};
/// Abstraction with all functions (aka syscalls) available to the node.
#[derive(Clone)]
pub struct NodeOs {

View File

@@ -1,5 +1,4 @@
use rand::Rng;
use rand::rngs::StdRng;
use rand::{rngs::StdRng, Rng};
/// Describes random delays and failures. Delay will be uniformly distributed in [min, max].
/// Connection failure will occur with the probablity fail_prob.

View File

@@ -3,8 +3,7 @@ use std::fmt::Debug;
use bytes::Bytes;
use utils::lsn::Lsn;
use crate::network::TCP;
use crate::world::NodeId;
use crate::{network::TCP, world::NodeId};
/// Internal node events.
#[derive(Debug)]

View File

@@ -1,8 +1,12 @@
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::ops::DerefMut;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64};
use std::{
cmp::Ordering,
collections::BinaryHeap,
ops::DerefMut,
sync::{
atomic::{AtomicU32, AtomicU64},
Arc,
},
};
use parking_lot::Mutex;
use tracing::trace;

View File

@@ -1,18 +1,19 @@
use std::ops::DerefMut;
use std::sync::{Arc, mpsc};
use parking_lot::Mutex;
use rand::SeedableRng;
use rand::rngs::StdRng;
use rand::{rngs::StdRng, SeedableRng};
use std::{
ops::DerefMut,
sync::{mpsc, Arc},
};
use super::chan::Chan;
use super::network::TCP;
use super::node_os::NodeOs;
use crate::executor::{ExternalHandle, Runtime};
use crate::network::NetworkTask;
use crate::options::NetworkOptions;
use crate::proto::{NodeEvent, SimEvent};
use crate::time::Timing;
use crate::{
executor::{ExternalHandle, Runtime},
network::NetworkTask,
options::NetworkOptions,
proto::{NodeEvent, SimEvent},
time::Timing,
};
use super::{chan::Chan, network::TCP, node_os::NodeOs};
pub type NodeId = u32;

View File

@@ -1,15 +1,14 @@
//! Simple test to verify that simulator is working.
#[cfg(test)]
mod reliable_copy_test {
use std::sync::Arc;
use anyhow::Result;
use desim::executor::{self, PollSome};
use desim::node_os::NodeOs;
use desim::options::{Delay, NetworkOptions};
use desim::proto::{AnyMessage, NetEvent, NodeEvent, ReplCell};
use desim::proto::{NetEvent, NodeEvent, ReplCell};
use desim::world::{NodeId, World};
use desim::{node_os::NodeOs, proto::AnyMessage};
use parking_lot::Mutex;
use std::sync::Arc;
use tracing::info;
/// Disk storage trait and implementation.

View File

@@ -1,29 +1,29 @@
use std::future::Future;
use std::io::Write as _;
use std::str::FromStr;
use std::time::Duration;
use ::pprof::ProfilerGuardBuilder;
use crate::error::{api_error_handler, route_error_handler, ApiError};
use crate::pprof;
use crate::request::{get_query_param, parse_query_param};
use ::pprof::protos::Message as _;
use anyhow::{Context, anyhow};
use ::pprof::ProfilerGuardBuilder;
use anyhow::{anyhow, Context};
use bytes::{Bytes, BytesMut};
use hyper::header::{AUTHORIZATION, CONTENT_DISPOSITION, CONTENT_TYPE, HeaderName};
use hyper::header::{HeaderName, AUTHORIZATION, CONTENT_DISPOSITION};
use hyper::http::HeaderValue;
use hyper::{Body, Method, Request, Response};
use metrics::{Encoder, IntCounter, TextEncoder, register_int_counter};
use hyper::Method;
use hyper::{header::CONTENT_TYPE, Body, Request, Response};
use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder};
use once_cell::sync::Lazy;
use regex::Regex;
use routerify::ext::RequestExt;
use routerify::{Middleware, RequestInfo, Router, RouterBuilder};
use tokio::sync::{Mutex, Notify, mpsc};
use tokio::sync::{mpsc, Mutex, Notify};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::io::ReaderStream;
use tracing::{Instrument, debug, info, info_span, warn};
use tracing::{debug, info, info_span, warn, Instrument};
use utils::auth::{AuthError, Claims, SwappableJwtAuth};
use crate::error::{ApiError, api_error_handler, route_error_handler};
use crate::pprof;
use crate::request::{get_query_param, parse_query_param};
use std::future::Future;
use std::io::Write as _;
use std::str::FromStr;
use std::time::Duration;
static SERVE_METRICS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
@@ -375,7 +375,7 @@ pub async fn profile_cpu_handler(req: Request<Body>) -> Result<Response<Body>, A
Err(_) => {
return Err(ApiError::Conflict(
"profiler already running (use ?force=true to cancel it)".into(),
));
))
}
}
tokio::time::sleep(Duration::from_millis(1)).await; // don't busy-wait
@@ -539,8 +539,8 @@ pub async fn profile_heap_handler(req: Request<Body>) -> Result<Response<Body>,
}
}
pub fn add_request_id_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>()
-> Middleware<B, ApiError> {
pub fn add_request_id_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
) -> Middleware<B, ApiError> {
Middleware::pre(move |req| async move {
let request_id = match req.headers().get(&X_REQUEST_ID_HEADER) {
Some(request_id) => request_id
@@ -664,7 +664,7 @@ pub fn auth_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
None => {
return Err(ApiError::Unauthorized(
"missing authorization header".to_string(),
));
))
}
}
}
@@ -717,13 +717,11 @@ pub fn check_permission_with(
#[cfg(test)]
mod tests {
use std::future::poll_fn;
use std::net::{IpAddr, SocketAddr};
use super::*;
use hyper::service::Service;
use routerify::RequestServiceBuilder;
use super::*;
use std::future::poll_fn;
use std::net::{IpAddr, SocketAddr};
#[tokio::test]
async fn test_request_id_returned() {

View File

@@ -1,10 +1,10 @@
use hyper::{header, Body, Response, StatusCode};
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::error::Error as StdError;
use hyper::{Body, Response, StatusCode, header};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tracing::{error, info, warn};
use utils::auth::AuthError;
#[derive(Debug, Error)]

View File

@@ -1,10 +1,11 @@
use crate::error::ApiError;
use crate::json::{json_request, json_response};
use hyper::{Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use utils::failpoint_support::apply_failpoint;
use crate::error::ApiError;
use crate::json::{json_request, json_response};
use utils::failpoint_support::apply_failpoint;
pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;

View File

@@ -1,6 +1,6 @@
use anyhow::Context;
use bytes::Buf;
use hyper::{Body, Request, Response, StatusCode, header};
use hyper::{header, Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use super::error::ApiError;

View File

@@ -9,4 +9,4 @@ extern crate hyper0 as hyper;
/// Current fast way to apply simple http routing in various Neon binaries.
/// Re-exported for sake of uniform approach, that could be later replaced with better alternatives, if needed.
pub use routerify::{RouterBuilder, RouterService, ext::RequestExt};
pub use routerify::{ext::RequestExt, RouterBuilder, RouterService};

View File

@@ -1,15 +1,15 @@
use anyhow::bail;
use flate2::write::{GzDecoder, GzEncoder};
use flate2::Compression;
use itertools::Itertools as _;
use pprof::protos::{Function, Line, Location, Message as _, Profile};
use regex::Regex;
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::ffi::c_void;
use std::io::Write as _;
use anyhow::bail;
use flate2::Compression;
use flate2::write::{GzDecoder, GzEncoder};
use itertools::Itertools as _;
use pprof::protos::{Function, Line, Location, Message as _, Profile};
use regex::Regex;
/// Decodes a gzip-compressed Protobuf-encoded pprof profile.
pub fn decode(bytes: &[u8]) -> anyhow::Result<Profile> {
let mut gz = GzDecoder::new(Vec::new());

View File

@@ -1,13 +1,10 @@
use core::fmt;
use std::borrow::Cow;
use std::str::FromStr;
use anyhow::anyhow;
use hyper::body::HttpBody;
use hyper::{Body, Request};
use routerify::ext::RequestExt;
use std::{borrow::Cow, str::FromStr};
use super::error::ApiError;
use anyhow::anyhow;
use hyper::{body::HttpBody, Body, Request};
use routerify::ext::RequestExt;
pub fn get_request_param<'a>(
request: &'a Request<Body>,

View File

@@ -6,15 +6,17 @@
//! Probabilistic cardinality estimators, such as the HyperLogLog algorithm,
//! use significantly less memory than this, but can only approximate the cardinality.
use std::hash::{BuildHasher, BuildHasherDefault, Hash};
use std::sync::atomic::AtomicU8;
use std::{
hash::{BuildHasher, BuildHasherDefault, Hash},
sync::atomic::AtomicU8,
};
use measured::LabelGroup;
use measured::label::{LabelGroupVisitor, LabelName, LabelValue, LabelVisitor};
use measured::metric::counter::CounterState;
use measured::metric::name::MetricNameEncoder;
use measured::metric::{Metric, MetricType, MetricVec};
use measured::text::TextEncoder;
use measured::{
label::{LabelGroupVisitor, LabelName, LabelValue, LabelVisitor},
metric::{counter::CounterState, name::MetricNameEncoder, Metric, MetricType, MetricVec},
text::TextEncoder,
LabelGroup,
};
use twox_hash::xxh3;
/// Create an [`HyperLogLogVec`] and registers to default registry.
@@ -25,7 +27,9 @@ macro_rules! register_hll_vec {
$crate::register(Box::new(hll_vec.clone())).map(|_| hll_vec)
}};
($N:literal, $NAME:expr, $HELP:expr, $LABELS_NAMES:expr $(,)?) => {{ $crate::register_hll_vec!($N, $crate::opts!($NAME, $HELP), $LABELS_NAMES) }};
($N:literal, $NAME:expr, $HELP:expr, $LABELS_NAMES:expr $(,)?) => {{
$crate::register_hll_vec!($N, $crate::opts!($NAME, $HELP), $LABELS_NAMES)
}};
}
/// Create an [`HyperLogLog`] and registers to default registry.
@@ -36,7 +40,9 @@ macro_rules! register_hll {
$crate::register(Box::new(hll.clone())).map(|_| hll)
}};
($N:literal, $NAME:expr, $HELP:expr $(,)?) => {{ $crate::register_hll!($N, $crate::opts!($NAME, $HELP)) }};
($N:literal, $NAME:expr, $HELP:expr $(,)?) => {{
$crate::register_hll!($N, $crate::opts!($NAME, $HELP))
}};
}
/// HLL is a probabilistic cardinality measure.
@@ -189,10 +195,8 @@ impl<W: std::io::Write, const N: usize> measured::metric::MetricEncoding<TextEnc
mod tests {
use std::collections::HashSet;
use measured::FixedCardinalityLabel;
use measured::label::StaticLabelSet;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use measured::{label::StaticLabelSet, FixedCardinalityLabel};
use rand::{rngs::StdRng, Rng, SeedableRng};
use rand_distr::{Distribution, Zipf};
use crate::HyperLogLogVec;

View File

@@ -1,10 +1,9 @@
//! A timestamp captured at process startup to identify restarts of the process, e.g., in logs and metrics.
use std::fmt::Display;
use chrono::Utc;
use super::register_uint_gauge;
use std::fmt::Display;
pub struct LaunchTimestamp(chrono::DateTime<Utc>);

View File

@@ -4,26 +4,38 @@
//! a default registry.
#![deny(clippy::undocumented_unsafe_blocks)]
use measured::label::{LabelGroupSet, LabelGroupVisitor, LabelName, NoLabels};
use measured::metric::counter::CounterState;
use measured::metric::gauge::GaugeState;
use measured::metric::group::Encoding;
use measured::metric::name::{MetricName, MetricNameEncoder};
use measured::metric::{MetricEncoding, MetricFamilyEncoding};
use measured::{FixedCardinalityLabel, LabelGroup, MetricGroup};
use measured::{
label::{LabelGroupSet, LabelGroupVisitor, LabelName, NoLabels},
metric::{
counter::CounterState,
gauge::GaugeState,
group::Encoding,
name::{MetricName, MetricNameEncoder},
MetricEncoding, MetricFamilyEncoding,
},
FixedCardinalityLabel, LabelGroup, MetricGroup,
};
use once_cell::sync::Lazy;
use prometheus::Registry;
use prometheus::core::{
Atomic, AtomicU64, Collector, GenericCounter, GenericCounterVec, GenericGauge, GenericGaugeVec,
};
pub use prometheus::local::LocalHistogram;
pub use prometheus::{
Counter, CounterVec, Encoder, Error, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter,
IntCounterVec, IntGauge, IntGaugeVec, TextEncoder, core, default_registry, exponential_buckets,
linear_buckets, opts, proto, register, register_counter_vec, register_gauge,
register_gauge_vec, register_histogram, register_histogram_vec, register_int_counter,
register_int_counter_vec, register_int_gauge, register_int_gauge_vec,
};
pub use prometheus::opts;
pub use prometheus::register;
pub use prometheus::Error;
use prometheus::Registry;
pub use prometheus::{core, default_registry, proto};
pub use prometheus::{exponential_buckets, linear_buckets};
pub use prometheus::{register_counter_vec, Counter, CounterVec};
pub use prometheus::{register_gauge, Gauge};
pub use prometheus::{register_gauge_vec, GaugeVec};
pub use prometheus::{register_histogram, Histogram};
pub use prometheus::{register_histogram_vec, HistogramVec};
pub use prometheus::{register_int_counter, IntCounter};
pub use prometheus::{register_int_counter_vec, IntCounterVec};
pub use prometheus::{register_int_gauge, IntGauge};
pub use prometheus::{register_int_gauge_vec, IntGaugeVec};
pub use prometheus::{Encoder, TextEncoder};
pub mod launch_timestamp;
mod wrappers;

View File

@@ -4,28 +4,28 @@
//! is rather narrow, but we can extend it once required.
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
use std::future::Future;
use std::io::ErrorKind;
use std::net::SocketAddr;
use std::os::fd::{AsRawFd, RawFd};
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::task::{Poll, ready};
use std::{fmt, io};
use anyhow::Context;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::io::ErrorKind;
use std::net::SocketAddr;
use std::os::fd::AsRawFd;
use std::os::fd::RawFd;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{ready, Poll};
use std::{fmt, io};
use std::{future::Future, str::FromStr};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_rustls::TlsAcceptor;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, trace, warn};
use pq_proto::framed::{ConnectionError, Framed, FramedReader, FramedWriter};
use pq_proto::{
BeMessage, FeMessage, FeStartupPacket, ProtocolError, SQLSTATE_ADMIN_SHUTDOWN,
SQLSTATE_INTERNAL_ERROR, SQLSTATE_SUCCESSFUL_COMPLETION,
};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_rustls::TlsAcceptor;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, trace, warn};
/// An error, occurred during query processing:
/// either during the connection ([`ConnectionError`]) or before/after it.
@@ -746,7 +746,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
match e {
QueryError::Shutdown => return Ok(ProcessMsgResult::Break),
QueryError::SimulatedConnectionError => {
return Err(QueryError::SimulatedConnectionError);
return Err(QueryError::SimulatedConnectionError)
}
err @ QueryError::Reconnect => {
// Instruct the client to reconnect, stop processing messages
@@ -1020,9 +1020,7 @@ fn log_query_error(query: &str, e: &QueryError) {
}
}
QueryError::Disconnected(other_connection_error) => {
error!(
"query handler for '{query}' failed with connection error: {other_connection_error:?}"
)
error!("query handler for '{query}' failed with connection error: {other_connection_error:?}")
}
QueryError::SimulatedConnectionError => {
error!("query handler for query '{query}' failed due to a simulated connection error")

View File

@@ -1,11 +1,10 @@
use std::io::Cursor;
use std::sync::Arc;
/// Test postgres_backend_async with tokio_postgres
use once_cell::sync::Lazy;
use postgres_backend::{AuthType, Handler, PostgresBackend, QueryError};
use pq_proto::{BeMessage, RowDescriptor};
use rustls::crypto::ring;
use std::io::Cursor;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream};
use tokio_postgres::config::SslMode;

View File

@@ -1,10 +1,9 @@
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
use anyhow::{bail, Context};
use itertools::Itertools;
use std::borrow::Cow;
use std::fmt;
use anyhow::{Context, bail};
use itertools::Itertools;
use url::Host;
/// Parses a string of format either `host:port` or `host` into a corresponding pair.
@@ -30,9 +29,8 @@ pub fn parse_host_port<S: AsRef<str>>(host_port: S) -> Result<(Host, Option<u16>
#[cfg(test)]
mod tests_parse_host_port {
use url::Host;
use crate::parse_host_port;
use url::Host;
#[test]
fn test_normal() {
@@ -209,11 +207,10 @@ impl fmt::Debug for PgConnectionConfig {
#[cfg(test)]
mod tests_pg_connection_config {
use crate::PgConnectionConfig;
use once_cell::sync::Lazy;
use url::Host;
use crate::PgConnectionConfig;
static STUB_HOST: Lazy<Host> = Lazy::new(|| Host::Domain("stub.host.example".to_owned()));
#[test]

View File

@@ -1,6 +1,6 @@
use std::ffi::CStr;
use criterion::{Bencher, Criterion, criterion_group, criterion_main};
use criterion::{criterion_group, criterion_main, Bencher, Criterion};
use postgres_ffi::v17::wal_generator::LogicalMessageGenerator;
use postgres_ffi::v17::waldecoder_handler::WalStreamDecoderHandler;
use postgres_ffi::waldecoder::WalStreamDecoder;

View File

@@ -4,7 +4,7 @@ use std::env;
use std::path::PathBuf;
use std::process::Command;
use anyhow::{Context, anyhow};
use anyhow::{anyhow, Context};
use bindgen::callbacks::{DeriveInfo, ParseCallbacks};
#[derive(Debug)]

View File

@@ -21,9 +21,7 @@ macro_rules! postgres_ffi {
pub mod bindings {
// bindgen generates bindings for a lot of stuff we don't need
#![allow(dead_code)]
#![allow(unsafe_op_in_unsafe_fn)]
#![allow(clippy::undocumented_unsafe_blocks)]
#![allow(clippy::ptr_offset_with_cast)]
use serde::{Deserialize, Serialize};
include!(concat!(
@@ -45,7 +43,8 @@ macro_rules! postgres_ffi {
pub const PG_MAJORVERSION: &str = stringify!($version);
// Re-export some symbols from bindings
pub use bindings::{CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, XLogRecord};
pub use bindings::DBState_DB_SHUTDOWNED;
pub use bindings::{CheckPoint, ControlFileData, XLogRecord};
pub const ZERO_CHECKPOINT: bytes::Bytes =
bytes::Bytes::from_static(&[0u8; xlog_utils::SIZEOF_CHECKPOINT]);
@@ -222,17 +221,21 @@ pub mod relfile_utils;
pub mod walrecord;
// Export some widely used datatypes that are unlikely to change across Postgres versions
pub use v14::bindings::{
BlockNumber, CheckPoint, ControlFileData, MultiXactId, OffsetNumber, Oid, PageHeaderData,
RepOriginId, TimeLineID, TimestampTz, TransactionId, XLogRecPtr, XLogRecord, XLogSegNo, uint32,
uint64,
};
pub use v14::bindings::RepOriginId;
pub use v14::bindings::{uint32, uint64, Oid};
pub use v14::bindings::{BlockNumber, OffsetNumber};
pub use v14::bindings::{MultiXactId, TransactionId};
pub use v14::bindings::{TimeLineID, TimestampTz, XLogRecPtr, XLogSegNo};
// Likewise for these, although the assumption that these don't change is a little more iffy.
pub use v14::bindings::{MultiXactOffset, MultiXactStatus};
pub use v14::bindings::{PageHeaderData, XLogRecord};
pub use v14::xlog_utils::{
XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
};
pub use v14::bindings::{CheckPoint, ControlFileData};
// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and
// --with-segsize=SEGSIZE, but assume the defaults for now.
pub const BLCKSZ: u16 = 8192;
@@ -243,11 +246,13 @@ pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024;
pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16;
// Export some version independent functions that are used outside of this mod
pub use v14::xlog_utils::encode_logical_message;
pub use v14::xlog_utils::get_current_timestamp;
pub use v14::xlog_utils::to_pg_timestamp;
pub use v14::xlog_utils::try_from_pg_timestamp;
pub use v14::xlog_utils::XLogFileName;
pub use v14::bindings::DBState_DB_SHUTDOWNED;
pub use v14::xlog_utils::{
XLogFileName, encode_logical_message, get_current_timestamp, to_pg_timestamp,
try_from_pg_timestamp,
};
pub fn bkpimage_is_compressed(bimg_info: u8, version: u32) -> bool {
dispatch_pgversion!(version, pgv::bindings::bkpimg_is_compressed(bimg_info))
@@ -350,9 +355,8 @@ pub fn fsm_logical_to_physical(addr: BlockNumber) -> BlockNumber {
}
pub mod waldecoder {
use std::num::NonZeroU32;
use bytes::{Buf, Bytes, BytesMut};
use std::num::NonZeroU32;
use thiserror::Error;
use utils::lsn::Lsn;

View File

@@ -9,7 +9,8 @@
//! comments on them.
//!
use crate::{BLCKSZ, PageHeaderData};
use crate::PageHeaderData;
use crate::BLCKSZ;
//
// From pg_tablespace_d.h

View File

@@ -3,16 +3,18 @@
//!
//! TODO: Generate separate types for each supported PG version
use crate::pg_constants;
use crate::XLogRecord;
use crate::{
BlockNumber, MultiXactId, MultiXactOffset, MultiXactStatus, Oid, RepOriginId, TimestampTz,
TransactionId,
};
use crate::{BLCKSZ, XLOG_SIZE_OF_XLOG_RECORD};
use bytes::{Buf, Bytes};
use serde::{Deserialize, Serialize};
use utils::bin_ser::DeserializeError;
use utils::lsn::Lsn;
use crate::{
BLCKSZ, BlockNumber, MultiXactId, MultiXactOffset, MultiXactStatus, Oid, RepOriginId,
TimestampTz, TransactionId, XLOG_SIZE_OF_XLOG_RECORD, XLogRecord, pg_constants,
};
#[repr(C)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct XlMultiXactCreate {
@@ -506,9 +508,8 @@ pub fn decode_wal_record(
}
pub mod v14 {
use bytes::{Buf, Bytes};
use crate::{OffsetNumber, TransactionId};
use bytes::{Buf, Bytes};
#[repr(C)]
#[derive(Debug)]
@@ -677,10 +678,9 @@ pub mod v15 {
}
pub mod v16 {
use bytes::{Buf, Bytes};
pub use super::v14::{XlHeapInsert, XlHeapLockUpdated, XlHeapMultiInsert, XlParameterChange};
use crate::{OffsetNumber, TransactionId};
use bytes::{Buf, Bytes};
pub struct XlHeapDelete {
pub xmax: TransactionId,
@@ -746,9 +746,8 @@ pub mod v16 {
/* Since PG16, we have the Neon RMGR (RM_NEON_ID) to manage Neon-flavored WAL. */
pub mod rm_neon {
use bytes::{Buf, Bytes};
use crate::{OffsetNumber, TransactionId};
use bytes::{Buf, Bytes};
#[repr(C)]
#[derive(Debug)]
@@ -859,14 +858,14 @@ pub mod v16 {
}
pub mod v17 {
pub use super::v14::XlHeapLockUpdated;
pub use crate::{TimeLineID, TimestampTz};
use bytes::{Buf, Bytes};
pub use super::v14::XlHeapLockUpdated;
pub use super::v16::rm_neon;
pub use super::v16::{
XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapMultiInsert, XlHeapUpdate, XlParameterChange,
rm_neon,
};
pub use crate::{TimeLineID, TimestampTz};
#[repr(C)]
#[derive(Debug)]

View File

@@ -1,9 +1,7 @@
use std::path::PathBuf;
use std::str::FromStr;
use anyhow::*;
use clap::{Arg, ArgMatches, Command, value_parser};
use clap::{value_parser, Arg, ArgMatches, Command};
use postgres::Client;
use std::{path::PathBuf, str::FromStr};
use wal_craft::*;
fn main() -> Result<()> {

View File

@@ -1,18 +1,17 @@
use anyhow::{bail, ensure};
use camino_tempfile::{tempdir, Utf8TempDir};
use log::*;
use postgres::types::PgLsn;
use postgres::Client;
use postgres_ffi::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
use postgres_ffi::{
XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
};
use std::ffi::OsStr;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::time::{Duration, Instant};
use anyhow::{bail, ensure};
use camino_tempfile::{Utf8TempDir, tempdir};
use log::*;
use postgres::Client;
use postgres::types::PgLsn;
use postgres_ffi::{
WAL_SEGMENT_SIZE, XLOG_BLCKSZ, XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD,
XLOG_SIZE_OF_XLOG_SHORT_PHD,
};
macro_rules! xlog_utils_test {
($version:ident) => {
#[path = "."]

View File

@@ -10,10 +10,11 @@
//! calls.
//!
//! [Box]: https://docs.rs/futures-util/0.3.26/src/futures_util/lock/bilock.rs.html#107
use std::future::Future;
use std::io::{self, ErrorKind};
use bytes::{Buf, BytesMut};
use std::{
future::Future,
io::{self, ErrorKind},
};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf};
use crate::{BeMessage, FeMessage, FeStartupPacket, ProtocolError};

View File

@@ -5,15 +5,14 @@
pub mod framed;
use std::borrow::Cow;
use std::{fmt, io, str};
use byteorder::{BigEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::{borrow::Cow, fmt, io, str};
// re-export for use in utils pageserver_feedback.rs
pub use postgres_protocol::PG_EPOCH;
use serde::{Deserialize, Serialize};
pub type Oid = u32;
pub type SystemId = u64;
@@ -207,8 +206,8 @@ use rand::distributions::{Distribution, Standard};
impl Distribution<CancelKeyData> for Standard {
fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> CancelKeyData {
CancelKeyData {
backend_pid: rng.r#gen(),
cancel_key: rng.r#gen(),
backend_pid: rng.gen(),
cancel_key: rng.gen(),
}
}
}
@@ -1036,7 +1035,7 @@ impl BeMessage<'_> {
buf.put_u8(b'd');
write_body(buf, |buf| {
buf.put_u8(b'0'); // matches INTERPRETED_WAL_RECORD_TAG in postgres-protocol
// dependency
// dependency
buf.put_u64(rec.streaming_lsn);
buf.put_u64(rec.commit_lsn);
buf.put_slice(rec.data);

View File

@@ -85,12 +85,12 @@ impl MemberSet {
Ok(MemberSet { m: members })
}
pub fn contains(&self, sk: NodeId) -> bool {
self.m.iter().any(|m| m.id == sk)
pub fn contains(&self, sk: &SafekeeperId) -> bool {
self.m.iter().any(|m| m.id == sk.id)
}
pub fn add(&mut self, sk: SafekeeperId) -> anyhow::Result<()> {
if self.contains(sk.id) {
if self.contains(&sk) {
bail!(format!(
"sk {} is already member of the set {}",
sk.id, self
@@ -130,11 +130,6 @@ impl Configuration {
new_members: None,
}
}
/// Is `sk_id` member of the configuration?
pub fn contains(&self, sk_id: NodeId) -> bool {
self.members.contains(sk_id) || self.new_members.as_ref().is_some_and(|m| m.contains(sk_id))
}
}
impl Display for Configuration {

View File

@@ -130,7 +130,11 @@ impl StorageModel {
break;
}
}
if possible { Some(snapshot_later) } else { None }
if possible {
Some(snapshot_later)
} else {
None
}
} else {
None
};

View File

@@ -76,10 +76,7 @@ pub fn draw_svg(
let mut result = String::new();
writeln!(
result,
"<svg xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\" height=\"300\" width=\"500\">"
)?;
writeln!(result, "<svg xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\" height=\"300\" width=\"500\">")?;
draw.calculate_svg_layout();

View File

@@ -1,8 +1,8 @@
//! Tracing wrapper for Hyper HTTP server
use hyper0::HeaderMap;
use hyper0::{Body, Request, Response};
use std::future::Future;
use hyper0::{Body, HeaderMap, Request, Response};
use tracing::Instrument;
use tracing_opentelemetry::OpenTelemetrySpanExt;

View File

@@ -36,11 +36,11 @@
pub mod http;
use opentelemetry::KeyValue;
use opentelemetry::trace::TracerProvider;
use opentelemetry::KeyValue;
use tracing::Subscriber;
use tracing_subscriber::Layer;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::Layer;
/// Set up OpenTelemetry exporter, using configuration from environment variables.
///

View File

@@ -1,6 +1,6 @@
use std::time::Duration;
use criterion::{Bencher, Criterion, criterion_group, criterion_main};
use criterion::{criterion_group, criterion_main, Bencher, Criterion};
use pprof::criterion::{Output, PProfProfiler};
use utils::id;
use utils::logging::log_slow;

View File

@@ -1,15 +1,12 @@
// For details about authentication see docs/authentication.md
use std::borrow::Cow;
use std::fmt::Display;
use std::fs;
use std::sync::Arc;
use arc_swap::ArcSwap;
use std::{borrow::Cow, fmt::Display, fs, sync::Arc};
use anyhow::Result;
use arc_swap::ArcSwap;
use camino::Utf8Path;
use jsonwebtoken::{
Algorithm, DecodingKey, EncodingKey, Header, TokenData, Validation, decode, encode,
decode, encode, Algorithm, DecodingKey, EncodingKey, Header, TokenData, Validation,
};
use serde::{Deserialize, Serialize};
@@ -132,9 +129,7 @@ impl JwtAuth {
anyhow::bail!("path is neither a directory or a file")
};
if decoding_keys.is_empty() {
anyhow::bail!(
"Configured for JWT auth with zero decoding keys. All JWT gated requests would be rejected."
);
anyhow::bail!("Configured for JWT auth with zero decoding keys. All JWT gated requests would be rejected.");
}
Ok(Self::new(decoding_keys))
}
@@ -180,9 +175,8 @@ pub fn encode_from_key_file(claims: &Claims, key_data: &[u8]) -> Result<String>
#[cfg(test)]
mod tests {
use std::str::FromStr;
use super::*;
use std::str::FromStr;
// Generated with:
//
@@ -221,9 +215,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
let encoded_eddsa = "eyJhbGciOiJFZERTQSIsInR5cCI6IkpXVCJ9.eyJzY29wZSI6InRlbmFudCIsInRlbmFudF9pZCI6IjNkMWY3NTk1YjQ2ODIzMDMwNGUwYjczY2VjYmNiMDgxIiwiaXNzIjoibmVvbi5jb250cm9scGxhbmUiLCJpYXQiOjE2Nzg0NDI0Nzl9.rNheBnluMJNgXzSTTJoTNIGy4P_qe0JUHl_nVEGuDCTgHOThPVr552EnmKccrCKquPeW3c2YUk0Y9Oh4KyASAw";
// Check it can be validated with the public key
let auth = JwtAuth::new(vec![
DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519).unwrap(),
]);
let auth = JwtAuth::new(vec![DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519).unwrap()]);
let claims_from_token = auth.decode(encoded_eddsa).unwrap().claims;
assert_eq!(claims_from_token, expected_claims);
}
@@ -238,9 +230,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
let encoded = encode_from_key_file(&claims, TEST_PRIV_KEY_ED25519).unwrap();
// decode it back
let auth = JwtAuth::new(vec![
DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519).unwrap(),
]);
let auth = JwtAuth::new(vec![DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519).unwrap()]);
let decoded = auth.decode(&encoded).unwrap();
assert_eq!(decoded.claims, claims);

View File

@@ -121,11 +121,9 @@ where
#[cfg(test)]
mod tests {
use std::io;
use tokio::sync::Mutex;
use super::*;
use std::io;
use tokio::sync::Mutex;
#[test]
fn backoff_defaults_produce_growing_backoff_sequence() {

View File

@@ -13,11 +13,9 @@
#![warn(missing_docs)]
use std::io::{self, Read, Write};
use bincode::Options;
use serde::Serialize;
use serde::de::DeserializeOwned;
use serde::{de::DeserializeOwned, Serialize};
use std::io::{self, Read, Write};
use thiserror::Error;
/// An error that occurred during a deserialize operation
@@ -263,11 +261,9 @@ impl<T> LeSer for T {}
#[cfg(test)]
mod tests {
use std::io::Cursor;
use serde::{Deserialize, Serialize};
use super::DeserializeError;
use serde::{Deserialize, Serialize};
use std::io::Cursor;
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ShortStruct {

View File

@@ -1,5 +1,7 @@
use std::fmt::Display;
use std::time::{Duration, Instant};
use std::{
fmt::Display,
time::{Duration, Instant},
};
use metrics::IntCounter;

View File

@@ -1,5 +1,4 @@
use tokio_util::task::TaskTracker;
use tokio_util::task::task_tracker::TaskTrackerToken;
use tokio_util::task::{task_tracker::TaskTrackerToken, TaskTracker};
/// While a reference is kept around, the associated [`Barrier::wait`] will wait.
///

View File

@@ -1,7 +1,9 @@
use std::borrow::Cow;
use std::fs::{self, File};
use std::io::{self, Write};
use std::os::fd::AsRawFd;
use std::{
borrow::Cow,
fs::{self, File},
io::{self, Write},
};
use camino::{Utf8Path, Utf8PathBuf};

View File

@@ -1,7 +1,6 @@
//! Wrapper around `std::env::var` for parsing environment variables.
use std::fmt::Display;
use std::str::FromStr;
use std::{fmt::Display, str::FromStr};
/// For types `V` that implement [`FromStr`].
pub fn var<V, E>(varname: &str) -> Option<V>

View File

@@ -127,9 +127,6 @@ pub async fn failpoint_sleep_cancellable_helper(
tracing::info!("failpoint {:?}: sleep done", name);
}
/// Initialize the configured failpoints
///
/// You must call this function before any concurrent threads do operations.
pub fn init() -> fail::FailScenario<'static> {
// The failpoints lib provides support for parsing the `FAILPOINTS` env var.
// We want non-default behavior for `exit`, though, so, we handle it separately.
@@ -137,10 +134,7 @@ pub fn init() -> fail::FailScenario<'static> {
// Format for FAILPOINTS is "name=actions" separated by ";".
let actions = std::env::var("FAILPOINTS");
if actions.is_ok() {
// SAFETY: this function should before any threads start and access env vars concurrently
unsafe {
std::env::remove_var("FAILPOINTS");
}
std::env::remove_var("FAILPOINTS");
} else {
// let the library handle non-utf8, or nothing for not present
}

View File

@@ -58,9 +58,10 @@ where
#[cfg(test)]
mod test {
use super::ignore_absent_files;
use crate::fs_ext::{is_directory_empty, list_dir};
use super::ignore_absent_files;
#[test]
fn is_empty_dir() {
use super::PathExt;

View File

@@ -38,8 +38,7 @@ pub fn rename_noreplace<P1: ?Sized + NixPath, P2: ?Sized + NixPath>(
#[cfg(test)]
mod test {
use std::fs;
use std::path::PathBuf;
use std::{fs, path::PathBuf};
use super::*;

View File

@@ -169,9 +169,9 @@ mod test {
];
let mut s = String::new();
for (line, gen_, expected) in examples {
for (line, gen, expected) in examples {
s.clear();
write!(s, "{}", &gen_.get_suffix()).expect("string grows");
write!(s, "{}", &gen.get_suffix()).expect("string grows");
assert_eq!(s, expected, "example on {line}");
}
}

View File

@@ -1,9 +1,8 @@
//! A wrapper around `ArcSwap` that ensures there is only one writer at a time and writes
//! don't block reads.
use std::sync::Arc;
use arc_swap::ArcSwap;
use std::sync::Arc;
use tokio::sync::TryLockError;
pub struct GuardArcSwap<T> {

View File

@@ -1,6 +1,5 @@
use std::fmt;
use std::num::ParseIntError;
use std::str::FromStr;
use std::{fmt, str::FromStr};
use anyhow::Context;
use hex::FromHex;
@@ -216,7 +215,7 @@ macro_rules! id_newtype {
impl AsRef<[u8]> for $t {
fn as_ref(&self) -> &[u8] {
&self.0.0
&self.0 .0
}
}
@@ -368,9 +367,10 @@ impl FromStr for NodeId {
mod tests {
use serde_assert::{Deserializer, Serializer, Token, Tokens};
use super::*;
use crate::bin_ser::BeSer;
use super::*;
#[test]
fn test_id_serde_non_human_readable() {
let original_id = Id([

View File

@@ -21,12 +21,15 @@
//!
//! Another explaination can be found here: <https://brandur.org/rate-limiting>
use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Mutex,
},
time::Duration,
};
use tokio::sync::Notify;
use tokio::time::Instant;
use tokio::{sync::Notify, time::Instant};
pub struct LeakyBucketConfig {
/// This is the "time cost" of a single request unit.

View File

@@ -2,23 +2,21 @@
//!
//! <https://elixir.bootlin.com/linux/v6.1.128/source/include/uapi/linux/sockios.h#L25-L27>
use std::io;
use std::mem::MaybeUninit;
use std::os::fd::RawFd;
use std::os::raw::c_int;
use std::{
io,
mem::MaybeUninit,
os::{fd::RawFd, raw::c_int},
};
use nix::libc::{FIONREAD, TIOCOUTQ};
unsafe fn do_ioctl(socket_fd: RawFd, cmd: nix::libc::Ioctl) -> io::Result<c_int> {
let mut inq: MaybeUninit<c_int> = MaybeUninit::uninit();
// SAFETY: encapsulating fn is unsafe, we require `socket_fd` to be a valid file descriptor
unsafe {
let err = nix::libc::ioctl(socket_fd, cmd, inq.as_mut_ptr());
if err == 0 {
Ok(inq.assume_init())
} else {
Err(io::Error::last_os_error())
}
let err = nix::libc::ioctl(socket_fd, cmd, inq.as_mut_ptr());
if err == 0 {
Ok(inq.assume_init())
} else {
Err(io::Error::last_os_error())
}
}
@@ -26,14 +24,12 @@ unsafe fn do_ioctl(socket_fd: RawFd, cmd: nix::libc::Ioctl) -> io::Result<c_int>
///
/// Caller must ensure that `socket_fd` is a valid TCP socket file descriptor.
pub unsafe fn inq(socket_fd: RawFd) -> io::Result<c_int> {
// SAFETY: encapsulating fn is unsafe
unsafe { do_ioctl(socket_fd, FIONREAD) }
do_ioctl(socket_fd, FIONREAD)
}
/// # Safety
///
/// Caller must ensure that `socket_fd` is a valid TCP socket file descriptor.
pub unsafe fn outq(socket_fd: RawFd) -> io::Result<c_int> {
// SAFETY: encapsulating fn is unsafe
unsafe { do_ioctl(socket_fd, TIOCOUTQ) }
do_ioctl(socket_fd, TIOCOUTQ)
}

View File

@@ -6,15 +6,16 @@
//! there for potential pitfalls with lock files that are used
//! to store PIDs (pidfiles).
use std::fs;
use std::io::{Read, Write};
use std::ops::Deref;
use std::os::unix::prelude::AsRawFd;
use std::{
fs,
io::{Read, Write},
ops::Deref,
os::unix::prelude::AsRawFd,
};
use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use nix::errno::Errno::EAGAIN;
use nix::fcntl;
use nix::{errno::Errno::EAGAIN, fcntl};
use crate::crashsafe;

View File

@@ -273,9 +273,7 @@ fn log_panic_to_stderr(
location: Option<PrettyLocation<'_, '_>>,
backtrace: &std::backtrace::Backtrace,
) {
eprintln!(
"panic while tracing is unconfigured: thread '{thread}' panicked at '{msg}', {location:?}\nStack backtrace:\n{backtrace}"
);
eprintln!("panic while tracing is unconfigured: thread '{thread}' panicked at '{msg}', {location:?}\nStack backtrace:\n{backtrace}");
}
struct PrettyLocation<'a, 'b>(&'a std::panic::Location<'b>);
@@ -363,8 +361,7 @@ pub async fn log_slow<O>(name: &str, threshold: Duration, f: impl Future<Output
#[cfg(test)]
mod tests {
use metrics::IntCounterVec;
use metrics::core::Opts;
use metrics::{core::Opts, IntCounterVec};
use crate::logging::{TracingEventCountLayer, TracingEventCountMetric};

View File

@@ -1,13 +1,11 @@
#![warn(missing_docs)]
use serde::{de::Visitor, Deserialize, Serialize};
use std::fmt;
use std::ops::{Add, AddAssign};
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use serde::de::Visitor;
use serde::{Deserialize, Serialize};
use crate::seqwait::MonotonicCounter;
/// Transaction log block size in bytes
@@ -409,10 +407,11 @@ impl rand::distributions::uniform::UniformSampler for LsnSampler {
#[cfg(test)]
mod tests {
use serde_assert::{Deserializer, Serializer, Token, Tokens};
use crate::bin_ser::BeSer;
use super::*;
use crate::bin_ser::BeSer;
use serde_assert::{Deserializer, Serializer, Token, Tokens};
#[test]
fn test_lsn_strings() {

View File

@@ -1,8 +1,7 @@
use pin_project_lite::pin_project;
use std::io::Read;
use std::pin::Pin;
use std::{io, task};
use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
pin_project! {

View File

@@ -1,7 +1,7 @@
use std::time::{Duration, SystemTime};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use pq_proto::{PG_EPOCH, read_cstr};
use pq_proto::{read_cstr, PG_EPOCH};
use serde::{Deserialize, Serialize};
use tracing::{trace, warn};

View File

@@ -3,7 +3,7 @@
//! postgres_connection crate.
use anyhow::Context;
use postgres_connection::{PgConnectionConfig, parse_host_port};
use postgres_connection::{parse_host_port, PgConnectionConfig};
use crate::id::TenantTimelineId;

View File

@@ -53,11 +53,10 @@ mod tests {
#[test]
fn basics() {
use super::RateLimit;
use std::sync::atomic::Ordering::Relaxed;
use std::time::Duration;
use super::RateLimit;
let called = AtomicUsize::new(0);
let mut f = RateLimit::new(Duration::from_millis(100));

View File

@@ -1,7 +1,7 @@
use sentry::ClientInitGuard;
use std::borrow::Cow;
use std::env;
use sentry::ClientInitGuard;
pub use sentry::release_name;
#[must_use]

View File

@@ -5,7 +5,6 @@ use std::collections::BinaryHeap;
use std::mem;
use std::sync::Mutex;
use std::time::Duration;
use tokio::sync::watch::{self, channel};
use tokio::time::timeout;
@@ -249,7 +248,11 @@ where
let internal = self.internal.lock().unwrap();
let cnt = internal.current.cnt_value();
drop(internal);
if cnt >= num { Ok(()) } else { Err(cnt) }
if cnt >= num {
Ok(())
} else {
Err(cnt)
}
}
/// Register and return a channel that will be notified when a number arrives,
@@ -322,9 +325,8 @@ where
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use std::sync::Arc;
impl MonotonicCounter<i32> for i32 {
fn cnt_advance(&mut self, val: i32) {

View File

@@ -12,7 +12,11 @@ pub struct Percent(#[serde(deserialize_with = "deserialize_pct_0_to_100")] u8);
impl Percent {
pub const fn new(pct: u8) -> Option<Self> {
if pct <= 100 { Some(Percent(pct)) } else { None }
if pct <= 100 {
Some(Percent(pct))
} else {
None
}
}
pub fn get(&self) -> u8 {

View File

@@ -1,7 +1,6 @@
//! See `pageserver_api::shard` for description on sharding.
use std::ops::RangeInclusive;
use std::str::FromStr;
use std::{ops::RangeInclusive, str::FromStr};
use hex::FromHex;
use serde::{Deserialize, Serialize};
@@ -60,7 +59,11 @@ impl ShardCount {
/// This method returns the actual number of shards, i.e. if our internal value is
/// zero, we return 1 (unsharded tenants have 1 shard).
pub fn count(&self) -> u8 {
if self.0 > 0 { self.0 } else { 1 }
if self.0 > 0 {
self.0
} else {
1
}
}
/// The literal internal value: this is **not** the number of shards in the

View File

@@ -1,7 +1,7 @@
pub use signal_hook::consts::TERM_SIGNALS;
pub use signal_hook::consts::signal::*;
use signal_hook::iterator::Signals;
pub use signal_hook::consts::{signal::*, TERM_SIGNALS};
pub enum Signal {
Quit,
Interrupt,

View File

@@ -44,7 +44,8 @@
#![warn(missing_docs)]
use std::ops::Deref;
use std::sync::{Arc, RwLock, RwLockWriteGuard, Weak};
use std::sync::{Arc, Weak};
use std::sync::{RwLock, RwLockWriteGuard};
use tokio::sync::watch;
@@ -218,11 +219,10 @@ impl RcuWaitList {
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
use std::time::Duration;
use super::*;
#[tokio::test]
async fn two_writers() {
let rcu = Rcu::new(1);

View File

@@ -1,6 +1,10 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
/// Gates are a concurrency helper, primarily used for implementing safe shutdown.
///

View File

@@ -1,6 +1,7 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex, MutexGuard,
};
use tokio::sync::Semaphore;
/// Custom design like [`tokio::sync::OnceCell`] but using [`OwnedSemaphorePermit`] instead of
@@ -300,13 +301,14 @@ impl Drop for InitPermit {
#[cfg(test)]
mod tests {
use std::convert::Infallible;
use std::pin::{Pin, pin};
use std::time::Duration;
use futures::Future;
use super::*;
use std::{
convert::Infallible,
pin::{pin, Pin},
time::Duration,
};
#[tokio::test]
async fn many_initializers() {

View File

@@ -1,5 +1,4 @@
use core::future::poll_fn;
use core::task::Poll;
use core::{future::poll_fn, task::Poll};
use std::sync::{Arc, Mutex};
use diatomic_waker::DiatomicWaker;

View File

@@ -1,8 +1,9 @@
use std::io;
use std::net::{TcpListener, ToSocketAddrs};
use std::{
io,
net::{TcpListener, ToSocketAddrs},
};
use nix::sys::socket::setsockopt;
use nix::sys::socket::sockopt::ReuseAddr;
use nix::sys::socket::{setsockopt, sockopt::ReuseAddr};
/// Bind a [`TcpListener`] to addr with `SO_REUSEADDR` set to true.
pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {

View File

@@ -172,14 +172,16 @@ fn tracing_subscriber_configured() -> bool {
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::fmt::{self};
use std::hash::{Hash, Hasher};
use tracing_subscriber::prelude::*;
use super::*;
use std::{
collections::HashSet,
fmt::{self},
hash::{Hash, Hasher},
};
struct MemoryIdentity<'a>(&'a dyn Extractor);
impl MemoryIdentity<'_> {

View File

@@ -44,11 +44,9 @@ where
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arc_swap::ArcSwap;
use super::*;
use arc_swap::ArcSwap;
use std::sync::Arc;
#[test]
fn test_try_rcu_success() {

Some files were not shown because too many files have changed in this diff Show More