Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Bayandin
8dabb5d21f run-python-test-set: remove needs_postgres_source input 2025-01-31 01:35:25 +00:00
229 changed files with 5261 additions and 9696 deletions

View File

@@ -24,4 +24,3 @@
!storage_controller/
!vendor/postgres-*/
!workspace_hack/
!build_tools/patches

View File

@@ -3,7 +3,6 @@ name: Bug Template
about: Used for describing bugs
title: ''
labels: t/bug
type: Bug
assignees: ''
---

View File

@@ -4,7 +4,6 @@ about: A set of related tasks contributing towards specific outcome, comprising
more than 1 week of work.
title: 'Epic: '
labels: t/Epic
type: Epic
assignees: ''
---

View File

@@ -27,4 +27,3 @@ config-variables:
- SLACK_ON_CALL_QA_STAGING_STREAM
- DEV_AWS_OIDC_ROLE_MANAGE_BENCHMARK_EC2_VMS_ARN
- SLACK_ON_CALL_STORAGE_STAGING_STREAM
- SLACK_CICD_CHANNEL_ID

View File

@@ -41,10 +41,7 @@ inputs:
description: 'Path to directory containing libpq library - it is caller responsibility to provision the libpq library'
required: false
default: '/tmp/neon/pg_install/v16/lib'
project_settings:
description: 'A JSON object with project settings'
required: false
default: '{}'
outputs:
dsn:
@@ -76,7 +73,7 @@ runs:
\"provisioner\": \"k8s-neonvm\",
\"autoscaling_limit_min_cu\": ${MIN_CU},
\"autoscaling_limit_max_cu\": ${MAX_CU},
\"settings\": ${PROJECT_SETTINGS}
\"settings\": { }
}
}")
@@ -95,12 +92,12 @@ runs:
if [ "${SHARD_SPLIT_PROJECT}" = "true" ]; then
# determine tenant ID
TENANT_ID=`${PSQL} ${dsn} -t -A -c "SHOW neon.tenant_id"`
echo "Splitting project ${project_id} with tenant_id ${TENANT_ID} into $((SHARD_COUNT)) shards with stripe size $((STRIPE_SIZE))"
echo "Sending PUT request to https://${API_HOST}/regions/${REGION_ID}/api/v1/admin/storage/proxy/control/v1/tenant/${TENANT_ID}/shard_split"
echo "with body {\"new_shard_count\": $((SHARD_COUNT)), \"new_stripe_size\": $((STRIPE_SIZE))}"
# we need an ADMIN API KEY to invoke storage controller API for shard splitting (bash -u above checks that the variable is set)
curl -X PUT \
"https://${API_HOST}/regions/${REGION_ID}/api/v1/admin/storage/proxy/control/v1/tenant/${TENANT_ID}/shard_split" \
@@ -121,4 +118,3 @@ runs:
STRIPE_SIZE: ${{ inputs.stripe_size }}
PSQL: ${{ inputs.psql_path }}
LD_LIBRARY_PATH: ${{ inputs.libpq_lib_path }}
PROJECT_SETTINGS: ${{ inputs.project_settings }}

View File

@@ -12,10 +12,6 @@ inputs:
description: 'Arbitrary parameters to pytest. For example "-s" to prevent capturing stdout/stderr'
required: false
default: ''
needs_postgres_source:
description: 'Set to true if the test suite requires postgres source checked out'
required: false
default: 'false'
run_in_parallel:
description: 'Whether to run tests in parallel'
required: false
@@ -84,12 +80,6 @@ runs:
skip-if-does-not-exist: true
aws-oicd-role-arn: ${{ inputs.aws-oicd-role-arn }}
- name: Checkout
if: inputs.needs_postgres_source == 'true'
uses: actions/checkout@v4
with:
submodules: true
- name: Cache poetry deps
uses: actions/cache@v4
with:
@@ -121,8 +111,6 @@ runs:
export DEFAULT_PG_VERSION=${PG_VERSION#v}
export LD_LIBRARY_PATH=${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/lib
export BENCHMARK_CONNSTR=${BENCHMARK_CONNSTR:-}
export ASAN_OPTIONS=detect_leaks=0:detect_stack_use_after_return=0:abort_on_error=1:strict_string_checks=1:check_initialization_order=1:strict_init_order=1
export UBSAN_OPTIONS=abort_on_error=1:print_stacktrace=1
if [ "${BUILD_TYPE}" = "remote" ]; then
export REMOTE_ENV=1

View File

@@ -1,5 +1,4 @@
rust_code: ['**/*.rs', '**/Cargo.toml', '**/Cargo.lock']
rust_dependencies: ['**/Cargo.lock']
v14: ['vendor/postgres-v14/**', 'Makefile', 'pgxn/**']
v15: ['vendor/postgres-v15/**', 'Makefile', 'pgxn/**']

View File

@@ -23,11 +23,6 @@ on:
description: 'a json object of postgres versions and lfc states to run regression tests on'
required: true
type: string
sanitizers:
description: 'enabled or disabled'
required: false
default: 'disabled'
type: string
defaults:
run:
@@ -92,7 +87,6 @@ jobs:
- name: Set env variables
env:
ARCH: ${{ inputs.arch }}
SANITIZERS: ${{ inputs.sanitizers }}
run: |
CARGO_FEATURES="--features testing"
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' ]]; then
@@ -105,14 +99,8 @@ jobs:
cov_prefix=""
CARGO_FLAGS="--locked --release"
fi
if [[ $SANITIZERS == 'enabled' ]]; then
make_vars="WITH_SANITIZERS=yes"
else
make_vars=""
fi
{
echo "cov_prefix=${cov_prefix}"
echo "make_vars=${make_vars}"
echo "CARGO_FEATURES=${CARGO_FEATURES}"
echo "CARGO_FLAGS=${CARGO_FLAGS}"
echo "CARGO_HOME=${GITHUB_WORKSPACE}/.cargo"
@@ -148,39 +136,37 @@ jobs:
- name: Build postgres v14
if: steps.cache_pg_14.outputs.cache-hit != 'true'
run: mold -run make ${make_vars} postgres-v14 -j$(nproc)
run: mold -run make postgres-v14 -j$(nproc)
- name: Build postgres v15
if: steps.cache_pg_15.outputs.cache-hit != 'true'
run: mold -run make ${make_vars} postgres-v15 -j$(nproc)
run: mold -run make postgres-v15 -j$(nproc)
- name: Build postgres v16
if: steps.cache_pg_16.outputs.cache-hit != 'true'
run: mold -run make ${make_vars} postgres-v16 -j$(nproc)
run: mold -run make postgres-v16 -j$(nproc)
- name: Build postgres v17
if: steps.cache_pg_17.outputs.cache-hit != 'true'
run: mold -run make ${make_vars} postgres-v17 -j$(nproc)
run: mold -run make postgres-v17 -j$(nproc)
- name: Build neon extensions
run: mold -run make ${make_vars} neon-pg-ext -j$(nproc)
run: mold -run make neon-pg-ext -j$(nproc)
- name: Build walproposer-lib
run: mold -run make ${make_vars} walproposer-lib -j$(nproc)
run: mold -run make walproposer-lib -j$(nproc)
- name: Run cargo build
env:
WITH_TESTS: ${{ inputs.sanitizers != 'enabled' && '--tests' || '' }}
run: |
export ASAN_OPTIONS=detect_leaks=0
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins ${WITH_TESTS}
PQ_LIB_DIR=$(pwd)/pg_install/v16/lib
export PQ_LIB_DIR
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests
# Do install *before* running rust tests because they might recompile the
# binaries with different features/flags.
- name: Install rust binaries
env:
ARCH: ${{ inputs.arch }}
SANITIZERS: ${{ inputs.sanitizers }}
run: |
# Install target binaries
mkdir -p /tmp/neon/bin/
@@ -195,7 +181,7 @@ jobs:
done
# Install test executables and write list of all binaries (for code coverage)
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' && $SANITIZERS != 'enabled' ]]; then
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' ]]; then
# Keep bloated coverage data files away from the rest of the artifact
mkdir -p /tmp/coverage/
@@ -228,10 +214,11 @@ jobs:
role-duration-seconds: 18000 # 5 hours
- name: Run rust tests
if: ${{ inputs.sanitizers != 'enabled' }}
env:
NEXTEST_RETRIES: 3
run: |
PQ_LIB_DIR=$(pwd)/pg_install/v16/lib
export PQ_LIB_DIR
LD_LIBRARY_PATH=$(pwd)/pg_install/v17/lib
export LD_LIBRARY_PATH
@@ -284,27 +271,6 @@ jobs:
path: /tmp/neon
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Check diesel schema
if: inputs.build-type == 'release' && inputs.arch == 'x64'
env:
DATABASE_URL: postgresql://localhost:1235/storage_controller
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
run: |
export ASAN_OPTIONS=detect_leaks=0
/tmp/neon/bin/neon_local init
/tmp/neon/bin/neon_local storage_controller start
diesel print-schema > storage_controller/src/schema.rs
if [ -n "$(git diff storage_controller/src/schema.rs)" ]; then
echo >&2 "Uncommitted changes in diesel schema"
git diff .
exit 1
fi
/tmp/neon/bin/neon_local storage_controller stop
# XXX: keep this after the binaries.list is formed, so the coverage can properly work later
- name: Merge and upload coverage data
if: inputs.build-type == 'debug'
@@ -337,11 +303,10 @@ jobs:
- name: Pytest regression tests
continue-on-error: ${{ matrix.lfc_state == 'with-lfc' && inputs.build-type == 'debug' }}
uses: ./.github/actions/run-python-test-set
timeout-minutes: ${{ inputs.sanitizers != 'enabled' && 60 || 180 }}
timeout-minutes: 60
with:
build_type: ${{ inputs.build-type }}
test_selection: regress
needs_postgres_source: true
run_with_real_s3: true
real_s3_bucket: neon-github-ci-tests
real_s3_region: eu-central-1
@@ -355,7 +320,6 @@ jobs:
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
PAGESERVER_GET_VECTORED_CONCURRENT_IO: sidecar-task
USE_LFC: ${{ matrix.lfc_state == 'with-lfc' && 'true' || 'false' }}
SANITIZERS: ${{ inputs.sanitizers }}
# Temporary disable this step until we figure out why it's so flaky
# Ref https://github.com/neondatabase/neon/issues/4540

View File

@@ -16,9 +16,6 @@ defaults:
run:
shell: bash -euxo pipefail {0}
# No permission for GITHUB_TOKEN by default; the **minimal required** set of permissions should be granted in each job.
permissions: {}
jobs:
check-codestyle-rust:
strategy:
@@ -87,3 +84,8 @@ jobs:
run: |
cargo hakari generate --diff # workspace-hack Cargo.toml is up-to-date
cargo hakari manage-deps --dry-run # all workspace crates depend on workspace-hack
# https://github.com/EmbarkStudios/cargo-deny
- name: Check rust licenses/bans/advisories/sources
if: ${{ !cancelled() }}
run: cargo deny check --hide-inclusion-graph

View File

@@ -67,9 +67,9 @@ jobs:
- uses: actions/checkout@v4
with:
ref: ${{ github.event.pull_request.head.sha }}
ref: main
token: ${{ secrets.CI_ACCESS_TOKEN }}
- name: Look for existing PR
id: get-pr
env:
@@ -77,7 +77,7 @@ jobs:
run: |
ALREADY_CREATED="$(gh pr --repo ${GITHUB_REPOSITORY} list --head ${BRANCH} --base main --json number --jq '.[].number')"
echo "ALREADY_CREATED=${ALREADY_CREATED}" >> ${GITHUB_OUTPUT}
- name: Get changed labels
id: get-labels
if: steps.get-pr.outputs.ALREADY_CREATED != ''
@@ -94,6 +94,8 @@ jobs:
echo "LABELS_TO_ADD=${LABELS_TO_ADD}" >> ${GITHUB_OUTPUT}
echo "LABELS_TO_REMOVE=${LABELS_TO_REMOVE}" >> ${GITHUB_OUTPUT}
- run: gh pr checkout "${PR_NUMBER}"
- run: git checkout -b "${BRANCH}"
- run: git push --force origin "${BRANCH}"
@@ -101,7 +103,7 @@ jobs:
- name: Create a Pull Request for CI run (if required)
if: steps.get-pr.outputs.ALREADY_CREATED == ''
env:
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
run: |
cat << EOF > body.md
@@ -138,7 +140,7 @@ jobs:
- run: git push --force origin "${BRANCH}"
if: steps.get-pr.outputs.ALREADY_CREATED != ''
cleanup:
# Close PRs and delete branchs if the original PR is closed.

View File

@@ -340,7 +340,7 @@ jobs:
],
"pg_version" : [
16,17
]
],
}'
if [ "$(date +%A)" = "Saturday" ] || [ ${RUN_AWS_RDS_AND_AURORA} = "true" ]; then
@@ -810,7 +810,7 @@ jobs:
# We might change it after https://github.com/neondatabase/neon/issues/2900.
#
# *_TPCH_S10_CONNSTR: DB generated with scale factor 10 (~10 GB)
# if: ${{ !cancelled() && (github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null) }}
if: ${{ !cancelled() && (github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null) }}
permissions:
contents: write
statuses: write
@@ -929,7 +929,7 @@ jobs:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
user-examples-compare:
# if: ${{ !cancelled() && (github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null) }}
if: ${{ !cancelled() && (github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null) }}
permissions:
contents: write
statuses: write

View File

@@ -235,7 +235,7 @@ jobs:
echo 'CPPFLAGS=-I/usr/local/opt/openssl@3/include' >> $GITHUB_ENV
- name: Run cargo build (only for v17)
run: cargo build --all --release -j$(sysctl -n hw.ncpu)
run: PQ_LIB_DIR=$(pwd)/pg_install/v17/lib cargo build --all --release -j$(sysctl -n hw.ncpu)
- name: Check that no warnings are produced (only for v17)
run: ./run_clippy.sh

View File

@@ -45,26 +45,6 @@ jobs:
run cancel-previous-in-concurrency-group.yml \
--field concurrency_group="${{ env.E2E_CONCURRENCY_GROUP }}"
files-changed:
needs: [ check-permissions ]
runs-on: [ self-hosted, small ]
timeout-minutes: 3
outputs:
check-rust-dependencies: ${{ steps.files-changed.outputs.rust_dependencies }}
steps:
- name: Checkout
uses: actions/checkout@v4
with:
submodules: true
- name: Check for file changes
uses: dorny/paths-filter@de90cc6fb38fc0963ad72b210f1f284cd68cea36 # v3.0.2
id: files-changed
with:
token: ${{ secrets.GITHUB_TOKEN }}
filters: .github/file-filters.yaml
tag:
needs: [ check-permissions ]
runs-on: [ self-hosted, small ]
@@ -190,14 +170,6 @@ jobs:
archs: '["x64", "arm64"]'
secrets: inherit
check-dependencies-rust:
needs: [ files-changed, build-build-tools-image ]
if: ${{ needs.files-changed.outputs.check-rust-dependencies == 'true' }}
uses: ./.github/workflows/cargo-deny.yml
with:
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
secrets: inherit
build-and-test-locally:
needs: [ tag, build-build-tools-image ]
strategy:
@@ -682,7 +654,7 @@ jobs:
push: true
pull: true
file: compute/compute-node.Dockerfile
target: extension-tests
target: neon-pg-ext-test
cache-from: type=registry,ref=cache.neon.build/compute-node-${{ matrix.version.pg }}:cache-${{ matrix.version.debian }}-${{ matrix.arch }}
tags: |
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{needs.tag.outputs.build-tag}}-${{ matrix.version.debian }}-${{ matrix.arch }}
@@ -1360,8 +1332,6 @@ jobs:
- build-and-test-locally
- check-codestyle-python
- check-codestyle-rust
- check-dependencies-rust
- files-changed
- promote-images-dev
- test-images
- trigger-custom-extensions-build-and-wait
@@ -1374,11 +1344,4 @@ jobs:
if: |
contains(needs.*.result, 'failure')
|| contains(needs.*.result, 'cancelled')
|| (needs.check-dependencies-rust.result == 'skipped' && needs.files-changed.outputs.check-rust-dependencies == 'true')
|| needs.build-and-test-locally.result == 'skipped'
|| needs.check-codestyle-python.result == 'skipped'
|| needs.check-codestyle-rust.result == 'skipped'
|| needs.files-changed.result == 'skipped'
|| needs.promote-images-dev.result == 'skipped'
|| needs.test-images.result == 'skipped'
|| needs.trigger-custom-extensions-build-and-wait.result == 'skipped'
|| contains(needs.*.result, 'skipped')

View File

@@ -1,134 +0,0 @@
name: Build and Test with Sanitizers
on:
schedule:
# * is a special character in YAML so you have to quote this string
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
- cron: '0 1 * * *' # run once a day, timezone is utc
workflow_dispatch:
defaults:
run:
shell: bash -euxo pipefail {0}
concurrency:
# Allow only one workflow per any non-`main` branch.
group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
cancel-in-progress: true
env:
RUST_BACKTRACE: 1
COPT: '-Werror'
jobs:
tag:
runs-on: [ self-hosted, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
outputs:
build-tag: ${{steps.build-tag.outputs.tag}}
steps:
# Need `fetch-depth: 0` to count the number of commits in the branch
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Get build tag
run: |
echo run:$GITHUB_RUN_ID
echo ref:$GITHUB_REF_NAME
echo rev:$(git rev-list --count HEAD)
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
echo "tag=$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
echo "tag=release-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release-proxy" ]]; then
echo "tag=release-proxy-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release-compute" ]]; then
echo "tag=release-compute-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release', 'release-proxy', 'release-compute'"
echo "tag=$GITHUB_RUN_ID" >> $GITHUB_OUTPUT
fi
shell: bash
id: build-tag
build-build-tools-image:
uses: ./.github/workflows/build-build-tools-image.yml
secrets: inherit
build-and-test-locally:
needs: [ tag, build-build-tools-image ]
strategy:
fail-fast: false
matrix:
arch: [ x64, arm64 ]
build-type: [ release ]
uses: ./.github/workflows/_build-and-test-locally.yml
with:
arch: ${{ matrix.arch }}
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
build-tag: ${{ needs.tag.outputs.build-tag }}
build-type: ${{ matrix.build-type }}
test-cfg: '[{"pg_version":"v17"}]'
sanitizers: enabled
secrets: inherit
create-test-report:
needs: [ build-and-test-locally, build-build-tools-image ]
if: ${{ !cancelled() }}
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
contents: write
pull-requests: write
outputs:
report-url: ${{ steps.create-allure-report.outputs.report-url }}
runs-on: [ self-hosted, small ]
container:
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
options: --init
steps:
- uses: actions/checkout@v4
- name: Create Allure report
if: ${{ !cancelled() }}
id: create-allure-report
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
- uses: actions/github-script@v7
if: ${{ !cancelled() }}
with:
# Retry script for 5XX server errors: https://github.com/actions/github-script#retries
retries: 5
script: |
const report = {
reportUrl: "${{ steps.create-allure-report.outputs.report-url }}",
reportJsonUrl: "${{ steps.create-allure-report.outputs.report-json-url }}",
}
const coverage = {}
const script = require("./scripts/comment-test-report.js")
await script({
github,
context,
fetch,
report,
coverage,
})

View File

@@ -1,57 +0,0 @@
name: cargo deny checks
on:
workflow_call:
inputs:
build-tools-image:
required: false
type: string
schedule:
- cron: '0 0 * * *'
jobs:
cargo-deny:
strategy:
matrix:
ref: >-
${{
fromJSON(
github.event_name == 'schedule'
&& '["main","release","release-proxy","release-compute"]'
|| format('["{0}"]', github.sha)
)
}}
runs-on: [self-hosted, small]
container:
image: ${{ inputs.build-tools-image || 'neondatabase/build-tools:pinned' }}
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
options: --init
steps:
- name: Checkout
uses: actions/checkout@v4
with:
ref: ${{ matrix.ref }}
- name: Check rust licenses/bans/advisories/sources
env:
CARGO_DENY_TARGET: >-
${{ github.event_name == 'schedule' && 'advisories' || 'all' }}
run: cargo deny check --hide-inclusion-graph $CARGO_DENY_TARGET
- name: Post to a Slack channel
if: ${{ github.event_name == 'schedule' && failure() }}
uses: slackapi/slack-github-action@v2
with:
method: chat.postMessage
token: ${{ secrets.SLACK_BOT_TOKEN }}
payload: |
channel: ${{ vars.SLACK_CICD_CHANNEL_ID }}
text: |
Periodic cargo-deny on ${{ matrix.ref }}: ${{ job.status }}
<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
Pinging @oncall-devprod.

View File

@@ -114,7 +114,7 @@ jobs:
run: make walproposer-lib -j$(nproc)
- name: Produce the build stats
run: cargo build --all --release --timings -j$(nproc)
run: PQ_LIB_DIR=$(pwd)/pg_install/v17/lib cargo build --all --release --timings -j$(nproc)
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4

View File

@@ -12,8 +12,8 @@ on:
pull_request:
paths:
- '.github/workflows/pg-clients.yml'
- 'test_runner/pg_clients/**/*.py'
- 'test_runner/logical_repl/**/*.py'
- 'test_runner/pg_clients/**'
- 'test_runner/logical_repl/**'
- 'poetry.lock'
workflow_dispatch:
@@ -104,8 +104,6 @@ jobs:
with:
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
postgres_version: ${{ env.DEFAULT_PG_VERSION }}
project_settings: >-
{"enable_logical_replication": true}
- name: Run tests
uses: ./.github/actions/run-python-test-set

View File

@@ -59,10 +59,7 @@ jobs:
echo "${RUST_CHANGED_FILES}"
build-build-tools-image:
if: |
false
|| needs.get-changed-files.outputs.python-changed == 'true'
|| needs.get-changed-files.outputs.rust-changed == 'true'
if: needs.get-changed-files.outputs.python-changed == 'true'
needs: [ get-changed-files ]
uses: ./.github/workflows/build-build-tools-image.yml
with:
@@ -95,8 +92,7 @@ jobs:
# - conclusion
# - neon-cloud-e2e
conclusion:
# Do not run job on Pull Requests as it interferes with the `conclusion` job from the `build_and_test` workflow
if: always() && github.event_name == 'merge_group'
if: always()
permissions:
statuses: write # for `github.repos.createCommitStatus(...)`
contents: write
@@ -128,8 +124,6 @@ jobs:
- name: Fail the job if any of the dependencies do not succeed or skipped
run: exit 1
if: |
false
|| (needs.check-codestyle-python.result == 'skipped' && needs.get-changed-files.outputs.python-changed == 'true')
|| (needs.check-codestyle-rust.result == 'skipped' && needs.get-changed-files.outputs.rust-changed == 'true')
(contains(needs.check-codestyle-python.result, 'skipped') && needs.get-changed-files.outputs.python-changed == 'true')
|| contains(needs.*.result, 'failure')
|| contains(needs.*.result, 'cancelled')

338
Cargo.lock generated
View File

@@ -206,16 +206,6 @@ dependencies = [
"syn 2.0.90",
]
[[package]]
name = "assert-json-diff"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12"
dependencies = [
"serde",
"serde_json",
]
[[package]]
name = "async-channel"
version = "1.9.0"
@@ -786,7 +776,7 @@ dependencies = [
[[package]]
name = "azure_core"
version = "0.21.0"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#985db729824be324ed11527e45de722250028d9e"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#66e77bdd87bf87e773acf3b0c84b532c1124367d"
dependencies = [
"async-trait",
"base64 0.22.1",
@@ -815,7 +805,7 @@ dependencies = [
[[package]]
name = "azure_identity"
version = "0.21.0"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#985db729824be324ed11527e45de722250028d9e"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#66e77bdd87bf87e773acf3b0c84b532c1124367d"
dependencies = [
"async-lock",
"async-trait",
@@ -834,7 +824,7 @@ dependencies = [
[[package]]
name = "azure_storage"
version = "0.21.0"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#985db729824be324ed11527e45de722250028d9e"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#66e77bdd87bf87e773acf3b0c84b532c1124367d"
dependencies = [
"RustyXML",
"async-lock",
@@ -852,7 +842,7 @@ dependencies = [
[[package]]
name = "azure_storage_blobs"
version = "0.21.0"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#985db729824be324ed11527e45de722250028d9e"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#66e77bdd87bf87e773acf3b0c84b532c1124367d"
dependencies = [
"RustyXML",
"azure_core",
@@ -872,7 +862,7 @@ dependencies = [
[[package]]
name = "azure_svc_blobstorage"
version = "0.21.0"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#985db729824be324ed11527e45de722250028d9e"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#66e77bdd87bf87e773acf3b0c84b532c1124367d"
dependencies = [
"azure_core",
"bytes",
@@ -951,18 +941,6 @@ version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b"
[[package]]
name = "bb8"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d89aabfae550a5c44b43ab941844ffcd2e993cb6900b342debf59e9ea74acdb8"
dependencies = [
"async-trait",
"futures-util",
"parking_lot 0.12.1",
"tokio",
]
[[package]]
name = "bcder"
version = "0.7.4"
@@ -988,7 +966,7 @@ version = "0.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f49d8fed880d473ea71efb9bf597651e77201bdd4893efe54c9e5d65ae04ce6f"
dependencies = [
"bitflags 2.8.0",
"bitflags 2.4.1",
"cexpr",
"clang-sys",
"itertools 0.12.1",
@@ -1016,9 +994,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitflags"
version = "2.8.0"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f68f53c83ab957f72c32642f3868eec03eb974d1fb82e453128456482613d36"
checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07"
[[package]]
name = "block-buffer"
@@ -1029,12 +1007,6 @@ dependencies = [
"generic-array",
]
[[package]]
name = "boxcar"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2721c3c5a6f0e7f7e607125d963fedeb765f545f67adc9d71ed934693881eb42"
[[package]]
name = "bstr"
version = "1.5.0"
@@ -1241,20 +1213,6 @@ version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afb84c814227b90d6895e01398aee0d8033c00e7466aca416fb6a8e0eb19d8a7"
[[package]]
name = "clashmap"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93bd59c81e2bd87a775ae2de75f070f7e2bfe97363a6ad652f46824564c23e4d"
dependencies = [
"crossbeam-utils",
"hashbrown 0.15.2",
"lock_api",
"parking_lot_core 0.9.8",
"polonius-the-crab",
"replace_with",
]
[[package]]
name = "colorchoice"
version = "1.0.0"
@@ -1433,7 +1391,6 @@ dependencies = [
"comfy-table",
"compute_api",
"futures",
"http-utils",
"humantime",
"humantime-serde",
"hyper 0.14.30",
@@ -1592,7 +1549,7 @@ version = "0.27.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df"
dependencies = [
"bitflags 2.8.0",
"bitflags 2.4.1",
"crossterm_winapi",
"libc",
"parking_lot 0.12.1",
@@ -1823,29 +1780,16 @@ version = "2.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccf1bedf64cdb9643204a36dd15b19a6ce8e7aa7f7b105868e9f1fad5ffa7d12"
dependencies = [
"bitflags 2.8.0",
"bitflags 2.4.1",
"byteorder",
"chrono",
"diesel_derives",
"itoa",
"pq-sys",
"r2d2",
"serde_json",
]
[[package]]
name = "diesel-async"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51a307ac00f7c23f526a04a77761a0519b9f0eb2838ebf5b905a58580095bdcb"
dependencies = [
"async-trait",
"bb8",
"diesel",
"futures-util",
"scoped-futures",
"tokio",
"tokio-postgres",
]
[[package]]
name = "diesel_derives"
version = "2.2.1"
@@ -2459,16 +2403,6 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "gettid"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "397256552fed4a9e577850498071831ec8f18ea83368aecc114cab469dcb43e5"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "gimli"
version = "0.31.1"
@@ -2597,12 +2531,6 @@ dependencies = [
"allocator-api2",
]
[[package]]
name = "hashbrown"
version = "0.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289"
[[package]]
name = "hashlink"
version = "0.9.1"
@@ -2653,15 +2581,6 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fe2267d4ed49bc07b63801559be28c718ea06c4738b7a03c94df7386d2cde46"
[[package]]
name = "higher-kinded-types"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "561985554c8b8d4808605c90a5f1979cc6c31a5d20b78465cd59501233c6678e"
dependencies = [
"never-say-never",
]
[[package]]
name = "hmac"
version = "0.12.1"
@@ -2758,38 +2677,6 @@ dependencies = [
"url",
]
[[package]]
name = "http-utils"
version = "0.1.0"
dependencies = [
"anyhow",
"backtrace",
"bytes",
"fail",
"flate2",
"hyper 0.14.30",
"inferno 0.12.0",
"itertools 0.10.5",
"jemalloc_pprof",
"metrics",
"once_cell",
"pprof",
"regex",
"routerify",
"serde",
"serde_json",
"serde_path_to_error",
"thiserror 1.0.69",
"tokio",
"tokio-stream",
"tokio-util",
"tracing",
"url",
"utils",
"uuid",
"workspace_hack",
]
[[package]]
name = "httparse"
version = "1.8.0"
@@ -3172,11 +3059,11 @@ dependencies = [
[[package]]
name = "inotify"
version = "0.11.0"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f37dccff2791ab604f9babef0ba14fbe0be30bd368dc541e2b08d07c8aa908f3"
checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff"
dependencies = [
"bitflags 2.8.0",
"bitflags 1.3.2",
"inotify-sys",
"libc",
]
@@ -3353,9 +3240,9 @@ dependencies = [
[[package]]
name = "kqueue"
version = "1.0.8"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c"
checksum = "2c8fc60ba15bf51257aa9807a48a61013db043fcf3a78cb0d916e8e396dcad98"
dependencies = [
"kqueue-sys",
"libc",
@@ -3363,9 +3250,9 @@ dependencies = [
[[package]]
name = "kqueue-sys"
version = "1.0.4"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b"
checksum = "8367585489f01bc55dd27404dcf56b95e6da061a256a666ab23be9ba96a2e587"
dependencies = [
"bitflags 1.3.2",
"libc",
@@ -3392,9 +3279,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.169"
version = "0.2.167"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a"
checksum = "09d6582e104315a817dff97f75133544b2e094ee22447d2acf4a74e189ba06fc"
[[package]]
name = "libloading"
@@ -3641,14 +3528,14 @@ dependencies = [
[[package]]
name = "mio"
version = "1.0.3"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd"
checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
dependencies = [
"libc",
"log",
"wasi 0.11.0+wasi-snapshot-preview1",
"windows-sys 0.52.0",
"windows-sys 0.48.0",
]
[[package]]
@@ -3657,12 +3544,6 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "never-say-never"
version = "6.6.666"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf5a574dadd7941adeaa71823ecba5e28331b8313fb2e1c6a5c7e5981ea53ad6"
[[package]]
name = "nix"
version = "0.25.1"
@@ -3694,7 +3575,7 @@ version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053"
dependencies = [
"bitflags 2.8.0",
"bitflags 2.4.1",
"cfg-if",
"libc",
"memoffset 0.9.0",
@@ -3712,11 +3593,12 @@ dependencies = [
[[package]]
name = "notify"
version = "8.0.0"
version = "6.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fee8403b3d66ac7b26aee6e40a897d85dc5ce26f44da36b8b73e987cc52e943"
checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d"
dependencies = [
"bitflags 2.8.0",
"bitflags 2.4.1",
"crossbeam-channel",
"filetime",
"fsevent-sys",
"inotify",
@@ -3724,17 +3606,10 @@ dependencies = [
"libc",
"log",
"mio",
"notify-types",
"walkdir",
"windows-sys 0.59.0",
"windows-sys 0.48.0",
]
[[package]]
name = "notify-types"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e0826a989adedc2a244799e823aece04662b66609d96af8dff7ac6df9a8925d"
[[package]]
name = "ntapi"
version = "0.4.1"
@@ -4144,7 +4019,6 @@ dependencies = [
"futures",
"hex",
"hex-literal",
"http-utils",
"humantime",
"humantime-serde",
"hyper 0.14.30",
@@ -4245,7 +4119,6 @@ dependencies = [
"anyhow",
"bytes",
"futures",
"http-utils",
"pageserver_api",
"postgres",
"reqwest",
@@ -4282,16 +4155,6 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "papaya"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc7c76487f7eaa00a0fc1d7f88dc6b295aec478d11b0fc79f857b62c2874124c"
dependencies = [
"equivalent",
"seize",
]
[[package]]
name = "parking"
version = "2.1.1"
@@ -4558,20 +4421,10 @@ dependencies = [
"plotters-backend",
]
[[package]]
name = "polonius-the-crab"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e97ca2c89572ae41bbec1c99498251f87dd5a94e500c5ec19c382dd593dd5ce9"
dependencies = [
"higher-kinded-types",
"never-say-never",
]
[[package]]
name = "postgres"
version = "0.19.7"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#1f21e7959a96a34dcfbfce1b14b73286cdadffe9"
version = "0.19.6"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#8b44892f7851e705810b2cb54504325699966070"
dependencies = [
"bytes",
"fallible-iterator",
@@ -4584,9 +4437,9 @@ dependencies = [
[[package]]
name = "postgres-protocol"
version = "0.6.6"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#1f21e7959a96a34dcfbfce1b14b73286cdadffe9"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#8b44892f7851e705810b2cb54504325699966070"
dependencies = [
"base64 0.22.1",
"base64 0.21.1",
"byteorder",
"bytes",
"fallible-iterator",
@@ -4618,7 +4471,7 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.6"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#1f21e7959a96a34dcfbfce1b14b73286cdadffe9"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#8b44892f7851e705810b2cb54504325699966070"
dependencies = [
"bytes",
"chrono",
@@ -4750,6 +4603,15 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "pq-sys"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6cc05d7ea95200187117196eee9edd0644424911821aeb28a18ce60ea0b8793"
dependencies = [
"vcpkg",
]
[[package]]
name = "pq_proto"
version = "0.1.0"
@@ -4798,7 +4660,7 @@ version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "731e0d9356b0c25f16f33b5be79b1c57b562f141ebfcdb0ad8ac2c13a24293b4"
dependencies = [
"bitflags 2.8.0",
"bitflags 2.4.1",
"chrono",
"flate2",
"hex",
@@ -4813,7 +4675,7 @@ version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d3554923a69f4ce04c4a754260c338f505ce22642d3830e049a399fc2059a29"
dependencies = [
"bitflags 2.8.0",
"bitflags 2.4.1",
"chrono",
"hex",
]
@@ -4919,7 +4781,6 @@ dependencies = [
"ahash",
"anyhow",
"arc-swap",
"assert-json-diff",
"async-compression",
"async-trait",
"atomic-take",
@@ -4927,16 +4788,15 @@ dependencies = [
"aws-sdk-iam",
"aws-sigv4",
"base64 0.13.1",
"boxcar",
"bstr",
"bytes",
"camino",
"camino-tempfile",
"chrono",
"clap",
"clashmap",
"compute_api",
"consumption_metrics",
"dashmap 5.5.0",
"ecdsa 0.16.9",
"ed25519-dalek",
"env_logger 0.10.2",
@@ -4944,7 +4804,6 @@ dependencies = [
"flate2",
"framed-websockets",
"futures",
"gettid",
"hashbrown 0.14.5",
"hashlink",
"hex",
@@ -4952,7 +4811,6 @@ dependencies = [
"hostname",
"http 1.1.0",
"http-body-util",
"http-utils",
"humantime",
"humantime-serde",
"hyper 0.14.30",
@@ -4968,9 +4826,7 @@ dependencies = [
"measured",
"metrics",
"once_cell",
"opentelemetry",
"p256 0.13.2",
"papaya",
"parking_lot 0.12.1",
"parquet",
"parquet_derive",
@@ -5017,9 +4873,6 @@ dependencies = [
"tokio-tungstenite 0.21.0",
"tokio-util",
"tracing",
"tracing-log",
"tracing-opentelemetry",
"tracing-serde",
"tracing-subscriber",
"tracing-utils",
"try-lock",
@@ -5071,6 +4924,17 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "r2d2"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
dependencies = [
"log",
"parking_lot 0.12.1",
"scheduled-thread-pool",
]
[[package]]
name = "rand"
version = "0.7.3"
@@ -5351,12 +5215,6 @@ dependencies = [
"utils",
]
[[package]]
name = "replace_with"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3a8614ee435691de62bcffcf4a66d91b3594bf1428a5722e79103249a095690"
[[package]]
name = "reqwest"
version = "0.12.4"
@@ -5636,7 +5494,7 @@ version = "0.38.41"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7f649912bc1495e167a6edee79151c84b1bad49748cb4f1f1167f459f6224f6"
dependencies = [
"bitflags 2.8.0",
"bitflags 2.4.1",
"errno",
"libc",
"linux-raw-sys 0.4.14",
@@ -5800,7 +5658,6 @@ dependencies = [
"futures",
"hex",
"http 1.1.0",
"http-utils",
"humantime",
"hyper 0.14.30",
"itertools 0.10.5",
@@ -5865,7 +5722,6 @@ dependencies = [
name = "safekeeper_client"
version = "0.1.0"
dependencies = [
"http-utils",
"reqwest",
"safekeeper_api",
"serde",
@@ -5893,12 +5749,12 @@ dependencies = [
]
[[package]]
name = "scoped-futures"
version = "0.1.4"
name = "scheduled-thread-pool"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b24aae2d0636530f359e9d5ef0c04669d11c5e756699b27a6a6d845d8329091"
checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19"
dependencies = [
"pin-project-lite",
"parking_lot 0.12.1",
]
[[package]]
@@ -5975,16 +5831,6 @@ dependencies = [
"libc",
]
[[package]]
name = "seize"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d84b0c858bdd30cb56f5597f8b3bf702ec23829e652cc636a1e5a7b9de46ae93"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]]
name = "semver"
version = "1.0.17"
@@ -6443,12 +6289,10 @@ dependencies = [
"clap",
"control_plane",
"diesel",
"diesel-async",
"diesel_migrations",
"fail",
"futures",
"hex",
"http-utils",
"humantime",
"hyper 0.14.30",
"itertools 0.10.5",
@@ -6459,12 +6303,10 @@ dependencies = [
"pageserver_api",
"pageserver_client",
"postgres_connection",
"r2d2",
"rand 0.8.5",
"reqwest",
"routerify",
"rustls 0.23.18",
"rustls-native-certs 0.8.0",
"scoped-futures",
"scopeguard",
"serde",
"serde_json",
@@ -6472,8 +6314,6 @@ dependencies = [
"strum_macros",
"thiserror 1.0.69",
"tokio",
"tokio-postgres",
"tokio-postgres-rustls",
"tokio-util",
"tracing",
"utils",
@@ -6716,7 +6556,7 @@ dependencies = [
"fastrand 2.2.0",
"once_cell",
"rustix",
"windows-sys 0.59.0",
"windows-sys 0.52.0",
]
[[package]]
@@ -6928,20 +6768,21 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.43.0"
version = "1.38.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e"
checksum = "eb2caba9f80616f438e09748d5acda951967e1ea58508ef53d9c6402485a46df"
dependencies = [
"backtrace",
"bytes",
"libc",
"mio",
"num_cpus",
"parking_lot 0.12.1",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.52.0",
"windows-sys 0.48.0",
]
[[package]]
@@ -6972,9 +6813,9 @@ dependencies = [
[[package]]
name = "tokio-macros"
version = "2.5.0"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8"
checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a"
dependencies = [
"proc-macro2",
"quote",
@@ -6983,8 +6824,8 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.10"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#1f21e7959a96a34dcfbfce1b14b73286cdadffe9"
version = "0.7.9"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#8b44892f7851e705810b2cb54504325699966070"
dependencies = [
"async-trait",
"byteorder",
@@ -7259,7 +7100,7 @@ version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "403fa3b783d4b626a8ad51d766ab03cb6d2dbfc46b1c5d4448395e6628dc9697"
dependencies = [
"bitflags 2.8.0",
"bitflags 2.4.1",
"bytes",
"http 1.1.0",
"http-body 1.0.0",
@@ -7613,38 +7454,48 @@ dependencies = [
"criterion",
"diatomic-waker",
"fail",
"flate2",
"futures",
"git-version",
"hex",
"hex-literal",
"humantime",
"hyper 0.14.30",
"inferno 0.12.0",
"itertools 0.10.5",
"jemalloc_pprof",
"jsonwebtoken",
"metrics",
"nix 0.27.1",
"once_cell",
"pin-project-lite",
"postgres_connection",
"pprof",
"pq_proto",
"rand 0.8.5",
"regex",
"routerify",
"scopeguard",
"sentry",
"serde",
"serde_assert",
"serde_json",
"serde_path_to_error",
"serde_with",
"signal-hook",
"strum",
"strum_macros",
"thiserror 1.0.69",
"tokio",
"tokio-stream",
"tokio-tar",
"tokio-util",
"toml_edit",
"tracing",
"tracing-error",
"tracing-subscriber",
"url",
"uuid",
"walkdir",
]
@@ -7664,6 +7515,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "version_check"
version = "0.9.4"
@@ -7746,9 +7603,9 @@ dependencies = [
[[package]]
name = "walkdir"
version = "2.5.0"
version = "2.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b"
checksum = "36df944cda56c7d8d8b7496af378e6b16de9284591917d307c9b4d313c44e698"
dependencies = [
"same-file",
"winapi-util",
@@ -8000,15 +7857,6 @@ dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "windows-sys"
version = "0.59.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b"
dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "windows-targets"
version = "0.48.0"
@@ -8237,9 +8085,7 @@ dependencies = [
"tower 0.4.13",
"tracing",
"tracing-core",
"tracing-log",
"url",
"uuid",
"zerocopy",
"zeroize",
"zstd",

View File

@@ -18,7 +18,6 @@ members = [
"storage_scrubber",
"workspace_hack",
"libs/compute_api",
"libs/http-utils",
"libs/pageserver_api",
"libs/postgres_ffi",
"libs/safekeeper_api",
@@ -55,7 +54,6 @@ async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
atomic-take = "1.1.0"
backtrace = "0.3.74"
flate2 = "1.0.26"
assert-json-diff = "2"
async-stream = "0.3"
async-trait = "0.1"
aws-config = { version = "1.5", default-features = false, features=["rustls", "sso"] }
@@ -79,10 +77,10 @@ camino = "1.1.6"
cfg-if = "1.0.0"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
clap = { version = "4.0", features = ["derive", "env"] }
clashmap = { version = "1.0", features = ["raw-api"] }
comfy-table = "7.1"
const_format = "0.2"
crc32c = "0.6"
dashmap = { version = "5.5.0", features = ["raw-api"] }
diatomic-waker = { version = "0.2.3" }
either = "1.8"
enum-map = "2.4.2"
@@ -125,7 +123,7 @@ measured = { version = "0.0.22", features=["lasso"] }
measured-process = { version = "0.0.22" }
memoffset = "0.9"
nix = { version = "0.27", features = ["dir", "fs", "process", "socket", "signal", "poll"] }
notify = "8.0.0"
notify = "6.0.0"
num_cpus = "1.15"
num-traits = "0.2.15"
once_cell = "1.13"
@@ -179,7 +177,7 @@ test-context = "0.3"
thiserror = "1.0"
tikv-jemallocator = { version = "0.6", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] }
tikv-jemalloc-ctl = { version = "0.6", features = ["stats"] }
tokio = { version = "1.41", features = ["macros"] }
tokio = { version = "1.17", features = ["macros"] }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
tokio-io-timeout = "1.2.0"
tokio-postgres-rustls = "0.12.0"
@@ -195,9 +193,7 @@ tower-http = { version = "0.6.2", features = ["request-id", "trace"] }
tower-service = "0.3.3"
tracing = "0.1"
tracing-error = "0.2"
tracing-log = "0.2"
tracing-opentelemetry = "0.28"
tracing-serde = "0.2.0"
tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] }
try-lock = "0.2.5"
twox-hash = { version = "1.6.3", default-features = false }
@@ -230,7 +226,6 @@ azure_storage_blobs = { git = "https://github.com/neondatabase/azure-sdk-for-rus
## Local libraries
compute_api = { version = "0.1", path = "./libs/compute_api/" }
consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }
http-utils = { version = "0.1", path = "./libs/http-utils/" }
metrics = { version = "0.1", path = "./libs/metrics/" }
pageserver = { path = "./pageserver" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }

View File

@@ -45,7 +45,7 @@ COPY --chown=nonroot . .
ARG ADDITIONAL_RUSTFLAGS
RUN set -e \
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo build \
&& PQ_LIB_DIR=$(pwd)/pg_install/v${STABLE_PG_VERSION}/lib RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo build \
--bin pg_sni_router \
--bin pageserver \
--bin pagectl \

View File

@@ -10,29 +10,18 @@ ICU_PREFIX_DIR := /usr/local/icu
# environment variable.
#
BUILD_TYPE ?= debug
WITH_SANITIZERS ?= no
ifeq ($(BUILD_TYPE),release)
PG_CONFIGURE_OPTS = --enable-debug --with-openssl
PG_CFLAGS = -O2 -g3 $(CFLAGS)
PG_LDFLAGS = $(LDFLAGS)
# Unfortunately, `--profile=...` is a nightly feature
CARGO_BUILD_FLAGS += --release
else ifeq ($(BUILD_TYPE),debug)
PG_CONFIGURE_OPTS = --enable-debug --with-openssl --enable-cassert --enable-depend
PG_CFLAGS = -O0 -g3 $(CFLAGS)
PG_LDFLAGS = $(LDFLAGS)
else
$(error Bad build type '$(BUILD_TYPE)', see Makefile for options)
endif
ifeq ($(WITH_SANITIZERS),yes)
PG_CFLAGS += -fsanitize=address -fsanitize=undefined -fno-sanitize-recover
COPT += -Wno-error # to avoid failing on warnings induced by sanitizers
PG_LDFLAGS = -fsanitize=address -fsanitize=undefined -static-libasan -static-libubsan $(LDFLAGS)
export CC := gcc
export ASAN_OPTIONS := detect_leaks=0
endif
ifeq ($(shell test -e /home/nonroot/.docker_build && echo -n yes),yes)
# Exclude static build openssl, icu for local build (MacOS, Linux)
# Only keep for build type release and debug
@@ -44,9 +33,7 @@ endif
UNAME_S := $(shell uname -s)
ifeq ($(UNAME_S),Linux)
# Seccomp BPF is only available for Linux
ifneq ($(WITH_SANITIZERS),yes)
PG_CONFIGURE_OPTS += --with-libseccomp
endif
PG_CONFIGURE_OPTS += --with-libseccomp
else ifeq ($(UNAME_S),Darwin)
PG_CFLAGS += -DUSE_PREFETCH
ifndef DISABLE_HOMEBREW
@@ -77,6 +64,8 @@ CARGO_BUILD_FLAGS += $(filter -j1,$(MAKEFLAGS))
CARGO_CMD_PREFIX += $(if $(filter n,$(MAKEFLAGS)),,+)
# Force cargo not to print progress bar
CARGO_CMD_PREFIX += CARGO_TERM_PROGRESS_WHEN=never CI=1
# Set PQ_LIB_DIR to make sure `storage_controller` get linked with bundled libpq (through diesel)
CARGO_CMD_PREFIX += PQ_LIB_DIR=$(POSTGRES_INSTALL_DIR)/v16/lib
CACHEDIR_TAG_CONTENTS := "Signature: 8a477f597d28d172789f06886806bc55"
@@ -119,7 +108,7 @@ $(POSTGRES_INSTALL_DIR)/build/%/config.status:
EXTRA_VERSION=$$(cd $(ROOT_PROJECT_DIR)/vendor/postgres-$$VERSION && git rev-parse HEAD); \
(cd $(POSTGRES_INSTALL_DIR)/build/$$VERSION && \
env PATH="$(EXTRA_PATH_OVERRIDES):$$PATH" $(ROOT_PROJECT_DIR)/vendor/postgres-$$VERSION/configure \
CFLAGS='$(PG_CFLAGS)' LDFLAGS='$(PG_LDFLAGS)' \
CFLAGS='$(PG_CFLAGS)' \
$(PG_CONFIGURE_OPTS) --with-extra-version=" ($$EXTRA_VERSION)" \
--prefix=$(abspath $(POSTGRES_INSTALL_DIR))/$$VERSION > configure.log)

View File

@@ -3,17 +3,10 @@ ARG DEBIAN_VERSION=bookworm
FROM debian:bookworm-slim AS pgcopydb_builder
ARG DEBIAN_VERSION
# Use strict mode for bash to catch errors early
SHELL ["/bin/bash", "-euo", "pipefail", "-c"]
# By default, /bin/sh used in debian images will treat '\n' as eol,
# but as we use bash as SHELL, and built-in echo in bash requires '-e' flag for that.
RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc && \
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc \
echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /root/.curlrc
COPY build_tools/patches/pgcopydbv017.patch /pgcopydbv017.patch
RUN if [ "${DEBIAN_VERSION}" = "bookworm" ]; then \
set -e && \
apt update && \
@@ -46,7 +39,6 @@ RUN if [ "${DEBIAN_VERSION}" = "bookworm" ]; then \
mkdir /tmp/pgcopydb && \
tar -xzf /tmp/pgcopydb.tar.gz -C /tmp/pgcopydb --strip-components=1 && \
cd /tmp/pgcopydb && \
patch -p1 < /pgcopydbv017.patch && \
make -s clean && \
make -s -j12 install && \
libpq_path=$(find /lib /usr/lib -name "libpq.so.5" | head -n 1) && \
@@ -63,8 +55,7 @@ ARG DEBIAN_VERSION
# Add nonroot user
RUN useradd -ms /bin/bash nonroot -b /home
# Use strict mode for bash to catch errors early
SHELL ["/bin/bash", "-euo", "pipefail", "-c"]
SHELL ["/bin/bash", "-c"]
RUN mkdir -p /pgcopydb/bin && \
mkdir -p /pgcopydb/lib && \
@@ -75,7 +66,7 @@ COPY --from=pgcopydb_builder /usr/lib/postgresql/16/bin/pgcopydb /pgcopydb/bin/p
COPY --from=pgcopydb_builder /pgcopydb/lib/libpq.so.5 /pgcopydb/lib/libpq.so.5
RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc && \
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc \
echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /root/.curlrc
# System deps
@@ -199,14 +190,8 @@ RUN set -e \
# It includes several bug fixes on top on v2.0 release (https://github.com/linux-test-project/lcov/compare/v2.0...master)
# And patches from us:
# - Generates json file with code coverage summary (https://github.com/neondatabase/lcov/commit/426e7e7a22f669da54278e9b55e6d8caabd00af0.tar.gz)
RUN set +o pipefail && \
for package in Capture::Tiny DateTime Devel::Cover Digest::MD5 File::Spec JSON::XS Memory::Process Time::HiRes JSON; do \
yes | perl -MCPAN -e "CPAN::Shell->notest('install', '$package')";\
done && \
set -o pipefail
# Split into separate step to debug flaky failures here
RUN wget https://github.com/neondatabase/lcov/archive/426e7e7a22f669da54278e9b55e6d8caabd00af0.tar.gz -O lcov.tar.gz \
&& ls -laht lcov.tar.gz && sha256sum lcov.tar.gz \
RUN for package in Capture::Tiny DateTime Devel::Cover Digest::MD5 File::Spec JSON::XS Memory::Process Time::HiRes JSON; do yes | perl -MCPAN -e "CPAN::Shell->notest('install', '$package')"; done \
&& wget https://github.com/neondatabase/lcov/archive/426e7e7a22f669da54278e9b55e6d8caabd00af0.tar.gz -O lcov.tar.gz \
&& echo "61a22a62e20908b8b9e27d890bd0ea31f567a7b9668065589266371dcbca0992 lcov.tar.gz" | sha256sum --check \
&& mkdir -p lcov && tar -xzf lcov.tar.gz -C lcov --strip-components=1 \
&& cd lcov \
@@ -268,7 +253,7 @@ WORKDIR /home/nonroot
# Rust
# Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`)
ENV RUSTC_VERSION=1.84.1
ENV RUSTC_VERSION=1.84.0
ENV RUSTUP_HOME="/home/nonroot/.rustup"
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
ARG RUSTFILT_VERSION=0.2.1
@@ -276,7 +261,6 @@ ARG CARGO_HAKARI_VERSION=0.9.33
ARG CARGO_DENY_VERSION=0.16.2
ARG CARGO_HACK_VERSION=0.6.33
ARG CARGO_NEXTEST_VERSION=0.9.85
ARG CARGO_DIESEL_CLI_VERSION=2.2.6
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && whoami && \
chmod +x rustup-init && \
./rustup-init -y --default-toolchain ${RUSTC_VERSION} && \
@@ -290,8 +274,6 @@ RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux
cargo install cargo-deny --locked --version ${CARGO_DENY_VERSION} && \
cargo install cargo-hack --version ${CARGO_HACK_VERSION} && \
cargo install cargo-nextest --version ${CARGO_NEXTEST_VERSION} && \
cargo install diesel_cli --version ${CARGO_DIESEL_CLI_VERSION} \
--features postgres-bundled --no-default-features && \
rm -rf /home/nonroot/.cargo/registry && \
rm -rf /home/nonroot/.cargo/git

View File

@@ -1,57 +0,0 @@
diff --git a/src/bin/pgcopydb/copydb.c b/src/bin/pgcopydb/copydb.c
index d730b03..69a9be9 100644
--- a/src/bin/pgcopydb/copydb.c
+++ b/src/bin/pgcopydb/copydb.c
@@ -44,6 +44,7 @@ GUC dstSettings[] = {
{ "synchronous_commit", "'off'" },
{ "statement_timeout", "0" },
{ "lock_timeout", "0" },
+ { "idle_in_transaction_session_timeout", "0" },
{ NULL, NULL },
};
diff --git a/src/bin/pgcopydb/pgsql.c b/src/bin/pgcopydb/pgsql.c
index 94f2f46..e051ba8 100644
--- a/src/bin/pgcopydb/pgsql.c
+++ b/src/bin/pgcopydb/pgsql.c
@@ -2319,6 +2319,11 @@ pgsql_execute_log_error(PGSQL *pgsql,
LinesBuffer lbuf = { 0 };
+ if (message != NULL){
+ // make sure message is writable by splitLines
+ message = strdup(message);
+ }
+
if (!splitLines(&lbuf, message))
{
/* errors have already been logged */
@@ -2332,6 +2337,7 @@ pgsql_execute_log_error(PGSQL *pgsql,
PQbackendPID(pgsql->connection),
lbuf.lines[lineNumber]);
}
+ free(message); // free copy of message we created above
if (pgsql->logSQL)
{
@@ -3174,11 +3180,18 @@ pgcopy_log_error(PGSQL *pgsql, PGresult *res, const char *context)
/* errors have already been logged */
return;
}
-
if (res != NULL)
{
char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
- strlcpy(pgsql->sqlstate, sqlstate, sizeof(pgsql->sqlstate));
+ if (sqlstate == NULL)
+ {
+ // PQresultErrorField returned NULL!
+ pgsql->sqlstate[0] = '\0'; // Set to an empty string to avoid segfault
+ }
+ else
+ {
+ strlcpy(pgsql->sqlstate, sqlstate, sizeof(pgsql->sqlstate));
+ }
}
char *endpoint =

File diff suppressed because it is too large Load Diff

View File

@@ -6,16 +6,16 @@ index da723b8..5328114 100644
----
-- No.A-1-1-3
CREATE EXTENSION pg_hint_plan;
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
+LOG: Sending request to compute_ctl: http://localhost:3080/extension_server/pg_hint_plan
-- No.A-1-2-3
DROP EXTENSION pg_hint_plan;
-- No.A-1-1-4
CREATE SCHEMA other_schema;
CREATE EXTENSION pg_hint_plan SCHEMA other_schema;
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
+LOG: Sending request to compute_ctl: http://localhost:3080/extension_server/pg_hint_plan
ERROR: extension "pg_hint_plan" must be installed in schema "hint_plan"
CREATE EXTENSION pg_hint_plan;
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
+LOG: Sending request to compute_ctl: http://localhost:3080/extension_server/pg_hint_plan
DROP SCHEMA other_schema;
----
---- No. A-5-1 comment pattern
@@ -35,7 +35,7 @@ index d372459..6282afe 100644
SET client_min_messages TO LOG;
SET pg_hint_plan.enable_hint TO on;
CREATE EXTENSION file_fdw;
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/file_fdw
+LOG: Sending request to compute_ctl: http://localhost:3080/extension_server/file_fdw
CREATE SERVER file_server FOREIGN DATA WRAPPER file_fdw;
CREATE USER MAPPING FOR PUBLIC SERVER file_server;
CREATE FOREIGN TABLE ft1 (id int, val int) SERVER file_server OPTIONS (format 'csv', filename :'filename');

View File

@@ -6,16 +6,16 @@ index e7d68a1..65a056c 100644
----
-- No.A-1-1-3
CREATE EXTENSION pg_hint_plan;
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
+LOG: Sending request to compute_ctl: http://localhost:3080/extension_server/pg_hint_plan
-- No.A-1-2-3
DROP EXTENSION pg_hint_plan;
-- No.A-1-1-4
CREATE SCHEMA other_schema;
CREATE EXTENSION pg_hint_plan SCHEMA other_schema;
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
+LOG: Sending request to compute_ctl: http://localhost:3080/extension_server/pg_hint_plan
ERROR: extension "pg_hint_plan" must be installed in schema "hint_plan"
CREATE EXTENSION pg_hint_plan;
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/pg_hint_plan
+LOG: Sending request to compute_ctl: http://localhost:3080/extension_server/pg_hint_plan
DROP SCHEMA other_schema;
----
---- No. A-5-1 comment pattern
@@ -168,7 +168,7 @@ index 017fa4b..98d989b 100644
SET client_min_messages TO LOG;
SET pg_hint_plan.enable_hint TO on;
CREATE EXTENSION file_fdw;
+LOG: Sending request to compute_ctl: http://localhost:3081/extension_server/file_fdw
+LOG: Sending request to compute_ctl: http://localhost:3080/extension_server/file_fdw
CREATE SERVER file_server FOREIGN DATA WRAPPER file_fdw;
CREATE USER MAPPING FOR PUBLIC SERVER file_server;
CREATE FOREIGN TABLE ft1 (id int, val int) SERVER file_server OPTIONS (format 'csv', filename :'filename');

View File

@@ -47,9 +47,7 @@ files:
# Allow postgres user (which is what compute_ctl runs as) to run /neonvm/bin/resize-swap
# and /neonvm/bin/set-disk-quota as root without requiring entering a password (NOPASSWD),
# regardless of hostname (ALL)
#
# Also allow it to shut down the VM. The fast_import job does that when it's finished.
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota
- filename: cgconfig.conf
content: |
# Configuration for cgroups in VM compute nodes

View File

@@ -34,21 +34,18 @@
//! -r http://pg-ext-s3-gateway \
//! ```
use std::collections::HashMap;
use std::ffi::OsString;
use std::fs::File;
use std::path::Path;
use std::process::exit;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::sync::{mpsc, Arc, Condvar, Mutex, RwLock};
use std::time::SystemTime;
use std::{thread, time::Duration};
use anyhow::{Context, Result};
use chrono::Utc;
use clap::Parser;
use clap::Arg;
use compute_tools::disk_quota::set_disk_quota;
use compute_tools::http::server::Server;
use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static;
use signal_hook::consts::{SIGQUIT, SIGTERM};
use signal_hook::{consts::SIGINT, iterator::Signals};
@@ -63,6 +60,7 @@ use compute_tools::compute::{
};
use compute_tools::configurator::launch_configurator;
use compute_tools::extension_server::get_pg_version_string;
use compute_tools::http::launch_http_server;
use compute_tools::logger::*;
use compute_tools::monitor::launch_monitor;
use compute_tools::params::*;
@@ -75,110 +73,11 @@ use utils::failpoint_support;
// in-case of not-set environment var
const BUILD_TAG_DEFAULT: &str = "latest";
// Compatibility hack: if the control plane specified any remote-ext-config
// use the default value for extension storage proxy gateway.
// Remove this once the control plane is updated to pass the gateway URL
fn parse_remote_ext_config(arg: &str) -> Result<String> {
if arg.starts_with("http") {
Ok(arg.trim_end_matches('/').to_string())
} else {
Ok("http://pg-ext-s3-gateway".to_string())
}
}
/// Generate a compute ID if one is not supplied. This exists to keep forward
/// compatibility tests working, but will be removed in a future iteration.
fn generate_compute_id() -> String {
let now = SystemTime::now();
format!(
"compute-{}",
now.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
)
}
#[derive(Parser)]
#[command(rename_all = "kebab-case")]
struct Cli {
#[arg(short = 'b', long, default_value = "postgres", env = "POSTGRES_PATH")]
pub pgbin: String,
#[arg(short = 'r', long, value_parser = parse_remote_ext_config)]
pub remote_ext_config: Option<String>,
/// The port to bind the external listening HTTP server to. Clients running
/// outside the compute will talk to the compute through this port. Keep
/// the previous name for this argument around for a smoother release
/// with the control plane.
///
/// TODO: Remove the alias after the control plane release which teaches the
/// control plane about the renamed argument.
#[arg(long, alias = "http-port", default_value_t = 3080)]
pub external_http_port: u16,
/// The port to bind the internal listening HTTP server to. Clients like
/// the neon extension (for installing remote extensions) and local_proxy.
#[arg(long)]
pub internal_http_port: Option<u16>,
#[arg(short = 'D', long, value_name = "DATADIR")]
pub pgdata: String,
#[arg(short = 'C', long, value_name = "DATABASE_URL")]
pub connstr: String,
#[cfg(target_os = "linux")]
#[arg(long, default_value = "neon-postgres")]
pub cgroup: String,
#[cfg(target_os = "linux")]
#[arg(
long,
default_value = "host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable application_name=vm-monitor"
)]
pub filecache_connstr: String,
#[cfg(target_os = "linux")]
#[arg(long, default_value = "0.0.0.0:10301")]
pub vm_monitor_addr: String,
#[arg(long, action = clap::ArgAction::SetTrue)]
pub resize_swap_on_bind: bool,
#[arg(long)]
pub set_disk_quota_for_fs: Option<String>,
#[arg(short = 's', long = "spec", group = "spec")]
pub spec_json: Option<String>,
#[arg(short = 'S', long, group = "spec-path")]
pub spec_path: Option<OsString>,
#[arg(short = 'i', long, group = "compute-id", default_value = generate_compute_id())]
pub compute_id: String,
#[arg(short = 'p', long, conflicts_with_all = ["spec", "spec-path"], value_name = "CONTROL_PLANE_API_BASE_URL")]
pub control_plane_uri: Option<String>,
}
fn main() -> Result<()> {
let cli = Cli::parse();
// For historical reasons, the main thread that processes the spec and launches postgres
// is synchronous, but we always have this tokio runtime available and we "enter" it so
// that you can use tokio::spawn() and tokio::runtime::Handle::current().block_on(...)
// from all parts of compute_ctl.
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
let _rt_guard = runtime.enter();
let build_tag = runtime.block_on(init())?;
let scenario = failpoint_support::init();
let (build_tag, clap_args) = init()?;
// enable core dumping for all child processes
setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
@@ -186,11 +85,13 @@ fn main() -> Result<()> {
// Enter startup tracing context
let _startup_context_guard = startup_context_from_env();
let cli_spec = try_spec_from_cli(&cli)?;
let cli_args = process_cli(&clap_args)?;
let compute = wait_spec(build_tag, &cli, cli_spec)?;
let cli_spec = try_spec_from_cli(&clap_args, &cli_args)?;
start_postgres(&cli, compute)?
let wait_spec_result = wait_spec(build_tag, cli_args, cli_spec)?;
start_postgres(&clap_args, wait_spec_result)?
// Startup is finished, exit the startup tracing span
};
@@ -207,8 +108,8 @@ fn main() -> Result<()> {
deinit_and_exit(wait_pg_result);
}
async fn init() -> Result<String> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL).await?;
fn init() -> Result<(String, clap::ArgMatches)> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
thread::spawn(move || {
@@ -222,7 +123,66 @@ async fn init() -> Result<String> {
.to_string();
info!("build_tag: {build_tag}");
Ok(build_tag)
Ok((build_tag, cli().get_matches()))
}
fn process_cli(matches: &clap::ArgMatches) -> Result<ProcessCliResult> {
let pgbin_default = "postgres";
let pgbin = matches
.get_one::<String>("pgbin")
.map(|s| s.as_str())
.unwrap_or(pgbin_default);
let ext_remote_storage = matches
.get_one::<String>("remote-ext-config")
// Compatibility hack: if the control plane specified any remote-ext-config
// use the default value for extension storage proxy gateway.
// Remove this once the control plane is updated to pass the gateway URL
.map(|conf| {
if conf.starts_with("http") {
conf.trim_end_matches('/')
} else {
"http://pg-ext-s3-gateway"
}
});
let http_port = *matches
.get_one::<u16>("http-port")
.expect("http-port is required");
let pgdata = matches
.get_one::<String>("pgdata")
.expect("PGDATA path is required");
let connstr = matches
.get_one::<String>("connstr")
.expect("Postgres connection string is required");
let spec_json = matches.get_one::<String>("spec");
let spec_path = matches.get_one::<String>("spec-path");
let resize_swap_on_bind = matches.get_flag("resize-swap-on-bind");
let set_disk_quota_for_fs = matches.get_one::<String>("set-disk-quota-for-fs");
Ok(ProcessCliResult {
connstr,
pgdata,
pgbin,
ext_remote_storage,
http_port,
spec_json,
spec_path,
resize_swap_on_bind,
set_disk_quota_for_fs,
})
}
struct ProcessCliResult<'clap> {
connstr: &'clap str,
pgdata: &'clap str,
pgbin: &'clap str,
ext_remote_storage: Option<&'clap str>,
http_port: u16,
spec_json: Option<&'clap String>,
spec_path: Option<&'clap String>,
resize_swap_on_bind: bool,
set_disk_quota_for_fs: Option<&'clap String>,
}
fn startup_context_from_env() -> Option<opentelemetry::ContextGuard> {
@@ -275,9 +235,19 @@ fn startup_context_from_env() -> Option<opentelemetry::ContextGuard> {
}
}
fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
fn try_spec_from_cli(
matches: &clap::ArgMatches,
ProcessCliResult {
spec_json,
spec_path,
..
}: &ProcessCliResult,
) -> Result<CliSpecParams> {
let compute_id = matches.get_one::<String>("compute-id");
let control_plane_uri = matches.get_one::<String>("control-plane-uri");
// First, try to get cluster spec from the cli argument
if let Some(ref spec_json) = cli.spec_json {
if let Some(spec_json) = spec_json {
info!("got spec from cli argument {}", spec_json);
return Ok(CliSpecParams {
spec: Some(serde_json::from_str(spec_json)?),
@@ -286,7 +256,7 @@ fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
}
// Second, try to read it from the file if path is provided
if let Some(ref spec_path) = cli.spec_path {
if let Some(spec_path) = spec_path {
let file = File::open(Path::new(spec_path))?;
return Ok(CliSpecParams {
spec: Some(serde_json::from_reader(file)?),
@@ -294,11 +264,17 @@ fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
});
}
if cli.control_plane_uri.is_none() {
panic!("must specify --control-plane-uri");
let Some(compute_id) = compute_id else {
panic!(
"compute spec should be provided by one of the following ways: \
--spec OR --spec-path OR --control-plane-uri and --compute-id"
);
};
let Some(control_plane_uri) = control_plane_uri else {
panic!("must specify both --control-plane-uri and --compute-id or none");
};
match get_spec_from_control_plane(cli.control_plane_uri.as_ref().unwrap(), &cli.compute_id) {
match get_spec_from_control_plane(control_plane_uri, compute_id) {
Ok(spec) => Ok(CliSpecParams {
spec,
live_config_allowed: true,
@@ -322,12 +298,21 @@ struct CliSpecParams {
fn wait_spec(
build_tag: String,
cli: &Cli,
ProcessCliResult {
connstr,
pgdata,
pgbin,
ext_remote_storage,
resize_swap_on_bind,
set_disk_quota_for_fs,
http_port,
..
}: ProcessCliResult,
CliSpecParams {
spec,
live_config_allowed,
}: CliSpecParams,
) -> Result<Arc<ComputeNode>> {
) -> Result<WaitSpecResult> {
let mut new_state = ComputeState::new();
let spec_set;
@@ -339,25 +324,23 @@ fn wait_spec(
} else {
spec_set = false;
}
let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?;
let connstr = Url::parse(connstr).context("cannot parse connstr as a URL")?;
let conn_conf = postgres::config::Config::from_str(connstr.as_str())
.context("cannot build postgres config from connstr")?;
let tokio_conn_conf = tokio_postgres::config::Config::from_str(connstr.as_str())
.context("cannot build tokio postgres config from connstr")?;
let compute_node = ComputeNode {
compute_id: cli.compute_id.clone(),
connstr,
conn_conf,
tokio_conn_conf,
pgdata: cli.pgdata.clone(),
pgbin: cli.pgbin.clone(),
pgversion: get_pg_version_string(&cli.pgbin),
external_http_port: cli.external_http_port,
internal_http_port: cli.internal_http_port.unwrap_or(cli.external_http_port + 1),
pgdata: pgdata.to_string(),
pgbin: pgbin.to_string(),
pgversion: get_pg_version_string(pgbin),
http_port,
live_config_allowed,
state: Mutex::new(new_state),
state_changed: Condvar::new(),
ext_remote_storage: cli.remote_ext_config.clone(),
ext_remote_storage: ext_remote_storage.map(|s| s.to_string()),
ext_download_progress: RwLock::new(HashMap::new()),
build_tag,
};
@@ -371,13 +354,10 @@ fn wait_spec(
compute.prewarm_postgres()?;
}
// Launch the external HTTP server first, so that we can serve control plane
// requests while configuration is still in progress.
Server::External(cli.external_http_port).launch(&compute);
// The internal HTTP server could be launched later, but there isn't much
// sense in waiting.
Server::Internal(cli.internal_http_port.unwrap_or(cli.external_http_port + 1)).launch(&compute);
// Launch http service first, so that we can serve control-plane requests
// while configuration is still in progress.
let _http_handle =
launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread");
if !spec_set {
// No spec provided, hang waiting for it.
@@ -409,12 +389,27 @@ fn wait_spec(
launch_lsn_lease_bg_task_for_static(&compute);
Ok(compute)
Ok(WaitSpecResult {
compute,
resize_swap_on_bind,
set_disk_quota_for_fs: set_disk_quota_for_fs.cloned(),
})
}
struct WaitSpecResult {
compute: Arc<ComputeNode>,
resize_swap_on_bind: bool,
set_disk_quota_for_fs: Option<String>,
}
fn start_postgres(
cli: &Cli,
compute: Arc<ComputeNode>,
// need to allow unused because `matches` is only used if target_os = "linux"
#[allow(unused_variables)] matches: &clap::ArgMatches,
WaitSpecResult {
compute,
resize_swap_on_bind,
set_disk_quota_for_fs,
}: WaitSpecResult,
) -> Result<(Option<PostgresHandle>, StartPostgresResult)> {
// We got all we need, update the state.
let mut state = compute.state.lock().unwrap();
@@ -442,7 +437,7 @@ fn start_postgres(
let mut delay_exit = false;
// Resize swap to the desired size if the compute spec says so
if let (Some(size_bytes), true) = (swap_size_bytes, cli.resize_swap_on_bind) {
if let (Some(size_bytes), true) = (swap_size_bytes, resize_swap_on_bind) {
// To avoid 'swapoff' hitting postgres startup, we need to run resize-swap to completion
// *before* starting postgres.
//
@@ -469,9 +464,9 @@ fn start_postgres(
// Set disk quota if the compute spec says so
if let (Some(disk_quota_bytes), Some(disk_quota_fs_mountpoint)) =
(disk_quota_bytes, cli.set_disk_quota_for_fs.as_ref())
(disk_quota_bytes, set_disk_quota_for_fs)
{
match set_disk_quota(disk_quota_bytes, disk_quota_fs_mountpoint) {
match set_disk_quota(disk_quota_bytes, &disk_quota_fs_mountpoint) {
Ok(()) => {
let size_mib = disk_quota_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
info!(%disk_quota_bytes, %size_mib, "set disk quota");
@@ -514,6 +509,27 @@ fn start_postgres(
if #[cfg(target_os = "linux")] {
use std::env;
use tokio_util::sync::CancellationToken;
let vm_monitor_addr = matches
.get_one::<String>("vm-monitor-addr")
.expect("--vm-monitor-addr should always be set because it has a default arg");
let file_cache_connstr = matches.get_one::<String>("filecache-connstr");
let cgroup = matches.get_one::<String>("cgroup");
// Only make a runtime if we need to.
// Note: it seems like you can make a runtime in an inner scope and
// if you start a task in it it won't be dropped. However, make it
// in the outermost scope just to be safe.
let rt = if env::var_os("AUTOSCALING").is_some() {
Some(
tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
.build()
.expect("failed to create tokio runtime for monitor")
)
} else {
None
};
// This token is used internally by the monitor to clean up all threads
let token = CancellationToken::new();
@@ -522,22 +538,19 @@ fn start_postgres(
let pgconnstr = if disable_lfc_resizing.unwrap_or(false) {
None
} else {
Some(cli.filecache_connstr.clone())
file_cache_connstr.cloned()
};
let vm_monitor = if env::var_os("AUTOSCALING").is_some() {
let vm_monitor = tokio::spawn(vm_monitor::start(
let vm_monitor = rt.as_ref().map(|rt| {
rt.spawn(vm_monitor::start(
Box::leak(Box::new(vm_monitor::Args {
cgroup: Some(cli.cgroup.clone()),
cgroup: cgroup.cloned(),
pgconnstr,
addr: cli.vm_monitor_addr.clone(),
addr: vm_monitor_addr.clone(),
})),
token.clone(),
));
Some(vm_monitor)
} else {
None
};
))
});
}
}
@@ -547,6 +560,8 @@ fn start_postgres(
delay_exit,
compute,
#[cfg(target_os = "linux")]
rt,
#[cfg(target_os = "linux")]
token,
#[cfg(target_os = "linux")]
vm_monitor,
@@ -554,13 +569,15 @@ fn start_postgres(
))
}
type PostgresHandle = (std::process::Child, tokio::task::JoinHandle<Result<()>>);
type PostgresHandle = (std::process::Child, std::thread::JoinHandle<()>);
struct StartPostgresResult {
delay_exit: bool,
// passed through from WaitSpecResult
compute: Arc<ComputeNode>,
#[cfg(target_os = "linux")]
rt: Option<tokio::runtime::Runtime>,
#[cfg(target_os = "linux")]
token: tokio_util::sync::CancellationToken,
#[cfg(target_os = "linux")]
@@ -579,10 +596,10 @@ fn wait_postgres(pg: Option<PostgresHandle>) -> Result<WaitPostgresResult> {
.expect("failed to start waiting on Postgres process");
PG_PID.store(0, Ordering::SeqCst);
// Process has exited. Wait for the log collecting task to finish.
let _ = tokio::runtime::Handle::current()
.block_on(logs_handle)
.map_err(|e| tracing::error!("log task panicked: {:?}", e));
// Process has exited, so we can join the logs thread.
let _ = logs_handle
.join()
.map_err(|e| tracing::error!("log thread panicked: {:?}", e));
info!("Postgres exited with code {}, shutting down", ecode);
exit_code = ecode.code()
@@ -603,6 +620,8 @@ fn cleanup_after_postgres_exit(
vm_monitor,
#[cfg(target_os = "linux")]
token,
#[cfg(target_os = "linux")]
rt,
}: StartPostgresResult,
) -> Result<bool> {
// Terminate the vm_monitor so it releases the file watcher on
@@ -615,6 +634,10 @@ fn cleanup_after_postgres_exit(
token.cancel();
// Kills the actual task running the monitor
handle.abort();
// If handle is some, rt must have been used to produce it, and
// hence is also some
rt.unwrap().shutdown_timeout(Duration::from_secs(2));
}
}
}
@@ -679,6 +702,105 @@ fn deinit_and_exit(WaitPostgresResult { exit_code }: WaitPostgresResult) -> ! {
exit(exit_code.unwrap_or(1))
}
fn cli() -> clap::Command {
// Env variable is set by `cargo`
let version = option_env!("CARGO_PKG_VERSION").unwrap_or("unknown");
clap::Command::new("compute_ctl")
.version(version)
.arg(
Arg::new("http-port")
.long("http-port")
.value_name("HTTP_PORT")
.default_value("3080")
.value_parser(clap::value_parser!(u16))
.required(false),
)
.arg(
Arg::new("connstr")
.short('C')
.long("connstr")
.value_name("DATABASE_URL")
.required(true),
)
.arg(
Arg::new("pgdata")
.short('D')
.long("pgdata")
.value_name("DATADIR")
.required(true),
)
.arg(
Arg::new("pgbin")
.short('b')
.long("pgbin")
.default_value("postgres")
.value_name("POSTGRES_PATH"),
)
.arg(
Arg::new("spec")
.short('s')
.long("spec")
.value_name("SPEC_JSON"),
)
.arg(
Arg::new("spec-path")
.short('S')
.long("spec-path")
.value_name("SPEC_PATH"),
)
.arg(
Arg::new("compute-id")
.short('i')
.long("compute-id")
.value_name("COMPUTE_ID"),
)
.arg(
Arg::new("control-plane-uri")
.short('p')
.long("control-plane-uri")
.value_name("CONTROL_PLANE_API_BASE_URI"),
)
.arg(
Arg::new("remote-ext-config")
.short('r')
.long("remote-ext-config")
.value_name("REMOTE_EXT_CONFIG"),
)
// TODO(fprasx): we currently have default arguments because the cloud PR
// to pass them in hasn't been merged yet. We should get rid of them once
// the PR is merged.
.arg(
Arg::new("vm-monitor-addr")
.long("vm-monitor-addr")
.default_value("0.0.0.0:10301")
.value_name("VM_MONITOR_ADDR"),
)
.arg(
Arg::new("cgroup")
.long("cgroup")
.default_value("neon-postgres")
.value_name("CGROUP"),
)
.arg(
Arg::new("filecache-connstr")
.long("filecache-connstr")
.default_value(
"host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable application_name=vm-monitor",
)
.value_name("FILECACHE_CONNSTR"),
)
.arg(
Arg::new("resize-swap-on-bind")
.long("resize-swap-on-bind")
.action(clap::ArgAction::SetTrue),
)
.arg(
Arg::new("set-disk-quota-for-fs")
.long("set-disk-quota-for-fs")
.value_name("SET_DISK_QUOTA_FOR_FS")
)
}
/// When compute_ctl is killed, send also termination signal to sync-safekeepers
/// to prevent leakage. TODO: it is better to convert compute_ctl to async and
/// wait for termination which would be easy then.
@@ -688,14 +810,7 @@ fn handle_exit_signal(sig: i32) {
exit(1);
}
#[cfg(test)]
mod test {
use clap::CommandFactory;
use super::Cli;
#[test]
fn verify_cli() {
Cli::command().debug_assert()
}
#[test]
fn verify_cli() {
cli().debug_assert()
}

View File

@@ -60,16 +60,6 @@ struct Args {
pg_lib_dir: Utf8PathBuf,
#[clap(long)]
pg_port: Option<u16>, // port to run postgres on, 5432 is default
/// Number of CPUs in the system. This is used to configure # of
/// parallel worker processes, for index creation.
#[clap(long, env = "NEON_IMPORTER_NUM_CPUS")]
num_cpus: Option<usize>,
/// Amount of RAM in the system. This is used to configure shared_buffers
/// and maintenance_work_mem.
#[clap(long, env = "NEON_IMPORTER_MEMORY_MB")]
memory_mb: Option<usize>,
}
#[serde_with::serde_as]
@@ -212,16 +202,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {
.await
.context("initdb")?;
// If the caller didn't specify CPU / RAM to use for sizing, default to
// number of CPUs in the system, and pretty arbitrarily, 256 MB of RAM.
let nproc = args.num_cpus.unwrap_or_else(num_cpus::get);
let memory_mb = args.memory_mb.unwrap_or(256);
// Somewhat arbitrarily, use 10 % of memory for shared buffer cache, 70% for
// maintenance_work_mem (i.e. for sorting during index creation), and leave the rest
// available for misc other stuff that PostgreSQL uses memory for.
let shared_buffers_mb = ((memory_mb as f32) * 0.10) as usize;
let maintenance_work_mem_mb = ((memory_mb as f32) * 0.70) as usize;
let nproc = num_cpus::get();
//
// Launch postgres process
@@ -231,15 +212,12 @@ pub(crate) async fn main() -> anyhow::Result<()> {
.arg(&pgdata_dir)
.args(["-p", &format!("{pg_port}")])
.args(["-c", "wal_level=minimal"])
.args(["-c", &format!("shared_buffers={shared_buffers_mb}MB")])
.args(["-c", "shared_buffers=10GB"])
.args(["-c", "max_wal_senders=0"])
.args(["-c", "fsync=off"])
.args(["-c", "full_page_writes=off"])
.args(["-c", "synchronous_commit=off"])
.args([
"-c",
&format!("maintenance_work_mem={maintenance_work_mem_mb}MB"),
])
.args(["-c", "maintenance_work_mem=8388608"])
.args(["-c", &format!("max_parallel_maintenance_workers={nproc}")])
.args(["-c", &format!("max_parallel_workers={nproc}")])
.args(["-c", &format!("max_parallel_workers_per_gather={nproc}")])
@@ -253,14 +231,6 @@ pub(crate) async fn main() -> anyhow::Result<()> {
])
.env_clear()
.env("LD_LIBRARY_PATH", &pg_lib_dir)
.env(
"ASAN_OPTIONS",
std::env::var("ASAN_OPTIONS").unwrap_or_default(),
)
.env(
"UBSAN_OPTIONS",
std::env::var("UBSAN_OPTIONS").unwrap_or_default(),
)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()

View File

@@ -140,34 +140,5 @@ pub async fn get_database_schema(
warn!("pg_dump stderr: {}", line)
}
});
#[allow(dead_code)]
struct SchemaStream<S> {
// We keep a reference to the child process to ensure it stays alive
// while the stream is being consumed. When SchemaStream is dropped,
// cmd will be dropped, which triggers kill_on_drop and terminates pg_dump
cmd: tokio::process::Child,
stream: S,
}
impl<S> Stream for SchemaStream<S>
where
S: Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin,
{
type Item = Result<bytes::Bytes, std::io::Error>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
Stream::poll_next(std::pin::Pin::new(&mut self.stream), cx)
}
}
let schema_stream = SchemaStream {
cmd,
stream: initial_stream.chain(stdout_reader.map(|res| res.map(|b| b.freeze()))),
};
Ok(schema_stream)
Ok(initial_stream.chain(stdout_reader.map(|res| res.map(|b| b.freeze()))))
}

View File

@@ -9,6 +9,7 @@ use std::str::FromStr;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::thread;
use std::time::Duration;
use std::time::Instant;
@@ -58,8 +59,6 @@ pub static PG_PID: AtomicU32 = AtomicU32::new(0);
/// Compute node info shared across several `compute_ctl` threads.
pub struct ComputeNode {
/// The ID of the compute
pub compute_id: String,
// Url type maintains proper escaping
pub connstr: url::Url,
// We connect to Postgres from many different places, so build configs once
@@ -82,10 +81,8 @@ pub struct ComputeNode {
/// - we push spec and it does configuration
/// - but then it is restarted without any spec again
pub live_config_allowed: bool,
/// The port that the compute's external HTTP server listens on
pub external_http_port: u16,
/// The port that the compute's internal HTTP server listens on
pub internal_http_port: u16,
/// The port that the compute's HTTP server listens on
pub http_port: u16,
/// Volatile part of the `ComputeNode`, which should be used under `Mutex`.
/// To allow HTTP API server to serving status requests, while configuration
/// is in progress, lock should be held only for short periods of time to do
@@ -549,7 +546,11 @@ impl ComputeNode {
pub fn check_safekeepers_synced(&self, compute_state: &ComputeState) -> Result<Option<Lsn>> {
let start_time = Utc::now();
let rt = tokio::runtime::Handle::current();
// Run actual work with new tokio runtime
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create rt");
let result = rt.block_on(self.check_safekeepers_synced_async(compute_state));
// Record runtime
@@ -596,9 +597,9 @@ impl ComputeNode {
SYNC_SAFEKEEPERS_PID.store(0, Ordering::SeqCst);
// Process has exited, so we can join the logs thread.
let _ = tokio::runtime::Handle::current()
.block_on(logs_handle)
.map_err(|e| tracing::error!("log task panicked: {:?}", e));
let _ = logs_handle
.join()
.map_err(|e| tracing::error!("log thread panicked: {:?}", e));
if !sync_output.status.success() {
anyhow::bail!(
@@ -633,7 +634,7 @@ impl ComputeNode {
config::write_postgres_conf(
&pgdata_path.join("postgresql.conf"),
&pspec.spec,
self.internal_http_port,
self.http_port,
)?;
// Syncing safekeepers is only safe with primary nodes: if a primary
@@ -783,7 +784,7 @@ impl ComputeNode {
pub fn start_postgres(
&self,
storage_auth_token: Option<String>,
) -> Result<(std::process::Child, tokio::task::JoinHandle<Result<()>>)> {
) -> Result<(std::process::Child, std::thread::JoinHandle<()>)> {
let pgdata_path = Path::new(&self.pgdata);
// Run postgres as a child process.
@@ -799,7 +800,7 @@ impl ComputeNode {
.expect("cannot start postgres process");
PG_PID.store(pg.id(), Ordering::SeqCst);
// Start a task to collect logs from stderr.
// Start a thread to collect logs from stderr.
let stderr = pg.stderr.take().expect("stderr should be captured");
let logs_handle = handle_postgres_logs(stderr);
@@ -808,28 +809,20 @@ impl ComputeNode {
Ok((pg, logs_handle))
}
/// Do post configuration of the already started Postgres. This function spawns a background task to
/// Do post configuration of the already started Postgres. This function spawns a background thread to
/// configure the database after applying the compute spec. Currently, it upgrades the neon extension
/// version. In the future, it may upgrade all 3rd-party extensions.
#[instrument(skip_all)]
pub fn post_apply_config(&self) -> Result<()> {
let conf = self.get_tokio_conn_conf(Some("compute_ctl:post_apply_config"));
tokio::spawn(async move {
let res = async {
let (mut client, connection) = conf.connect(NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
let conf = self.get_conn_conf(Some("compute_ctl:post_apply_config"));
thread::spawn(move || {
let func = || {
let mut client = conf.connect(NoTls)?;
handle_neon_extension_upgrade(&mut client)
.await
.context("handle_neon_extension_upgrade")?;
Ok::<_, anyhow::Error>(())
}
.await;
if let Err(err) = res {
};
if let Err(err) = func() {
error!("error while post_apply_config: {err:#}");
}
});
@@ -926,10 +919,13 @@ impl ComputeNode {
conf: Arc<tokio_postgres::Config>,
concurrency: usize,
) -> Result<()> {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
info!("Applying config with max {} concurrency", concurrency);
debug!("Config: {:?}", spec);
let rt = tokio::runtime::Handle::current();
rt.block_on(async {
// Proceed with post-startup configuration. Note, that order of operations is important.
let client = Self::get_maintenance_client(&conf).await?;
@@ -1323,18 +1319,14 @@ impl ComputeNode {
}
// Run migrations separately to not hold up cold starts
tokio::spawn(async move {
let mut conf = conf.as_ref().clone();
thread::spawn(move || {
let conf = conf.as_ref().clone();
let mut conf = postgres::config::Config::from(conf);
conf.application_name("compute_ctl:migrations");
match conf.connect(NoTls).await {
Ok((mut client, connection)) => {
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
if let Err(e) = handle_migrations(&mut client).await {
match conf.connect(NoTls) {
Ok(mut client) => {
if let Err(e) = handle_migrations(&mut client) {
error!("Failed to run migrations: {}", e);
}
}
@@ -1371,11 +1363,16 @@ impl ComputeNode {
if let Some(ref pgbouncer_settings) = spec.pgbouncer_settings {
info!("tuning pgbouncer");
// Spawn a background task to do the tuning,
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create rt");
// Spawn a thread to do the tuning,
// so that we don't block the main thread that starts Postgres.
let pgbouncer_settings = pgbouncer_settings.clone();
tokio::spawn(async move {
let res = tune_pgbouncer(pgbouncer_settings).await;
let _handle = thread::spawn(move || {
let res = rt.block_on(tune_pgbouncer(pgbouncer_settings));
if let Err(err) = res {
error!("error while tuning pgbouncer: {err:?}");
}
@@ -1385,20 +1382,20 @@ impl ComputeNode {
if let Some(ref local_proxy) = spec.local_proxy_config {
info!("configuring local_proxy");
// Spawn a background task to do the configuration,
// Spawn a thread to do the configuration,
// so that we don't block the main thread that starts Postgres.
let local_proxy = local_proxy.clone();
tokio::spawn(async move {
let _handle = Some(thread::spawn(move || {
if let Err(err) = local_proxy::configure(&local_proxy) {
error!("error while configuring local_proxy: {err:?}");
}
});
}));
}
// Write new config
let pgdata_path = Path::new(&self.pgdata);
let postgresql_conf_path = pgdata_path.join("postgresql.conf");
config::write_postgres_conf(&postgresql_conf_path, &spec, self.internal_http_port)?;
config::write_postgres_conf(&postgresql_conf_path, &spec, self.http_port)?;
let max_concurrent_connections = spec.reconfigure_concurrency;
@@ -1434,9 +1431,7 @@ impl ComputeNode {
}
#[instrument(skip_all)]
pub fn start_compute(
&self,
) -> Result<(std::process::Child, tokio::task::JoinHandle<Result<()>>)> {
pub fn start_compute(&self) -> Result<(std::process::Child, std::thread::JoinHandle<()>)> {
let compute_state = self.state.lock().unwrap().clone();
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
info!(
@@ -1451,11 +1446,16 @@ impl ComputeNode {
if let Some(pgbouncer_settings) = &pspec.spec.pgbouncer_settings {
info!("tuning pgbouncer");
// Spawn a background task to do the tuning,
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create rt");
// Spawn a thread to do the tuning,
// so that we don't block the main thread that starts Postgres.
let pgbouncer_settings = pgbouncer_settings.clone();
let _handle = tokio::spawn(async move {
let res = tune_pgbouncer(pgbouncer_settings).await;
let _handle = thread::spawn(move || {
let res = rt.block_on(tune_pgbouncer(pgbouncer_settings));
if let Err(err) = res {
error!("error while tuning pgbouncer: {err:?}");
}
@@ -1465,10 +1465,10 @@ impl ComputeNode {
if let Some(local_proxy) = &pspec.spec.local_proxy_config {
info!("configuring local_proxy");
// Spawn a background task to do the configuration,
// Spawn a thread to do the configuration,
// so that we don't block the main thread that starts Postgres.
let local_proxy = local_proxy.clone();
let _handle = tokio::spawn(async move {
let _handle = thread::spawn(move || {
if let Err(err) = local_proxy::configure(&local_proxy) {
error!("error while configuring local_proxy: {err:?}");
}
@@ -1487,8 +1487,7 @@ impl ComputeNode {
extension_server::create_control_files(remote_extensions, &self.pgbin);
let library_load_start_time = Utc::now();
let rt = tokio::runtime::Handle::current();
let remote_ext_metrics = rt.block_on(self.prepare_preload_libraries(&pspec.spec))?;
let remote_ext_metrics = self.prepare_preload_libraries(&pspec.spec)?;
let library_load_time = Utc::now()
.signed_duration_since(library_load_start_time)
@@ -1543,7 +1542,7 @@ impl ComputeNode {
self.post_apply_config()?;
let conf = self.get_conn_conf(None);
tokio::task::spawn_blocking(|| {
thread::spawn(move || {
let res = get_installed_extensions(conf);
match res {
Ok(extensions) => {
@@ -1892,6 +1891,7 @@ LIMIT 100",
Ok(ext_version)
}
#[tokio::main]
pub async fn prepare_preload_libraries(
&self,
spec: &ComputeSpec,

View File

@@ -51,12 +51,9 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
pub fn launch_configurator(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
let compute = Arc::clone(compute);
let runtime = tokio::runtime::Handle::current();
thread::Builder::new()
.name("compute-configurator".into())
.spawn(move || {
let _rt_guard = runtime.enter();
configurator_main_loop(&compute);
info!("configurator thread is exited");
})

View File

@@ -4,9 +4,11 @@ use http::{header::CONTENT_TYPE, StatusCode};
use serde::Serialize;
use tracing::error;
pub use server::launch_http_server;
mod extract;
mod routes;
pub mod server;
mod server;
/// Convenience response builder for JSON responses
struct JsonResponse;

View File

@@ -1,21 +1,7 @@
use axum::response::{IntoResponse, Response};
use http::StatusCode;
use serde::{Deserialize, Serialize};
use tracing::info;
use utils::failpoint_support::apply_failpoint;
pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
/// Information for configuring a single fail point
#[derive(Debug, Serialize, Deserialize)]
pub struct FailpointConfig {
/// Name of the fail point
pub name: String,
/// List of actions to take, using the format described in `fail::cfg`
///
/// We also support `actions = "exit"` to cause the fail point to immediately exit.
pub actions: String,
}
use utils::failpoint_support::{apply_failpoint, ConfigureFailpointsRequest};
use crate::http::{extract::Json, JsonResponse};

View File

@@ -1,7 +1,7 @@
use std::{
fmt::Display,
net::{IpAddr, Ipv6Addr, SocketAddr},
sync::Arc,
thread,
time::Duration,
};
@@ -26,65 +26,46 @@ use super::routes::{
};
use crate::compute::ComputeNode;
async fn handle_404() -> Response {
StatusCode::NOT_FOUND.into_response()
}
const X_REQUEST_ID: &str = "x-request-id";
/// `compute_ctl` has two servers: internal and external. The internal server
/// binds to the loopback interface and handles communication from clients on
/// the compute. The external server is what receives communication from the
/// control plane, the metrics scraper, etc. We make the distinction because
/// certain routes in `compute_ctl` only need to be exposed to local processes
/// like Postgres via the neon extension and local_proxy.
#[derive(Clone, Copy, Debug)]
pub enum Server {
Internal(u16),
External(u16),
}
/// This middleware function allows compute_ctl to generate its own request ID
/// if one isn't supplied. The control plane will always send one as a UUID. The
/// neon Postgres extension on the other hand does not send one.
async fn maybe_add_request_id_header(mut request: Request, next: Next) -> Response {
let headers = request.headers_mut();
impl Display for Server {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Server::Internal(_) => f.write_str("internal"),
Server::External(_) => f.write_str("external"),
}
if headers.get(X_REQUEST_ID).is_none() {
headers.append(X_REQUEST_ID, Uuid::new_v4().to_string().parse().unwrap());
}
next.run(request).await
}
impl From<Server> for Router<Arc<ComputeNode>> {
fn from(server: Server) -> Self {
let mut router = Router::<Arc<ComputeNode>>::new();
router = match server {
Server::Internal(_) => {
router = router
.route(
"/extension_server/{*filename}",
post(extension_server::download_extension),
)
.route("/extensions", post(extensions::install_extension))
.route("/grants", post(grants::add_grant));
// Add in any testing support
if cfg!(feature = "testing") {
use super::routes::failpoints;
router = router.route("/failpoints", post(failpoints::configure_failpoints));
}
router
}
Server::External(_) => router
.route("/check_writability", post(check_writability::is_writable))
.route("/configure", post(configure::configure))
.route("/database_schema", get(database_schema::get_schema_dump))
.route("/dbs_and_roles", get(dbs_and_roles::get_catalog_objects))
.route("/insights", get(insights::get_insights))
.route("/metrics", get(metrics::get_metrics))
.route("/metrics.json", get(metrics_json::get_metrics))
.route("/status", get(status::get_status))
.route("/terminate", post(terminate::terminate)),
};
router.fallback(Server::handle_404).method_not_allowed_fallback(Server::handle_405).layer(
/// Run the HTTP server and wait on it forever.
#[tokio::main]
async fn serve(port: u16, compute: Arc<ComputeNode>) {
let mut app = Router::new()
.route("/check_writability", post(check_writability::is_writable))
.route("/configure", post(configure::configure))
.route("/database_schema", get(database_schema::get_schema_dump))
.route("/dbs_and_roles", get(dbs_and_roles::get_catalog_objects))
.route(
"/extension_server/{*filename}",
post(extension_server::download_extension),
)
.route("/extensions", post(extensions::install_extension))
.route("/grants", post(grants::add_grant))
.route("/insights", get(insights::get_insights))
.route("/metrics", get(metrics::get_metrics))
.route("/metrics.json", get(metrics_json::get_metrics))
.route("/status", get(status::get_status))
.route("/terminate", post(terminate::terminate))
.fallback(handle_404)
.layer(
ServiceBuilder::new()
// Add this middleware since we assume the request ID exists
.layer(middleware::from_fn(maybe_add_request_id_header))
@@ -124,88 +105,45 @@ impl From<Server> for Router<Arc<ComputeNode>> {
)
.layer(PropagateRequestIdLayer::x_request_id()),
)
}
}
.with_state(compute);
impl Server {
async fn handle_404() -> impl IntoResponse {
StatusCode::NOT_FOUND
// Add in any testing support
if cfg!(feature = "testing") {
use super::routes::failpoints;
app = app.route("/failpoints", post(failpoints::configure_failpoints))
}
async fn handle_405() -> impl IntoResponse {
StatusCode::METHOD_NOT_ALLOWED
}
async fn listener(&self) -> Result<TcpListener> {
let addr = SocketAddr::new(self.ip(), self.port());
let listener = TcpListener::bind(&addr).await?;
Ok(listener)
}
fn ip(&self) -> IpAddr {
match self {
// TODO: Change this to Ipv6Addr::LOCALHOST when the GitHub runners
// allow binding to localhost
Server::Internal(_) => IpAddr::from(Ipv6Addr::UNSPECIFIED),
Server::External(_) => IpAddr::from(Ipv6Addr::UNSPECIFIED),
}
}
fn port(self) -> u16 {
match self {
Server::Internal(port) => port,
Server::External(port) => port,
}
}
async fn serve(self, compute: Arc<ComputeNode>) {
let listener = self.listener().await.unwrap_or_else(|e| {
// If we can't bind, the compute cannot operate correctly
panic!(
"failed to bind the compute_ctl {} HTTP server to {}: {}",
self,
SocketAddr::new(self.ip(), self.port()),
e
);
});
if tracing::enabled!(tracing::Level::INFO) {
let local_addr = match listener.local_addr() {
Ok(local_addr) => local_addr,
Err(_) => SocketAddr::new(self.ip(), self.port()),
};
info!(
"compute_ctl {} HTTP server listening at {}",
self, local_addr
// This usually binds to both IPv4 and IPv6 on Linux, see
// https://github.com/rust-lang/rust/pull/34440 for more information
let addr = SocketAddr::new(IpAddr::from(Ipv6Addr::UNSPECIFIED), port);
let listener = match TcpListener::bind(&addr).await {
Ok(listener) => listener,
Err(e) => {
error!(
"failed to bind the compute_ctl HTTP server to port {}: {}",
port, e
);
return;
}
};
let router = Router::from(self).with_state(compute);
if let Err(e) = axum::serve(listener, router).await {
error!("compute_ctl {} HTTP server error: {}", self, e);
}
if let Ok(local_addr) = listener.local_addr() {
info!("compute_ctl HTTP server listening on {}", local_addr);
} else {
info!("compute_ctl HTTP server listening on port {}", port);
}
pub fn launch(self, compute: &Arc<ComputeNode>) {
let state = Arc::clone(compute);
info!("Launching the {} server", self);
tokio::spawn(self.serve(state));
if let Err(e) = axum::serve(listener, app).await {
error!("compute_ctl HTTP server error: {}", e);
}
}
/// This middleware function allows compute_ctl to generate its own request ID
/// if one isn't supplied. The control plane will always send one as a UUID. The
/// neon Postgres extension on the other hand does not send one.
async fn maybe_add_request_id_header(mut request: Request, next: Next) -> Response {
let headers = request.headers_mut();
if headers.get(X_REQUEST_ID).is_none() {
headers.append(X_REQUEST_ID, Uuid::new_v4().to_string().parse().unwrap());
}
/// Launch a separate HTTP server thread and return its `JoinHandle`.
pub fn launch_http_server(port: u16, state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
let state = Arc::clone(state);
next.run(request).await
Ok(thread::Builder::new()
.name("http-server".into())
.spawn(move || serve(port, state))?)
}

View File

@@ -11,7 +11,7 @@ use tracing_subscriber::prelude::*;
/// set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`. See
/// `tracing-utils` package description.
///
pub async fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result<()> {
pub fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result<()> {
// Initialize Logging
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_log_level));
@@ -22,7 +22,7 @@ pub async fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result
.with_writer(std::io::stderr);
// Initialize OpenTelemetry
let otlp_layer = tracing_utils::init_tracing("compute_ctl").await;
let otlp_layer = tracing_utils::init_tracing_without_runtime("compute_ctl");
// Put it all together
tracing_subscriber::registry()

View File

@@ -1,6 +1,6 @@
use anyhow::{Context, Result};
use fail::fail_point;
use tokio_postgres::{Client, Transaction};
use postgres::{Client, Transaction};
use tracing::{error, info};
use crate::metrics::DB_MIGRATION_FAILED;
@@ -21,11 +21,10 @@ impl<'m> MigrationRunner<'m> {
}
/// Get the current value neon_migration.migration_id
async fn get_migration_id(&mut self) -> Result<i64> {
fn get_migration_id(&mut self) -> Result<i64> {
let row = self
.client
.query_one("SELECT id FROM neon_migration.migration_id", &[])
.await?;
.query_one("SELECT id FROM neon_migration.migration_id", &[])?;
Ok(row.get::<&str, i64>("id"))
}
@@ -35,7 +34,7 @@ impl<'m> MigrationRunner<'m> {
/// This function has a fail point called compute-migration, which can be
/// used if you would like to fail the application of a series of migrations
/// at some point.
async fn update_migration_id(txn: &mut Transaction<'_>, migration_id: i64) -> Result<()> {
fn update_migration_id(txn: &mut Transaction, migration_id: i64) -> Result<()> {
// We use this fail point in order to check that failing in the
// middle of applying a series of migrations fails in an expected
// manner
@@ -60,38 +59,31 @@ impl<'m> MigrationRunner<'m> {
"UPDATE neon_migration.migration_id SET id = $1",
&[&migration_id],
)
.await
.with_context(|| format!("update neon_migration.migration_id to {migration_id}"))?;
Ok(())
}
/// Prepare the migrations the target database for handling migrations
async fn prepare_database(&mut self) -> Result<()> {
fn prepare_database(&mut self) -> Result<()> {
self.client
.simple_query("CREATE SCHEMA IF NOT EXISTS neon_migration")
.await?;
self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)").await?;
.simple_query("CREATE SCHEMA IF NOT EXISTS neon_migration")?;
self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)")?;
self.client.simple_query(
"INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING",
)?;
self.client
.simple_query(
"INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING",
)
.await?;
.simple_query("ALTER SCHEMA neon_migration OWNER TO cloud_admin")?;
self.client
.simple_query("ALTER SCHEMA neon_migration OWNER TO cloud_admin")
.await?;
self.client
.simple_query("REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC")
.await?;
.simple_query("REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC")?;
Ok(())
}
/// Run an individual migration in a separate transaction block.
async fn run_migration(client: &mut Client, migration_id: i64, migration: &str) -> Result<()> {
fn run_migration(client: &mut Client, migration_id: i64, migration: &str) -> Result<()> {
let mut txn = client
.transaction()
.await
.with_context(|| format!("begin transaction for migration {migration_id}"))?;
if migration.starts_with("-- SKIP") {
@@ -100,38 +92,35 @@ impl<'m> MigrationRunner<'m> {
// Even though we are skipping the migration, updating the
// migration ID should help keep logic easy to understand when
// trying to understand the state of a cluster.
Self::update_migration_id(&mut txn, migration_id).await?;
Self::update_migration_id(&mut txn, migration_id)?;
} else {
info!("Running migration id={}:\n{}\n", migration_id, migration);
txn.simple_query(migration)
.await
.with_context(|| format!("apply migration {migration_id}"))?;
Self::update_migration_id(&mut txn, migration_id).await?;
Self::update_migration_id(&mut txn, migration_id)?;
}
txn.commit()
.await
.with_context(|| format!("commit transaction for migration {migration_id}"))?;
Ok(())
}
/// Run the configured set of migrations
pub async fn run_migrations(mut self) -> Result<()> {
pub fn run_migrations(mut self) -> Result<()> {
self.prepare_database()
.await
.context("prepare database to handle migrations")?;
let mut current_migration = self.get_migration_id().await? as usize;
let mut current_migration = self.get_migration_id()? as usize;
while current_migration < self.migrations.len() {
// The index lags the migration ID by 1, so the current migration
// ID is also the next index
let migration_id = (current_migration + 1) as i64;
let migration = self.migrations[current_migration];
match Self::run_migration(self.client, migration_id, migration).await {
match Self::run_migration(self.client, migration_id, migration) {
Ok(_) => {
info!("Finished migration id={}", migration_id);
}

View File

@@ -7,6 +7,7 @@ use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::process::Child;
use std::str::FromStr;
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use anyhow::{bail, Result};
@@ -15,7 +16,6 @@ use ini::Ini;
use notify::{RecursiveMode, Watcher};
use postgres::config::Config;
use tokio::io::AsyncBufReadExt;
use tokio::task::JoinHandle;
use tokio::time::timeout;
use tokio_postgres;
use tokio_postgres::NoTls;
@@ -477,13 +477,23 @@ pub async fn tune_pgbouncer(pgbouncer_config: HashMap<String, String>) -> Result
Ok(())
}
/// Spawn a task that will read Postgres logs from `stderr`, join multiline logs
/// Spawn a thread that will read Postgres logs from `stderr`, join multiline logs
/// and send them to the logger. In the future we may also want to add context to
/// these logs.
pub fn handle_postgres_logs(stderr: std::process::ChildStderr) -> JoinHandle<Result<()>> {
tokio::spawn(async move {
let stderr = tokio::process::ChildStderr::from_std(stderr)?;
handle_postgres_logs_async(stderr).await
pub fn handle_postgres_logs(stderr: std::process::ChildStderr) -> JoinHandle<()> {
std::thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to build tokio runtime");
let res = runtime.block_on(async move {
let stderr = tokio::process::ChildStderr::from_std(stderr)?;
handle_postgres_logs_async(stderr).await
});
if let Err(e) = res {
tracing::error!("error while processing postgres logs: {}", e);
}
})
}

View File

@@ -1,8 +1,8 @@
use anyhow::{anyhow, bail, Result};
use postgres::Client;
use reqwest::StatusCode;
use std::fs::File;
use std::path::Path;
use tokio_postgres::Client;
use tracing::{error, info, instrument, warn};
use crate::config;
@@ -166,17 +166,17 @@ pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
}
#[instrument(skip_all)]
pub async fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
pub fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
info!("handle neon extension upgrade");
let query = "ALTER EXTENSION neon UPDATE";
info!("update neon extension version with query: {}", query);
client.simple_query(query).await?;
client.simple_query(query)?;
Ok(())
}
#[instrument(skip_all)]
pub async fn handle_migrations(client: &mut Client) -> Result<()> {
pub fn handle_migrations(client: &mut Client) -> Result<()> {
info!("handle migrations");
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
@@ -206,9 +206,7 @@ pub async fn handle_migrations(client: &mut Client) -> Result<()> {
),
];
MigrationRunner::new(client, &migrations)
.run_migrations()
.await?;
MigrationRunner::new(client, &migrations).run_migrations()?;
Ok(())
}
@@ -216,7 +214,7 @@ pub async fn handle_migrations(client: &mut Client) -> Result<()> {
/// Connect to the database as superuser and pre-create anon extension
/// if it is present in shared_preload_libraries
#[instrument(skip_all)]
pub async fn handle_extension_anon(
pub fn handle_extension_anon(
spec: &ComputeSpec,
db_owner: &str,
db_client: &mut Client,
@@ -229,7 +227,7 @@ pub async fn handle_extension_anon(
if !grants_only {
// check if extension is already initialized using anon.is_initialized()
let query = "SELECT anon.is_initialized()";
match db_client.query(query, &[]).await {
match db_client.query(query, &[]) {
Ok(rows) => {
if !rows.is_empty() {
let is_initialized: bool = rows[0].get(0);
@@ -251,7 +249,7 @@ pub async fn handle_extension_anon(
// Users cannot create it themselves, because superuser is required.
let mut query = "CREATE EXTENSION IF NOT EXISTS anon CASCADE";
info!("creating anon extension with query: {}", query);
match db_client.query(query, &[]).await {
match db_client.query(query, &[]) {
Ok(_) => {}
Err(e) => {
error!("anon extension creation failed with error: {}", e);
@@ -261,7 +259,7 @@ pub async fn handle_extension_anon(
// check that extension is installed
query = "SELECT extname FROM pg_extension WHERE extname = 'anon'";
let rows = db_client.query(query, &[]).await?;
let rows = db_client.query(query, &[])?;
if rows.is_empty() {
error!("anon extension is not installed");
return Ok(());
@@ -270,7 +268,7 @@ pub async fn handle_extension_anon(
// Initialize anon extension
// This also requires superuser privileges, so users cannot do it themselves.
query = "SELECT anon.init()";
match db_client.query(query, &[]).await {
match db_client.query(query, &[]) {
Ok(_) => {}
Err(e) => {
error!("anon.init() failed with error: {}", e);
@@ -281,7 +279,7 @@ pub async fn handle_extension_anon(
// check that extension is installed, if not bail early
let query = "SELECT extname FROM pg_extension WHERE extname = 'anon'";
match db_client.query(query, &[]).await {
match db_client.query(query, &[]) {
Ok(rows) => {
if rows.is_empty() {
error!("anon extension is not installed");
@@ -296,12 +294,12 @@ pub async fn handle_extension_anon(
let query = format!("GRANT ALL ON SCHEMA anon TO {}", db_owner);
info!("granting anon extension permissions with query: {}", query);
db_client.simple_query(&query).await?;
db_client.simple_query(&query)?;
// Grant permissions to db_owner to use anon extension functions
let query = format!("GRANT ALL ON ALL FUNCTIONS IN SCHEMA anon TO {}", db_owner);
info!("granting anon extension permissions with query: {}", query);
db_client.simple_query(&query).await?;
db_client.simple_query(&query)?;
// This is needed, because some functions are defined as SECURITY DEFINER.
// In Postgres SECURITY DEFINER functions are executed with the privileges
@@ -316,16 +314,16 @@ pub async fn handle_extension_anon(
where nsp.nspname = 'anon';", db_owner);
info!("change anon extension functions owner to db owner");
db_client.simple_query(&query).await?;
db_client.simple_query(&query)?;
// affects views as well
let query = format!("GRANT ALL ON ALL TABLES IN SCHEMA anon TO {}", db_owner);
info!("granting anon extension permissions with query: {}", query);
db_client.simple_query(&query).await?;
db_client.simple_query(&query)?;
let query = format!("GRANT ALL ON ALL SEQUENCES IN SCHEMA anon TO {}", db_owner);
info!("granting anon extension permissions with query: {}", query);
db_client.simple_query(&query).await?;
db_client.simple_query(&query)?;
}
}

View File

@@ -33,7 +33,6 @@ postgres_backend.workspace = true
safekeeper_api.workspace = true
postgres_connection.workspace = true
storage_broker.workspace = true
http-utils.workspace = true
utils.workspace = true
whoami.workspace = true

View File

@@ -261,13 +261,7 @@ fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command {
let mut filled_cmd = cmd.env_clear().env("RUST_BACKTRACE", backtrace_setting);
// Pass through these environment variables to the command
for var in [
"LLVM_PROFILE_FILE",
"FAILPOINTS",
"RUST_LOG",
"ASAN_OPTIONS",
"UBSAN_OPTIONS",
] {
for var in ["LLVM_PROFILE_FILE", "FAILPOINTS", "RUST_LOG"] {
if let Some(val) = std::env::var_os(var) {
filled_cmd = filled_cmd.env(var, val);
}

View File

@@ -552,10 +552,8 @@ struct EndpointCreateCmdArgs {
lsn: Option<Lsn>,
#[clap(long)]
pg_port: Option<u16>,
#[clap(long, alias = "http-port")]
external_http_port: Option<u16>,
#[clap(long)]
internal_http_port: Option<u16>,
http_port: Option<u16>,
#[clap(long = "pageserver-id")]
endpoint_pageserver_id: Option<NodeId>,
@@ -1355,8 +1353,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
tenant_id,
timeline_id,
args.pg_port,
args.external_http_port,
args.internal_http_port,
args.http_port,
args.pg_version,
mode,
!args.update_catalog,

View File

@@ -37,8 +37,6 @@
//! ```
//!
use std::collections::BTreeMap;
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::net::TcpStream;
use std::path::PathBuf;
@@ -75,8 +73,7 @@ pub struct EndpointConf {
timeline_id: TimelineId,
mode: ComputeMode,
pg_port: u16,
external_http_port: u16,
internal_http_port: u16,
http_port: u16,
pg_version: u32,
skip_pg_catalog_updates: bool,
drop_subscriptions_before_start: bool,
@@ -131,7 +128,7 @@ impl ComputeControlPlane {
1 + self
.endpoints
.values()
.map(|ep| std::cmp::max(ep.pg_address.port(), ep.external_http_address.port()))
.map(|ep| std::cmp::max(ep.pg_address.port(), ep.http_address.port()))
.max()
.unwrap_or(self.base_port)
}
@@ -143,27 +140,18 @@ impl ComputeControlPlane {
tenant_id: TenantId,
timeline_id: TimelineId,
pg_port: Option<u16>,
external_http_port: Option<u16>,
internal_http_port: Option<u16>,
http_port: Option<u16>,
pg_version: u32,
mode: ComputeMode,
skip_pg_catalog_updates: bool,
drop_subscriptions_before_start: bool,
) -> Result<Arc<Endpoint>> {
let pg_port = pg_port.unwrap_or_else(|| self.get_port());
let external_http_port = external_http_port.unwrap_or_else(|| self.get_port() + 1);
let internal_http_port = internal_http_port.unwrap_or_else(|| external_http_port + 1);
let http_port = http_port.unwrap_or_else(|| self.get_port() + 1);
let ep = Arc::new(Endpoint {
endpoint_id: endpoint_id.to_owned(),
pg_address: SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), pg_port),
external_http_address: SocketAddr::new(
IpAddr::from(Ipv4Addr::UNSPECIFIED),
external_http_port,
),
internal_http_address: SocketAddr::new(
IpAddr::from(Ipv4Addr::LOCALHOST),
internal_http_port,
),
pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), pg_port),
http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), http_port),
env: self.env.clone(),
timeline_id,
mode,
@@ -188,8 +176,7 @@ impl ComputeControlPlane {
tenant_id,
timeline_id,
mode,
external_http_port,
internal_http_port,
http_port,
pg_port,
pg_version,
skip_pg_catalog_updates,
@@ -243,10 +230,9 @@ pub struct Endpoint {
pub timeline_id: TimelineId,
pub mode: ComputeMode,
// port and address of the Postgres server and `compute_ctl`'s HTTP APIs
// port and address of the Postgres server and `compute_ctl`'s HTTP API
pub pg_address: SocketAddr,
pub external_http_address: SocketAddr,
pub internal_http_address: SocketAddr,
pub http_address: SocketAddr,
// postgres major version in the format: 14, 15, etc.
pg_version: u32,
@@ -301,15 +287,8 @@ impl Endpoint {
serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
Ok(Endpoint {
pg_address: SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), conf.pg_port),
external_http_address: SocketAddr::new(
IpAddr::from(Ipv4Addr::UNSPECIFIED),
conf.external_http_port,
),
internal_http_address: SocketAddr::new(
IpAddr::from(Ipv4Addr::LOCALHOST),
conf.internal_http_port,
),
pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.pg_port),
http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.http_port),
endpoint_id,
env: env.clone(),
timeline_id: conf.timeline_id,
@@ -671,51 +650,24 @@ impl Endpoint {
println!("Also at '{}'", conn_str);
}
let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
//cmd.args([
// "--external-http-port",
// &self.external_http_address.port().to_string(),
//])
//.args([
// "--internal-http-port",
// &self.internal_http_address.port().to_string(),
//])
cmd.args([
"--http-port",
&self.external_http_address.port().to_string(),
])
.args(["--pgdata", self.pgdata().to_str().unwrap()])
.args(["--connstr", &conn_str])
.args([
"--spec-path",
self.endpoint_path().join("spec.json").to_str().unwrap(),
])
.args([
"--pgbin",
self.env
.pg_bin_dir(self.pg_version)?
.join("postgres")
.to_str()
.unwrap(),
])
// TODO: It would be nice if we generated compute IDs with the same
// algorithm as the real control plane.
//
// TODO: Add this back when
// https://github.com/neondatabase/neon/pull/10747 is merged.
//
//.args([
// "--compute-id",
// &format!(
// "compute-{}",
// SystemTime::now()
// .duration_since(UNIX_EPOCH)
// .unwrap()
// .as_secs()
// ),
//])
.stdin(std::process::Stdio::null())
.stderr(logfile.try_clone()?)
.stdout(logfile);
cmd.args(["--http-port", &self.http_address.port().to_string()])
.args(["--pgdata", self.pgdata().to_str().unwrap()])
.args(["--connstr", &conn_str])
.args([
"--spec-path",
self.endpoint_path().join("spec.json").to_str().unwrap(),
])
.args([
"--pgbin",
self.env
.pg_bin_dir(self.pg_version)?
.join("postgres")
.to_str()
.unwrap(),
])
.stdin(std::process::Stdio::null())
.stderr(logfile.try_clone()?)
.stdout(logfile);
if let Some(remote_ext_config) = remote_ext_config {
cmd.args(["--remote-ext-config", remote_ext_config]);
@@ -802,8 +754,8 @@ impl Endpoint {
reqwest::Method::GET,
format!(
"http://{}:{}/status",
self.external_http_address.ip(),
self.external_http_address.port()
self.http_address.ip(),
self.http_address.port()
),
)
.send()
@@ -876,8 +828,8 @@ impl Endpoint {
let response = client
.post(format!(
"http://{}:{}/configure",
self.external_http_address.ip(),
self.external_http_address.port()
self.http_address.ip(),
self.http_address.port()
))
.header(CONTENT_TYPE.as_str(), "application/json")
.body(format!(

View File

@@ -357,11 +357,6 @@ impl PageServerNode {
.map(serde_json::from_str)
.transpose()
.context("Failed to parse 'compaction_algorithm' json")?,
compaction_l0_first: settings
.remove("compaction_l0_first")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'compaction_l0_first' as a bool")?,
l0_flush_delay_threshold: settings
.remove("l0_flush_delay_threshold")
.map(|x| x.parse::<usize>())
@@ -393,11 +388,6 @@ impl PageServerNode {
.map(|x| x.parse::<u8>())
.transpose()
.context("Failed to parse 'image_creation_check_threshold' as integer")?,
image_creation_preempt_threshold: settings
.remove("image_creation_preempt_threshold")
.map(|x| x.parse::<usize>())
.transpose()
.context("Failed to parse 'image_creation_preempt_threshold' as integer")?,
pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
walreceiver_connect_timeout: settings
.remove("walreceiver_connect_timeout")

View File

@@ -17,10 +17,8 @@ use camino::Utf8PathBuf;
use postgres_connection::PgConnectionConfig;
use reqwest::{IntoUrl, Method};
use thiserror::Error;
use http_utils::error::HttpErrorBody;
use utils::auth::{Claims, Scope};
use utils::id::NodeId;
use utils::{http::error::HttpErrorBody, id::NodeId};
use crate::{
background_process,

View File

@@ -221,17 +221,7 @@ impl StorageController {
"-p",
&format!("{}", postgres_port),
];
let pg_lib_dir = self.get_pg_lib_dir().await.unwrap();
let envs = [
("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
];
let exitcode = Command::new(bin_path)
.args(args)
.envs(envs)
.spawn()?
.wait()
.await?;
let exitcode = Command::new(bin_path).args(args).spawn()?.wait().await?;
Ok(exitcode.success())
}
@@ -252,11 +242,6 @@ impl StorageController {
let pg_bin_dir = self.get_pg_bin_dir().await?;
let createdb_path = pg_bin_dir.join("createdb");
let pg_lib_dir = self.get_pg_lib_dir().await.unwrap();
let envs = [
("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
];
let output = Command::new(&createdb_path)
.args([
"-h",
@@ -269,7 +254,6 @@ impl StorageController {
&username(),
DB_NAME,
])
.envs(envs)
.output()
.await
.expect("Failed to spawn createdb");

View File

@@ -32,7 +32,6 @@ reason = "the marvin attack only affects private key decryption, not public key
# https://embarkstudios.github.io/cargo-deny/checks/licenses/cfg.html
[licenses]
allow = [
"0BSD",
"Apache-2.0",
"BSD-2-Clause",
"BSD-3-Clause",

View File

@@ -52,7 +52,6 @@ for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
if [ $pg_version -ge 16 ]; then
docker cp ext-src $TEST_CONTAINER_NAME:/
docker exec $TEST_CONTAINER_NAME bash -c "apt update && apt install -y libtap-parser-sourcehandler-pgtap-perl"
# This is required for the pg_hint_plan test, to prevent flaky log message causing the test to fail
# It cannot be moved to Dockerfile now because the database directory is created after the start of the container
echo Adding dummy config

View File

@@ -1,4 +0,0 @@
#!/bin/bash
set -ex
cd "$(dirname "${0}")"
pg_prove test.sql

View File

@@ -1,15 +0,0 @@
diff --git a/test.sql b/test.sql
index d7a0ca8..f15bc76 100644
--- a/test.sql
+++ b/test.sql
@@ -9,9 +9,7 @@
\set ON_ERROR_STOP true
\set QUIET 1
-CREATE EXTENSION pgcrypto;
-CREATE EXTENSION pgtap;
-CREATE EXTENSION pgjwt;
+CREATE EXTENSION IF NOT EXISTS pgtap;
BEGIN;
SELECT plan(23);

View File

@@ -1,5 +0,0 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
patch -p1 <test-upgrade.patch
pg_prove -d contrib_regression test.sql

View File

@@ -24,7 +24,7 @@ function wait_for_ready {
}
function create_extensions() {
for ext in ${1}; do
docker compose exec neon-test-extensions psql -X -v ON_ERROR_STOP=1 -d contrib_regression -c "CREATE EXTENSION IF NOT EXISTS ${ext} CASCADE"
docker compose exec neon-test-extensions psql -X -v ON_ERROR_STOP=1 -d contrib_regression -c "CREATE EXTENSION IF NOT EXISTS ${ext}"
done
}
EXTENSIONS='[
@@ -40,8 +40,7 @@ EXTENSIONS='[
{"extname": "pg_uuidv7", "extdir": "pg_uuidv7-src"},
{"extname": "roaringbitmap", "extdir": "pg_roaringbitmap-src"},
{"extname": "semver", "extdir": "pg_semver-src"},
{"extname": "pg_ivm", "extdir": "pg_ivm-src"},
{"extname": "pgjwt", "extdir": "pgjwt-src"}
{"extname": "pg_ivm", "extdir": "pg_ivm-src"}
]'
EXTNAMES=$(echo ${EXTENSIONS} | jq -r '.[].extname' | paste -sd ' ' -)
TAG=${NEWTAG} docker compose --profile test-extensions up --quiet-pull --build -d

View File

@@ -285,10 +285,10 @@ To summarize, list of cplane changes:
### storage_controller implementation
If desired, we may continue using current 'load everything on startup and keep
in memory' approach: single timeline shouldn't take more than 100 bytes (it's 16
byte tenant_id, 16 byte timeline_id, int generation, vec of ~3 safekeeper ids
plus some flags), so 10^6 of timelines shouldn't take more than 100MB.
Current 'load everything on startup and keep in memory' easy design is fine.
Single timeline shouldn't take more than 100 bytes (it's 16 byte tenant_id, 16
byte timeline_id, int generation, vec of ~3 safekeeper ids plus some flags), so
10^6 of timelines shouldn't take more than 100MB.
Similar to pageserver attachment Intents storage_controller would have in-memory
`MigrationRequest` (or its absense) for each timeline and pool of tasks trying
@@ -296,7 +296,7 @@ to make these request reality; this ensures one instance of storage_controller
won't do several migrations on the same timeline concurrently. In the first
version it is simpler to have more manual control and no retries, i.e. migration
failure removes the request. Later we can build retries and automatic
scheduling/migration around. `MigrationRequest` is
scheduling/migration. `MigrationRequest` is
```
enum MigrationRequest {
To(Vec<NodeId>),
@@ -313,9 +313,9 @@ similarly, in the first version it is ok to trigger it manually).
#### Schema
`safekeepers` table mirroring current `nodes` should be added, except that for
`scheduling_policy`: it is enough to have at least in the beginning only 3
fields: 1) `active` 2) `paused` (initially means only not assign new tlis there
3) `decomissioned` (node is removed).
`scheduling_policy` field (seems like `status` is a better name for it): it is enough
to have at least in the beginning only 3 fields: 1) `active` 2) `offline` 3)
`decomissioned`.
`timelines` table:
```
@@ -324,24 +324,18 @@ table! {
timelines (tenant_id, timeline_id) {
timeline_id -> Varchar,
tenant_id -> Varchar,
start_lsn -> pg_lsn,
generation -> Int4,
sk_set -> Array<Int4>, // list of safekeeper ids
new_sk_set -> Nullable<Array<Int8>>, // list of safekeeper ids, null if not joint conf
new_sk_set -> Nullable<Array<Int4>>, // list of safekeeper ids, null if not joint conf
cplane_notified_generation -> Int4,
deleted_at -> Nullable<Timestamptz>,
}
}
```
`start_lsn` is needed to create timeline on safekeepers properly, see below. We
might also want to add ancestor_timeline_id to preserve the hierarchy, but for
this RFC it is not needed.
#### API
Node management is similar to pageserver:
1) POST `/control/v1/safekeepers` inserts safekeeper.
1) POST `/control/v1/safekeepers` upserts safekeeper.
2) GET `/control/v1/safekeepers` lists safekeepers.
3) GET `/control/v1/safekeepers/:node_id` gets safekeeper.
4) PUT `/control/v1/safekepers/:node_id/status` changes status to e.g.
@@ -351,15 +345,25 @@ Node management is similar to pageserver:
Safekeeper deploy scripts should register safekeeper at storage_contorller as
they currently do with cplane, under the same id.
Timeline creation/deletion will work through already existing POST and DELETE
`tenant/:tenant_id/timeline`. Cplane is expected to retry both until they
succeed. See next section on the implementation details.
Timeline creation/deletion: already existing POST `tenant/:tenant_id/timeline`
would 1) choose initial set of safekeepers; 2) write to the db initial
`Configuration` with `INSERT ON CONFLICT DO NOTHING` returning existing row in
case of conflict; 3) create timeline on the majority of safekeepers (already
created is ok).
We don't want to block timeline creation/deletion when one safekeeper is down.
Currently this is crutched by compute implicitly creating timeline on any
safekeeper it is connected to. This creates ugly timeline state on safekeeper
when timeline is created, but start LSN is not defined yet. Next section
describes dealing with this.
We don't want to block timeline creation when one safekeeper is down. Currently
this is solved by compute implicitly creating timeline on any safekeeper it is
connected to. This creates ugly timeline state on safekeeper when timeline is
created, but start LSN is not defined yet. It would be nice to remove this; to
do that, controller can in the background retry to create timeline on
safekeeper(s) which missed that during initial creation call. It can do that
through `pull_timeline` from majority so it doesn't need to remember
`parent_lsn` in its db.
Timeline deletion removes the row from the db and forwards deletion to the
current configuration members. Without additional actions deletions might leak,
see below on this; initially let's ignore these, reporting to cplane success if
at least one safekeeper deleted the timeline (this will remove s3 data).
Tenant deletion repeats timeline deletion for all timelines.
@@ -391,6 +395,26 @@ Similar call should be added for the tenant.
It would be great to have some way of subscribing to the results (apart from
looking at logs/metrics).
Migration is executed as described above. One subtlety is that (local) deletion on
source safekeeper might fail, which is not a problem if we are going to
decomission the node but leaves garbage otherwise. I'd propose in the first version
1) Don't attempt deletion at all if node status is `offline`.
2) If it failed, just issue warning.
And add PUT `/control/v1/safekeepers/:node_id/scrub` endpoint which would find and
remove garbage timelines for manual use. It will 1) list all timelines on the
safekeeper 2) compare each one against configuration storage: if timeline
doesn't exist at all (had been deleted), it can be deleted. Otherwise, it can
be deleted under generation number if node is not member of current generation.
Automating this is untrivial; we'd need to register all potential missing
deletions <tenant_id, timeline_id, generation, node_id> in the same transaction
which switches configurations. Similarly when timeline is fully deleted to
prevent cplane operation from blocking when some safekeeper is not available
deletion should be also registered.
One more task pool should infinitely retry notifying control plane about changed
safekeeper sets.
3) GET `/control/v1/tenant/:tenant_id/timeline/:timeline_id/` should return
current in memory state of the timeline and pending `MigrationRequest`,
if any.
@@ -399,153 +423,12 @@ looking at logs/metrics).
migration by switching configuration from the joint to the one with (previous) `sk_set` under CAS
(incrementing generation as always).
#### API implementation and reconciliation
For timeline creation/deletion we want to preserve the basic assumption that
unreachable minority (1 sk of 3) doesn't block their completion, but eventually
we want to finish creation/deletion on nodes which missed it (unless they are
removed). Similarly for migration; it may and should finish even though excluded
members missed their exclusion. And of course e.g. such pending exclusion on
node C after migration ABC -> ABD must not prevent next migration ABD -> ABE. As
another example, if some node missed timeline creation it clearly must not block
migration from it. Hence it is natural to have per safekeeper background
reconciler which retries these ops until they succeed. There are 3 possible
operation types, and the type is defined by timeline state (membership
configuration and whether it is deleted) and safekeeper id: we may need to
create timeline on sk (node added), locally delete it (node excluded, somewhat
similar to detach) or globally delete it (timeline is deleted).
Next, on storage controller restart in principle these pending operations can be
figured out by comparing safekeepers state against storcon state. But it seems
better to me to materialize them in the database; it is not expensive, avoids
these startup scans which themselves can fail etc and makes it very easy to see
outstanding work directly at the source of truth -- the db. So we can add table
`safekeeper_timeline_pending_ops`
```
table! {
// timeline_id, sk_id is primary key
safekeeper_timeline_pending_ops (sk_id, tenant_id, timeline_id) {
sk_id -> int8,
tenant_id -> Varchar,
timeline_id -> Varchar,
generation -> Int4,
op_type -> Varchar,
}
}
```
`op_type` can be `include` (seed from peers and ensure generation is up to
date), `exclude` (remove locally) and `delete`. Field is actually not strictly
needed as it can be computed from current configuration, but gives more explicit
observability.
`generation` is necessary there because after op is done reconciler must remove
it and not remove another row with higher gen which in theory might appear.
Any insert of row should overwrite (remove) all rows with the same sk and
timeline id but lower `generation` as next op makes previous obsolete. Insertion
of `op_type` `delete` overwrites all rows.
About `exclude`: rather than adding explicit safekeeper http endpoint, it is
reasonable to reuse membership switch endpoint: if safekeeper is not member
of the configuration it locally removes the timeline on the switch. In this case
404 should also be considered an 'ok' answer by the caller.
So, main loop of per sk reconcile reads `safekeeper_timeline_pending_ops`
joined with timeline configuration to get current conf (with generation `n`)
for the safekeeper and does the jobs, infinitely retrying failures:
1) If node is member (`include`):
- Check if timeline exists on it, if not, call pull_timeline on it from
other members
- Call switch configuration to the current
2) If node is not member (`exclude`):
- Call switch configuration to the current, 404 is ok.
3) If timeline is deleted (`delete`), call delete.
In cases 1 and 2 remove `safekeeper_timeline_pending_ops` for the sk and
timeline with generation <= `n` if `op_type` is not `delete`.
In case 3 also remove `safekeeper_timeline_pending_ops`
entry + remove `timelines` entry if there is nothing left in `safekeeper_timeline_pending_ops` for the timeline.
Let's consider in details how APIs can be implemented from this angle.
Timeline creation. It is assumed that cplane retries it until success, so all
actions must be idempotent. Now, a tricky point here is timeline start LSN. For
the initial (tenant creation) call cplane doesn't know it. However, setting
start_lsn on safekeepers during creation is a good thing -- it provides a
guarantee that walproposer can always find a common point in WAL histories of
safekeeper and its own, and so absense of it would be a clear sign of
corruption. The following sequence works:
1) Create timeline (or observe that it exists) on pageserver,
figuring out last_record_lsn in response.
2) Choose safekeepers and insert (ON CONFLICT DO NOTHING) timeline row into the
db. Note that last_record_lsn returned on the previous step is movable as it
changes once ingestion starts, insert must not overwrite it (as well as other
fields like membership conf). On the contrary, start_lsn used in the next
step must be set to the value in the db. cplane_notified_generation can be set
to 1 (initial generation) in insert to avoid notifying cplane about initial
conf as cplane will receive it in timeline creation request anyway.
3) Issue timeline creation calls to at least majority of safekeepers. Using
majority here is not necessary but handy because it guarantees that any live
majority will have at least one sk with created timeline and so
reconciliation task can use pull_timeline shared with migration instead of
create timeline special init case. OFC if timeline is already exists call is
ignored.
4) For minority of safekeepers which could have missed creation insert
entries to `safekeeper_timeline_pending_ops`. We won't miss this insertion
because response to cplane is sent only after it has happened, and cplane
retries the call until 200 response.
There is a small question how request handler (timeline creation in this
case) would interact with per sk reconciler. As always I prefer to do the
simplest possible thing and here it seems to be just waking it up so it
re-reads the db for work to do. Passing work in memory is faster, but
that shouldn't matter, and path to scan db for work will exist anyway,
simpler to reuse it.
For pg version / wal segment size: while we may persist them in `timelines`
table, it is not necessary as initial creation at step 3 can take them from
pageserver or cplane creation call and later pull_timeline will carry them
around.
Timeline migration.
1) CAS to the db to create joint conf, and in the same transaction create
`safekeeper_timeline_pending_ops` `include` entries to initialize new members
as well as deliver this conf to current ones; poke per sk reconcilers to work
on it. Also any conf change should also poke cplane notifier task(s).
2) Once it becomes possible per alg description above, get out of joint conf
with another CAS. Task should get wakeups from per sk reconcilers because
conf switch is required for advancement; however retries should be sleep
based as well as LSN advancement might be needed, though in happy path
it isn't. To see whether further transition is possible on wakup migration
executor polls safekeepers per the algorithm. CAS creating new conf with only
new members should again insert entries to `safekeeper_timeline_pending_ops`
to switch them there, as well as `exclude` rows to remove timeline from
old members.
Timeline deletion: just set `deleted_at` on the timeline row and insert
`safekeeper_timeline_pending_ops` entries in the same xact, the rest is done by
per sk reconcilers.
When node is removed (set to `decomissioned`), `safekeeper_timeline_pending_ops`
for it must be cleared in the same transaction.
One more task pool should infinitely retry notifying control plane about changed
safekeeper sets (trying making `cplane_notified_generation` equal `generation`).
#### Dealing with multiple instances of storage_controller
Operations described above executed concurrently might create some errors but do
not prevent progress, so while we normally don't want to run multiple instances
of storage_controller it is fine to have it temporarily, e.g. during redeploy.
To harden against some controller instance creating some work in
`safekeeper_timeline_pending_ops` and then disappearing without anyone pickup up
the job per sk reconcilers apart from explicit wakups should scan for work
periodically. It is possible to remove that though if all db updates are
protected with leadership token/term -- then such scans are needed only after
leadership is acquired.
Any interactions with db update in-memory controller state, e.g. if migration
request failed because different one is in progress, controller remembers that
and tries to finish it.
@@ -662,7 +545,7 @@ Aurora does this but similarly I don't think this is needed.
We should use Compute <-> safekeeper protocol change to include other (long
yearned) modifications:
- send data in network order without putting whole structs to be arch independent
- send data in network order to make arm work.
- remove term_start_lsn from AppendRequest
- add horizon to TermHistory
- add to ProposerGreeting number of connection from this wp to sk

View File

@@ -204,16 +204,14 @@ impl RemoteExtSpec {
// Check if extension is present in public or custom.
// If not, then it is not allowed to be used by this compute.
if !self
.public_extensions
.as_ref()
.is_some_and(|exts| exts.iter().any(|e| e == real_ext_name))
&& !self
.custom_extensions
.as_ref()
.is_some_and(|exts| exts.iter().any(|e| e == real_ext_name))
{
return Err(anyhow::anyhow!("extension {} is not found", real_ext_name));
if let Some(public_extensions) = &self.public_extensions {
if !public_extensions.contains(&real_ext_name.to_string()) {
if let Some(custom_extensions) = &self.custom_extensions {
if !custom_extensions.contains(&real_ext_name.to_string()) {
return Err(anyhow::anyhow!("extension {} is not found", real_ext_name));
}
}
}
}
match self.extension_data.get(real_ext_name) {
@@ -342,102 +340,6 @@ mod tests {
use super::*;
use std::fs::File;
#[test]
fn allow_installing_remote_extensions() {
let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
"public_extensions": null,
"custom_extensions": null,
"library_index": {},
"extension_data": {},
}))
.unwrap();
rspec
.get_ext("ext", false, "latest", "v17")
.expect_err("Extension should not be found");
let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
"public_extensions": [],
"custom_extensions": null,
"library_index": {},
"extension_data": {},
}))
.unwrap();
rspec
.get_ext("ext", false, "latest", "v17")
.expect_err("Extension should not be found");
let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
"public_extensions": [],
"custom_extensions": [],
"library_index": {
"ext": "ext"
},
"extension_data": {
"ext": {
"control_data": {
"ext.control": ""
},
"archive_path": ""
}
},
}))
.unwrap();
rspec
.get_ext("ext", false, "latest", "v17")
.expect_err("Extension should not be found");
let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
"public_extensions": [],
"custom_extensions": ["ext"],
"library_index": {
"ext": "ext"
},
"extension_data": {
"ext": {
"control_data": {
"ext.control": ""
},
"archive_path": ""
}
},
}))
.unwrap();
rspec
.get_ext("ext", false, "latest", "v17")
.expect("Extension should be found");
let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
"public_extensions": ["ext"],
"custom_extensions": [],
"library_index": {
"extlib": "ext",
},
"extension_data": {
"ext": {
"control_data": {
"ext.control": ""
},
"archive_path": ""
}
},
}))
.unwrap();
rspec
.get_ext("ext", false, "latest", "v17")
.expect("Extension should be found");
// test library index for the case when library name
// doesn't match the extension name
rspec
.get_ext("extlib", true, "latest", "v17")
.expect("Library should be found");
}
#[test]
fn parse_spec_file() {
let file = File::open("tests/cluster_spec.json").unwrap();

View File

@@ -1,37 +0,0 @@
[package]
name = "http-utils"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
anyhow.workspace = true
backtrace.workspace = true
bytes.workspace = true
inferno.workspace = true
fail.workspace = true
flate2.workspace = true
hyper0.workspace = true
itertools.workspace = true
jemalloc_pprof.workspace = true
once_cell.workspace = true
pprof.workspace = true
regex.workspace = true
routerify.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_path_to_error.workspace = true
thiserror.workspace = true
tracing.workspace = true
tokio.workspace = true
tokio-util.workspace = true
url.workspace = true
uuid.workspace = true
# to use tokio channels as streams, this is faster to compile than async_stream
# why is it only here? no other crate should use it, streams are rarely needed.
tokio-stream = { version = "0.1.14" }
metrics.workspace = true
utils.workspace = true
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -1,50 +0,0 @@
use crate::error::ApiError;
use crate::json::{json_request, json_response};
use hyper::{Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use utils::failpoint_support::apply_failpoint;
pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
/// Information for configuring a single fail point
#[derive(Debug, Serialize, Deserialize)]
pub struct FailpointConfig {
/// Name of the fail point
pub name: String,
/// List of actions to take, using the format described in `fail::cfg`
///
/// We also support `actions = "exit"` to cause the fail point to immediately exit.
pub actions: String,
}
/// Configure failpoints through http.
pub async fn failpoints_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
if !fail::has_failpoints() {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"Cannot manage failpoints because neon was compiled without failpoints support"
)));
}
let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?;
for fp in failpoints {
tracing::info!("cfg failpoint: {} {}", fp.name, fp.actions);
// We recognize one extra "action" that's not natively recognized
// by the failpoints crate: exit, to immediately kill the process
let cfg_result = apply_failpoint(&fp.name, &fp.actions);
if let Err(err_msg) = cfg_result {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"Failed to configure failpoints: {err_msg}"
)));
}
}
json_response(StatusCode::OK, ())
}

View File

@@ -94,7 +94,6 @@ pub struct ConfigToml {
pub ondemand_download_behavior_treat_error_as_warn: bool,
#[serde(with = "humantime_serde")]
pub background_task_maximum_delay: Duration,
pub use_compaction_semaphore: bool,
pub control_plane_api: Option<reqwest::Url>,
pub control_plane_api_token: Option<String>,
pub control_plane_emergency_mode: bool,
@@ -122,7 +121,6 @@ pub struct ConfigToml {
pub wal_receiver_protocol: PostgresClientProtocol,
pub page_service_pipelining: PageServicePipeliningConfig,
pub get_vectored_concurrent_io: GetVectoredConcurrentIo,
pub enable_read_path_debugging: Option<bool>,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -264,8 +262,6 @@ pub struct TenantConfigToml {
/// size exceeds `compaction_upper_limit * checkpoint_distance`.
pub compaction_upper_limit: usize,
pub compaction_algorithm: crate::models::CompactionAlgorithmSettings,
/// If true, compact down L0 across all tenant timelines before doing regular compaction.
pub compaction_l0_first: bool,
/// Level0 delta layer threshold at which to delay layer flushes for compaction backpressure,
/// such that they take 2x as long, and start waiting for layer flushes during ephemeral layer
/// rolls. This helps compaction keep up with WAL ingestion, and avoids read amplification
@@ -327,10 +323,6 @@ pub struct TenantConfigToml {
// Expresed in multiples of checkpoint distance.
pub image_layer_creation_check_threshold: u8,
// How many multiples of L0 `compaction_threshold` will preempt image layer creation and do L0 compaction.
// Set to 0 to disable preemption.
pub image_creation_preempt_threshold: usize,
/// The length for an explicit LSN lease request.
/// Layers needed to reconstruct pages at LSN will not be GC-ed during this interval.
#[serde(with = "humantime_serde")]
@@ -474,7 +466,6 @@ impl Default for ConfigToml {
DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY,
)
.unwrap()),
use_compaction_semaphore: false,
control_plane_api: (None),
control_plane_api_token: (None),
@@ -495,7 +486,7 @@ impl Default for ConfigToml {
NonZeroUsize::new(DEFAULT_MAX_VECTORED_READ_BYTES).unwrap(),
)),
image_compression: (DEFAULT_IMAGE_COMPRESSION),
timeline_offloading: true,
timeline_offloading: false,
ephemeral_bytes_per_memory_kb: (DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
l0_flush: None,
virtual_file_io_mode: None,
@@ -515,11 +506,6 @@ impl Default for ConfigToml {
} else {
GetVectoredConcurrentIo::SidecarTask
},
enable_read_path_debugging: if cfg!(test) || cfg!(feature = "testing") {
Some(true)
} else {
None
},
}
}
}
@@ -547,7 +533,6 @@ pub mod tenant_conf_defaults {
// most of our pageservers. Compaction ~50 layers requires about 2GB memory (could be reduced later by optimizing L0 hole
// calculation to avoid loading all keys into the memory). So with this config, we can get a maximum peak compaction usage of 18GB.
pub const DEFAULT_COMPACTION_UPPER_LIMIT: usize = 50;
pub const DEFAULT_COMPACTION_L0_FIRST: bool = false;
pub const DEFAULT_COMPACTION_ALGORITHM: crate::models::CompactionAlgorithm =
crate::models::CompactionAlgorithm::Legacy;
@@ -562,10 +547,6 @@ pub mod tenant_conf_defaults {
// Relevant: https://github.com/neondatabase/neon/issues/3394
pub const DEFAULT_GC_PERIOD: &str = "1 hr";
pub const DEFAULT_IMAGE_CREATION_THRESHOLD: usize = 3;
// If there are more than threshold * compaction_threshold (that is 3 * 10 in the default config) L0 layers, image
// layer creation will end immediately. Set to 0 to disable. The target default will be 3 once we
// want to enable this feature.
pub const DEFAULT_IMAGE_CREATION_PREEMPT_THRESHOLD: usize = 0;
pub const DEFAULT_PITR_INTERVAL: &str = "7 days";
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "10 seconds";
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds";
@@ -597,7 +578,6 @@ impl Default for TenantConfigToml {
compaction_algorithm: crate::models::CompactionAlgorithmSettings {
kind: DEFAULT_COMPACTION_ALGORITHM,
},
compaction_l0_first: DEFAULT_COMPACTION_L0_FIRST,
l0_flush_delay_threshold: None,
l0_flush_stall_threshold: None,
l0_flush_wait_upload: DEFAULT_L0_FLUSH_WAIT_UPLOAD,
@@ -625,10 +605,9 @@ impl Default for TenantConfigToml {
lazy_slru_download: false,
timeline_get_throttle: crate::models::ThrottleConfig::disabled(),
image_layer_creation_check_threshold: DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD,
image_creation_preempt_threshold: DEFAULT_IMAGE_CREATION_PREEMPT_THRESHOLD,
lsn_lease_length: LsnLease::DEFAULT_LENGTH,
lsn_lease_length_for_ts: LsnLease::DEFAULT_LENGTH_FOR_TS,
timeline_offloading: true,
timeline_offloading: false,
wal_receiver_protocol_override: None,
rel_size_v2_enabled: None,
gc_compaction_enabled: DEFAULT_GC_COMPACTION_ENABLED,

View File

@@ -464,8 +464,6 @@ pub struct TenantConfigPatch {
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub compaction_algorithm: FieldPatch<CompactionAlgorithmSettings>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub compaction_l0_first: FieldPatch<bool>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub l0_flush_delay_threshold: FieldPatch<usize>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub l0_flush_stall_threshold: FieldPatch<usize>,
@@ -500,8 +498,6 @@ pub struct TenantConfigPatch {
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub image_layer_creation_check_threshold: FieldPatch<u8>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub image_creation_preempt_threshold: FieldPatch<usize>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub lsn_lease_length: FieldPatch<String>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub lsn_lease_length_for_ts: FieldPatch<String>,
@@ -531,7 +527,6 @@ pub struct TenantConfig {
pub compaction_upper_limit: Option<usize>,
// defer parsing compaction_algorithm, like eviction_policy
pub compaction_algorithm: Option<CompactionAlgorithmSettings>,
pub compaction_l0_first: Option<bool>,
pub l0_flush_delay_threshold: Option<usize>,
pub l0_flush_stall_threshold: Option<usize>,
pub l0_flush_wait_upload: Option<bool>,
@@ -549,7 +544,6 @@ pub struct TenantConfig {
pub lazy_slru_download: Option<bool>,
pub timeline_get_throttle: Option<ThrottleConfig>,
pub image_layer_creation_check_threshold: Option<u8>,
pub image_creation_preempt_threshold: Option<usize>,
pub lsn_lease_length: Option<String>,
pub lsn_lease_length_for_ts: Option<String>,
pub timeline_offloading: Option<bool>,
@@ -570,7 +564,6 @@ impl TenantConfig {
mut compaction_threshold,
mut compaction_upper_limit,
mut compaction_algorithm,
mut compaction_l0_first,
mut l0_flush_delay_threshold,
mut l0_flush_stall_threshold,
mut l0_flush_wait_upload,
@@ -588,7 +581,6 @@ impl TenantConfig {
mut lazy_slru_download,
mut timeline_get_throttle,
mut image_layer_creation_check_threshold,
mut image_creation_preempt_threshold,
mut lsn_lease_length,
mut lsn_lease_length_for_ts,
mut timeline_offloading,
@@ -610,7 +602,6 @@ impl TenantConfig {
.compaction_upper_limit
.apply(&mut compaction_upper_limit);
patch.compaction_algorithm.apply(&mut compaction_algorithm);
patch.compaction_l0_first.apply(&mut compaction_l0_first);
patch
.l0_flush_delay_threshold
.apply(&mut l0_flush_delay_threshold);
@@ -644,9 +635,6 @@ impl TenantConfig {
patch
.image_layer_creation_check_threshold
.apply(&mut image_layer_creation_check_threshold);
patch
.image_creation_preempt_threshold
.apply(&mut image_creation_preempt_threshold);
patch.lsn_lease_length.apply(&mut lsn_lease_length);
patch
.lsn_lease_length_for_ts
@@ -674,7 +662,6 @@ impl TenantConfig {
compaction_threshold,
compaction_upper_limit,
compaction_algorithm,
compaction_l0_first,
l0_flush_delay_threshold,
l0_flush_stall_threshold,
l0_flush_wait_upload,
@@ -692,7 +679,6 @@ impl TenantConfig {
lazy_slru_download,
timeline_get_throttle,
image_layer_creation_check_threshold,
image_creation_preempt_threshold,
lsn_lease_length,
lsn_lease_length_for_ts,
timeline_offloading,

View File

@@ -76,15 +76,7 @@ impl Conf {
let mut cmd = Command::new(path);
cmd.env_clear()
.env("LD_LIBRARY_PATH", self.pg_lib_dir()?)
.env("DYLD_LIBRARY_PATH", self.pg_lib_dir()?)
.env(
"ASAN_OPTIONS",
std::env::var("ASAN_OPTIONS").unwrap_or_default(),
)
.env(
"UBSAN_OPTIONS",
std::env::var("UBSAN_OPTIONS").unwrap_or_default(),
);
.env("DYLD_LIBRARY_PATH", self.pg_lib_dir()?);
Ok(cmd)
}

View File

@@ -64,14 +64,6 @@ pub async fn do_run_initdb(args: RunInitdbArgs<'_>) -> Result<(), Error> {
.env_clear()
.env("LD_LIBRARY_PATH", library_search_path)
.env("DYLD_LIBRARY_PATH", library_search_path)
.env(
"ASAN_OPTIONS",
std::env::var("ASAN_OPTIONS").unwrap_or_default(),
)
.env(
"UBSAN_OPTIONS",
std::env::var("UBSAN_OPTIONS").unwrap_or_default(),
)
.stdin(std::process::Stdio::null())
// stdout invocation produces the same output every time, we don't need it
.stdout(std::process::Stdio::null())

View File

@@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
use crate::{
DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT,
DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
};
/// External backup storage configuration, enough for creating a client for that storage.
@@ -45,11 +45,11 @@ impl RemoteStorageKind {
impl RemoteStorageConfig {
/// Helper to fetch the configured concurrency limit.
pub fn concurrency_limit(&self) -> usize {
pub fn concurrency_limit(&self) -> Option<usize> {
match &self.storage {
RemoteStorageKind::LocalFs { .. } => DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT,
RemoteStorageKind::AwsS3(c) => c.concurrency_limit.into(),
RemoteStorageKind::AzureContainer(c) => c.concurrency_limit.into(),
RemoteStorageKind::LocalFs { .. } => None,
RemoteStorageKind::AwsS3(c) => Some(c.concurrency_limit.into()),
RemoteStorageKind::AzureContainer(c) => Some(c.concurrency_limit.into()),
}
}
}

View File

@@ -65,12 +65,6 @@ pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
/// Here, a limit of max 20k concurrent connections was noted.
/// <https://learn.microsoft.com/en-us/answers/questions/1301863/is-there-any-limitation-to-concurrent-connections>
pub const DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT: usize = 100;
/// Set this limit analogously to the S3 limit.
///
/// The local filesystem backend doesn't enforce a concurrency limit itself, but this also bounds
/// the upload queue concurrency. Some tests create thousands of uploads, which slows down the
/// quadratic scheduling of the upload queue, and there is no point spawning so many Tokio tasks.
pub const DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT: usize = 100;
/// No limits on the client side, which currenltly means 1000 for AWS S3.
/// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>
pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;

View File

@@ -21,17 +21,23 @@ bytes.workspace = true
camino.workspace = true
chrono.workspace = true
diatomic-waker.workspace = true
flate2.workspace = true
git-version.workspace = true
hex = { workspace = true, features = ["serde"] }
humantime.workspace = true
hyper0 = { workspace = true, features = ["full"] }
inferno.workspace = true
itertools.workspace = true
fail.workspace = true
futures = { workspace = true }
jemalloc_pprof.workspace = true
jsonwebtoken.workspace = true
nix.workspace = true
once_cell.workspace = true
pin-project-lite.workspace = true
pprof.workspace = true
regex.workspace = true
routerify.workspace = true
serde.workspace = true
serde_with.workspace = true
serde_json.workspace = true
@@ -48,6 +54,8 @@ rand.workspace = true
scopeguard.workspace = true
strum.workspace = true
strum_macros.workspace = true
url.workspace = true
uuid.workspace = true
walkdir.workspace = true
pq_proto.workspace = true
@@ -56,6 +64,12 @@ metrics.workspace = true
const_format.workspace = true
# to use tokio channels as streams, this is faster to compile than async_stream
# why is it only here? no other crate should use it, streams are rarely needed.
tokio-stream = { version = "0.1.14" }
serde_path_to_error.workspace = true
[dev-dependencies]
byteorder.workspace = true
bytes.workspace = true

View File

@@ -39,7 +39,7 @@ function initdb_with_args {
;;
esac
eval env -i LD_LIBRARY_PATH="$PG_BIN"/../lib ASAN_OPTIONS="${ASAN_OPTIONS-}" UBSAN_OPTIONS="${UBSAN_OPTIONS-}" "${cmd[*]}"
eval env -i LD_LIBRARY_PATH="$PG_BIN"/../lib "${cmd[*]}"
}
rm -fr "$DATA_DIR"

View File

@@ -10,7 +10,7 @@ use jsonwebtoken::{
};
use serde::{Deserialize, Serialize};
use crate::id::TenantId;
use crate::{http::error::ApiError, id::TenantId};
/// Algorithm to use. We require EdDSA.
const STORAGE_TOKEN_ALGORITHM: Algorithm = Algorithm::EdDSA;
@@ -90,6 +90,15 @@ impl Display for AuthError {
}
}
impl From<AuthError> for ApiError {
fn from(_value: AuthError) -> Self {
// Don't pass on the value of the AuthError as a precautionary measure.
// Being intentionally vague in public error communication hurts debugability
// but it is more secure.
ApiError::Forbidden("JWT authentication error".to_string())
}
}
pub struct JwtAuth {
decoding_keys: Vec<DecodingKey>,
validation: Validation,

View File

@@ -1,5 +1,4 @@
use std::fmt::{Debug, Display};
use std::time::Duration;
use futures::Future;
use tokio_util::sync::CancellationToken;
@@ -30,11 +29,6 @@ pub async fn exponential_backoff(
}
}
pub fn exponential_backoff_duration(n: u32, base_increment: f64, max_seconds: f64) -> Duration {
let seconds = exponential_backoff_duration_seconds(n, base_increment, max_seconds);
Duration::from_secs_f64(seconds)
}
pub fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_seconds: f64) -> f64 {
if n == 0 {
0.0

View File

@@ -1,6 +1,13 @@
//! Failpoint support code shared between pageserver and safekeepers.
use crate::http::{
error::ApiError,
json::{json_request, json_response},
};
use hyper::{Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use tracing::*;
/// Declare a failpoint that can use to `pause` failpoint action.
/// We don't want to block the executor thread, hence, spawn_blocking + await.
@@ -177,3 +184,45 @@ fn exit_failpoint() {
tracing::info!("Exit requested by failpoint");
std::process::exit(1);
}
pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
/// Information for configuring a single fail point
#[derive(Debug, Serialize, Deserialize)]
pub struct FailpointConfig {
/// Name of the fail point
pub name: String,
/// List of actions to take, using the format described in `fail::cfg`
///
/// We also support `actions = "exit"` to cause the fail point to immediately exit.
pub actions: String,
}
/// Configure failpoints through http.
pub async fn failpoints_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
if !fail::has_failpoints() {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"Cannot manage failpoints because neon was compiled without failpoints support"
)));
}
let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?;
for fp in failpoints {
info!("cfg failpoint: {} {}", fp.name, fp.actions);
// We recognize one extra "action" that's not natively recognized
// by the failpoints crate: exit, to immediately kill the process
let cfg_result = apply_failpoint(&fp.name, &fp.actions);
if let Err(err_msg) = cfg_result {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"Failed to configure failpoints: {err_msg}"
)));
}
}
json_response(StatusCode::OK, ())
}

View File

@@ -1,6 +1,7 @@
use crate::error::{api_error_handler, route_error_handler, ApiError};
use crate::auth::{AuthError, Claims, SwappableJwtAuth};
use crate::http::error::{api_error_handler, route_error_handler, ApiError};
use crate::http::request::{get_query_param, parse_query_param};
use crate::pprof;
use crate::request::{get_query_param, parse_query_param};
use ::pprof::protos::Message as _;
use ::pprof::ProfilerGuardBuilder;
use anyhow::{anyhow, Context};
@@ -18,7 +19,6 @@ use tokio::sync::{mpsc, Mutex, Notify};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::io::ReaderStream;
use tracing::{debug, info, info_span, warn, Instrument};
use utils::auth::{AuthError, Claims, SwappableJwtAuth};
use std::future::Future;
use std::io::Write as _;
@@ -718,9 +718,9 @@ pub fn check_permission_with(
#[cfg(test)]
mod tests {
use super::*;
use futures::future::poll_fn;
use hyper::service::Service;
use routerify::RequestServiceBuilder;
use std::future::poll_fn;
use std::net::{IpAddr, SocketAddr};
#[tokio::test]

View File

@@ -5,8 +5,6 @@ use std::error::Error as StdError;
use thiserror::Error;
use tracing::{error, info, warn};
use utils::auth::AuthError;
#[derive(Debug, Error)]
pub enum ApiError {
#[error("Bad request: {0:#?}")]
@@ -98,15 +96,6 @@ impl ApiError {
}
}
impl From<AuthError> for ApiError {
fn from(_value: AuthError) -> Self {
// Don't pass on the value of the AuthError as a precautionary measure.
// Being intentionally vague in public error communication hurts debugability
// but it is more secure.
ApiError::Forbidden("JWT authentication error".to_string())
}
}
#[derive(Serialize, Deserialize)]
pub struct HttpErrorBody {
pub msg: String,

View File

@@ -1,12 +1,8 @@
pub mod endpoint;
pub mod error;
pub mod failpoints;
pub mod json;
pub mod pprof;
pub mod request;
extern crate hyper0 as hyper;
/// Current fast way to apply simple http routing in various Neon binaries.
/// Re-exported for sake of uniform approach, that could be later replaced with better alternatives, if needed.
pub use routerify::{ext::RequestExt, RouterBuilder, RouterService};

View File

@@ -2,6 +2,8 @@
//! between other crates in this repository.
#![deny(clippy::undocumented_unsafe_blocks)]
extern crate hyper0 as hyper;
pub mod backoff;
/// `Lsn` type implements common tasks on Log Sequence Numbers
@@ -31,6 +33,9 @@ pub mod shard;
mod hex;
pub use hex::Hex;
// http endpoint utils
pub mod http;
// definition of the Generation type for pageserver attachment APIs
pub mod generation;
@@ -91,6 +96,8 @@ pub mod circuit_breaker;
pub mod try_rcu;
pub mod pprof;
pub mod guard_arc_swap;
// Re-export used in macro. Avoids adding git-version as dep in target crates.

View File

@@ -5,27 +5,6 @@ use metrics::{IntCounter, IntCounterVec};
use once_cell::sync::Lazy;
use strum_macros::{EnumString, VariantNames};
/// Logs a critical error, similarly to `tracing::error!`. This will:
///
/// * Emit an ERROR log message with prefix "CRITICAL:" and a backtrace.
/// * Trigger a pageable alert (via the metric below).
/// * Increment libmetrics_tracing_event_count{level="critical"}, and indirectly level="error".
/// * In debug builds, panic the process.
///
/// When including errors in the message, please use {err:?} to include the error cause and original
/// backtrace.
#[macro_export]
macro_rules! critical {
($($arg:tt)*) => {{
if cfg!(debug_assertions) {
panic!($($arg)*);
}
$crate::logging::TRACING_EVENT_COUNT_METRIC.inc_critical();
let backtrace = std::backtrace::Backtrace::capture();
tracing::error!("CRITICAL: {}\n{backtrace}", format!($($arg)*));
}};
}
#[derive(EnumString, strum_macros::Display, VariantNames, Eq, PartialEq, Debug, Clone, Copy)]
#[strum(serialize_all = "snake_case")]
pub enum LogFormat {
@@ -46,10 +25,7 @@ impl LogFormat {
}
}
pub struct TracingEventCountMetric {
/// CRITICAL is not a `tracing` log level. Instead, we increment it in the `critical!` macro,
/// and also emit it as a regular error. These are thus double-counted, but that seems fine.
critical: IntCounter,
struct TracingEventCountMetric {
error: IntCounter,
warn: IntCounter,
info: IntCounter,
@@ -57,7 +33,7 @@ pub struct TracingEventCountMetric {
trace: IntCounter,
}
pub static TRACING_EVENT_COUNT_METRIC: Lazy<TracingEventCountMetric> = Lazy::new(|| {
static TRACING_EVENT_COUNT_METRIC: Lazy<TracingEventCountMetric> = Lazy::new(|| {
let vec = metrics::register_int_counter_vec!(
"libmetrics_tracing_event_count",
"Number of tracing events, by level",
@@ -70,7 +46,6 @@ pub static TRACING_EVENT_COUNT_METRIC: Lazy<TracingEventCountMetric> = Lazy::new
impl TracingEventCountMetric {
fn new(vec: IntCounterVec) -> Self {
Self {
critical: vec.with_label_values(&["critical"]),
error: vec.with_label_values(&["error"]),
warn: vec.with_label_values(&["warn"]),
info: vec.with_label_values(&["info"]),
@@ -79,11 +54,6 @@ impl TracingEventCountMetric {
}
}
// Allow public access from `critical!` macro.
pub fn inc_critical(&self) {
self.critical.inc();
}
fn inc_for_level(&self, level: tracing::Level) {
let counter = match level {
tracing::Level::ERROR => &self.error,

View File

@@ -177,8 +177,8 @@ impl FileCacheState {
crate::spawn_with_cancel(
token,
|res| {
if let Err(e) = res {
error!(error = format_args!("{e:#}"), "postgres error");
if let Err(error) = res {
error!(%error, "postgres error")
}
},
conn,
@@ -205,7 +205,7 @@ impl FileCacheState {
{
Ok(rows) => Ok(rows),
Err(e) => {
error!(error = format_args!("{e:#}"), "postgres error -> retrying");
error!(error = ?e, "postgres error: {e} -> retrying");
let client = FileCacheState::connect(&self.conn_str, self.token.clone())
.await

View File

@@ -191,12 +191,15 @@ async fn start_monitor(
.await;
let mut monitor = match monitor {
Ok(Ok(monitor)) => monitor,
Ok(Err(e)) => {
error!(error = format_args!("{e:#}"), "failed to create monitor");
Ok(Err(error)) => {
error!(?error, "failed to create monitor");
return;
}
Err(_) => {
error!(?timeout, "creating monitor timed out");
error!(
?timeout,
"creating monitor timed out (probably waiting to receive protocol range)"
);
return;
}
};
@@ -204,9 +207,6 @@ async fn start_monitor(
match monitor.run().await {
Ok(()) => info!("monitor was killed due to new connection"),
Err(e) => error!(
error = format_args!("{e:#}"),
"monitor terminated unexpectedly"
),
Err(e) => error!(error = ?e, "monitor terminated unexpectedly"),
}
}

View File

@@ -370,16 +370,12 @@ impl Runner {
}),
InboundMsgKind::InvalidMessage { error } => {
warn!(
error = format_args!("{error:#}"),
id, "received notification of an invalid message we sent"
%error, id, "received notification of an invalid message we sent"
);
Ok(None)
}
InboundMsgKind::InternalError { error } => {
warn!(
error = format_args!("{error:#}"),
id, "agent experienced an internal error"
);
warn!(error, id, "agent experienced an internal error");
Ok(None)
}
InboundMsgKind::HealthCheck {} => {
@@ -480,7 +476,7 @@ impl Runner {
// gives the outermost cause, and the debug impl
// pretty-prints the error, whereas {:#} contains all the
// causes, but is compact (no newlines).
warn!(error = format_args!("{e:#}"), "error handling message");
warn!(error = format!("{e:#}"), "error handling message");
OutboundMsg::new(
OutboundMsgKind::InternalError {
error: e.to_string(),
@@ -496,7 +492,7 @@ impl Runner {
.context("failed to send message")?;
}
Err(e) => warn!(
error = format_args!("{e:#}"),
error = format!("{e}"),
msg = ?msg,
"received error message"
),

View File

@@ -79,7 +79,6 @@ pq_proto.workspace = true
remote_storage.workspace = true
storage_broker.workspace = true
tenant_size_model.workspace = true
http-utils.workspace = true
utils.workspace = true
workspace_hack.workspace = true
reqwest.workspace = true

View File

@@ -11,7 +11,6 @@ testing = [ "pageserver_api/testing" ]
pageserver_api.workspace = true
thiserror.workspace = true
reqwest = { workspace = true, features = [ "stream" ] }
http-utils.workspace = true
utils.workspace = true
serde.workspace = true
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -1,12 +1,11 @@
use std::{collections::HashMap, error::Error as _};
use bytes::Bytes;
use reqwest::{IntoUrl, Method, StatusCode};
use detach_ancestor::AncestorDetached;
use http_utils::error::HttpErrorBody;
use pageserver_api::{models::*, shard::TenantShardId};
use reqwest::{IntoUrl, Method, StatusCode};
use utils::{
http::error::HttpErrorBody,
id::{TenantId, TimelineId},
lsn::Lsn,
};

View File

@@ -592,7 +592,7 @@ fn start_pageserver(
let router = http::make_router(router_state, launch_ts, http_auth.clone())?
.build()
.map_err(|err| anyhow!(err))?;
let service = http_utils::RouterService::new(router).unwrap();
let service = utils::http::RouterService::new(router).unwrap();
let server = hyper0::Server::from_tcp(http_listener)?
.serve(service)
.with_graceful_shutdown({

View File

@@ -140,10 +140,6 @@ pub struct PageServerConf {
/// not terrible.
pub background_task_maximum_delay: Duration,
/// If true, use a separate semaphore for compaction tasks instead of the common background task
/// semaphore. Defaults to false.
pub use_compaction_semaphore: bool,
pub control_plane_api: Option<Url>,
/// JWT token for use with the control plane API.
@@ -197,10 +193,6 @@ pub struct PageServerConf {
pub page_service_pipelining: pageserver_api::config::PageServicePipeliningConfig,
pub get_vectored_concurrent_io: pageserver_api::config::GetVectoredConcurrentIo,
/// Enable read path debugging. If enabled, read key errors will print a backtrace of the layer
/// files read.
pub enable_read_path_debugging: bool,
}
/// Token for authentication to safekeepers
@@ -340,7 +332,6 @@ impl PageServerConf {
test_remote_failures,
ondemand_download_behavior_treat_error_as_warn,
background_task_maximum_delay,
use_compaction_semaphore,
control_plane_api,
control_plane_api_token,
control_plane_emergency_mode,
@@ -364,7 +355,6 @@ impl PageServerConf {
wal_receiver_protocol,
page_service_pipelining,
get_vectored_concurrent_io,
enable_read_path_debugging,
} = config_toml;
let mut conf = PageServerConf {
@@ -395,7 +385,6 @@ impl PageServerConf {
test_remote_failures,
ondemand_download_behavior_treat_error_as_warn,
background_task_maximum_delay,
use_compaction_semaphore,
control_plane_api,
control_plane_emergency_mode,
heatmap_upload_concurrency,
@@ -451,7 +440,6 @@ impl PageServerConf {
.unwrap_or_default(),
virtual_file_io_mode: virtual_file_io_mode.unwrap_or(virtual_file::IoMode::preferred()),
no_sync: no_sync.unwrap_or(false),
enable_read_path_debugging: enable_read_path_debugging.unwrap_or(false),
};
// ------------------------------------------------------------

View File

@@ -8,6 +8,7 @@ use std::time::Duration;
use crate::controller_upcall_client::ControlPlaneGenerationsApi;
use crate::metrics;
use crate::tenant::remote_timeline_client::remote_layer_path;
use crate::tenant::remote_timeline_client::remote_timeline_path;
use crate::tenant::remote_timeline_client::LayerFileMetadata;
use crate::virtual_file::MaybeFatalIo;
@@ -462,18 +463,45 @@ impl DeletionQueueClient {
///
/// The `current_generation` is the generation of this pageserver's current attachment. The
/// generations in `layers` are the generations in which those layers were written.
pub(crate) fn push_layers(
pub(crate) async fn push_layers(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
current_generation: Generation,
layers: Vec<(LayerName, LayerFileMetadata)>,
) -> Result<(), DeletionQueueError> {
// None generations are not valid for attached tenants: they must always be attached in
// a known generation. None generations are still permitted for layers in the index because
// they may be historical.
assert!(!current_generation.is_none());
if current_generation.is_none() {
debug!("Enqueuing deletions in legacy mode, skipping queue");
let mut layer_paths = Vec::new();
for (layer, meta) in layers {
layer_paths.push(remote_layer_path(
&tenant_shard_id.tenant_id,
&timeline_id,
meta.shard,
&layer,
meta.generation,
));
}
self.push_immediate(layer_paths).await?;
return self.flush_immediate().await;
}
self.push_layers_sync(tenant_shard_id, timeline_id, current_generation, layers)
}
/// When a Tenant has a generation, push_layers is always synchronous because
/// the ListValidator channel is an unbounded channel.
///
/// This can be merged into push_layers when we remove the Generation-less mode
/// support (`<https://github.com/neondatabase/neon/issues/5395>`)
pub(crate) fn push_layers_sync(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
current_generation: Generation,
layers: Vec<(LayerName, LayerFileMetadata)>,
) -> Result<(), DeletionQueueError> {
metrics::DELETION_QUEUE
.keys_submitted
.inc_by(layers.len() as u64);
@@ -929,12 +957,14 @@ mod test {
// File should still be there after we push it to the queue (we haven't pushed enough to flush anything)
info!("Pushing");
client.push_layers(
tenant_shard_id,
TIMELINE_ID,
now_generation,
[(layer_file_name_1.clone(), layer_metadata)].to_vec(),
)?;
client
.push_layers(
tenant_shard_id,
TIMELINE_ID,
now_generation,
[(layer_file_name_1.clone(), layer_metadata)].to_vec(),
)
.await?;
assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
assert_local_files(&[], &deletion_prefix);
@@ -987,12 +1017,14 @@ mod test {
assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
tracing::debug!("Pushing...");
client.push_layers(
tenant_shard_id,
TIMELINE_ID,
stale_generation,
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
)?;
client
.push_layers(
tenant_shard_id,
TIMELINE_ID,
stale_generation,
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
)
.await?;
// We enqueued the operation in a stale generation: it should have failed validation
tracing::debug!("Flushing...");
@@ -1000,12 +1032,14 @@ mod test {
assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
tracing::debug!("Pushing...");
client.push_layers(
tenant_shard_id,
TIMELINE_ID,
latest_generation,
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
)?;
client
.push_layers(
tenant_shard_id,
TIMELINE_ID,
latest_generation,
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
)
.await?;
// We enqueued the operation in a fresh generation: it should have passed validation
tracing::debug!("Flushing...");
@@ -1040,24 +1074,28 @@ mod test {
// generation gets that treatment)
let remote_layer_file_name_historical =
ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
client.push_layers(
tenant_shard_id,
TIMELINE_ID,
now_generation.previous(),
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
)?;
client
.push_layers(
tenant_shard_id,
TIMELINE_ID,
now_generation.previous(),
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
)
.await?;
// Inject a deletion in the generation before generation_now: after restart,
// this deletion should get executed, because we execute deletions in the
// immediately previous generation on the same node.
let remote_layer_file_name_previous =
ctx.write_remote_layer(EXAMPLE_LAYER_NAME_ALT, layer_generation)?;
client.push_layers(
tenant_shard_id,
TIMELINE_ID,
now_generation,
[(EXAMPLE_LAYER_NAME_ALT.clone(), layer_metadata.clone())].to_vec(),
)?;
client
.push_layers(
tenant_shard_id,
TIMELINE_ID,
now_generation,
[(EXAMPLE_LAYER_NAME_ALT.clone(), layer_metadata.clone())].to_vec(),
)
.await?;
client.flush().await?;
assert_remote_files(
@@ -1101,7 +1139,6 @@ pub(crate) mod mock {
use tracing::info;
use super::*;
use crate::tenant::remote_timeline_client::remote_layer_path;
use std::sync::atomic::{AtomicUsize, Ordering};
pub struct ConsumerState {

View File

@@ -61,7 +61,6 @@ use crate::{
remote_timeline_client::LayerFileMetadata,
secondary::SecondaryTenant,
storage_layer::{AsLayerDesc, EvictionError, Layer, LayerName, LayerVisibilityHint},
tasks::sleep_random,
},
CancellableTask, DiskUsageEvictionTask,
};
@@ -211,8 +210,14 @@ async fn disk_usage_eviction_task(
info!("disk usage based eviction task finishing");
};
if sleep_random(task_config.period, &cancel).await.is_err() {
return;
use crate::tenant::tasks::random_init_delay;
{
if random_init_delay(task_config.period, &cancel)
.await
.is_err()
{
return;
}
}
let mut iteration_no = 0;

View File

@@ -13,12 +13,6 @@ use enumset::EnumSet;
use futures::future::join_all;
use futures::StreamExt;
use futures::TryFutureExt;
use http_utils::endpoint::{
profile_cpu_handler, profile_heap_handler, prometheus_metrics_handler, request_span,
};
use http_utils::failpoints::failpoints_handler;
use http_utils::request::must_parse_query_param;
use http_utils::request::{get_request_param, must_get_query_param, parse_query_param};
use humantime::format_rfc3339;
use hyper::header;
use hyper::StatusCode;
@@ -66,6 +60,13 @@ use tokio::time::Instant;
use tokio_util::io::StreamReader;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::auth::JwtAuth;
use utils::failpoint_support::failpoints_handler;
use utils::http::endpoint::{
profile_cpu_handler, profile_heap_handler, prometheus_metrics_handler, request_span,
};
use utils::http::request::must_parse_query_param;
use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
@@ -103,13 +104,6 @@ use crate::tenant::OffloadedTimeline;
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
use crate::DEFAULT_PG_VERSION;
use crate::{disk_usage_eviction_task, tenant};
use http_utils::{
endpoint::{self, attach_openapi_ui, auth_middleware, check_permission_with},
error::{ApiError, HttpErrorBody},
json::{json_request, json_request_maybe, json_response},
request::parse_request_param,
RequestExt, RouterBuilder,
};
use pageserver_api::models::{
StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest, TimelineGcRequest,
TimelineInfo,
@@ -117,6 +111,13 @@ use pageserver_api::models::{
use utils::{
auth::SwappableJwtAuth,
generation::Generation,
http::{
endpoint::{self, attach_openapi_ui, auth_middleware, check_permission_with},
error::{ApiError, HttpErrorBody},
json::{json_request, json_request_maybe, json_response},
request::parse_request_param,
RequestExt, RouterBuilder,
},
id::{TenantId, TimelineId},
lsn::Lsn,
};
@@ -560,7 +561,7 @@ async fn reload_auth_validation_keys_handler(
let key_path = config.auth_validation_public_key_path.as_ref().unwrap();
info!("Reloading public key(s) for verifying JWT tokens from {key_path:?}");
match utils::auth::JwtAuth::from_key_path(key_path) {
match JwtAuth::from_key_path(key_path) {
Ok(new_auth) => {
shared_auth.swap(new_auth);
json_response(StatusCode::OK, ())
@@ -3392,17 +3393,7 @@ where
let status = response.status();
info!(%status, "Cancelled request finished successfully")
}
Err(e) => match e {
ApiError::ShuttingDown | ApiError::ResourceUnavailable(_) => {
// Don't log this at error severity: they are normal during lifecycle of tenants/process
info!("Cancelled request aborted for shutdown")
}
_ => {
// Log these in a highly visible way, because we have no client to send the response to, but
// would like to know that something went wrong.
error!("Cancelled request finished with an error: {e:?}")
}
},
Err(e) => error!("Cancelled request finished with an error: {e:?}"),
}
}
// only logging for cancelled panicked request handlers is the tracing_panic_hook,

View File

@@ -263,6 +263,14 @@ pub(crate) const TENANT_HEATMAP_BASENAME: &str = "heatmap-v1.json";
/// data directory at pageserver startup can be automatically removed.
pub(crate) const TEMP_FILE_SUFFIX: &str = "___temp";
/// A marker file to mark that a timeline directory was not fully initialized.
/// If a timeline directory with this marker is encountered at pageserver startup,
/// the timeline directory and the marker file are both removed.
/// Full path: `tenants/<tenant_id>/timelines/<timeline_id>___uninit`.
pub(crate) const TIMELINE_UNINIT_MARK_SUFFIX: &str = "___uninit";
pub(crate) const TIMELINE_DELETE_MARK_SUFFIX: &str = "___delete";
pub fn is_temporary(path: &Utf8Path) -> bool {
match path.file_name() {
Some(name) => name.ends_with(TEMP_FILE_SUFFIX),
@@ -270,6 +278,25 @@ pub fn is_temporary(path: &Utf8Path) -> bool {
}
}
fn ends_with_suffix(path: &Utf8Path, suffix: &str) -> bool {
match path.file_name() {
Some(name) => name.ends_with(suffix),
None => false,
}
}
// FIXME: DO NOT ADD new query methods like this, which will have a next step of parsing timelineid
// from the directory name. Instead create type "UninitMark(TimelineId)" and only parse it once
// from the name.
pub(crate) fn is_uninit_mark(path: &Utf8Path) -> bool {
ends_with_suffix(path, TIMELINE_UNINIT_MARK_SUFFIX)
}
pub(crate) fn is_delete_mark(path: &Utf8Path) -> bool {
ends_with_suffix(path, TIMELINE_DELETE_MARK_SUFFIX)
}
/// During pageserver startup, we need to order operations not to exhaust tokio worker threads by
/// blocking.
///

View File

@@ -6,7 +6,7 @@ use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use enum_map::{Enum as _, EnumMap};
use enum_map::EnumMap;
use futures::Future;
use metrics::{
register_counter_vec, register_gauge_vec, register_histogram, register_histogram_vec,
@@ -32,7 +32,6 @@ use utils::id::TimelineId;
use crate::config::PageServerConf;
use crate::context::{PageContentKind, RequestContext};
use crate::pgdatadir_mapping::DatadirModificationStats;
use crate::task_mgr::TaskKind;
use crate::tenant::layer_map::LayerMap;
use crate::tenant::mgr::TenantSlot;
@@ -104,7 +103,7 @@ pub(crate) static STORAGE_TIME_COUNT_PER_TIMELINE: Lazy<IntCounterVec> = Lazy::n
.expect("failed to define a metric")
});
// Buckets for background operation duration in seconds, like compaction, GC, size calculation.
// Buckets for background operations like compaction, GC, size calculation
const STORAGE_OP_BUCKETS: &[f64] = &[0.010, 0.100, 1.0, 10.0, 100.0, 1000.0];
pub(crate) static STORAGE_TIME_GLOBAL: Lazy<HistogramVec> = Lazy::new(|| {
@@ -236,7 +235,7 @@ pub(crate) static GET_VECTORED_LATENCY: Lazy<GetVectoredLatency> = Lazy::new(||
GetVectoredLatency {
map: EnumMap::from_array(std::array::from_fn(|task_kind_idx| {
let task_kind = TaskKind::from_usize(task_kind_idx);
let task_kind = <TaskKind as enum_map::Enum>::from_usize(task_kind_idx);
if GetVectoredLatency::TRACKED_TASK_KINDS.contains(&task_kind) {
let task_kind = task_kind.into();
@@ -259,7 +258,7 @@ pub(crate) static SCAN_LATENCY: Lazy<ScanLatency> = Lazy::new(|| {
ScanLatency {
map: EnumMap::from_array(std::array::from_fn(|task_kind_idx| {
let task_kind = TaskKind::from_usize(task_kind_idx);
let task_kind = <TaskKind as enum_map::Enum>::from_usize(task_kind_idx);
if ScanLatency::TRACKED_TASK_KINDS.contains(&task_kind) {
let task_kind = task_kind.into();
@@ -300,10 +299,10 @@ static PAGE_CACHE_READ_ACCESSES: Lazy<IntCounterVec> = Lazy::new(|| {
pub(crate) static PAGE_CACHE: Lazy<PageCacheMetrics> = Lazy::new(|| PageCacheMetrics {
map: EnumMap::from_array(std::array::from_fn(|task_kind| {
let task_kind = TaskKind::from_usize(task_kind);
let task_kind = <TaskKind as enum_map::Enum>::from_usize(task_kind);
let task_kind: &'static str = task_kind.into();
EnumMap::from_array(std::array::from_fn(|content_kind| {
let content_kind = PageContentKind::from_usize(content_kind);
let content_kind = <PageContentKind as enum_map::Enum>::from_usize(content_kind);
let content_kind: &'static str = content_kind.into();
PageCacheMetricsForTaskKind {
read_accesses_immutable: {
@@ -1366,7 +1365,10 @@ impl SmgrOpTimer {
/// The first callers receives Some, subsequent ones None.
///
/// See [`SmgrOpTimerState`] for more context.
pub(crate) fn observe_execution_end(&mut self, at: Instant) -> Option<SmgrOpFlushInProgress> {
pub(crate) fn observe_execution_end_flush_start(
&mut self,
at: Instant,
) -> Option<SmgrOpFlushInProgress> {
// NB: unlike the other observe_* methods, this one take()s.
#[allow(clippy::question_mark)] // maintain similar code pattern.
let Some(mut inner) = self.0.take() else {
@@ -1400,6 +1402,7 @@ impl SmgrOpTimer {
..
} = inner;
Some(SmgrOpFlushInProgress {
flush_started_at: at,
global_micros: global_flush_in_progress_micros,
per_timeline_micros: per_timeline_flush_in_progress_micros,
})
@@ -1415,6 +1418,7 @@ impl SmgrOpTimer {
/// add another `observe_*` method to [`SmgrOpTimer`], follow the existing pattern there,
/// and remove this struct from the code base.
pub(crate) struct SmgrOpFlushInProgress {
flush_started_at: Instant,
global_micros: IntCounter,
per_timeline_micros: IntCounter,
}
@@ -1433,13 +1437,12 @@ impl Drop for SmgrOpTimer {
self.observe_throttle_start(now);
self.observe_throttle_done(ThrottleResult::NotThrottled { end: now });
self.observe_execution_start(now);
let maybe_flush_timer = self.observe_execution_end(now);
drop(maybe_flush_timer);
self.observe_execution_end_flush_start(now);
}
}
impl SmgrOpFlushInProgress {
pub(crate) async fn measure<Fut, O>(self, mut started_at: Instant, mut fut: Fut) -> O
pub(crate) async fn measure<Fut, O>(mut self, mut fut: Fut) -> O
where
Fut: std::future::Future<Output = O>,
{
@@ -1451,12 +1454,12 @@ impl SmgrOpFlushInProgress {
let mut observe_guard = scopeguard::guard(
|| {
let now = Instant::now();
let elapsed = now - started_at;
let elapsed = now - self.flush_started_at;
self.global_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
self.per_timeline_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
started_at = now;
self.flush_started_at = now;
},
|mut observe| {
observe();
@@ -1909,7 +1912,7 @@ pub(crate) static COMPUTE_COMMANDS_COUNTERS: Lazy<ComputeCommandCounters> = Lazy
ComputeCommandCounters {
map: EnumMap::from_array(std::array::from_fn(|i| {
let command = ComputeCommandKind::from_usize(i);
let command = <ComputeCommandKind as enum_map::Enum>::from_usize(i);
let command_str: &'static str = command.into();
inner.with_label_values(&[command_str])
})),
@@ -2209,13 +2212,11 @@ pub(crate) static TENANT_TASK_EVENTS: Lazy<IntCounterVec> = Lazy::new(|| {
pub struct BackgroundLoopSemaphoreMetrics {
counters: EnumMap<BackgroundLoopKind, IntCounterPair>,
durations: EnumMap<BackgroundLoopKind, Histogram>,
waiting_tasks: EnumMap<BackgroundLoopKind, IntGauge>,
running_tasks: EnumMap<BackgroundLoopKind, IntGauge>,
durations: EnumMap<BackgroundLoopKind, Counter>,
}
pub(crate) static BACKGROUND_LOOP_SEMAPHORE: Lazy<BackgroundLoopSemaphoreMetrics> =
Lazy::new(|| {
pub(crate) static BACKGROUND_LOOP_SEMAPHORE: Lazy<BackgroundLoopSemaphoreMetrics> = Lazy::new(
|| {
let counters = register_int_counter_pair_vec!(
"pageserver_background_loop_semaphore_wait_start_count",
"Counter for background loop concurrency-limiting semaphore acquire calls started",
@@ -2225,101 +2226,45 @@ pub(crate) static BACKGROUND_LOOP_SEMAPHORE: Lazy<BackgroundLoopSemaphoreMetrics
)
.unwrap();
let durations = register_histogram_vec!(
"pageserver_background_loop_semaphore_wait_seconds",
"Seconds spent waiting on background loop semaphore acquisition",
&["task"],
vec![0.01, 1.0, 5.0, 10.0, 30.0, 60.0, 180.0, 300.0, 600.0],
)
.unwrap();
let waiting_tasks = register_int_gauge_vec!(
"pageserver_background_loop_semaphore_waiting_tasks",
"Number of background loop tasks waiting for semaphore",
&["task"],
)
.unwrap();
let running_tasks = register_int_gauge_vec!(
"pageserver_background_loop_semaphore_running_tasks",
"Number of background loop tasks running concurrently",
let durations = register_counter_vec!(
"pageserver_background_loop_semaphore_wait_duration_seconds",
"Sum of wall clock time spent waiting on the background loop concurrency-limiting semaphore acquire calls",
&["task"],
)
.unwrap();
BackgroundLoopSemaphoreMetrics {
counters: EnumMap::from_array(std::array::from_fn(|i| {
let kind = BackgroundLoopKind::from_usize(i);
counters: enum_map::EnumMap::from_array(std::array::from_fn(|i| {
let kind = <BackgroundLoopKind as enum_map::Enum>::from_usize(i);
counters.with_label_values(&[kind.into()])
})),
durations: EnumMap::from_array(std::array::from_fn(|i| {
let kind = BackgroundLoopKind::from_usize(i);
durations: enum_map::EnumMap::from_array(std::array::from_fn(|i| {
let kind = <BackgroundLoopKind as enum_map::Enum>::from_usize(i);
durations.with_label_values(&[kind.into()])
})),
waiting_tasks: EnumMap::from_array(std::array::from_fn(|i| {
let kind = BackgroundLoopKind::from_usize(i);
waiting_tasks.with_label_values(&[kind.into()])
})),
running_tasks: EnumMap::from_array(std::array::from_fn(|i| {
let kind = BackgroundLoopKind::from_usize(i);
running_tasks.with_label_values(&[kind.into()])
})),
}
});
},
);
impl BackgroundLoopSemaphoreMetrics {
/// Starts recording semaphore metrics. Call `acquired()` on the returned recorder when the
/// semaphore is acquired, and drop it when the task completes or is cancelled.
pub(crate) fn record(
&self,
task: BackgroundLoopKind,
) -> BackgroundLoopSemaphoreMetricsRecorder {
BackgroundLoopSemaphoreMetricsRecorder::start(self, task)
}
}
/// Records metrics for a background task.
pub struct BackgroundLoopSemaphoreMetricsRecorder<'a> {
metrics: &'a BackgroundLoopSemaphoreMetrics,
task: BackgroundLoopKind,
start: Instant,
wait_counter_guard: Option<metrics::IntCounterPairGuard>,
}
impl<'a> BackgroundLoopSemaphoreMetricsRecorder<'a> {
/// Starts recording semaphore metrics, by recording wait time and incrementing
/// `wait_start_count` and `waiting_tasks`.
fn start(metrics: &'a BackgroundLoopSemaphoreMetrics, task: BackgroundLoopKind) -> Self {
metrics.waiting_tasks[task].inc();
Self {
metrics,
task,
start: Instant::now(),
wait_counter_guard: Some(metrics.counters[task].guard()),
pub(crate) fn measure_acquisition(&self, task: BackgroundLoopKind) -> impl Drop + '_ {
struct Record<'a> {
metrics: &'a BackgroundLoopSemaphoreMetrics,
task: BackgroundLoopKind,
_counter_guard: metrics::IntCounterPairGuard,
start: Instant,
}
}
/// Signals that the semaphore has been acquired, and updates relevant metrics.
pub fn acquired(&mut self) -> Duration {
let waited = self.start.elapsed();
self.wait_counter_guard.take().expect("already acquired");
self.metrics.durations[self.task].observe(waited.as_secs_f64());
self.metrics.waiting_tasks[self.task].dec();
self.metrics.running_tasks[self.task].inc();
waited
}
}
impl Drop for BackgroundLoopSemaphoreMetricsRecorder<'_> {
/// The task either completed or was cancelled.
fn drop(&mut self) {
if self.wait_counter_guard.take().is_some() {
// Waiting.
self.metrics.durations[self.task].observe(self.start.elapsed().as_secs_f64());
self.metrics.waiting_tasks[self.task].dec();
} else {
// Running.
self.metrics.running_tasks[self.task].dec();
impl Drop for Record<'_> {
fn drop(&mut self) {
let elapsed = self.start.elapsed().as_secs_f64();
self.metrics.durations[self.task].inc_by(elapsed);
}
}
Record {
metrics: self,
task,
_counter_guard: self.counters[task].guard(),
start: Instant::now(),
}
}
}
@@ -2433,40 +2378,11 @@ pub(crate) struct WalIngestMetrics {
pub(crate) records_observed: IntCounter,
pub(crate) records_committed: IntCounter,
pub(crate) records_filtered: IntCounter,
pub(crate) values_committed_metadata_images: IntCounter,
pub(crate) values_committed_metadata_deltas: IntCounter,
pub(crate) values_committed_data_images: IntCounter,
pub(crate) values_committed_data_deltas: IntCounter,
pub(crate) gap_blocks_zeroed_on_rel_extend: IntCounter,
}
impl WalIngestMetrics {
pub(crate) fn inc_values_committed(&self, stats: &DatadirModificationStats) {
if stats.metadata_images > 0 {
self.values_committed_metadata_images
.inc_by(stats.metadata_images);
}
if stats.metadata_deltas > 0 {
self.values_committed_metadata_deltas
.inc_by(stats.metadata_deltas);
}
if stats.data_images > 0 {
self.values_committed_data_images.inc_by(stats.data_images);
}
if stats.data_deltas > 0 {
self.values_committed_data_deltas.inc_by(stats.data_deltas);
}
}
pub(crate) clear_vm_bits_unknown: IntCounterVec,
}
pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| {
let values_committed = register_int_counter_vec!(
"pageserver_wal_ingest_values_committed",
"Number of values committed to pageserver storage from WAL records",
&["class", "kind"],
)
.expect("failed to define a metric");
WalIngestMetrics {
bytes_received: register_int_counter!(
"pageserver_wal_ingest_bytes_received",
@@ -2493,15 +2409,17 @@ pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| {
"Number of WAL records filtered out due to sharding"
)
.expect("failed to define a metric"),
values_committed_metadata_images: values_committed.with_label_values(&["metadata", "image"]),
values_committed_metadata_deltas: values_committed.with_label_values(&["metadata", "delta"]),
values_committed_data_images: values_committed.with_label_values(&["data", "image"]),
values_committed_data_deltas: values_committed.with_label_values(&["data", "delta"]),
gap_blocks_zeroed_on_rel_extend: register_int_counter!(
"pageserver_gap_blocks_zeroed_on_rel_extend",
"Total number of zero gap blocks written on relation extends"
)
.expect("failed to define a metric"),
clear_vm_bits_unknown: register_int_counter_vec!(
"pageserver_wal_ingest_clear_vm_bits_unknown",
"Number of ignored ClearVmBits operations due to unknown pages/relations",
&["entity"],
)
.expect("failed to define a metric"),
}
});
@@ -2568,7 +2486,7 @@ pub(crate) static WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM: Lazy<Histogram> =
pub(crate) struct WalRedoProcessCounters {
pub(crate) started: IntCounter,
pub(crate) killed_by_cause: EnumMap<WalRedoKillCause, IntCounter>,
pub(crate) killed_by_cause: enum_map::EnumMap<WalRedoKillCause, IntCounter>,
pub(crate) active_stderr_logger_tasks_started: IntCounter,
pub(crate) active_stderr_logger_tasks_finished: IntCounter,
}
@@ -2610,7 +2528,7 @@ impl Default for WalRedoProcessCounters {
Self {
started,
killed_by_cause: EnumMap::from_array(std::array::from_fn(|i| {
let cause = WalRedoKillCause::from_usize(i);
let cause = <WalRedoKillCause as enum_map::Enum>::from_usize(i);
let cause_str: &'static str = cause.into();
killed.with_label_values(&[cause_str])
})),

View File

@@ -489,6 +489,7 @@ impl timeline::handle::TenantManager<TenantManagerTypes> for TenantManagerWrappe
let timeline = tenant_shard
.get_timeline(timeline_id, true)
.map_err(GetActiveTimelineError::Timeline)?;
set_tracing_field_shard_id(&timeline);
Ok(timeline)
}
}
@@ -773,11 +774,11 @@ impl PageServerHandler {
let batched_msg = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn);
let shard = timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
let span = tracing::info_span!(parent: &parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
let timer = record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetRelExists,
@@ -792,10 +793,11 @@ impl PageServerHandler {
}
}
PagestreamFeMessage::Nblocks(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn);
let shard = timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
let span = tracing::info_span!(parent: &parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
let timer = record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetRelSize,
@@ -810,10 +812,11 @@ impl PageServerHandler {
}
}
PagestreamFeMessage::DbSize(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.hdr.request_lsn);
let shard = timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
let span = tracing::info_span!(parent: &parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
let timer = record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetDbSize,
@@ -828,10 +831,11 @@ impl PageServerHandler {
}
}
PagestreamFeMessage::GetSlruSegment(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.hdr.request_lsn);
let shard = timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
let span = tracing::info_span!(parent: &parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
let timer = record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetSlruSegment,
@@ -846,20 +850,12 @@ impl PageServerHandler {
}
}
PagestreamFeMessage::GetPage(req) => {
// avoid a somewhat costly Span::record() by constructing the entire span in one go.
macro_rules! mkspan {
(before shard routing) => {{
tracing::info_span!(parent: &parent_span, "handle_get_page_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.hdr.request_lsn)
}};
($shard_id:expr) => {{
tracing::info_span!(parent: &parent_span, "handle_get_page_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.hdr.request_lsn, shard_id = %$shard_id)
}};
}
let span = tracing::info_span!(parent: parent_span, "handle_get_page_at_lsn_request_batched", req_lsn = %req.hdr.request_lsn);
macro_rules! respond_error {
($span:expr, $error:expr) => {{
($error:expr) => {{
let error = BatchedFeMessage::RespondError {
span: $span,
span,
error: BatchedPageStreamError {
req: req.hdr,
err: $error,
@@ -872,35 +868,27 @@ impl PageServerHandler {
let key = rel_block_to_key(req.rel, req.blkno);
let shard = match timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Page(key))
.instrument(span.clone()) // sets `shard_id` field
.await
{
Ok(tl) => tl,
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
// We already know this tenant exists in general, because we resolved it at
// start of connection. Getting a NotFound here indicates that the shard containing
// the requested page is not present on this node: the client's knowledge of shard->pageserver
// mapping is out of date.
//
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
return respond_error!(PageStreamError::Reconnect(
"getpage@lsn request routed to wrong shard".into()
));
}
Err(e) => {
let span = mkspan!(before shard routing);
match e {
GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_)) => {
// We already know this tenant exists in general, because we resolved it at
// start of connection. Getting a NotFound here indicates that the shard containing
// the requested page is not present on this node: the client's knowledge of shard->pageserver
// mapping is out of date.
//
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
return respond_error!(
span,
PageStreamError::Reconnect(
"getpage@lsn request routed to wrong shard".into()
)
);
}
e => {
return respond_error!(span, e.into());
}
}
return respond_error!(e.into());
}
};
let span = mkspan!(shard.tenant_shard_id.shard_slug());
let timer = record_op_start_and_throttle(
&shard,
@@ -922,7 +910,7 @@ impl PageServerHandler {
{
Ok(lsn) => lsn,
Err(e) => {
return respond_error!(span, e);
return respond_error!(e);
}
};
BatchedFeMessage::GetPage {
@@ -934,10 +922,11 @@ impl PageServerHandler {
}
#[cfg(feature = "testing")]
PagestreamFeMessage::Test(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_test_request");
let shard = timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
let span = tracing::info_span!(parent: &parent_span, "handle_test_request", shard_id = %shard.tenant_shard_id.shard_slug());
let timer =
record_op_start_and_throttle(&shard, metrics::SmgrQueryType::Test, received_at)
.await?;
@@ -1074,7 +1063,7 @@ impl PageServerHandler {
};
// invoke handler function
let (mut handler_results, span): (
let (handler_results, span): (
Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>>,
_,
) = match batch {
@@ -1201,49 +1190,11 @@ impl PageServerHandler {
}
};
// We purposefully don't count flush time into the smgr operation timer.
//
// The reason is that current compute client will not perform protocol processing
// if the postgres backend process is doing things other than `->smgr_read()`.
// This is especially the case for prefetch.
//
// If the compute doesn't read from the connection, eventually TCP will backpressure
// all the way into our flush call below.
//
// The timer's underlying metric is used for a storage-internal latency SLO and
// we don't want to include latency in it that we can't control.
// And as pointed out above, in this case, we don't control the time that flush will take.
//
// We put each response in the batch onto the wire in a separate pgb_writer.flush()
// call, which (all unmeasured) adds syscall overhead but reduces time to first byte
// and avoids building up a "giant" contiguous userspace buffer to hold the entire response.
// TODO: vectored socket IO would be great, but pgb_writer doesn't support that.
let flush_timers = {
let flushing_start_time = Instant::now();
let mut flush_timers = Vec::with_capacity(handler_results.len());
for handler_result in &mut handler_results {
let flush_timer = match handler_result {
Ok((_, timer)) => Some(
timer
.observe_execution_end(flushing_start_time)
.expect("we are the first caller"),
),
Err(_) => {
// TODO: measure errors
None
}
};
flush_timers.push(flush_timer);
}
assert_eq!(flush_timers.len(), handler_results.len());
flush_timers
};
// Map handler result to protocol behavior.
// Some handler errors cause exit from pagestream protocol.
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
for (handler_result, flushing_timer) in handler_results.into_iter().zip(flush_timers) {
let response_msg = match handler_result {
for handler_result in handler_results {
let (response_msg, timer) = match handler_result {
Err(e) => match &e.err {
PageStreamError::Shutdown => {
// If we fail to fulfil a request during shutdown, which may be _because_ of
@@ -1267,14 +1218,16 @@ impl PageServerHandler {
span.in_scope(|| {
error!("error reading relation or page version: {full:#}")
});
PagestreamBeMessage::Error(PagestreamErrorResponse {
req: e.req,
message: e.err.to_string(),
})
(
PagestreamBeMessage::Error(PagestreamErrorResponse {
req: e.req,
message: e.err.to_string(),
}),
None, // TODO: measure errors
)
}
},
Ok((response_msg, _op_timer_already_observed)) => response_msg,
Ok((response_msg, timer)) => (response_msg, Some(timer)),
};
//
@@ -1285,12 +1238,30 @@ impl PageServerHandler {
&response_msg.serialize(protocol_version),
))?;
// We purposefully don't count flush time into the timer.
//
// The reason is that current compute client will not perform protocol processing
// if the postgres backend process is doing things other than `->smgr_read()`.
// This is especially the case for prefetch.
//
// If the compute doesn't read from the connection, eventually TCP will backpressure
// all the way into our flush call below.
//
// The timer's underlying metric is used for a storage-internal latency SLO and
// we don't want to include latency in it that we can't control.
// And as pointed out above, in this case, we don't control the time that flush will take.
let flushing_timer = timer.map(|mut timer| {
timer
.observe_execution_end_flush_start(Instant::now())
.expect("we are the first caller")
});
// what we want to do
let flush_fut = pgb_writer.flush();
// metric for how long flushing takes
let flush_fut = match flushing_timer {
Some(flushing_timer) => {
futures::future::Either::Left(flushing_timer.measure(Instant::now(), flush_fut))
futures::future::Either::Left(flushing_timer.measure(flush_fut))
}
None => futures::future::Either::Right(flush_fut),
};
@@ -1309,6 +1280,8 @@ impl PageServerHandler {
}
Ok(())
}
// and log the info! line inside the request span
.instrument(span.clone())
.await?;
}
Ok(())
@@ -1369,7 +1342,7 @@ impl PageServerHandler {
.take()
.expect("implementation error: timeline_handles should not be locked");
let request_span = info_span!("request");
let request_span = info_span!("request", shard_id = tracing::field::Empty);
let ((pgb_reader, timeline_handles), result) = match self.pipelining_config.clone() {
PageServicePipeliningConfig::Pipelined(pipelining_config) => {
self.handle_pagerequests_pipelined(
@@ -1719,7 +1692,7 @@ impl PageServerHandler {
// to distinguish a misbehaving client (asking for old LSN) from a storage issue (data missing at a legitimate LSN).
if request_lsn < **latest_gc_cutoff_lsn && !timeline.is_gc_blocked_by_lsn_lease_deadline() {
let gc_info = &timeline.gc_info.read().unwrap();
if !gc_info.lsn_covered_by_lease(request_lsn) {
if !gc_info.leases.contains_key(&request_lsn) {
return Err(
PageStreamError::BadRequest(format!(
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
@@ -2063,13 +2036,6 @@ impl PageServerHandler {
.unwrap()
.get(tenant_id, timeline_id, ShardSelector::Zero)
.await?;
set_tracing_field_shard_id(&timeline);
if timeline.is_archived() == Some(true) {
// TODO after a grace period, turn this log line into a hard error
tracing::warn!("timeline {tenant_id}/{timeline_id} is archived, but got basebackup request for it.");
//return Err(QueryError::NotFound("timeline is archived".into()))
}
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
if let Some(lsn) = lsn {

View File

@@ -48,7 +48,7 @@ use tracing::{debug, trace, warn};
use utils::bin_ser::DeserializeError;
use utils::pausable_failpoint;
use utils::{bin_ser::BeSer, lsn::Lsn};
use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
use wal_decoder::serialized_batch::SerializedValueBatch;
/// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
pub const MAX_AUX_FILE_DELTAS: usize = 1024;
@@ -612,18 +612,11 @@ impl Timeline {
pausable_failpoint!("find-lsn-for-timestamp-pausable");
let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
let gc_cutoff_planned = {
let gc_info = self.gc_info.read().unwrap();
gc_info.min_cutoff()
};
// Usually the planned cutoff is newer than the cutoff of the last gc run,
// but let's be defensive.
let gc_cutoff = gc_cutoff_planned.max(*gc_cutoff_lsn_guard);
// We use this method to figure out the branching LSN for the new branch, but the
// GC cutoff could be before the branching point and we cannot create a new branch
// with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
// on the safe side.
let min_lsn = std::cmp::max(gc_cutoff, self.get_ancestor_lsn());
let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
let max_lsn = self.get_last_record_lsn();
// LSNs are always 8-byte aligned. low/mid/high represent the
@@ -1304,26 +1297,6 @@ impl DatadirModification<'_> {
.is_some_and(|b| b.has_data())
}
/// Returns statistics about the currently pending modifications.
pub(crate) fn stats(&self) -> DatadirModificationStats {
let mut stats = DatadirModificationStats::default();
for (_, _, value) in self.pending_metadata_pages.values().flatten() {
match value {
Value::Image(_) => stats.metadata_images += 1,
Value::WalRecord(r) if r.will_init() => stats.metadata_images += 1,
Value::WalRecord(_) => stats.metadata_deltas += 1,
}
}
for valuemeta in self.pending_data_batch.iter().flat_map(|b| &b.metadata) {
match valuemeta {
ValueMeta::Serialized(s) if s.will_init => stats.data_images += 1,
ValueMeta::Serialized(_) => stats.data_deltas += 1,
ValueMeta::Observed(_) => {}
}
}
stats
}
/// Set the current lsn
pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
ensure!(
@@ -2344,15 +2317,6 @@ impl DatadirModification<'_> {
}
}
/// Statistics for a DatadirModification.
#[derive(Default)]
pub struct DatadirModificationStats {
pub metadata_images: u64,
pub metadata_deltas: u64,
pub data_images: u64,
pub data_deltas: u64,
}
/// This struct facilitates accessing either a committed key from the timeline at a
/// specific LSN, or the latest uncommitted key from a pending modification.
///

View File

@@ -328,8 +328,8 @@ pub enum TaskKind {
// Eviction. One per timeline.
Eviction,
// Tenant housekeeping (flush idle ephemeral layers, shut down idle walredo, etc.).
TenantHousekeeping,
// Ingest housekeeping (flushing ephemeral layers on time threshold or disk pressure)
IngestHousekeeping,
/// See [`crate::disk_usage_eviction_task`].
DiskUsageEviction,

View File

@@ -20,7 +20,6 @@ use chrono::NaiveDateTime;
use enumset::EnumSet;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use itertools::Itertools as _;
use pageserver_api::models;
use pageserver_api::models::CompactInfoResponse;
use pageserver_api::models::LsnLease;
@@ -47,18 +46,14 @@ use std::sync::atomic::AtomicBool;
use std::sync::Weak;
use std::time::SystemTime;
use storage_broker::BrokerClientChannel;
use timeline::compaction::CompactionOutcome;
use timeline::compaction::GcCompactionQueue;
use timeline::import_pgdata;
use timeline::offload::offload_timeline;
use timeline::offload::OffloadError;
use timeline::CompactFlags;
use timeline::CompactOptions;
use timeline::CompactionError;
use timeline::ShutdownMode;
use tokio::io::BufReader;
use tokio::sync::watch;
use tokio::sync::Notify;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::*;
@@ -100,6 +95,7 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::DeletionQueueClient;
use crate::deletion_queue::DeletionQueueError;
use crate::import_datadir;
use crate::is_uninit_mark;
use crate::l0_flush::L0FlushGlobalState;
use crate::metrics::CONCURRENT_INITDBS;
use crate::metrics::INITDB_RUN_TIME;
@@ -353,9 +349,6 @@ pub struct Tenant {
/// Overhead of mutex is acceptable because compaction is done with a multi-second period.
compaction_circuit_breaker: std::sync::Mutex<CircuitBreaker>,
/// Signals the tenant compaction loop that there is L0 compaction work to be done.
pub(crate) l0_compaction_trigger: Arc<Notify>,
/// Scheduled gc-compaction tasks.
scheduled_compaction_tasks: std::sync::Mutex<HashMap<TimelineId, Arc<GcCompactionQueue>>>,
@@ -1697,7 +1690,12 @@ impl Tenant {
timeline_id,
index_part,
remote_metadata,
self.get_timeline_resources_for(remote_client),
TimelineResources {
remote_client,
pagestream_throttle: self.pagestream_throttle.clone(),
pagestream_throttle_metrics: self.pagestream_throttle_metrics.clone(),
l0_flush_global_state: self.l0_flush_global_state.clone(),
},
LoadTimelineCause::Attach,
ctx,
)
@@ -1795,7 +1793,11 @@ impl Tenant {
let entry = entry.context("read timeline dir entry")?;
let entry_path = entry.path();
let purge = if crate::is_temporary(entry_path) {
let purge = if crate::is_temporary(entry_path)
// TODO: remove uninit mark code (https://github.com/neondatabase/neon/issues/5718)
|| is_uninit_mark(entry_path)
|| crate::is_delete_mark(entry_path)
{
true
} else {
match TimelineId::try_from(entry_path.file_name()) {
@@ -2900,194 +2902,146 @@ impl Tenant {
.await
}
/// Performs one compaction iteration. Called periodically from the compaction loop. Returns
/// whether another compaction is needed, if we still have pending work or if we yield for
/// immediate L0 compaction.
/// Perform one compaction iteration.
/// This function is periodically called by compactor task.
/// Also it can be explicitly requested per timeline through page server
/// api's 'compact' command.
///
/// Compaction can also be explicitly requested for a timeline via the HTTP API.
/// Returns whether we have pending compaction task.
async fn compaction_iteration(
self: &Arc<Self>,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<CompactionOutcome, CompactionError> {
// Don't compact inactive tenants.
) -> Result<bool, timeline::CompactionError> {
// Don't start doing work during shutdown, or when broken, we do not need those in the logs
if !self.is_active() {
return Ok(CompactionOutcome::Skipped);
return Ok(false);
}
// Don't compact tenants that can't upload layers. We don't check `may_delete_layers_hint`,
// since we need to compact L0 even in AttachedMulti to bound read amplification.
let location = self.tenant_conf.load().location;
if !location.may_upload_layers_hint() {
info!("skipping compaction in location state {location:?}");
return Ok(CompactionOutcome::Skipped);
}
// Don't compact if the circuit breaker is tripped.
if self.compaction_circuit_breaker.lock().unwrap().is_broken() {
info!("skipping compaction due to previous failures");
return Ok(CompactionOutcome::Skipped);
}
// Collect all timelines to compact, along with offload instructions and L0 counts.
let mut compact: Vec<Arc<Timeline>> = Vec::new();
let mut offload: HashSet<TimelineId> = HashSet::new();
let mut l0_counts: HashMap<TimelineId, usize> = HashMap::new();
{
let offload_enabled = self.get_timeline_offloading_enabled();
let conf = self.tenant_conf.load();
// Note that compaction usually requires deletions, but we don't respect
// may_delete_layers_hint here: that is because tenants in AttachedMulti
// should proceed with compaction even if they can't do deletion, to avoid
// accumulating dangerously deep stacks of L0 layers. Deletions will be
// enqueued inside RemoteTimelineClient, and executed layer if/when we transition
// to AttachedSingle state.
if !conf.location.may_upload_layers_hint() {
info!("Skipping compaction in location state {:?}", conf.location);
return Ok(false);
}
}
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// compactions. We don't want to block everything else while the
// compaction runs.
let timelines_to_compact_or_offload;
{
let timelines = self.timelines.lock().unwrap();
for (&timeline_id, timeline) in timelines.iter() {
// Skip inactive timelines.
if !timeline.is_active() {
continue;
}
// Schedule the timeline for compaction.
compact.push(timeline.clone());
// Schedule the timeline for offloading if eligible.
let can_offload = offload_enabled
&& timeline.can_offload().0
&& !timelines
.iter()
.any(|(_, tli)| tli.get_ancestor_timeline_id() == Some(timeline_id));
if can_offload {
offload.insert(timeline_id);
}
}
} // release timelines lock
for timeline in &compact {
// Collect L0 counts. Can't await while holding lock above.
if let Ok(lm) = timeline.layers.read().await.layer_map() {
l0_counts.insert(timeline.timeline_id, lm.level0_deltas().len());
}
}
// Pass 1: L0 compaction across all timelines, in order of L0 count. We prioritize this to
// bound read amplification.
//
// TODO: this may spin on one or more ingest-heavy timelines, starving out image/GC
// compaction and offloading. We leave that as a potential problem to solve later. Consider
// splitting L0 and image/GC compaction to separate background jobs.
if self.get_compaction_l0_first() {
let compaction_threshold = self.get_compaction_threshold();
let compact_l0 = compact
timelines_to_compact_or_offload = timelines
.iter()
.map(|tli| (tli, l0_counts.get(&tli.timeline_id).copied().unwrap_or(0)))
.filter(|&(_, l0)| l0 >= compaction_threshold)
.sorted_by_key(|&(_, l0)| l0)
.rev()
.map(|(tli, _)| tli.clone())
.collect_vec();
let mut has_pending_l0 = false;
for timeline in compact_l0 {
let outcome = timeline
.compact(cancel, CompactFlags::OnlyL0Compaction.into(), ctx)
.instrument(info_span!("compact_timeline", timeline_id = %timeline.timeline_id))
.await
.inspect_err(|err| self.maybe_trip_compaction_breaker(err))?;
match outcome {
CompactionOutcome::Done => {}
CompactionOutcome::Skipped => {}
CompactionOutcome::Pending => has_pending_l0 = true,
CompactionOutcome::YieldForL0 => has_pending_l0 = true,
}
}
if has_pending_l0 {
return Ok(CompactionOutcome::YieldForL0); // do another pass
}
.filter_map(|(timeline_id, timeline)| {
let (is_active, (can_offload, _)) =
(timeline.is_active(), timeline.can_offload());
let has_no_unoffloaded_children = {
!timelines
.iter()
.any(|(_id, tl)| tl.get_ancestor_timeline_id() == Some(*timeline_id))
};
let config_allows_offload = self.conf.timeline_offloading
|| self
.tenant_conf
.load()
.tenant_conf
.timeline_offloading
.unwrap_or_default();
let can_offload =
can_offload && has_no_unoffloaded_children && config_allows_offload;
if (is_active, can_offload) == (false, false) {
None
} else {
Some((*timeline_id, timeline.clone(), (is_active, can_offload)))
}
})
.collect::<Vec<_>>();
drop(timelines);
}
// Pass 2: image compaction and timeline offloading. If any timelines have accumulated
// more L0 layers, they may also be compacted here.
//
// NB: image compaction may yield if there is pending L0 compaction.
//
// TODO: it will only yield if there is pending L0 compaction on the same timeline. If a
// different timeline needs compaction, it won't. It should check `l0_compaction_trigger`.
// We leave this for a later PR.
//
// TODO: consider ordering timelines by some priority, e.g. time since last full compaction,
// amount of L1 delta debt or garbage, offload-eligible timelines first, etc.
let mut has_pending = false;
for timeline in compact {
if !timeline.is_active() {
continue;
}
// Before doing any I/O work, check our circuit breaker
if self.compaction_circuit_breaker.lock().unwrap().is_broken() {
info!("Skipping compaction due to previous failures");
return Ok(false);
}
let mut outcome = timeline
.compact(cancel, EnumSet::default(), ctx)
.instrument(info_span!("compact_timeline", timeline_id = %timeline.timeline_id))
.await
.inspect_err(|err| self.maybe_trip_compaction_breaker(err))?;
let mut has_pending_task = false;
// If we're done compacting, check the scheduled GC compaction queue for more work.
if outcome == CompactionOutcome::Done {
let queue = self
.scheduled_compaction_tasks
.lock()
.unwrap()
.get(&timeline.timeline_id)
.cloned();
if let Some(queue) = queue {
outcome = queue
.iteration(cancel, ctx, &self.gc_block, &timeline)
.await?;
}
}
// If we're done compacting, offload the timeline if requested.
if outcome == CompactionOutcome::Done && offload.contains(&timeline.timeline_id) {
pausable_failpoint!("before-timeline-auto-offload");
offload_timeline(self, &timeline)
.instrument(info_span!("offload_timeline", timeline_id = %timeline.timeline_id))
for (timeline_id, timeline, (can_compact, can_offload)) in &timelines_to_compact_or_offload
{
// pending_task_left == None: cannot compact, maybe still pending tasks
// pending_task_left == Some(true): compaction task left
// pending_task_left == Some(false): no compaction task left
let pending_task_left = if *can_compact {
let has_pending_l0_compaction_task = timeline
.compact(cancel, EnumSet::empty(), ctx)
.instrument(info_span!("compact_timeline", %timeline_id))
.await
.or_else(|err| match err {
// Ignore this, we likely raced with unarchival.
OffloadError::NotArchived => Ok(()),
err => Err(err),
.inspect_err(|e| match e {
timeline::CompactionError::ShuttingDown => (),
timeline::CompactionError::Offload(_) => {
// Failures to offload timelines do not trip the circuit breaker, because
// they do not do lots of writes the way compaction itself does: it is cheap
// to retry, and it would be bad to stop all compaction because of an issue with offloading.
}
timeline::CompactionError::Other(e) => {
self.compaction_circuit_breaker
.lock()
.unwrap()
.fail(&CIRCUIT_BREAKERS_BROKEN, e);
}
})?;
}
match outcome {
CompactionOutcome::Done => {}
CompactionOutcome::Skipped => {}
CompactionOutcome::Pending => has_pending = true,
// This mostly makes sense when the L0-only pass above is enabled, since there's
// otherwise no guarantee that we'll start with the timeline that has high L0.
CompactionOutcome::YieldForL0 => return Ok(CompactionOutcome::YieldForL0),
if has_pending_l0_compaction_task {
Some(true)
} else {
let queue = {
let guard = self.scheduled_compaction_tasks.lock().unwrap();
guard.get(timeline_id).cloned()
};
if let Some(queue) = queue {
let has_pending_tasks = queue
.iteration(cancel, ctx, &self.gc_block, timeline)
.await?;
Some(has_pending_tasks)
} else {
Some(false)
}
}
} else {
None
};
has_pending_task |= pending_task_left.unwrap_or(false);
if pending_task_left == Some(false) && *can_offload {
pausable_failpoint!("before-timeline-auto-offload");
match offload_timeline(self, timeline)
.instrument(info_span!("offload_timeline", %timeline_id))
.await
{
Err(OffloadError::NotArchived) => {
// Ignore this, we likely raced with unarchival
Ok(())
}
other => other,
}?;
}
}
// Success! Untrip the breaker if necessary.
self.compaction_circuit_breaker
.lock()
.unwrap()
.success(&CIRCUIT_BREAKERS_UNBROKEN);
match has_pending {
true => Ok(CompactionOutcome::Pending),
false => Ok(CompactionOutcome::Done),
}
}
/// Trips the compaction circuit breaker if appropriate.
pub(crate) fn maybe_trip_compaction_breaker(&self, err: &CompactionError) {
match err {
CompactionError::ShuttingDown => (),
// Offload failures don't trip the circuit breaker, since they're cheap to retry and
// shouldn't block compaction.
CompactionError::Offload(_) => {}
CompactionError::Other(err) => {
self.compaction_circuit_breaker
.lock()
.unwrap()
.fail(&CIRCUIT_BREAKERS_BROKEN, err);
}
}
Ok(has_pending_task)
}
/// Cancel scheduled compaction tasks
@@ -3134,28 +3088,32 @@ impl Tenant {
Ok(rx)
}
/// Performs periodic housekeeping, via the tenant housekeeping background task.
async fn housekeeping(&self) {
// Call through to all timelines to freeze ephemeral layers as needed. This usually happens
// during ingest, but we don't want idle timelines to hold open layers for too long.
let timelines = self
.timelines
.lock()
.unwrap()
.values()
.filter(|tli| tli.is_active())
.cloned()
.collect_vec();
// Call through to all timelines to freeze ephemeral layers if needed. Usually
// this happens during ingest: this background housekeeping is for freezing layers
// that are open but haven't been written to for some time.
async fn ingest_housekeeping(&self) {
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// compactions. We don't want to block everything else while the
// compaction runs.
let timelines = {
self.timelines
.lock()
.unwrap()
.values()
.filter_map(|timeline| {
if timeline.is_active() {
Some(timeline.clone())
} else {
None
}
})
.collect::<Vec<_>>()
};
for timeline in timelines {
for timeline in &timelines {
timeline.maybe_freeze_ephemeral_layer().await;
}
// Shut down walredo if idle.
const WALREDO_IDLE_TIMEOUT: Duration = Duration::from_secs(180);
if let Some(ref walredo_mgr) = self.walredo_mgr {
walredo_mgr.maybe_quiesce(WALREDO_IDLE_TIMEOUT);
}
}
pub fn timeline_has_no_attached_children(&self, timeline_id: TimelineId) -> bool {
@@ -3865,13 +3823,6 @@ impl Tenant {
.unwrap_or(self.conf.default_tenant_conf.compaction_upper_limit)
}
pub fn get_compaction_l0_first(&self) -> bool {
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
tenant_conf
.compaction_l0_first
.unwrap_or(self.conf.default_tenant_conf.compaction_l0_first)
}
pub fn get_gc_horizon(&self) -> u64 {
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
tenant_conf
@@ -3926,16 +3877,6 @@ impl Tenant {
.unwrap_or(self.conf.default_tenant_conf.lsn_lease_length)
}
pub fn get_timeline_offloading_enabled(&self) -> bool {
if self.conf.timeline_offloading {
return true;
}
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
tenant_conf
.timeline_offloading
.unwrap_or(self.conf.default_tenant_conf.timeline_offloading)
}
/// Generate an up-to-date TenantManifest based on the state of this Tenant.
fn build_tenant_manifest(&self) -> TenantManifest {
let timelines_offloaded = self.timelines_offloaded.lock().unwrap();
@@ -4174,7 +4115,6 @@ impl Tenant {
// use an extremely long backoff.
Some(Duration::from_secs(3600 * 24)),
)),
l0_compaction_trigger: Arc::new(Notify::new()),
scheduled_compaction_tasks: Mutex::new(Default::default()),
activate_now_sem: tokio::sync::Semaphore::new(0),
attach_wal_lag_cooldown: Arc::new(std::sync::OnceLock::new()),
@@ -4702,26 +4642,22 @@ impl Tenant {
// check against last actual 'latest_gc_cutoff' first
let latest_gc_cutoff_lsn = src_timeline.get_latest_gc_cutoff_lsn();
src_timeline
.check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn)
.context(format!(
"invalid branch start lsn: less than latest GC cutoff {}",
*latest_gc_cutoff_lsn,
))
.map_err(CreateTimelineError::AncestorLsn)?;
// and then the planned GC cutoff
{
let gc_info = src_timeline.gc_info.read().unwrap();
let planned_cutoff = gc_info.min_cutoff();
if gc_info.lsn_covered_by_lease(start_lsn) {
tracing::info!("skipping comparison of {start_lsn} with gc cutoff {} and planned gc cutoff {planned_cutoff} due to lsn lease", *latest_gc_cutoff_lsn);
} else {
src_timeline
.check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn)
.context(format!(
"invalid branch start lsn: less than latest GC cutoff {}",
*latest_gc_cutoff_lsn,
))
.map_err(CreateTimelineError::AncestorLsn)?;
// and then the planned GC cutoff
if start_lsn < planned_cutoff {
return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
"invalid branch start lsn: less than planned GC cutoff {planned_cutoff}"
)));
}
let cutoff = gc_info.min_cutoff();
if start_lsn < cutoff {
return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
"invalid branch start lsn: less than planned GC cutoff {cutoff}"
)));
}
}
@@ -5083,19 +5019,12 @@ impl Tenant {
)
}
/// Builds required resources for a new timeline.
/// Call this before constructing a timeline, to build its required structures
fn build_timeline_resources(&self, timeline_id: TimelineId) -> TimelineResources {
let remote_client = self.build_timeline_remote_client(timeline_id);
self.get_timeline_resources_for(remote_client)
}
/// Builds timeline resources for the given remote client.
fn get_timeline_resources_for(&self, remote_client: RemoteTimelineClient) -> TimelineResources {
TimelineResources {
remote_client,
remote_client: self.build_timeline_remote_client(timeline_id),
pagestream_throttle: self.pagestream_throttle.clone(),
pagestream_throttle_metrics: self.pagestream_throttle_metrics.clone(),
l0_compaction_trigger: self.l0_compaction_trigger.clone(),
l0_flush_global_state: self.l0_flush_global_state.clone(),
}
}
@@ -5541,7 +5470,6 @@ pub(crate) mod harness {
compaction_threshold: Some(tenant_conf.compaction_threshold),
compaction_upper_limit: Some(tenant_conf.compaction_upper_limit),
compaction_algorithm: Some(tenant_conf.compaction_algorithm),
compaction_l0_first: Some(tenant_conf.compaction_l0_first),
l0_flush_delay_threshold: tenant_conf.l0_flush_delay_threshold,
l0_flush_stall_threshold: tenant_conf.l0_flush_stall_threshold,
l0_flush_wait_upload: Some(tenant_conf.l0_flush_wait_upload),
@@ -5563,9 +5491,6 @@ pub(crate) mod harness {
image_layer_creation_check_threshold: Some(
tenant_conf.image_layer_creation_check_threshold,
),
image_creation_preempt_threshold: Some(
tenant_conf.image_creation_preempt_threshold,
),
lsn_lease_length: Some(tenant_conf.lsn_lease_length),
lsn_lease_length_for_ts: Some(tenant_conf.lsn_lease_length_for_ts),
timeline_offloading: Some(tenant_conf.timeline_offloading),
@@ -7769,18 +7694,6 @@ mod tests {
}
tline.freeze_and_flush().await?;
// Force layers to L1
tline
.compact(
&cancel,
{
let mut flags = EnumSet::new();
flags.insert(CompactFlags::ForceL0Compaction);
flags
},
&ctx,
)
.await?;
if iter % 5 == 0 {
let (_, before_delta_file_accessed) =
@@ -7793,7 +7706,6 @@ mod tests {
let mut flags = EnumSet::new();
flags.insert(CompactFlags::ForceImageLayerCreation);
flags.insert(CompactFlags::ForceRepartition);
flags.insert(CompactFlags::ForceL0Compaction);
flags
},
&ctx,
@@ -8240,8 +8152,6 @@ mod tests {
let cancel = CancellationToken::new();
// Image layer creation happens on the disk_consistent_lsn so we need to force set it now.
tline.force_set_disk_consistent_lsn(Lsn(0x40));
tline
.compact(
&cancel,
@@ -8255,7 +8165,8 @@ mod tests {
)
.await
.unwrap();
// Image layers are created at repartition LSN
// Image layers are created at last_record_lsn
let images = tline
.inspect_image_layers(Lsn(0x40), &ctx, io_concurrency.clone())
.await

View File

@@ -285,10 +285,6 @@ pub struct TenantConfOpt {
#[serde(default)]
pub compaction_algorithm: Option<CompactionAlgorithmSettings>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub compaction_l0_first: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub l0_flush_delay_threshold: Option<usize>,
@@ -361,9 +357,6 @@ pub struct TenantConfOpt {
#[serde(skip_serializing_if = "Option::is_none")]
pub image_layer_creation_check_threshold: Option<u8>,
#[serde(skip_serializing_if = "Option::is_none")]
pub image_creation_preempt_threshold: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "humantime_serde")]
#[serde(default)]
@@ -420,9 +413,6 @@ impl TenantConfOpt {
.as_ref()
.unwrap_or(&global_conf.compaction_algorithm)
.clone(),
compaction_l0_first: self
.compaction_l0_first
.unwrap_or(global_conf.compaction_l0_first),
l0_flush_delay_threshold: self
.l0_flush_delay_threshold
.or(global_conf.l0_flush_delay_threshold),
@@ -463,9 +453,6 @@ impl TenantConfOpt {
image_layer_creation_check_threshold: self
.image_layer_creation_check_threshold
.unwrap_or(global_conf.image_layer_creation_check_threshold),
image_creation_preempt_threshold: self
.image_creation_preempt_threshold
.unwrap_or(global_conf.image_creation_preempt_threshold),
lsn_lease_length: self
.lsn_lease_length
.unwrap_or(global_conf.lsn_lease_length),
@@ -473,7 +460,7 @@ impl TenantConfOpt {
.lsn_lease_length_for_ts
.unwrap_or(global_conf.lsn_lease_length_for_ts),
timeline_offloading: self
.timeline_offloading
.lazy_slru_download
.unwrap_or(global_conf.timeline_offloading),
wal_receiver_protocol_override: self
.wal_receiver_protocol_override
@@ -500,7 +487,6 @@ impl TenantConfOpt {
mut compaction_threshold,
mut compaction_upper_limit,
mut compaction_algorithm,
mut compaction_l0_first,
mut l0_flush_delay_threshold,
mut l0_flush_stall_threshold,
mut l0_flush_wait_upload,
@@ -518,7 +504,6 @@ impl TenantConfOpt {
mut lazy_slru_download,
mut timeline_get_throttle,
mut image_layer_creation_check_threshold,
mut image_creation_preempt_threshold,
mut lsn_lease_length,
mut lsn_lease_length_for_ts,
mut timeline_offloading,
@@ -546,7 +531,6 @@ impl TenantConfOpt {
.compaction_upper_limit
.apply(&mut compaction_upper_limit);
patch.compaction_algorithm.apply(&mut compaction_algorithm);
patch.compaction_l0_first.apply(&mut compaction_l0_first);
patch
.l0_flush_delay_threshold
.apply(&mut l0_flush_delay_threshold);
@@ -594,9 +578,6 @@ impl TenantConfOpt {
patch
.image_layer_creation_check_threshold
.apply(&mut image_layer_creation_check_threshold);
patch
.image_creation_preempt_threshold
.apply(&mut image_creation_preempt_threshold);
patch
.lsn_lease_length
.map(|v| humantime::parse_duration(&v))?
@@ -628,7 +609,6 @@ impl TenantConfOpt {
compaction_threshold,
compaction_upper_limit,
compaction_algorithm,
compaction_l0_first,
l0_flush_delay_threshold,
l0_flush_stall_threshold,
l0_flush_wait_upload,
@@ -646,7 +626,6 @@ impl TenantConfOpt {
lazy_slru_download,
timeline_get_throttle,
image_layer_creation_check_threshold,
image_creation_preempt_threshold,
lsn_lease_length,
lsn_lease_length_for_ts,
timeline_offloading,
@@ -691,7 +670,6 @@ impl From<TenantConfOpt> for models::TenantConfig {
compaction_period: value.compaction_period.map(humantime),
compaction_threshold: value.compaction_threshold,
compaction_upper_limit: value.compaction_upper_limit,
compaction_l0_first: value.compaction_l0_first,
l0_flush_delay_threshold: value.l0_flush_delay_threshold,
l0_flush_stall_threshold: value.l0_flush_stall_threshold,
l0_flush_wait_upload: value.l0_flush_wait_upload,
@@ -711,7 +689,6 @@ impl From<TenantConfOpt> for models::TenantConfig {
lazy_slru_download: value.lazy_slru_download,
timeline_get_throttle: value.timeline_get_throttle,
image_layer_creation_check_threshold: value.image_layer_creation_check_threshold,
image_creation_preempt_threshold: value.image_creation_preempt_threshold,
lsn_lease_length: value.lsn_lease_length.map(humantime),
lsn_lease_length_for_ts: value.lsn_lease_length_for_ts.map(humantime),
timeline_offloading: value.timeline_offloading,

View File

@@ -2816,8 +2816,8 @@ where
}
use {
crate::tenant::gc_result::GcResult, http_utils::error::ApiError,
pageserver_api::models::TimelineGcRequest,
crate::tenant::gc_result::GcResult, pageserver_api::models::TimelineGcRequest,
utils::http::error::ApiError,
};
#[cfg(test)]

View File

@@ -195,7 +195,7 @@ use utils::backoff::{
use utils::pausable_failpoint;
use utils::shard::ShardNumber;
use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex, OnceLock};
use std::time::Duration;
@@ -219,7 +219,7 @@ use crate::task_mgr::shutdown_token;
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::remote_timeline_client::download::download_retry;
use crate::tenant::storage_layer::AsLayerDesc;
use crate::tenant::upload_queue::{Delete, UploadQueueStoppedDeletable};
use crate::tenant::upload_queue::{Delete, OpType, UploadQueueStoppedDeletable};
use crate::tenant::TIMELINES_SEGMENT_NAME;
use crate::{
config::PageServerConf,
@@ -437,7 +437,8 @@ impl RemoteTimelineClient {
.conf
.remote_storage_config
.as_ref()
.map_or(0, |r| r.concurrency_limit());
.and_then(|r| r.concurrency_limit())
.unwrap_or(0);
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_with_current_remote_index_part(index_part, inprogress_limit)?;
self.update_remote_physical_size_gauge(Some(index_part));
@@ -460,7 +461,8 @@ impl RemoteTimelineClient {
.conf
.remote_storage_config
.as_ref()
.map_or(0, |r| r.concurrency_limit());
.and_then(|r| r.concurrency_limit())
.unwrap_or(0);
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_empty_remote(local_metadata, inprogress_limit)?;
self.update_remote_physical_size_gauge(None);
@@ -482,7 +484,8 @@ impl RemoteTimelineClient {
.conf
.remote_storage_config
.as_ref()
.map_or(0, |r| r.concurrency_limit());
.and_then(|r| r.concurrency_limit())
.unwrap_or(0);
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_with_current_remote_index_part(index_part, inprogress_limit)?;
@@ -499,21 +502,37 @@ impl RemoteTimelineClient {
/// Notify this client of a change to its parent tenant's config, as this may cause us to
/// take action (unblocking deletions when transitioning from AttachedMulti to AttachedSingle)
pub(super) fn update_config(self: &Arc<Self>, location_conf: &AttachedLocationConfig) {
pub(super) fn update_config(&self, location_conf: &AttachedLocationConfig) {
let new_conf = RemoteTimelineClientConfig::from(location_conf);
let unblocked = !new_conf.block_deletions;
let mut block_deletions_below = Generation::None;
if new_conf.block_deletions {
block_deletions_below = self.generation;
}
// Update config before draining deletions, so that we don't race with more being
// inserted. This can result in deletions happening our of order, but that does not
// violate any invariants: deletions only need to be ordered relative to upload of the index
// that dereferences the deleted objects, and we are not changing that order.
*self.config.write().unwrap() = new_conf;
// If the conf change may unblock any deletions, try to launch them.
if let Ok(queue) = self.upload_queue.lock().unwrap().initialized_mut() {
if block_deletions_below != queue.block_deletions_below {
queue.block_deletions_below = block_deletions_below;
self.launch_queued_tasks(queue);
if unblocked {
// If we may now delete layers, drain any that were blocked in our old
// configuration state
let mut queue_locked = self.upload_queue.lock().unwrap();
if let Ok(queue) = queue_locked.initialized_mut() {
let blocked_deletions = std::mem::take(&mut queue.blocked_deletions);
for d in blocked_deletions {
if let Err(e) = self.deletion_queue_client.push_layers_sync(
self.tenant_shard_id,
self.timeline_id,
self.generation,
d.layers,
) {
// This could happen if the pageserver is shut down while a tenant
// is transitioning from a deletion-blocked state: we will leak some
// S3 objects in this case.
warn!("Failed to drain blocked deletions: {}", e);
break;
}
}
}
}
}
@@ -1169,7 +1188,7 @@ impl RemoteTimelineClient {
"scheduled layer file upload {layer}",
);
let op = UploadOp::UploadLayer(layer, metadata);
let op = UploadOp::UploadLayer(layer, metadata, None);
self.metric_begin(&op);
upload_queue.queued_operations.push_back(op);
}
@@ -1395,6 +1414,13 @@ impl RemoteTimelineClient {
Ok(())
}
pub(crate) fn schedule_barrier(self: &Arc<Self>) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
self.schedule_barrier0(upload_queue);
Ok(())
}
fn schedule_barrier0(
self: &Arc<Self>,
upload_queue: &mut UploadQueueInitialized,
@@ -1869,16 +1895,31 @@ impl RemoteTimelineClient {
while let Some((mut next_op, coalesced_ops)) = upload_queue.next_ready() {
debug!("starting op: {next_op}");
// Handle barriers and shutdowns.
// Prepare upload.
match &mut next_op {
UploadOp::UploadLayer(layer, meta, mode) => {
if upload_queue
.recently_deleted
.remove(&(layer.layer_desc().layer_name().clone(), meta.generation))
{
*mode = Some(OpType::FlushDeletion);
} else {
*mode = Some(OpType::MayReorder)
}
}
UploadOp::UploadMetadata { .. } => {}
UploadOp::Delete(Delete { layers }) => {
for (name, meta) in layers {
upload_queue
.recently_deleted
.insert((name.clone(), meta.generation));
}
}
UploadOp::Barrier(sender) => {
sender.send_replace(());
continue;
}
UploadOp::Shutdown => unreachable!("shutdown is intentionally never popped off"),
UploadOp::UploadLayer(..) => {}
UploadOp::UploadMetadata { .. } => {}
UploadOp::Delete(Delete { .. }) => {}
};
// Assign unique ID to this task
@@ -1950,7 +1991,7 @@ impl RemoteTimelineClient {
// Assert that we don't modify a layer that's referenced by the current index.
if cfg!(debug_assertions) {
let modified = match &task.op {
UploadOp::UploadLayer(layer, layer_metadata) => {
UploadOp::UploadLayer(layer, layer_metadata, _) => {
vec![(layer.layer_desc().layer_name(), layer_metadata)]
}
UploadOp::Delete(delete) => {
@@ -1972,7 +2013,68 @@ impl RemoteTimelineClient {
}
let upload_result: anyhow::Result<()> = match &task.op {
UploadOp::UploadLayer(ref layer, ref layer_metadata) => {
UploadOp::UploadLayer(ref layer, ref layer_metadata, mode) => {
// TODO: check if this mechanism can be removed now that can_bypass() performs
// conflict checks during scheduling.
if let Some(OpType::FlushDeletion) = mode {
if self.config.read().unwrap().block_deletions {
// Of course, this is not efficient... but usually the queue should be empty.
let mut queue_locked = self.upload_queue.lock().unwrap();
let mut detected = false;
if let Ok(queue) = queue_locked.initialized_mut() {
for list in queue.blocked_deletions.iter_mut() {
list.layers.retain(|(name, meta)| {
if name == &layer.layer_desc().layer_name()
&& meta.generation == layer_metadata.generation
{
detected = true;
// remove the layer from deletion queue
false
} else {
// keep the layer
true
}
});
}
}
if detected {
info!(
"cancelled blocked deletion of layer {} at gen {:?}",
layer.layer_desc().layer_name(),
layer_metadata.generation
);
}
} else {
// TODO: we did not guarantee that upload task starts after deletion task, so there could be possibly race conditions
// that we still get the layer deleted. But this only happens if someone creates a layer immediately after it's deleted,
// which is not possible in the current system.
info!(
"waiting for deletion queue flush to complete before uploading layer {} at gen {:?}",
layer.layer_desc().layer_name(),
layer_metadata.generation
);
{
// We are going to flush, we can clean up the recently deleted list.
let mut queue_locked = self.upload_queue.lock().unwrap();
if let Ok(queue) = queue_locked.initialized_mut() {
queue.recently_deleted.clear();
}
}
if let Err(e) = self.deletion_queue_client.flush_execute().await {
warn!(
"failed to flush the deletion queue before uploading layer {} at gen {:?}, still proceeding to upload: {e:#} ",
layer.layer_desc().layer_name(),
layer_metadata.generation
);
} else {
info!(
"done flushing deletion queue before uploading layer {} at gen {:?}",
layer.layer_desc().layer_name(),
layer_metadata.generation
);
}
}
}
let local_path = layer.local_path();
// We should only be uploading layers created by this `Tenant`'s lifetime, so
@@ -2037,17 +2139,24 @@ impl RemoteTimelineClient {
res
}
UploadOp::Delete(delete) => {
pausable_failpoint!("before-delete-layer-pausable");
// TODO: these can't go through the deletion queue, since it's asynchronous and
// may clobber a later write to this later.
self.deletion_queue_client
.push_layers(
self.tenant_shard_id,
self.timeline_id,
self.generation,
delete.layers.clone(),
)
.map_err(|e| anyhow::anyhow!(e))
if self.config.read().unwrap().block_deletions {
let mut queue_locked = self.upload_queue.lock().unwrap();
if let Ok(queue) = queue_locked.initialized_mut() {
queue.blocked_deletions.push(delete.clone());
}
Ok(())
} else {
pausable_failpoint!("before-delete-layer-pausable");
self.deletion_queue_client
.push_layers(
self.tenant_shard_id,
self.timeline_id,
self.generation,
delete.layers.clone(),
)
.await
.map_err(|e| anyhow::anyhow!(e))
}
}
unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown => {
// unreachable. Barrier operations are handled synchronously in
@@ -2129,7 +2238,7 @@ impl RemoteTimelineClient {
upload_queue.inprogress_tasks.remove(&task.task_id);
let lsn_update = match task.op {
UploadOp::UploadLayer(_, _) => None,
UploadOp::UploadLayer(_, _, _) => None,
UploadOp::UploadMetadata { ref uploaded } => {
// the task id is reused as a monotonicity check for storing the "clean"
// IndexPart.
@@ -2204,7 +2313,7 @@ impl RemoteTimelineClient {
)> {
use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
let res = match op {
UploadOp::UploadLayer(_, m) => (
UploadOp::UploadLayer(_, m, _) => (
RemoteOpFileKind::Layer,
RemoteOpKind::Upload,
RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size),
@@ -2294,11 +2403,12 @@ impl RemoteTimelineClient {
.clone(),
inprogress_tasks: HashMap::default(),
queued_operations: VecDeque::default(),
block_deletions_below: Generation::None,
#[cfg(feature = "testing")]
dangling_files: HashMap::default(),
blocked_deletions: Vec::new(),
shutting_down: false,
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
recently_deleted: HashSet::new(),
};
let upload_queue = std::mem::replace(

View File

@@ -9,14 +9,13 @@ use crate::{
metrics::SECONDARY_MODE,
tenant::{
config::AttachmentMode,
mgr::{GetTenantError, TenantManager},
mgr::GetTenantError,
mgr::TenantManager,
remote_timeline_client::remote_heatmap_path,
span::debug_assert_current_span_has_tenant_id,
tasks::{warn_when_period_overrun, BackgroundLoopKind},
Tenant,
},
virtual_file::VirtualFile,
TEMP_FILE_SUFFIX,
};
use futures::Future;
@@ -33,10 +32,7 @@ use super::{
};
use tokio_util::sync::CancellationToken;
use tracing::{info_span, instrument, Instrument};
use utils::{
backoff, completion::Barrier, crashsafe::path_with_suffix_extension,
yielding_loop::yielding_loop,
};
use utils::{backoff, completion::Barrier, yielding_loop::yielding_loop};
pub(super) async fn heatmap_uploader_task(
tenant_manager: Arc<TenantManager>,
@@ -465,18 +461,6 @@ async fn upload_tenant_heatmap(
}
}
// After a successful upload persist the fresh heatmap to disk.
// When restarting, the tenant will read the heatmap from disk
// and additively generate a new heatmap (see [`Timeline::generate_heatmap`]).
// If the heatmap is stale, the additive generation can lead to keeping previously
// evicted timelines on the secondarie's disk.
let tenant_shard_id = tenant.get_tenant_shard_id();
let heatmap_path = tenant.conf.tenant_heatmap_path(tenant_shard_id);
let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
if let Err(err) = VirtualFile::crashsafe_overwrite(heatmap_path, temp_path, bytes).await {
tracing::warn!("Non fatal IO error writing to disk after heatmap upload: {err}");
}
tracing::info!("Successfully uploaded {size} byte heatmap to {path}");
Ok(UploadHeatmapOutcome::Uploaded(LastUploadState {

View File

@@ -44,7 +44,7 @@ pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
use self::inmemory_layer::InMemoryLayerFileId;
use super::timeline::{GetVectoredError, ReadPath};
use super::timeline::GetVectoredError;
use super::PageReconstructError;
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
@@ -262,8 +262,6 @@ pub(crate) struct ValuesReconstructState {
pub(crate) io_concurrency: IoConcurrency,
num_active_ios: Arc<AtomicUsize>,
pub(crate) read_path: Option<ReadPath>,
}
/// The level of IO concurrency to be used on the read path
@@ -611,7 +609,6 @@ impl ValuesReconstructState {
delta_layers_visited: 0,
io_concurrency,
num_active_ios: Arc::new(AtomicUsize::new(0)),
read_path: None,
}
}

View File

@@ -166,10 +166,6 @@ impl BatchLayerWriter {
// END: catch every error and do the recovery in the above section
Ok(generated_layers)
}
pub fn pending_layer_num(&self) -> usize {
self.generated_layer_writers.len()
}
}
/// An image writer that takes images and produces multiple image layers.

View File

@@ -353,6 +353,7 @@ impl Layer {
/// while the guard exists.
///
/// Returns None if the layer is currently evicted or becoming evicted.
#[cfg(test)]
pub(crate) async fn keep_resident(&self) -> Option<ResidentLayer> {
let downloaded = self.0.inner.get().and_then(|rowe| rowe.get())?;
@@ -529,6 +530,7 @@ impl ResidentOrWantedEvicted {
/// This is not used on the read path (anything that calls
/// [`LayerInner::get_or_maybe_download`]) because it was decided that reads always win
/// evictions, and part of that winning is using [`ResidentOrWantedEvicted::get_and_upgrade`].
#[cfg(test)]
fn get(&self) -> Option<Arc<DownloadedLayer>> {
match self {
ResidentOrWantedEvicted::Resident(strong) => Some(strong.clone()),

Some files were not shown because too many files have changed in this diff Show More