Compare commits

..

1 Commits

Author SHA1 Message Date
Christian Schwarz
c0881a1407 break bench_ingest compilation on all platforms 2025-04-24 16:51:42 +02:00
117 changed files with 1544 additions and 3904 deletions

View File

@@ -49,10 +49,6 @@ inputs:
description: 'A JSON object with project settings'
required: false
default: '{}'
default_endpoint_settings:
description: 'A JSON object with the default endpoint settings'
required: false
default: '{}'
outputs:
dsn:
@@ -70,9 +66,9 @@ runs:
# A shell without `set -x` to not to expose password/dsn in logs
shell: bash -euo pipefail {0}
run: |
res=$(curl \
project=$(curl \
"https://${API_HOST}/api/v2/projects" \
-w "%{http_code}" \
--fail \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer ${API_KEY}" \
@@ -87,15 +83,6 @@ runs:
\"settings\": ${PROJECT_SETTINGS}
}
}")
code=${res: -3}
if [[ ${code} -ge 400 ]]; then
echo Request failed with error code ${code}
echo ${res::-3}
exit 1
else
project=${res::-3}
fi
# Mask password
echo "::add-mask::$(echo $project | jq --raw-output '.roles[] | select(.name != "web_access") | .password')"
@@ -139,22 +126,6 @@ runs:
-H "Accept: application/json" -H "Content-Type: application/json" -H "Authorization: Bearer ${ADMIN_API_KEY}" \
-d "{\"scheduling\": \"Essential\"}"
fi
# XXX
# This is a workaround for the default endpoint settings, which currently do not allow some settings in the public API.
# https://github.com/neondatabase/cloud/issues/27108
if [[ -n ${DEFAULT_ENDPOINT_SETTINGS} && ${DEFAULT_ENDPOINT_SETTINGS} != "{}" ]] ; then
PROJECT_DATA=$(curl -X GET \
"https://${API_HOST}/regions/${REGION_ID}/api/v1/admin/projects/${project_id}" \
-H "Accept: application/json" -H "Content-Type: application/json" -H "Authorization: Bearer ${ADMIN_API_KEY}" \
-d "{\"scheduling\": \"Essential\"}"
)
NEW_DEFAULT_ENDPOINT_SETTINGS=$(echo ${PROJECT_DATA} | jq -rc ".project.default_endpoint_settings + ${DEFAULT_ENDPOINT_SETTINGS}")
curl -X POST --fail \
"https://${API_HOST}/regions/${REGION_ID}/api/v1/admin/projects/${project_id}/default_endpoint_settings" \
-H "Accept: application/json" -H "Content-Type: application/json" -H "Authorization: Bearer ${ADMIN_API_KEY}" \
--data "${NEW_DEFAULT_ENDPOINT_SETTINGS}"
fi
env:
API_HOST: ${{ inputs.api_host }}
@@ -171,4 +142,3 @@ runs:
PSQL: ${{ inputs.psql_path }}
LD_LIBRARY_PATH: ${{ inputs.libpq_lib_path }}
PROJECT_SETTINGS: ${{ inputs.project_settings }}
DEFAULT_ENDPOINT_SETTINGS: ${{ inputs.default_endpoint_settings }}

View File

@@ -28,16 +28,6 @@ on:
required: false
default: 'disabled'
type: string
test-selection:
description: 'specification of selected test(s) to run'
required: false
default: ''
type: string
test-run-count:
description: 'number of runs to perform for selected tests'
required: false
default: 1
type: number
defaults:
run:
@@ -285,7 +275,7 @@ jobs:
for io_mode in buffered direct direct-rw ; do
NEON_PAGESERVER_UNIT_TEST_GET_VECTORED_CONCURRENT_IO=$get_vectored_concurrent_io \
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine \
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IO_MODE=$io_mode \
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOMODE=$io_mode \
${cov_prefix} \
cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(pageserver)'
done
@@ -391,22 +381,21 @@ jobs:
run_with_real_s3: true
real_s3_bucket: neon-github-ci-tests
real_s3_region: eu-central-1
rerun_failed: ${{ inputs.test-run-count == 1 }}
rerun_failed: true
pg_version: ${{ matrix.pg_version }}
sanitizers: ${{ inputs.sanitizers }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
# `--session-timeout` is equal to (timeout-minutes - 10 minutes) * 60 seconds.
# Attempt to stop tests gracefully to generate test reports
# until they are forcibly stopped by the stricter `timeout-minutes` limit.
extra_params: --session-timeout=${{ inputs.sanitizers != 'enabled' && 3000 || 10200 }} --count=${{ inputs.test-run-count }}
${{ inputs.test-selection != '' && format('-k "{0}"', inputs.test-selection) || '' }}
extra_params: --session-timeout=${{ inputs.sanitizers != 'enabled' && 3000 || 10200 }}
env:
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
CHECK_ONDISK_DATA_COMPATIBILITY: nonempty
BUILD_TAG: ${{ inputs.build-tag }}
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
PAGESERVER_GET_VECTORED_CONCURRENT_IO: sidecar-task
PAGESERVER_VIRTUAL_FILE_IO_MODE: direct-rw
PAGESERVER_VIRTUAL_FILE_IO_MODE: direct
USE_LFC: ${{ matrix.lfc_state == 'with-lfc' && 'true' || 'false' }}
# Temporary disable this step until we figure out why it's so flaky

View File

@@ -34,10 +34,11 @@ permissions:
jobs:
build-pgxn:
if: |
inputs.pg_versions != '[]' || inputs.rebuild_everything ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
github.ref_name == 'main'
(inputs.pg_versions != '[]' || inputs.rebuild_everything) && (
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
github.ref_name == 'main'
)
timeout-minutes: 30
runs-on: macos-15
strategy:
@@ -62,8 +63,13 @@ jobs:
- name: Cache postgres ${{ matrix.postgres-version }} build
id: cache_pg
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/${{ matrix.postgres-version }}
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ matrix.postgres-version }}-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
@@ -99,21 +105,13 @@ jobs:
run: |
make postgres-headers-${{ matrix.postgres-version }} -j$(sysctl -n hw.ncpu)
- name: Upload "pg_install/${{ matrix.postgres-version }}" artifact
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
with:
name: pg_install--${{ matrix.postgres-version }}
path: pg_install/${{ matrix.postgres-version }}
# The artifact is supposed to be used by the next job in the same workflow,
# so theres no need to store it for too long.
retention-days: 1
build-walproposer-lib:
if: |
inputs.pg_versions != '[]' || inputs.rebuild_everything ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
github.ref_name == 'main'
(inputs.pg_versions != '[]' || inputs.rebuild_everything) && (
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
github.ref_name == 'main'
)
timeout-minutes: 30
runs-on: macos-15
needs: [build-pgxn]
@@ -134,16 +132,27 @@ jobs:
id: pg_rev
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v17) | tee -a "${GITHUB_OUTPUT}"
- name: Download "pg_install/v17" artifact
uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
- name: Cache postgres v17 build
id: cache_pg
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
name: pg_install--v17
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v17
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v17-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache walproposer-lib
id: cache_walproposer_lib
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/build/walproposer-lib
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-walproposer_lib-v17-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
@@ -169,21 +178,13 @@ jobs:
run:
make walproposer-lib -j$(sysctl -n hw.ncpu)
- name: Upload "pg_install/build/walproposer-lib" artifact
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
with:
name: pg_install--build--walproposer-lib
path: pg_install/build/walproposer-lib
# The artifact is supposed to be used by the next job in the same workflow,
# so theres no need to store it for too long.
retention-days: 1
cargo-build:
if: |
inputs.pg_versions != '[]' || inputs.rebuild_rust_code || inputs.rebuild_everything ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
github.ref_name == 'main'
(inputs.pg_versions != '[]' || inputs.rebuild_rust_code || inputs.rebuild_everything) && (
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
github.ref_name == 'main'
)
timeout-minutes: 30
runs-on: macos-15
needs: [build-pgxn, build-walproposer-lib]
@@ -202,45 +203,72 @@ jobs:
with:
submodules: true
- name: Download "pg_install/v14" artifact
uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
- name: Set pg v14 for caching
id: pg_rev_v14
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v14) | tee -a "${GITHUB_OUTPUT}"
- name: Set pg v15 for caching
id: pg_rev_v15
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v15) | tee -a "${GITHUB_OUTPUT}"
- name: Set pg v16 for caching
id: pg_rev_v16
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v16) | tee -a "${GITHUB_OUTPUT}"
- name: Set pg v17 for caching
id: pg_rev_v17
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v17) | tee -a "${GITHUB_OUTPUT}"
- name: Cache postgres v14 build
id: cache_pg
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
name: pg_install--v14
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v14
- name: Download "pg_install/v15" artifact
uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v14-${{ steps.pg_rev_v14.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache postgres v15 build
id: cache_pg_v15
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
name: pg_install--v15
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v15
- name: Download "pg_install/v16" artifact
uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v15-${{ steps.pg_rev_v15.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache postgres v16 build
id: cache_pg_v16
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
name: pg_install--v16
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v16
- name: Download "pg_install/v17" artifact
uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v16-${{ steps.pg_rev_v16.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache postgres v17 build
id: cache_pg_v17
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
name: pg_install--v17
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v17
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v17-${{ steps.pg_rev_v17.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Download "pg_install/build/walproposer-lib" artifact
uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
with:
name: pg_install--build--walproposer-lib
path: pg_install/build/walproposer-lib
# `actions/download-artifact` doesn't preserve permissions:
# https://github.com/actions/download-artifact?tab=readme-ov-file#permission-loss
- name: Make pg_install/v*/bin/* executable
run: |
chmod +x pg_install/v*/bin/*
- name: Cache cargo deps
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
- name: Cache cargo deps (only for v17)
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: |
~/.cargo/registry
!~/.cargo/registry/src
@@ -248,6 +276,18 @@ jobs:
target
key: v1-${{ runner.os }}-${{ runner.arch }}-cargo-${{ hashFiles('./Cargo.lock') }}-${{ hashFiles('./rust-toolchain.toml') }}-rust
- name: Cache walproposer-lib
id: cache_walproposer_lib
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
with:
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/build/walproposer-lib
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-walproposer_lib-v17-${{ steps.pg_rev_v17.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Install build dependencies
run: |
brew install flex bison openssl protobuf icu4c
@@ -257,8 +297,8 @@ jobs:
echo 'LDFLAGS=-L/usr/local/opt/openssl@3/lib' >> $GITHUB_ENV
echo 'CPPFLAGS=-I/usr/local/opt/openssl@3/include' >> $GITHUB_ENV
- name: Run cargo build
- name: Run cargo build (only for v17)
run: cargo build --all --release -j$(sysctl -n hw.ncpu)
- name: Check that no warnings are produced
- name: Check that no warnings are produced (only for v17)
run: ./run_clippy.sh

View File

@@ -1,120 +0,0 @@
name: Build and Run Selected Test
on:
workflow_dispatch:
inputs:
test-selection:
description: 'Specification of selected test(s), as accepted by pytest -k'
required: true
type: string
run-count:
description: 'Number of test runs to perform'
required: true
type: number
archs:
description: 'Archs to run tests on, e. g.: ["x64", "arm64"]'
default: '["x64"]'
required: true
type: string
build-types:
description: 'Build types to run tests on, e. g.: ["debug", "release"]'
default: '["release"]'
required: true
type: string
pg-versions:
description: 'Postgres versions to use for testing, e.g,: [{"pg_version":"v16"}, {"pg_version":"v17"}])'
default: '[{"pg_version":"v17"}]'
required: true
type: string
defaults:
run:
shell: bash -euxo pipefail {0}
env:
RUST_BACKTRACE: 1
COPT: '-Werror'
jobs:
meta:
uses: ./.github/workflows/_meta.yml
with:
github-event-name: ${{ github.event_name }}
github-event-json: ${{ toJSON(github.event) }}
build-and-test-locally:
needs: [ meta ]
strategy:
fail-fast: false
matrix:
arch: ${{ fromJson(inputs.archs) }}
build-type: ${{ fromJson(inputs.build-types) }}
uses: ./.github/workflows/_build-and-test-locally.yml
with:
arch: ${{ matrix.arch }}
build-tools-image: ghcr.io/neondatabase/build-tools:pinned-bookworm
build-tag: ${{ needs.meta.outputs.build-tag }}
build-type: ${{ matrix.build-type }}
test-cfg: ${{ inputs.pg-versions }}
test-selection: ${{ inputs.test-selection }}
test-run-count: ${{ fromJson(inputs.run-count) }}
secrets: inherit
create-test-report:
needs: [ build-and-test-locally ]
if: ${{ !cancelled() }}
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
contents: write
pull-requests: write
outputs:
report-url: ${{ steps.create-allure-report.outputs.report-url }}
runs-on: [ self-hosted, small ]
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Create Allure report
if: ${{ !cancelled() }}
id: create-allure-report
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_DEV }}
- uses: actions/github-script@v7
if: ${{ !cancelled() }}
with:
# Retry script for 5XX server errors: https://github.com/actions/github-script#retries
retries: 5
script: |
const report = {
reportUrl: "${{ steps.create-allure-report.outputs.report-url }}",
reportJsonUrl: "${{ steps.create-allure-report.outputs.report-json-url }}",
}
const coverage = {}
const script = require("./scripts/comment-test-report.js")
await script({
github,
context,
fetch,
report,
coverage,
})

View File

@@ -324,7 +324,7 @@ jobs:
TEST_RESULT_CONNSTR: "${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}"
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
PAGESERVER_GET_VECTORED_CONCURRENT_IO: sidecar-task
PAGESERVER_VIRTUAL_FILE_IO_MODE: direct-rw
PAGESERVER_VIRTUAL_FILE_IO_MODE: direct
SYNC_BETWEEN_TESTS: true
# XXX: no coverage data handling here, since benchmarks are run on release builds,
# while coverage is currently collected for the debug ones

View File

@@ -19,7 +19,7 @@ jobs:
runs-on: ubuntu-22.04
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0
uses: step-security/harden-runner@v2
with:
egress-policy: audit

View File

@@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-22.04
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0
uses: step-security/harden-runner@v2
with:
egress-policy: audit

View File

@@ -1,112 +0,0 @@
name: Cloud Extensions Test
on:
schedule:
# * is a special character in YAML so you have to quote this string
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
- cron: '45 1 * * *' # run once a day, timezone is utc
workflow_dispatch: # adds ability to run this manually
inputs:
region_id:
description: 'Project region id. If not set, the default region will be used'
required: false
default: 'aws-us-east-2'
defaults:
run:
shell: bash -euxo pipefail {0}
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
contents: write
jobs:
regress:
env:
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
strategy:
fail-fast: false
matrix:
pg-version: [16, 17]
runs-on: [ self-hosted, small ]
container:
# We use the neon-test-extensions image here as it contains the source code for the extensions.
image: ghcr.io/neondatabase/neon-test-extensions-v${{ matrix.pg-version }}:latest
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Evaluate the settings
id: project-settings
run: |
if [[ $((${{ matrix.pg-version }})) -lt 17 ]]; then
ULID=ulid
else
ULID=pgx_ulid
fi
LIBS=timescaledb:rag_bge_small_en_v15,rag_jina_reranker_v1_tiny_en:$ULID
settings=$(jq -c -n --arg libs $LIBS '{preload_libraries:{use_defaults:false,enabled_libraries:($libs| split(":"))}}')
echo settings=$settings >> $GITHUB_OUTPUT
- name: Create Neon Project
id: create-neon-project
uses: ./.github/actions/neon-project-create
with:
region_id: ${{ inputs.region_id }}
postgres_version: ${{ matrix.pg-version }}
project_settings: ${{ steps.project-settings.outputs.settings }}
# We need these settings to get the expected output results.
# We cannot use the environment variables e.g. PGTZ due to
# https://github.com/neondatabase/neon/issues/1287
default_endpoint_settings: >
{
"pg_settings": {
"DateStyle": "Postgres,MDY",
"TimeZone": "America/Los_Angeles",
"compute_query_id": "off",
"neon.allow_unstable_extensions": "on"
}
}
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
admin_api_key: ${{ secrets.NEON_STAGING_ADMIN_API_KEY }}
- name: Run the regression tests
run: /run-tests.sh -r /ext-src
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
SKIP: "pg_hint_plan-src,pg_repack-src,pg_cron-src,plpgsql_check-src"
- name: Delete Neon Project
if: ${{ always() }}
uses: ./.github/actions/neon-project-delete
with:
project_id: ${{ steps.create-neon-project.outputs.project_id }}
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
uses: slackapi/slack-github-action@fcfb566f8b0aab22203f066d80ca1d7e4b5d05b3 # v1.27.1
with:
channel-id: ${{ vars.SLACK_ON_CALL_QA_STAGING_STREAM }}
slack-message: |
Periodic extensions test on staging: ${{ job.status }}
<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}

View File

@@ -14,7 +14,7 @@ jobs:
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0
uses: step-security/harden-runner@v2
with:
egress-policy: audit

View File

@@ -28,7 +28,7 @@ jobs:
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0
uses: step-security/harden-runner@v2
with:
egress-policy: audit
@@ -75,7 +75,7 @@ jobs:
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0
uses: step-security/harden-runner@v2
with:
egress-policy: audit

View File

@@ -69,6 +69,10 @@ jobs:
check-macos-build:
needs: [ check-permissions, files-changed ]
if: |
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
github.ref_name == 'main'
uses: ./.github/workflows/build-macos.yml
with:
pg_versions: ${{ needs.files-changed.outputs.postgres_changes }}

View File

@@ -41,7 +41,7 @@ jobs:
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0
uses: step-security/harden-runner@v2
with:
egress-policy: audit

View File

@@ -35,7 +35,7 @@ jobs:
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0
uses: step-security/harden-runner@v2
with:
egress-policy: audit
@@ -73,7 +73,7 @@ jobs:
}}
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0
uses: step-security/harden-runner@v2
with:
egress-policy: audit

5
Cargo.lock generated
View File

@@ -1323,6 +1323,7 @@ dependencies = [
"serde_json",
"serde_with",
"signal-hook",
"spki 0.7.3",
"tar",
"thiserror 1.0.69",
"tokio",
@@ -4301,7 +4302,6 @@ dependencies = [
"remote_storage",
"reqwest",
"rpds",
"rstest",
"rustls 0.23.18",
"scopeguard",
"send-future",
@@ -6616,14 +6616,12 @@ dependencies = [
"anyhow",
"async-stream",
"bytes",
"camino",
"clap",
"const_format",
"futures",
"futures-core",
"futures-util",
"http-body-util",
"http-utils",
"humantime",
"hyper 1.4.1",
"hyper-util",
@@ -6633,7 +6631,6 @@ dependencies = [
"prost 0.13.3",
"rustls 0.23.18",
"tokio",
"tokio-rustls 0.26.0",
"tonic",
"tonic-build",
"tracing",

View File

@@ -173,7 +173,7 @@ RUN curl -fsSL "https://github.com/protocolbuffers/protobuf/releases/download/v$
&& rm -rf protoc.zip protoc
# s5cmd
ENV S5CMD_VERSION=2.3.0
ENV S5CMD_VERSION=2.2.2
RUN curl -sL "https://github.com/peak/s5cmd/releases/download/v${S5CMD_VERSION}/s5cmd_${S5CMD_VERSION}_Linux-$(uname -m | sed 's/x86_64/64bit/g' | sed 's/aarch64/arm64/g').tar.gz" | tar zxvf - s5cmd \
&& chmod +x s5cmd \
&& mv s5cmd /usr/local/bin/s5cmd
@@ -206,7 +206,7 @@ RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-$(uname -m).zip" -o "aws
&& rm awscliv2.zip
# Mold: A Modern Linker
ENV MOLD_VERSION=v2.37.1
ENV MOLD_VERSION=v2.34.1
RUN set -e \
&& git clone https://github.com/rui314/mold.git \
&& mkdir mold/build \
@@ -268,7 +268,7 @@ WORKDIR /home/nonroot
RUN echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /home/nonroot/.curlrc
# Python
ENV PYTHON_VERSION=3.11.12 \
ENV PYTHON_VERSION=3.11.10 \
PYENV_ROOT=/home/nonroot/.pyenv \
PATH=/home/nonroot/.pyenv/shims:/home/nonroot/.pyenv/bin:/home/nonroot/.poetry/bin:$PATH
RUN set -e \
@@ -296,12 +296,12 @@ ENV RUSTC_VERSION=1.86.0
ENV RUSTUP_HOME="/home/nonroot/.rustup"
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
ARG RUSTFILT_VERSION=0.2.1
ARG CARGO_HAKARI_VERSION=0.9.36
ARG CARGO_DENY_VERSION=0.18.2
ARG CARGO_HACK_VERSION=0.6.36
ARG CARGO_NEXTEST_VERSION=0.9.94
ARG CARGO_HAKARI_VERSION=0.9.33
ARG CARGO_DENY_VERSION=0.16.2
ARG CARGO_HACK_VERSION=0.6.33
ARG CARGO_NEXTEST_VERSION=0.9.85
ARG CARGO_CHEF_VERSION=0.1.71
ARG CARGO_DIESEL_CLI_VERSION=2.2.9
ARG CARGO_DIESEL_CLI_VERSION=2.2.6
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && whoami && \
chmod +x rustup-init && \
./rustup-init -y --default-toolchain ${RUSTC_VERSION} && \

View File

@@ -1800,8 +1800,8 @@ COPY compute/patches/pg_repack.patch /ext-src
RUN cd /ext-src/pg_repack-src && patch -p1 </ext-src/pg_repack.patch && rm -f /ext-src/pg_repack.patch
COPY --chmod=755 docker-compose/run-tests.sh /run-tests.sh
RUN apt-get update && apt-get install -y libtap-parser-sourcehandler-pgtap-perl jq \
&& apt clean && rm -rf /ext-src/*.tar.gz /ext-src/*.patch /var/lib/apt/lists/*
RUN apt-get update && apt-get install -y libtap-parser-sourcehandler-pgtap-perl\
&& apt clean && rm -rf /ext-src/*.tar.gz /var/lib/apt/lists/*
ENV PATH=/usr/local/pgsql/bin:$PATH
ENV PGHOST=compute
ENV PGPORT=55433

View File

@@ -44,6 +44,7 @@ serde.workspace = true
serde_with.workspace = true
serde_json.workspace = true
signal-hook.workspace = true
spki = { version = "0.7.3", features = ["std"] }
tar.workspace = true
tower.workspace = true
tower-http.workspace = true

View File

@@ -57,24 +57,13 @@ use tracing::{error, info};
use url::Url;
use utils::failpoint_support;
// Compatibility hack: if the control plane specified any remote-ext-config
// use the default value for extension storage proxy gateway.
// Remove this once the control plane is updated to pass the gateway URL
fn parse_remote_ext_config(arg: &str) -> Result<String> {
if arg.starts_with("http") {
Ok(arg.trim_end_matches('/').to_string())
} else {
Ok("http://pg-ext-s3-gateway".to_string())
}
}
#[derive(Parser)]
#[command(rename_all = "kebab-case")]
struct Cli {
#[arg(short = 'b', long, default_value = "postgres", env = "POSTGRES_PATH")]
pub pgbin: String,
#[arg(short = 'r', long, value_parser = parse_remote_ext_config)]
#[arg(short = 'r', long)]
pub remote_ext_config: Option<String>,
/// The port to bind the external listening HTTP server to. Clients running

View File

@@ -1,8 +1,8 @@
use metrics::core::{AtomicF64, AtomicU64, Collector, GenericCounter, GenericGauge};
use metrics::core::{AtomicF64, Collector, GenericGauge};
use metrics::proto::MetricFamily;
use metrics::{
IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter,
register_int_counter_vec, register_int_gauge_vec, register_uint_gauge_vec,
IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter_vec,
register_int_gauge_vec, register_uint_gauge_vec,
};
use once_cell::sync::Lazy;
@@ -81,22 +81,6 @@ pub(crate) static COMPUTE_CTL_UP: Lazy<IntGaugeVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static PG_CURR_DOWNTIME_MS: Lazy<GenericGauge<AtomicF64>> = Lazy::new(|| {
register_gauge!(
"compute_pg_current_downtime_ms",
"Non-cumulative duration of Postgres downtime in ms; resets after successful check",
)
.expect("failed to define a metric")
});
pub(crate) static PG_TOTAL_DOWNTIME_MS: Lazy<GenericCounter<AtomicU64>> = Lazy::new(|| {
register_int_counter!(
"compute_pg_downtime_ms_total",
"Cumulative duration of Postgres downtime in ms",
)
.expect("failed to define a metric")
});
pub fn collect() -> Vec<MetricFamily> {
let mut metrics = COMPUTE_CTL_UP.collect();
metrics.extend(INSTALLED_EXTENSIONS.collect());
@@ -104,7 +88,5 @@ pub fn collect() -> Vec<MetricFamily> {
metrics.extend(REMOTE_EXT_REQUESTS_TOTAL.collect());
metrics.extend(DB_MIGRATION_FAILED.collect());
metrics.extend(AUDIT_LOG_DIR_SIZE.collect());
metrics.extend(PG_CURR_DOWNTIME_MS.collect());
metrics.extend(PG_TOTAL_DOWNTIME_MS.collect());
metrics
}

View File

@@ -6,294 +6,197 @@ use chrono::{DateTime, Utc};
use compute_api::responses::ComputeStatus;
use compute_api::spec::ComputeFeature;
use postgres::{Client, NoTls};
use tracing::{Level, error, info, instrument, span};
use tracing::{debug, error, info, warn};
use crate::compute::ComputeNode;
use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS};
const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
struct ComputeMonitor {
compute: Arc<ComputeNode>,
// Spin in a loop and figure out the last activity time in the Postgres.
// Then update it in the shared state. This function never errors out.
// NB: the only expected panic is at `Mutex` unwrap(), all other errors
// should be handled gracefully.
fn watch_compute_activity(compute: &ComputeNode) {
// Suppose that `connstr` doesn't change
let connstr = compute.params.connstr.clone();
let conf = compute.get_conn_conf(Some("compute_ctl:activity_monitor"));
/// The moment when Postgres had some activity,
/// that should prevent compute from being suspended.
last_active: Option<DateTime<Utc>>,
// During startup and configuration we connect to every Postgres database,
// but we don't want to count this as some user activity. So wait until
// the compute fully started before monitoring activity.
wait_for_postgres_start(compute);
/// The moment when we last tried to check Postgres.
last_checked: DateTime<Utc>,
/// The last moment we did a successful Postgres check.
last_up: DateTime<Utc>,
// Define `client` outside of the loop to reuse existing connection if it's active.
let mut client = conf.connect(NoTls);
/// Only used for internal statistics change tracking
/// between monitor runs and can be outdated.
active_time: Option<f64>,
/// Only used for internal statistics change tracking
/// between monitor runs and can be outdated.
sessions: Option<i64>,
let mut sleep = false;
let mut prev_active_time: Option<f64> = None;
let mut prev_sessions: Option<i64> = None;
/// Use experimental statistics-based activity monitor. It's no longer
/// 'experimental' per se, as it's enabled for everyone, but we still
/// keep the flag as an option to turn it off in some cases if it will
/// misbehave.
experimental: bool,
}
impl ComputeMonitor {
fn report_down(&self) {
let now = Utc::now();
// Calculate and report current downtime
// (since the last time Postgres was up)
let downtime = now.signed_duration_since(self.last_up);
PG_CURR_DOWNTIME_MS.set(downtime.num_milliseconds() as f64);
// Calculate and update total downtime
// (cumulative duration of Postgres downtime in ms)
let inc = now
.signed_duration_since(self.last_checked)
.num_milliseconds();
PG_TOTAL_DOWNTIME_MS.inc_by(inc as u64);
if compute.has_feature(ComputeFeature::ActivityMonitorExperimental) {
info!("starting experimental activity monitor for {}", connstr);
} else {
info!("starting activity monitor for {}", connstr);
}
fn report_up(&mut self) {
self.last_up = Utc::now();
PG_CURR_DOWNTIME_MS.set(0.0);
}
fn downtime_info(&self) -> String {
format!(
"total_ms: {}, current_ms: {}, last_up: {}",
PG_TOTAL_DOWNTIME_MS.get(),
PG_CURR_DOWNTIME_MS.get(),
self.last_up
)
}
/// Spin in a loop and figure out the last activity time in the Postgres.
/// Then update it in the shared state. This function never errors out.
/// NB: the only expected panic is at `Mutex` unwrap(), all other errors
/// should be handled gracefully.
#[instrument(skip_all)]
pub fn run(&mut self) {
// Suppose that `connstr` doesn't change
let connstr = self.compute.params.connstr.clone();
let conf = self
.compute
.get_conn_conf(Some("compute_ctl:compute_monitor"));
// During startup and configuration we connect to every Postgres database,
// but we don't want to count this as some user activity. So wait until
// the compute fully started before monitoring activity.
wait_for_postgres_start(&self.compute);
// Define `client` outside of the loop to reuse existing connection if it's active.
let mut client = conf.connect(NoTls);
info!("starting compute monitor for {}", connstr);
loop {
match &mut client {
Ok(cli) => {
if cli.is_closed() {
info!(
downtime_info = self.downtime_info(),
"connection to Postgres is closed, trying to reconnect"
);
self.report_down();
// Connection is closed, reconnect and try again.
client = conf.connect(NoTls);
} else {
match self.check(cli) {
Ok(_) => {
self.report_up();
self.compute.update_last_active(self.last_active);
}
Err(e) => {
// Although we have many places where we can return errors in `check()`,
// normally it shouldn't happen. I.e., we will likely return error if
// connection got broken, query timed out, Postgres returned invalid data, etc.
// In all such cases it's suspicious, so let's report this as downtime.
self.report_down();
error!(
downtime_info = self.downtime_info(),
"could not check Postgres: {}", e
);
// Reconnect to Postgres just in case. During tests, I noticed
// that queries in `check()` can fail with `connection closed`,
// but `cli.is_closed()` above doesn't detect it. Even if old
// connection is still alive, it will be dropped when we reassign
// `client` to a new connection.
client = conf.connect(NoTls);
}
}
}
}
Err(e) => {
info!(
downtime_info = self.downtime_info(),
"could not connect to Postgres: {}, retrying", e
);
self.report_down();
// Establish a new connection and try again.
client = conf.connect(NoTls);
}
}
// Reset the `last_checked` timestamp and sleep before the next iteration.
self.last_checked = Utc::now();
loop {
// We use `continue` a lot, so it's more convenient to sleep at the top of the loop.
// But skip the first sleep, so we can connect to Postgres immediately.
if sleep {
// Should be outside of the mutex lock to allow others to read while we sleep.
thread::sleep(MONITOR_CHECK_INTERVAL);
} else {
sleep = true;
}
}
#[instrument(skip_all)]
fn check(&mut self, cli: &mut Client) -> anyhow::Result<()> {
// This is new logic, only enable if the feature flag is set.
// TODO: remove this once we are sure that it works OR drop it altogether.
if self.experimental {
// Check if the total active time or sessions across all databases has changed.
// If it did, it means that user executed some queries. In theory, it can even go down if
// some databases were dropped, but it's still user activity.
match get_database_stats(cli) {
Ok((active_time, sessions)) => {
let mut detected_activity = false;
match &mut client {
Ok(cli) => {
if cli.is_closed() {
info!("connection to Postgres is closed, trying to reconnect");
if let Some(prev_active_time) = self.active_time {
if active_time != prev_active_time {
detected_activity = true;
// Connection is closed, reconnect and try again.
client = conf.connect(NoTls);
continue;
}
// This is a new logic, only enable if the feature flag is set.
// TODO: remove this once we are sure that it works OR drop it altogether.
if compute.has_feature(ComputeFeature::ActivityMonitorExperimental) {
// First, check if the total active time or sessions across all databases has changed.
// If it did, it means that user executed some queries. In theory, it can even go down if
// some databases were dropped, but it's still a user activity.
match get_database_stats(cli) {
Ok((active_time, sessions)) => {
let mut detected_activity = false;
prev_active_time = match prev_active_time {
Some(prev_active_time) => {
if active_time != prev_active_time {
detected_activity = true;
}
Some(active_time)
}
None => Some(active_time),
};
prev_sessions = match prev_sessions {
Some(prev_sessions) => {
if sessions != prev_sessions {
detected_activity = true;
}
Some(sessions)
}
None => Some(sessions),
};
if detected_activity {
// Update the last active time and continue, we don't need to
// check backends state change.
compute.update_last_active(Some(Utc::now()));
continue;
}
}
Err(e) => {
error!("could not get database statistics: {}", e);
continue;
}
}
self.active_time = Some(active_time);
}
if let Some(prev_sessions) = self.sessions {
if sessions != prev_sessions {
detected_activity = true;
// Second, if database statistics is the same, check all backends state change,
// maybe there is some with more recent activity. `get_backends_state_change()`
// can return None or stale timestamp, so it's `compute.update_last_active()`
// responsibility to check if the new timestamp is more recent than the current one.
// This helps us to discover new sessions, that did nothing yet.
match get_backends_state_change(cli) {
Ok(last_active) => {
compute.update_last_active(last_active);
}
Err(e) => {
error!("could not get backends state change: {}", e);
}
}
// Finally, if there are existing (logical) walsenders, do not suspend.
//
// walproposer doesn't currently show up in pg_stat_replication,
// but protect if it will be
let ws_count_query = "select count(*) from pg_stat_replication where application_name != 'walproposer';";
match cli.query_one(ws_count_query, &[]) {
Ok(r) => match r.try_get::<&str, i64>("count") {
Ok(num_ws) => {
if num_ws > 0 {
compute.update_last_active(Some(Utc::now()));
continue;
}
}
}
self.sessions = Some(sessions);
if detected_activity {
// Update the last active time and continue, we don't need to
// check backends state change.
self.last_active = Some(Utc::now());
return Ok(());
Err(e) => {
warn!("failed to parse walsenders count: {:?}", e);
continue;
}
},
Err(e) => {
warn!("failed to get list of walsenders: {:?}", e);
continue;
}
}
Err(e) => {
return Err(anyhow::anyhow!("could not get database statistics: {}", e));
//
// Don't suspend compute if there is an active logical replication subscription
//
// `where pid is not null` to filter out read only computes and subscription on branches
//
let logical_subscriptions_query =
"select count(*) from pg_stat_subscription where pid is not null;";
match cli.query_one(logical_subscriptions_query, &[]) {
Ok(row) => match row.try_get::<&str, i64>("count") {
Ok(num_subscribers) => {
if num_subscribers > 0 {
compute.update_last_active(Some(Utc::now()));
continue;
}
}
Err(e) => {
warn!("failed to parse `pg_stat_subscription` count: {:?}", e);
continue;
}
},
Err(e) => {
warn!(
"failed to get list of active logical replication subscriptions: {:?}",
e
);
continue;
}
}
//
// Do not suspend compute if autovacuum is running
//
let autovacuum_count_query = "select count(*) from pg_stat_activity where backend_type = 'autovacuum worker'";
match cli.query_one(autovacuum_count_query, &[]) {
Ok(r) => match r.try_get::<&str, i64>("count") {
Ok(num_workers) => {
if num_workers > 0 {
compute.update_last_active(Some(Utc::now()));
continue;
}
}
Err(e) => {
warn!("failed to parse autovacuum workers count: {:?}", e);
continue;
}
},
Err(e) => {
warn!("failed to get list of autovacuum workers: {:?}", e);
continue;
}
}
}
}
// If database statistics are the same, check all backends for state changes.
// Maybe there are some with more recent activity. `get_backends_state_change()`
// can return None or stale timestamp, so it's `compute.update_last_active()`
// responsibility to check if the new timestamp is more recent than the current one.
// This helps us to discover new sessions that have not done anything yet.
match get_backends_state_change(cli) {
Ok(last_active) => match (last_active, self.last_active) {
(Some(last_active), Some(prev_last_active)) => {
if last_active > prev_last_active {
self.last_active = Some(last_active);
return Ok(());
}
}
(Some(last_active), None) => {
self.last_active = Some(last_active);
return Ok(());
}
_ => {}
},
Err(e) => {
return Err(anyhow::anyhow!(
"could not get backends state change: {}",
e
));
debug!("could not connect to Postgres: {}, retrying", e);
// Establish a new connection and try again.
client = conf.connect(NoTls);
}
}
// If there are existing (logical) walsenders, do not suspend.
//
// N.B. walproposer doesn't currently show up in pg_stat_replication,
// but protect if it will.
const WS_COUNT_QUERY: &str =
"select count(*) from pg_stat_replication where application_name != 'walproposer';";
match cli.query_one(WS_COUNT_QUERY, &[]) {
Ok(r) => match r.try_get::<&str, i64>("count") {
Ok(num_ws) => {
if num_ws > 0 {
self.last_active = Some(Utc::now());
return Ok(());
}
}
Err(e) => {
let err: anyhow::Error = e.into();
return Err(err.context("failed to parse walsenders count"));
}
},
Err(e) => {
return Err(anyhow::anyhow!("failed to get list of walsenders: {}", e));
}
}
// Don't suspend compute if there is an active logical replication subscription
//
// `where pid is not null` to filter out read only computes and subscription on branches
const LOGICAL_SUBSCRIPTIONS_QUERY: &str =
"select count(*) from pg_stat_subscription where pid is not null;";
match cli.query_one(LOGICAL_SUBSCRIPTIONS_QUERY, &[]) {
Ok(row) => match row.try_get::<&str, i64>("count") {
Ok(num_subscribers) => {
if num_subscribers > 0 {
self.last_active = Some(Utc::now());
return Ok(());
}
}
Err(e) => {
return Err(anyhow::anyhow!(
"failed to parse 'pg_stat_subscription' count: {}",
e
));
}
},
Err(e) => {
return Err(anyhow::anyhow!(
"failed to get list of active logical replication subscriptions: {}",
e
));
}
}
// Do not suspend compute if autovacuum is running
const AUTOVACUUM_COUNT_QUERY: &str =
"select count(*) from pg_stat_activity where backend_type = 'autovacuum worker'";
match cli.query_one(AUTOVACUUM_COUNT_QUERY, &[]) {
Ok(r) => match r.try_get::<&str, i64>("count") {
Ok(num_workers) => {
if num_workers > 0 {
self.last_active = Some(Utc::now());
return Ok(());
};
}
Err(e) => {
return Err(anyhow::anyhow!(
"failed to parse autovacuum workers count: {}",
e
));
}
},
Err(e) => {
return Err(anyhow::anyhow!(
"failed to get list of autovacuum workers: {}",
e
));
}
}
Ok(())
}
}
@@ -412,24 +315,9 @@ fn get_backends_state_change(cli: &mut Client) -> anyhow::Result<Option<DateTime
/// Launch a separate compute monitor thread and return its `JoinHandle`.
pub fn launch_monitor(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
let compute = Arc::clone(compute);
let experimental = compute.has_feature(ComputeFeature::ActivityMonitorExperimental);
let now = Utc::now();
let mut monitor = ComputeMonitor {
compute,
last_active: None,
last_checked: now,
last_up: now,
active_time: None,
sessions: None,
experimental,
};
let span = span!(Level::INFO, "compute_monitor");
thread::Builder::new()
.name("compute-monitor".into())
.spawn(move || {
let _enter = span.enter();
monitor.run();
})
.spawn(move || watch_compute_activity(&compute))
.expect("cannot launch compute monitor thread")
}

View File

@@ -3,6 +3,7 @@ use std::{io::Write, os::unix::fs::OpenOptionsExt, path::Path, time::Duration};
use anyhow::{Context, Result, bail};
use compute_api::responses::TlsConfig;
use ring::digest;
use spki::der::{Decode, PemReader};
use x509_cert::Certificate;
#[derive(Clone, Copy)]
@@ -51,7 +52,7 @@ pub fn update_key_path_blocking(pg_data: &Path, tls_config: &TlsConfig) {
match try_update_key_path_blocking(pg_data, tls_config) {
Ok(()) => break,
Err(e) => {
tracing::error!(error = ?e, "could not create key file");
tracing::error!("could not create key file {e:?}");
std::thread::sleep(Duration::from_secs(1))
}
}
@@ -91,14 +92,8 @@ fn try_update_key_path_blocking(pg_data: &Path, tls_config: &TlsConfig) -> Resul
fn verify_key_cert(key: &str, cert: &str) -> Result<()> {
use x509_cert::der::oid::db::rfc5912::ECDSA_WITH_SHA_256;
let certs = Certificate::load_pem_chain(cert.as_bytes())
.context("decoding PEM encoded certificates")?;
// First certificate is our server-cert,
// all the rest of the certs are the CA cert chain.
let Some(cert) = certs.first() else {
bail!("no certificates found");
};
let cert = Certificate::decode(&mut PemReader::new(cert.as_bytes()).context("pem reader")?)
.context("decode cert")?;
match cert.signature_algorithm.oid {
ECDSA_WITH_SHA_256 => {
@@ -120,82 +115,3 @@ fn verify_key_cert(key: &str, cert: &str) -> Result<()> {
Ok(())
}
#[cfg(test)]
mod tests {
use super::verify_key_cert;
/// Real certificate chain file, generated by cert-manager in dev.
/// The server auth certificate has expired since 2025-04-24T15:41:35Z.
const CERT: &str = "
-----BEGIN CERTIFICATE-----
MIICCDCCAa+gAwIBAgIQKhLomFcNULbZA/bPdGzaSzAKBggqhkjOPQQDAjBEMQsw
CQYDVQQGEwJVUzESMBAGA1UEChMJTmVvbiBJbmMuMSEwHwYDVQQDExhOZW9uIEs4
cyBJbnRlcm1lZGlhdGUgQ0EwHhcNMjUwNDIzMTU0MTM1WhcNMjUwNDI0MTU0MTM1
WjBBMT8wPQYDVQQDEzZjb21wdXRlLXdpc3B5LWdyYXNzLXcwY21laWp3LmRlZmF1
bHQuc3ZjLmNsdXN0ZXIubG9jYWwwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAATF
QCcG2m/EVHAiZtSsYgVnHgoTjUL/Jtwfdrpvz2t0bVRZmBmSKhlo53uPV9Y5eKFG
AmR54p9/gT2eO3xU7vAgo4GFMIGCMA4GA1UdDwEB/wQEAwIFoDAMBgNVHRMBAf8E
AjAAMB8GA1UdIwQYMBaAFFR2JAhXkeiNQNEixTvAYIwxUu3QMEEGA1UdEQQ6MDiC
NmNvbXB1dGUtd2lzcHktZ3Jhc3MtdzBjbWVpancuZGVmYXVsdC5zdmMuY2x1c3Rl
ci5sb2NhbDAKBggqhkjOPQQDAgNHADBEAiBLG22wKG8XS9e9RxBT+kmUx/kIThcP
DIpp7jx0PrFcdQIgEMTdnXpx5Cv/Z0NIEDxtMHUD7G0vuRPfztki36JuakM=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIICFzCCAb6gAwIBAgIUbbX98N2Ip6lWAONRk8dU9hSz+YIwCgYIKoZIzj0EAwIw
RDELMAkGA1UEBhMCVVMxEjAQBgNVBAoTCU5lb24gSW5jLjEhMB8GA1UEAxMYTmVv
biBBV1MgSW50ZXJtZWRpYXRlIENBMB4XDTI1MDQyMjE1MTAxMFoXDTI1MDcyMTE1
MTAxMFowRDELMAkGA1UEBhMCVVMxEjAQBgNVBAoTCU5lb24gSW5jLjEhMB8GA1UE
AxMYTmVvbiBLOHMgSW50ZXJtZWRpYXRlIENBMFkwEwYHKoZIzj0CAQYIKoZIzj0D
AQcDQgAE5++m5owqNI4BPMTVNIUQH0qvU7pYhdpHGVGhdj/Lgars6ROvE6uSNQV4
SAmJN5HBzj5/6kLQaTPWpXW7EHXjK6OBjTCBijAOBgNVHQ8BAf8EBAMCAQYwEgYD
VR0TAQH/BAgwBgEB/wIBADAdBgNVHQ4EFgQUVHYkCFeR6I1A0SLFO8BgjDFS7dAw
HwYDVR0jBBgwFoAUgHfNXfyKtHO0V9qoLOWCjkNiaI8wJAYDVR0eAQH/BBowGKAW
MBSCEi5zdmMuY2x1c3Rlci5sb2NhbDAKBggqhkjOPQQDAgNHADBEAiBObVFFdXaL
QpOXmN60dYUNnQRwjKreFduEkQgOdOlssgIgVAdJJQFgvlrvEOBhY8j5WyeKRwUN
k/ALs6KpgaFBCGY=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIB4jCCAYegAwIBAgIUFlxWFn/11yoGdmD+6gf+yQMToS0wCgYIKoZIzj0EAwIw
ODELMAkGA1UEBhMCVVMxEjAQBgNVBAoTCU5lb24gSW5jLjEVMBMGA1UEAxMMTmVv
biBSb290IENBMB4XDTI1MDQwMzA3MTUyMloXDTI2MDQwMzA3MTUyMlowRDELMAkG
A1UEBhMCVVMxEjAQBgNVBAoTCU5lb24gSW5jLjEhMB8GA1UEAxMYTmVvbiBBV1Mg
SW50ZXJtZWRpYXRlIENBMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEqonG/IQ6
ZxtEtOUTkkoNopPieXDO5CBKUkNFTGeJEB7OxRlSpYJgsBpaYIaD6Vc4sVk3thIF
p+pLw52idQOIN6NjMGEwDgYDVR0PAQH/BAQDAgEGMA8GA1UdEwEB/wQFMAMBAf8w
HQYDVR0OBBYEFIB3zV38irRztFfaqCzlgo5DYmiPMB8GA1UdIwQYMBaAFKh7M4/G
FHvr/ORDQZt4bMLlJvHCMAoGCCqGSM49BAMCA0kAMEYCIQCbS4x7QPslONzBYbjC
UQaQ0QLDW4CJHvQ4u4gbWFG87wIhAJMsHQHjP9qTT27Q65zQCR7O8QeLAfha1jrH
Ag/LsxSr
-----END CERTIFICATE-----
";
/// The key corresponding to [`CERT`]
const KEY: &str = "
-----BEGIN EC PRIVATE KEY-----
MHcCAQEEIDnAnrqmIJjndCLWP1iIO5X3X63Aia48TGpGuMXwvm6IoAoGCCqGSM49
AwEHoUQDQgAExUAnBtpvxFRwImbUrGIFZx4KE41C/ybcH3a6b89rdG1UWZgZkioZ
aOd7j1fWOXihRgJkeeKff4E9njt8VO7wIA==
-----END EC PRIVATE KEY-----
";
/// An incorrect key.
const INCORRECT_KEY: &str = "
-----BEGIN EC PRIVATE KEY-----
MHcCAQEEIL6WqqBDyvM0HWz7Ir5M5+jhFWB7IzOClGn26OPrzHCXoAoGCCqGSM49
AwEHoUQDQgAE7XVvdOy5lfwtNKb+gJEUtnG+DrnnXLY5LsHDeGQKV9PTRcEMeCrG
YZzHyML4P6Sr4yi2ts+4B9i47uvAG8+XwQ==
-----END EC PRIVATE KEY-----
";
#[test]
fn certificate_verification() {
verify_key_cert(KEY, CERT).unwrap();
}
#[test]
#[should_panic(expected = "private key file does not match certificate")]
fn certificate_verification_fail() {
verify_key_cert(INCORRECT_KEY, CERT).unwrap();
}
}

View File

@@ -17,10 +17,8 @@ use std::time::Duration;
use anyhow::{Context, Result, anyhow, bail};
use clap::Parser;
use compute_api::spec::ComputeMode;
use control_plane::broker::StorageBroker;
use control_plane::endpoint::ComputeControlPlane;
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_PORT, EndpointStorage};
use control_plane::local_env;
use control_plane::local_env::{
EndpointStorageConf, InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf,
NeonLocalInitPageserverConf, SafekeeperConf,
@@ -30,6 +28,7 @@ use control_plane::safekeeper::SafekeeperNode;
use control_plane::storage_controller::{
NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController,
};
use control_plane::{broker, local_env};
use nix::fcntl::{FlockArg, flock};
use pageserver_api::config::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
@@ -989,8 +988,7 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
NeonLocalInitConf {
control_plane_api: Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap()),
broker: NeonBroker {
listen_addr: Some(DEFAULT_BROKER_ADDR.parse().unwrap()),
listen_https_addr: None,
listen_addr: DEFAULT_BROKER_ADDR.parse().unwrap(),
},
safekeepers: vec![SafekeeperConf {
id: DEFAULT_SAFEKEEPER_ID,
@@ -1779,8 +1777,7 @@ async fn handle_endpoint_storage(
async fn handle_storage_broker(subcmd: &StorageBrokerCmd, env: &local_env::LocalEnv) -> Result<()> {
match subcmd {
StorageBrokerCmd::Start(args) => {
let storage_broker = StorageBroker::from_env(env);
if let Err(e) = storage_broker.start(&args.start_timeout).await {
if let Err(e) = broker::start_broker_process(env, &args.start_timeout).await {
eprintln!("broker start failed: {e}");
exit(1);
}
@@ -1788,8 +1785,7 @@ async fn handle_storage_broker(subcmd: &StorageBrokerCmd, env: &local_env::Local
StorageBrokerCmd::Stop(_args) => {
// FIXME: stop_mode unused
let storage_broker = StorageBroker::from_env(env);
if let Err(e) = storage_broker.stop() {
if let Err(e) = broker::stop_broker_process(env) {
eprintln!("broker stop failed: {e}");
exit(1);
}
@@ -1839,11 +1835,8 @@ async fn handle_start_all_impl(
#[allow(clippy::redundant_closure_call)]
(|| {
js.spawn(async move {
let storage_broker = StorageBroker::from_env(env);
storage_broker
.start(&retry_timeout)
.await
.map_err(|e| e.context("start storage_broker"))
let retry_timeout = retry_timeout;
broker::start_broker_process(env, &retry_timeout).await
});
js.spawn(async move {
@@ -1998,8 +1991,7 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
}
}
let storage_broker = StorageBroker::from_env(env);
if let Err(e) = storage_broker.stop() {
if let Err(e) = broker::stop_broker_process(env) {
eprintln!("neon broker stop failed: {e:#}");
}

View File

@@ -3,86 +3,60 @@
//! In the local test environment, the storage broker stores its data directly in
//!
//! ```text
//! .neon/storage_broker
//! .neon
//! ```
use std::time::Duration;
use anyhow::Context;
use camino::Utf8PathBuf;
use crate::{background_process, local_env::LocalEnv};
use crate::{background_process, local_env};
pub struct StorageBroker {
env: LocalEnv,
pub async fn start_broker_process(
env: &local_env::LocalEnv,
retry_timeout: &Duration,
) -> anyhow::Result<()> {
let broker = &env.broker;
let listen_addr = &broker.listen_addr;
print!("Starting neon broker at {}", listen_addr);
let args = [format!("--listen-addr={listen_addr}")];
let client = reqwest::Client::new();
background_process::start_process(
"storage_broker",
&env.base_data_dir,
&env.storage_broker_bin(),
args,
[],
background_process::InitialPidFile::Create(storage_broker_pid_file_path(env)),
retry_timeout,
|| async {
let url = broker.client_url();
let status_url = url.join("status").with_context(|| {
format!("Failed to append /status path to broker endpoint {url}")
})?;
let request = client
.get(status_url)
.build()
.with_context(|| format!("Failed to construct request to broker endpoint {url}"))?;
match client.execute(request).await {
Ok(resp) => Ok(resp.status().is_success()),
Err(_) => Ok(false),
}
},
)
.await
.context("Failed to spawn storage_broker subprocess")?;
Ok(())
}
impl StorageBroker {
/// Create a new `StorageBroker` instance from the environment.
pub fn from_env(env: &LocalEnv) -> Self {
Self { env: env.clone() }
}
pub fn initialize(&self) -> anyhow::Result<()> {
if self.env.generate_local_ssl_certs {
self.env.generate_ssl_cert(
&self.env.storage_broker_data_dir().join("server.crt"),
&self.env.storage_broker_data_dir().join("server.key"),
)?;
}
Ok(())
}
/// Start the storage broker process.
pub async fn start(&self, retry_timeout: &Duration) -> anyhow::Result<()> {
let broker = &self.env.broker;
print!("Starting neon broker at {}", broker.client_url());
let mut args = Vec::new();
if let Some(addr) = &broker.listen_addr {
args.push(format!("--listen-addr={addr}"));
}
if let Some(addr) = &broker.listen_https_addr {
args.push(format!("--listen-https-addr={addr}"));
}
let client = self.env.create_http_client();
background_process::start_process(
"storage_broker",
&self.env.storage_broker_data_dir(),
&self.env.storage_broker_bin(),
args,
[],
background_process::InitialPidFile::Create(self.pid_file_path()),
retry_timeout,
|| async {
let url = broker.client_url();
let status_url = url.join("status").with_context(|| {
format!("Failed to append /status path to broker endpoint {url}")
})?;
let request = client.get(status_url).build().with_context(|| {
format!("Failed to construct request to broker endpoint {url}")
})?;
match client.execute(request).await {
Ok(resp) => Ok(resp.status().is_success()),
Err(_) => Ok(false),
}
},
)
.await
.context("Failed to spawn storage_broker subprocess")?;
Ok(())
}
/// Stop the storage broker process.
pub fn stop(&self) -> anyhow::Result<()> {
background_process::stop_process(true, "storage_broker", &self.pid_file_path())
}
/// Get the path to the PID file for the storage broker.
fn pid_file_path(&self) -> Utf8PathBuf {
Utf8PathBuf::from_path_buf(self.env.base_data_dir.join("storage_broker.pid"))
.expect("non-Unicode path")
}
pub fn stop_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
background_process::stop_process(true, "storage_broker", &storage_broker_pid_file_path(env))
}
fn storage_broker_pid_file_path(env: &local_env::LocalEnv) -> Utf8PathBuf {
Utf8PathBuf::from_path_buf(env.base_data_dir.join("storage_broker.pid"))
.expect("non-Unicode path")
}

View File

@@ -4,7 +4,7 @@
//! script which will use local paths.
use std::collections::HashMap;
use std::net::SocketAddr;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::time::Duration;
@@ -14,12 +14,11 @@ use anyhow::{Context, bail};
use clap::ValueEnum;
use pem::Pem;
use postgres_backend::AuthType;
use reqwest::{Certificate, Url};
use reqwest::Url;
use serde::{Deserialize, Serialize};
use utils::auth::encode_from_key_file;
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use crate::broker::StorageBroker;
use crate::endpoint_storage::{ENDPOINT_STORAGE_REMOTE_STORAGE_DIR, EndpointStorage};
use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode};
use crate::safekeeper::SafekeeperNode;
@@ -158,16 +157,11 @@ pub struct EndpointStorageConf {
}
/// Broker config for cluster internal communication.
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug, Default)]
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
#[serde(default)]
pub struct NeonBroker {
/// Broker listen HTTP address for storage nodes coordination, e.g. '127.0.0.1:50051'.
/// At least one of listen_addr or listen_https_addr must be set.
pub listen_addr: Option<SocketAddr>,
/// Broker listen HTTPS address for storage nodes coordination, e.g. '127.0.0.1:50051'.
/// At least one of listen_addr or listen_https_addr must be set.
/// listen_https_addr is preferred over listen_addr in neon_local.
pub listen_https_addr: Option<SocketAddr>,
/// Broker listen address for storage nodes coordination, e.g. '127.0.0.1:50051'.
pub listen_addr: SocketAddr,
}
/// A part of storage controller's config the neon_local knows about.
@@ -241,19 +235,18 @@ impl Default for NeonStorageControllerConf {
}
}
// Dummy Default impl to satisfy Deserialize derive.
impl Default for NeonBroker {
fn default() -> Self {
NeonBroker {
listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
}
}
}
impl NeonBroker {
pub fn client_url(&self) -> Url {
let url = if let Some(addr) = self.listen_https_addr {
format!("https://{}", addr)
} else {
format!(
"http://{}",
self.listen_addr
.expect("at least one address should be set")
)
};
Url::parse(&url).expect("failed to construct url")
Url::parse(&format!("http://{}", self.listen_addr)).expect("failed to construct url")
}
}
@@ -448,10 +441,6 @@ impl LocalEnv {
self.base_data_dir.join("endpoints")
}
pub fn storage_broker_data_dir(&self) -> PathBuf {
self.base_data_dir.join("storage_broker")
}
pub fn pageserver_data_dir(&self, pageserver_id: NodeId) -> PathBuf {
self.base_data_dir
.join(format!("pageserver_{pageserver_id}"))
@@ -514,23 +503,6 @@ impl LocalEnv {
)
}
/// Creates HTTP client with local SSL CA certificates.
pub fn create_http_client(&self) -> reqwest::Client {
let ssl_ca_certs = self.ssl_ca_cert_path().map(|ssl_ca_file| {
let buf = std::fs::read(ssl_ca_file).expect("SSL CA file should exist");
Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid")
});
let mut http_client = reqwest::Client::builder();
for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
http_client
.build()
.expect("HTTP client should construct with no error")
}
/// Inspect the base data directory and extract the instance id and instance directory path
/// for all storage controller instances
pub async fn storage_controller_instances(&self) -> std::io::Result<Vec<(u8, PathBuf)>> {
@@ -939,12 +911,6 @@ impl LocalEnv {
// create endpoints dir
fs::create_dir_all(env.endpoints_path())?;
// create storage broker dir
fs::create_dir_all(env.storage_broker_data_dir())?;
StorageBroker::from_env(&env)
.initialize()
.context("storage broker init failed")?;
// create safekeeper dirs
for safekeeper in &env.safekeepers {
fs::create_dir_all(SafekeeperNode::datadir_path_by_id(&env, safekeeper.id))?;

View File

@@ -21,6 +21,7 @@ use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
use postgres_backend::AuthType;
use postgres_connection::{PgConnectionConfig, parse_host_port};
use reqwest::Certificate;
use utils::auth::{Claims, Scope};
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
@@ -50,6 +51,19 @@ impl PageServerNode {
parse_host_port(&conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
let port = port.unwrap_or(5432);
let ssl_ca_certs = env.ssl_ca_cert_path().map(|ssl_ca_file| {
let buf = std::fs::read(ssl_ca_file).expect("SSL root CA file should exist");
Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid")
});
let mut http_client = reqwest::Client::builder();
for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
let http_client = http_client
.build()
.expect("Client constructs with no errors");
let endpoint = if env.storage_controller.use_https_pageserver_api {
format!(
"https://{}",
@@ -66,7 +80,7 @@ impl PageServerNode {
conf: conf.clone(),
env: env.clone(),
http_client: mgmt_api::Client::new(
env.create_http_client(),
http_client,
endpoint,
{
match conf.http_auth_type {

View File

@@ -87,7 +87,7 @@ impl SafekeeperNode {
conf: conf.clone(),
pg_connection_config: Self::safekeeper_connection_config(&listen_addr, conf.pg_port),
env: env.clone(),
http_client: env.create_http_client(),
http_client: reqwest::Client::new(),
http_base_url: format!("http://{}:{}/v1", listen_addr, conf.http_port),
listen_addr,
}

View File

@@ -20,7 +20,7 @@ use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
use pem::Pem;
use postgres_backend::AuthType;
use reqwest::Method;
use reqwest::{Certificate, Method};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tokio::process::Command;
@@ -153,11 +153,24 @@ impl StorageController {
}
};
let ssl_ca_certs = env.ssl_ca_cert_path().map(|ssl_ca_file| {
let buf = std::fs::read(ssl_ca_file).expect("SSL CA file should exist");
Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid")
});
let mut http_client = reqwest::Client::builder();
for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
let http_client = http_client
.build()
.expect("HTTP client should construct with no error");
Self {
env: env.clone(),
private_key,
public_key,
client: env.create_http_client(),
client: http_client,
config: env.storage_controller.clone(),
listen_port: OnceLock::default(),
}

View File

@@ -9,20 +9,21 @@
# to verify custom image builds (e.g pre-published ones).
#
# A test script for postgres extensions
# Currently supports only v16+
# Currently supports only v16
#
set -eux -o pipefail
export COMPOSE_FILE='docker-compose.yml'
export COMPOSE_PROFILES=test-extensions
cd "$(dirname "${0}")"
COMPOSE_FILE='docker-compose.yml'
cd $(dirname $0)
COMPUTE_CONTAINER_NAME=docker-compose-compute-1
TEST_CONTAINER_NAME=docker-compose-neon-test-extensions-1
PSQL_OPTION="-h localhost -U cloud_admin -p 55433 -d postgres"
function cleanup() {
cleanup() {
echo "show container information"
docker ps
echo "stop containers..."
docker compose down
docker compose --profile test-extensions -f $COMPOSE_FILE down
}
for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
@@ -30,55 +31,55 @@ for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
echo "clean up containers if exists"
cleanup
PG_TEST_VERSION=$((pg_version < 16 ? 16 : pg_version))
PG_VERSION=${pg_version} PG_TEST_VERSION=${PG_TEST_VERSION} docker compose up --quiet-pull --build -d
PG_VERSION=$pg_version PG_TEST_VERSION=$PG_TEST_VERSION docker compose --profile test-extensions -f $COMPOSE_FILE up --quiet-pull --build -d
echo "wait until the compute is ready. timeout after 60s. "
cnt=0
while sleep 3; do
# check timeout
(( cnt += 3 ))
if [[ ${cnt} -gt 60 ]]; then
cnt=`expr $cnt + 3`
if [ $cnt -gt 60 ]; then
echo "timeout before the compute is ready."
exit 1
fi
if docker compose logs "compute_is_ready" | grep -q "accepting connections"; then
if docker compose --profile test-extensions -f $COMPOSE_FILE logs "compute_is_ready" | grep -q "accepting connections"; then
echo "OK. The compute is ready to connect."
echo "execute simple queries."
docker compose exec compute /bin/bash -c "psql ${PSQL_OPTION} -c 'SELECT 1'"
docker exec $COMPUTE_CONTAINER_NAME /bin/bash -c "psql $PSQL_OPTION"
break
fi
done
if [[ ${pg_version} -ge 16 ]]; then
if [ $pg_version -ge 16 ]; then
# This is required for the pg_hint_plan test, to prevent flaky log message causing the test to fail
# It cannot be moved to Dockerfile now because the database directory is created after the start of the container
echo Adding dummy config
docker compose exec compute touch /var/db/postgres/compute/compute_ctl_temp_override.conf
docker exec $COMPUTE_CONTAINER_NAME touch /var/db/postgres/compute/compute_ctl_temp_override.conf
# The following block copies the files for the pg_hintplan test to the compute node for the extension test in an isolated docker-compose environment
TMPDIR=$(mktemp -d)
docker compose cp neon-test-extensions:/ext-src/pg_hint_plan-src/data "${TMPDIR}/data"
docker compose cp "${TMPDIR}/data" compute:/ext-src/pg_hint_plan-src/
rm -rf "${TMPDIR}"
docker cp $TEST_CONTAINER_NAME:/ext-src/pg_hint_plan-src/data $TMPDIR/data
docker cp $TMPDIR/data $COMPUTE_CONTAINER_NAME:/ext-src/pg_hint_plan-src/
rm -rf $TMPDIR
# The following block does the same for the contrib/file_fdw test
TMPDIR=$(mktemp -d)
docker compose cp neon-test-extensions:/postgres/contrib/file_fdw/data "${TMPDIR}/data"
docker compose cp "${TMPDIR}/data" compute:/postgres/contrib/file_fdw/data
rm -rf "${TMPDIR}"
docker cp $TEST_CONTAINER_NAME:/postgres/contrib/file_fdw/data $TMPDIR/data
docker cp $TMPDIR/data $COMPUTE_CONTAINER_NAME:/postgres/contrib/file_fdw/data
rm -rf $TMPDIR
# Apply patches
docker compose exec -T neon-test-extensions bash -c "(cd /postgres && patch -p1)" <"../compute/patches/contrib_pg${pg_version}.patch"
cat ../compute/patches/contrib_pg${pg_version}.patch | docker exec -i $TEST_CONTAINER_NAME bash -c "(cd /postgres && patch -p1)"
# We are running tests now
rm -f testout.txt testout_contrib.txt
docker compose exec -e USE_PGXS=1 -e SKIP=timescaledb-src,rdkit-src,postgis-src,pg_jsonschema-src,kq_imcx-src,wal2json_2_5-src,rag_jina_reranker_v1_tiny_en-src,rag_bge_small_en_v15-src \
neon-test-extensions /run-tests.sh /ext-src | tee testout.txt && EXT_SUCCESS=1 || EXT_SUCCESS=0
docker compose exec -e SKIP=start-scripts,postgres_fdw,ltree_plpython,jsonb_plpython,jsonb_plperl,hstore_plpython,hstore_plperl,dblink,bool_plperl \
neon-test-extensions /run-tests.sh /postgres/contrib | tee testout_contrib.txt && CONTRIB_SUCCESS=1 || CONTRIB_SUCCESS=0
if [[ ${EXT_SUCCESS} -eq 0 || ${CONTRIB_SUCCESS} -eq 0 ]]; then
docker exec -e USE_PGXS=1 -e SKIP=timescaledb-src,rdkit-src,postgis-src,pg_jsonschema-src,kq_imcx-src,wal2json_2_5-src,rag_jina_reranker_v1_tiny_en-src,rag_bge_small_en_v15-src \
$TEST_CONTAINER_NAME /run-tests.sh /ext-src | tee testout.txt && EXT_SUCCESS=1 || EXT_SUCCESS=0
docker exec -e SKIP=start-scripts,postgres_fdw,ltree_plpython,jsonb_plpython,jsonb_plperl,hstore_plpython,hstore_plperl,dblink,bool_plperl \
$TEST_CONTAINER_NAME /run-tests.sh /postgres/contrib | tee testout_contrib.txt && CONTRIB_SUCCESS=1 || CONTRIB_SUCCESS=0
if [ $EXT_SUCCESS -eq 0 ] || [ $CONTRIB_SUCCESS -eq 0 ]; then
CONTRIB_FAILED=
FAILED=
[[ ${EXT_SUCCESS} -eq 0 ]] && FAILED=$(tail -1 testout.txt | awk '{for(i=1;i<=NF;i++){print "/ext-src/"$i;}}')
[[ ${CONTRIB_SUCCESS} -eq 0 ]] && CONTRIB_FAILED=$(tail -1 testout_contrib.txt | awk '{for(i=0;i<=NF;i++){print "/postgres/contrib/"$i;}}')
for d in ${FAILED} ${CONTRIB_FAILED}; do
docker compose exec neon-test-extensions bash -c 'for file in $(find '"${d}"' -name regression.diffs -o -name regression.out); do cat ${file}; done' || [[ ${?} -eq 1 ]]
[ $EXT_SUCCESS -eq 0 ] && FAILED=$(tail -1 testout.txt | awk '{for(i=1;i<=NF;i++){print "/ext-src/"$i;}}')
[ $CONTRIB_SUCCESS -eq 0 ] && CONTRIB_FAILED=$(tail -1 testout_contrib.txt | awk '{for(i=0;i<=NF;i++){print "/postgres/contrib/"$i;}}')
for d in $FAILED $CONTRIB_FAILED; do
docker exec $TEST_CONTAINER_NAME bash -c 'for file in $(find '"$d"' -name regression.diffs -o -name regression.out); do cat $file; done' || [ $? -eq 1 ]
done
exit 1
fi

View File

@@ -1,99 +0,0 @@
# PostgreSQL Extensions for Testing
This directory contains PostgreSQL extensions used primarily for:
1. Testing extension upgrades between different Compute versions
2. Running regression tests with regular users (mostly for cloud instances)
## Directory Structure
Each extension directory follows a standard structure:
- `extension-name-src/` - Directory containing test files for the extension
- `test-upgrade.sh` - Script for testing upgrade scenarios
- `regular-test.sh` - Script for testing with regular users
- Additional test files depending on the extension
## Available Extensions
This directory includes the following extensions:
- `hll-src` - HyperLogLog, a fixed-size data structure for approximating cardinality
- `hypopg-src` - Extension to create hypothetical indexes
- `ip4r-src` - IPv4/v6 and subnet data types
- `pg_cron-src` - Run periodic jobs in PostgreSQL
- `pg_graphql-src` - GraphQL support for PostgreSQL
- `pg_hint_plan-src` - Execution plan hints
- `pg_ivm-src` - Incremental view maintenance
- `pg_jsonschema-src` - JSON Schema validation
- `pg_repack-src` - Reorganize tables with minimal locks
- `pg_roaringbitmap-src` - Roaring bitmap implementation
- `pg_semver-src` - Semantic version data type
- `pg_session_jwt-src` - JWT authentication for PostgreSQL
- `pg_tiktoken-src` - OpenAI Tiktoken tokenizer
- `pg_uuidv7-src` - UUIDv7 implementation for PostgreSQL
- `pgjwt-src` - JWT tokens for PostgreSQL
- `pgrag-src` - Retrieval Augmented Generation for PostgreSQL
- `pgtap-src` - Unit testing framework for PostgreSQL
- `pgvector-src` - Vector similarity search
- `pgx_ulid-src` - ULID data type
- `plv8-src` - JavaScript language for PostgreSQL stored procedures
- `postgresql-unit-src` - SI units for PostgreSQL
- `prefix-src` - Prefix matching for strings
- `rag_bge_small_en_v15-src` - BGE embedding model for RAG
- `rag_jina_reranker_v1_tiny_en-src` - Jina reranker model for RAG
- `rum-src` - RUM access method for text search
## Usage
### Extension Upgrade Testing
The extensions in this directory are used by the `test-upgrade.sh` script to test upgrading extensions between different versions of Neon Compute nodes. The script:
1. Creates a database with extensions installed on an old Compute version
2. Creates timelines for each extension
3. Switches to a new Compute version and tests the upgrade process
4. Verifies extension functionality after upgrade
### Regular User Testing
For testing with regular users (particularly for cloud instances), each extension directory typically contains a `regular-test.sh` script that:
1. Drops the database if it exists
2. Creates a fresh test database
3. Installs the extension
4. Runs regression tests
A note about pg_regress: Since pg_regress attempts to set `lc_messages` for the database by default, which is forbidden for regular users, we create databases manually and use the `--use-existing` option to bypass this limitation.
### CI Workflows
Two main workflows use these extensions:
1. **Cloud Extensions Test** - Tests extensions on Neon cloud projects
2. **Force Test Upgrading of Extension** - Tests upgrading extensions between different Compute versions
These workflows are integrated into the build-and-test pipeline through shell scripts:
- `docker_compose_test.sh` - Tests extensions in a Docker Compose environment
- `test_extensions_upgrade.sh` - Tests extension upgrades between different Compute versions
## Adding New Extensions
To add a new extension for testing:
1. Create a directory named `extension-name-src` in this directory
2. Add at minimum:
- `regular-test.sh` for testing with regular users
- If `regular-test.sh` doesn't exist, the system will look for `neon-test.sh`
- If neither exists, it will try to run `make installcheck`
- `test-upgrade.sh` is only needed if you want to test upgrade scenarios
3. Update the list of extensions in the `test_extensions_upgrade.sh` script if needed for upgrade testing
### Patching Extension Sources
If you need to patch the extension sources:
1. Place the patch file in the extension's directory
2. Apply the patch in the appropriate script (`test-upgrade.sh`, `neon-test.sh`, `regular-test.sh`, or `Makefile`)
3. The patch will be applied during the testing process

View File

@@ -1,7 +0,0 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
dropdb --if-exists contrib_regression
createdb contrib_regression
${PG_REGRESS} --use-existing --inputdir=./ --bindir='/usr/local/pgsql/bin' --dbname=contrib_regression setup add_agg agg_oob auto_sparse card_op cast_shape copy_binary cumulative_add_cardinality_correction cumulative_add_comprehensive_promotion cumulative_add_sparse_edge cumulative_add_sparse_random cumulative_add_sparse_step cumulative_union_comprehensive cumulative_union_explicit_explicit cumulative_union_explicit_promotion cumulative_union_probabilistic_probabilistic cumulative_union_sparse_full_representation cumulative_union_sparse_promotion cumulative_union_sparse_sparse disable_hashagg equal explicit_thresh hash hash_any meta_func murmur_bigint murmur_bytea nosparse notequal scalar_oob storedproc transaction typmod typmod_insert union_op

View File

@@ -1,7 +0,0 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
dropdb --if-exists contrib_regression
createdb contrib_regression
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --inputdir=./ --bindir='/usr/local/pgsql/bin' --use-existing --inputdir=test --dbname=contrib_regression hypopg hypo_brin hypo_index_part hypo_include hypo_hash hypo_hide_index

View File

@@ -1,7 +0,0 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
dropdb --if-exist contrib_regression
createdb contrib_regression
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --use-existing --inputdir=./ --bindir='/usr/local/pgsql/bin' --dbname=contrib_regression ip4r ip4r-softerr ip4r-v11

View File

@@ -1,7 +0,0 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
dropdb --if-exist contrib_regression
createdb contrib_regression
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --use-existing --inputdir=./ --bindir='/usr/local/pgsql/bin' --dbname=contrib_regression pg_cron-test

View File

@@ -1,23 +0,0 @@
#!/bin/bash
set -ex
cd "$(dirname "${0}")"
PGXS="$(dirname "$(pg_config --pgxs)" )"
REGRESS="${PGXS}/../test/regress/pg_regress"
TESTDIR="test"
TESTS=$(ls "${TESTDIR}/sql" | sort )
TESTS=${TESTS//\.sql/}
TESTS=${TESTS/empty_mutations/}
TESTS=${TESTS/function_return_row_is_selectable/}
TESTS=${TESTS/issue_300/}
TESTS=${TESTS/permissions_connection_column/}
TESTS=${TESTS/permissions_functions/}
TESTS=${TESTS/permissions_node_column/}
TESTS=${TESTS/permissions_table_level/}
TESTS=${TESTS/permissions_types/}
TESTS=${TESTS/row_level_security/}
TESTS=${TESTS/sqli_connection/}
dropdb --if-exist contrib_regression
createdb contrib_regression
psql -v ON_ERROR_STOP=1 -f test/fixtures.sql -d contrib_regression
${REGRESS} --use-existing --dbname=contrib_regression --inputdir=${TESTDIR} ${TESTS}

View File

@@ -1,7 +0,0 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
dropdb --if-exist contrib_regression
createdb contrib_regression
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --use-existing --inputdir=./ --bindir='/usr/local/pgsql/bin' --encoding=UTF8 --dbname=contrib_regression init base_plan pg_hint_plan ut-init ut-A ut-S ut-J ut-L ut-G ut-R ut-fdw ut-W ut-T ut-fini hints_anywhere plpgsql oldextversions

View File

@@ -1,9 +0,0 @@
#!/bin/sh
set -ex
dropdb --if-exist contrib_regression
createdb contrib_regression
cd "$(dirname ${0})"
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
patch -p1 <regular.patch
${PG_REGRESS} --use-existing --inputdir=./ --bindir='/usr/local/pgsql/bin' --dbname=contrib_regression pg_ivm create_immv refresh_immv
patch -R -p1 <regular.patch

View File

@@ -1,309 +0,0 @@
diff --git a/expected/pg_ivm.out b/expected/pg_ivm.out
index e8798ee..4081680 100644
--- a/expected/pg_ivm.out
+++ b/expected/pg_ivm.out
@@ -1363,61 +1363,6 @@ SELECT * FROM mv ORDER BY i;
| 2 | 4 | 2 | 2 | 2
(1 row)
-ROLLBACK;
--- IMMV containing user defined type
-BEGIN;
-CREATE TYPE mytype;
-CREATE FUNCTION mytype_in(cstring)
- RETURNS mytype AS 'int4in'
- LANGUAGE INTERNAL STRICT IMMUTABLE;
-NOTICE: return type mytype is only a shell
-CREATE FUNCTION mytype_out(mytype)
- RETURNS cstring AS 'int4out'
- LANGUAGE INTERNAL STRICT IMMUTABLE;
-NOTICE: argument type mytype is only a shell
-CREATE TYPE mytype (
- LIKE = int4,
- INPUT = mytype_in,
- OUTPUT = mytype_out
-);
-CREATE FUNCTION mytype_eq(mytype, mytype)
- RETURNS bool AS 'int4eq'
- LANGUAGE INTERNAL STRICT IMMUTABLE;
-CREATE FUNCTION mytype_lt(mytype, mytype)
- RETURNS bool AS 'int4lt'
- LANGUAGE INTERNAL STRICT IMMUTABLE;
-CREATE FUNCTION mytype_cmp(mytype, mytype)
- RETURNS integer AS 'btint4cmp'
- LANGUAGE INTERNAL STRICT IMMUTABLE;
-CREATE OPERATOR = (
- leftarg = mytype, rightarg = mytype,
- procedure = mytype_eq);
-CREATE OPERATOR < (
- leftarg = mytype, rightarg = mytype,
- procedure = mytype_lt);
-CREATE OPERATOR CLASS mytype_ops
- DEFAULT FOR TYPE mytype USING btree AS
- OPERATOR 1 <,
- OPERATOR 3 = ,
- FUNCTION 1 mytype_cmp(mytype,mytype);
-CREATE TABLE t_mytype (x mytype);
-SELECT create_immv('mv_mytype',
- 'SELECT * FROM t_mytype');
-NOTICE: could not create an index on immv "mv_mytype" automatically
-DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause.
-HINT: Create an index on the immv for efficient incremental maintenance.
- create_immv
--------------
- 0
-(1 row)
-
-INSERT INTO t_mytype VALUES ('1'::mytype);
-SELECT * FROM mv_mytype;
- x
----
- 1
-(1 row)
-
ROLLBACK;
-- outer join is not supported
SELECT create_immv('mv(a,b)',
@@ -1510,112 +1455,6 @@ SELECT create_immv('mv_ivm_only_values1', 'values(1)');
ERROR: VALUES is not supported on incrementally maintainable materialized view
SELECT create_immv('mv_ivm_only_values2', 'SELECT * FROM (values(1)) AS tmp');
ERROR: VALUES is not supported on incrementally maintainable materialized view
--- views containing base tables with Row Level Security
-DROP USER IF EXISTS ivm_admin;
-NOTICE: role "ivm_admin" does not exist, skipping
-DROP USER IF EXISTS ivm_user;
-NOTICE: role "ivm_user" does not exist, skipping
-CREATE USER ivm_admin;
-CREATE USER ivm_user;
---- create a table with RLS
-SET SESSION AUTHORIZATION ivm_admin;
-CREATE TABLE rls_tbl(id int, data text, owner name);
-INSERT INTO rls_tbl VALUES
- (1,'foo','ivm_user'),
- (2,'bar','postgres');
-CREATE TABLE num_tbl(id int, num text);
-INSERT INTO num_tbl VALUES
- (1,'one'),
- (2,'two'),
- (3,'three'),
- (4,'four'),
- (5,'five'),
- (6,'six');
---- Users can access only their own rows
-CREATE POLICY rls_tbl_policy ON rls_tbl FOR SELECT TO PUBLIC USING(owner = current_user);
-ALTER TABLE rls_tbl ENABLE ROW LEVEL SECURITY;
-GRANT ALL on rls_tbl TO PUBLIC;
-GRANT ALL on num_tbl TO PUBLIC;
---- create a view owned by ivm_user
-SET SESSION AUTHORIZATION ivm_user;
-SELECT create_immv('ivm_rls', 'SELECT * FROM rls_tbl');
-NOTICE: could not create an index on immv "ivm_rls" automatically
-DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause.
-HINT: Create an index on the immv for efficient incremental maintenance.
- create_immv
--------------
- 1
-(1 row)
-
-SELECT id, data, owner FROM ivm_rls ORDER BY 1,2,3;
- id | data | owner
-----+------+----------
- 1 | foo | ivm_user
-(1 row)
-
-RESET SESSION AUTHORIZATION;
---- inserts rows owned by different users
-INSERT INTO rls_tbl VALUES
- (3,'baz','ivm_user'),
- (4,'qux','postgres');
-SELECT id, data, owner FROM ivm_rls ORDER BY 1,2,3;
- id | data | owner
-----+------+----------
- 1 | foo | ivm_user
- 3 | baz | ivm_user
-(2 rows)
-
---- combination of diffent kinds of commands
-WITH
- i AS (INSERT INTO rls_tbl VALUES(5,'quux','postgres'), (6,'corge','ivm_user')),
- u AS (UPDATE rls_tbl SET owner = 'postgres' WHERE id = 1),
- u2 AS (UPDATE rls_tbl SET owner = 'ivm_user' WHERE id = 2)
-SELECT;
---
-(1 row)
-
-SELECT id, data, owner FROM ivm_rls ORDER BY 1,2,3;
- id | data | owner
-----+-------+----------
- 2 | bar | ivm_user
- 3 | baz | ivm_user
- 6 | corge | ivm_user
-(3 rows)
-
----
-SET SESSION AUTHORIZATION ivm_user;
-SELECT create_immv('ivm_rls2', 'SELECT * FROM rls_tbl JOIN num_tbl USING(id)');
-NOTICE: could not create an index on immv "ivm_rls2" automatically
-DETAIL: This target list does not have all the primary key columns, or this view does not contain GROUP BY or DISTINCT clause.
-HINT: Create an index on the immv for efficient incremental maintenance.
- create_immv
--------------
- 3
-(1 row)
-
-RESET SESSION AUTHORIZATION;
-WITH
- x AS (UPDATE rls_tbl SET data = data || '_2' where id in (3,4)),
- y AS (UPDATE num_tbl SET num = num || '_2' where id in (3,4))
-SELECT;
---
-(1 row)
-
-SELECT * FROM ivm_rls2 ORDER BY 1,2,3;
- id | data | owner | num
-----+-------+----------+---------
- 2 | bar | ivm_user | two
- 3 | baz_2 | ivm_user | three_2
- 6 | corge | ivm_user | six
-(3 rows)
-
-DROP TABLE rls_tbl CASCADE;
-NOTICE: drop cascades to 2 other objects
-DETAIL: drop cascades to table ivm_rls
-drop cascades to table ivm_rls2
-DROP TABLE num_tbl CASCADE;
-DROP USER ivm_user;
-DROP USER ivm_admin;
-- automatic index creation
BEGIN;
CREATE TABLE base_a (i int primary key, j int);
diff --git a/sql/pg_ivm.sql b/sql/pg_ivm.sql
index d3c1a01..203213d 100644
--- a/sql/pg_ivm.sql
+++ b/sql/pg_ivm.sql
@@ -454,53 +454,6 @@ DELETE FROM base_t WHERE v = 5;
SELECT * FROM mv ORDER BY i;
ROLLBACK;
--- IMMV containing user defined type
-BEGIN;
-
-CREATE TYPE mytype;
-CREATE FUNCTION mytype_in(cstring)
- RETURNS mytype AS 'int4in'
- LANGUAGE INTERNAL STRICT IMMUTABLE;
-CREATE FUNCTION mytype_out(mytype)
- RETURNS cstring AS 'int4out'
- LANGUAGE INTERNAL STRICT IMMUTABLE;
-CREATE TYPE mytype (
- LIKE = int4,
- INPUT = mytype_in,
- OUTPUT = mytype_out
-);
-
-CREATE FUNCTION mytype_eq(mytype, mytype)
- RETURNS bool AS 'int4eq'
- LANGUAGE INTERNAL STRICT IMMUTABLE;
-CREATE FUNCTION mytype_lt(mytype, mytype)
- RETURNS bool AS 'int4lt'
- LANGUAGE INTERNAL STRICT IMMUTABLE;
-CREATE FUNCTION mytype_cmp(mytype, mytype)
- RETURNS integer AS 'btint4cmp'
- LANGUAGE INTERNAL STRICT IMMUTABLE;
-
-CREATE OPERATOR = (
- leftarg = mytype, rightarg = mytype,
- procedure = mytype_eq);
-CREATE OPERATOR < (
- leftarg = mytype, rightarg = mytype,
- procedure = mytype_lt);
-
-CREATE OPERATOR CLASS mytype_ops
- DEFAULT FOR TYPE mytype USING btree AS
- OPERATOR 1 <,
- OPERATOR 3 = ,
- FUNCTION 1 mytype_cmp(mytype,mytype);
-
-CREATE TABLE t_mytype (x mytype);
-SELECT create_immv('mv_mytype',
- 'SELECT * FROM t_mytype');
-INSERT INTO t_mytype VALUES ('1'::mytype);
-SELECT * FROM mv_mytype;
-
-ROLLBACK;
-
-- outer join is not supported
SELECT create_immv('mv(a,b)',
'SELECT a.i, b.i FROM mv_base_a a LEFT JOIN mv_base_b b ON a.i=b.i');
@@ -579,71 +532,6 @@ SELECT create_immv('mv_ivm31', 'SELECT sum(i)/sum(j) FROM mv_base_a');
SELECT create_immv('mv_ivm_only_values1', 'values(1)');
SELECT create_immv('mv_ivm_only_values2', 'SELECT * FROM (values(1)) AS tmp');
-
--- views containing base tables with Row Level Security
-DROP USER IF EXISTS ivm_admin;
-DROP USER IF EXISTS ivm_user;
-CREATE USER ivm_admin;
-CREATE USER ivm_user;
-
---- create a table with RLS
-SET SESSION AUTHORIZATION ivm_admin;
-CREATE TABLE rls_tbl(id int, data text, owner name);
-INSERT INTO rls_tbl VALUES
- (1,'foo','ivm_user'),
- (2,'bar','postgres');
-CREATE TABLE num_tbl(id int, num text);
-INSERT INTO num_tbl VALUES
- (1,'one'),
- (2,'two'),
- (3,'three'),
- (4,'four'),
- (5,'five'),
- (6,'six');
-
---- Users can access only their own rows
-CREATE POLICY rls_tbl_policy ON rls_tbl FOR SELECT TO PUBLIC USING(owner = current_user);
-ALTER TABLE rls_tbl ENABLE ROW LEVEL SECURITY;
-GRANT ALL on rls_tbl TO PUBLIC;
-GRANT ALL on num_tbl TO PUBLIC;
-
---- create a view owned by ivm_user
-SET SESSION AUTHORIZATION ivm_user;
-SELECT create_immv('ivm_rls', 'SELECT * FROM rls_tbl');
-SELECT id, data, owner FROM ivm_rls ORDER BY 1,2,3;
-RESET SESSION AUTHORIZATION;
-
---- inserts rows owned by different users
-INSERT INTO rls_tbl VALUES
- (3,'baz','ivm_user'),
- (4,'qux','postgres');
-SELECT id, data, owner FROM ivm_rls ORDER BY 1,2,3;
-
---- combination of diffent kinds of commands
-WITH
- i AS (INSERT INTO rls_tbl VALUES(5,'quux','postgres'), (6,'corge','ivm_user')),
- u AS (UPDATE rls_tbl SET owner = 'postgres' WHERE id = 1),
- u2 AS (UPDATE rls_tbl SET owner = 'ivm_user' WHERE id = 2)
-SELECT;
-SELECT id, data, owner FROM ivm_rls ORDER BY 1,2,3;
-
----
-SET SESSION AUTHORIZATION ivm_user;
-SELECT create_immv('ivm_rls2', 'SELECT * FROM rls_tbl JOIN num_tbl USING(id)');
-RESET SESSION AUTHORIZATION;
-
-WITH
- x AS (UPDATE rls_tbl SET data = data || '_2' where id in (3,4)),
- y AS (UPDATE num_tbl SET num = num || '_2' where id in (3,4))
-SELECT;
-SELECT * FROM ivm_rls2 ORDER BY 1,2,3;
-
-DROP TABLE rls_tbl CASCADE;
-DROP TABLE num_tbl CASCADE;
-
-DROP USER ivm_user;
-DROP USER ivm_admin;
-
-- automatic index creation
BEGIN;
CREATE TABLE base_a (i int primary key, j int);

View File

@@ -1,13 +1,8 @@
EXTENSION = pg_jsonschema
DATA = pg_jsonschema--1.0.sql
REGRESS = jsonschema_valid_api jsonschema_edge_cases
REGRESS_OPTS = --load-extension=pg_jsonschema
PG_CONFIG ?= pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
PG_REGRESS := $(dir $(PGXS))../../src/test/regress/pg_regress
.PHONY installcheck:
installcheck:
dropdb --if-exists contrib_regression
createdb contrib_regression
psql -d contrib_regression -c "CREATE EXTENSION $(EXTENSION)"
$(PG_REGRESS) --use-existing --dbname=contrib_regression $(REGRESS)
include $(PGXS)

View File

@@ -1,7 +0,0 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
dropdb --if-exist contrib_regression
createdb contrib_regression
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --use-existing --inputdir=./ --bindir='/usr/local/pgsql/bin' --dbname=contrib_regression roaringbitmap

View File

@@ -1,12 +0,0 @@
#!/bin/bash
set -ex
# For v16 it's required to create a type which is impossible without superuser access
# do not run this test so far
if [[ "${PG_VERSION}" = v16 ]]; then
exit 0
fi
cd "$(dirname ${0})"
dropdb --if-exist contrib_regression
createdb contrib_regression
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --use-existing --inputdir=./ --bindir='/usr/local/pgsql/bin' --inputdir=test --dbname=contrib_regression base corpus

View File

@@ -6,10 +6,4 @@ export PGOPTIONS = -c pg_session_jwt.jwk={"crv":"Ed25519","kty":"OKP","x":"R_Abz
PG_CONFIG ?= pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
PG_REGRESS := $(dir $(PGXS))../../src/test/regress/pg_regress
.PHONY installcheck:
installcheck:
dropdb --if-exists contrib_regression
createdb contrib_regression
psql -d contrib_regression -c "CREATE EXTENSION $(EXTENSION)"
$(PG_REGRESS) --use-existing --dbname=contrib_regression $(REGRESS)
include $(PGXS)

View File

@@ -5,6 +5,4 @@ REGRESS = pg_tiktoken
installcheck: regression-test
regression-test:
dropdb --if-exists contrib_regression
createdb contrib_regression
$(PG_REGRESS) --inputdir=. --outputdir=. --use-existing --dbname=contrib_regression $(REGRESS)
$(PG_REGRESS) --inputdir=. --outputdir=. --dbname=contrib_regression $(REGRESS)

View File

@@ -1,7 +0,0 @@
#!/bin/sh
set -ex
cd "$(dirname "${0}")"
dropdb --if-exist contrib_regression
createdb contrib_regression
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --use-existing --inputdir=./ --bindir='/usr/local/pgsql/bin' --inputdir=test --dbname=contrib_regression 001_setup 002_uuid_generate_v7 003_uuid_v7_to_timestamptz 004_uuid_timestamptz_to_v7 005_uuid_v7_to_timestamp 006_uuid_timestamp_to_v7

View File

@@ -1,6 +1,4 @@
#!/bin/bash
set -ex
cd "$(dirname "${0}")"
dropdb --if-exists contrib_regression
createdb contrib_regression
pg_prove -d contrib_regression test.sql
pg_prove test.sql

View File

@@ -1,8 +0,0 @@
#!/bin/sh
set -ex
cd "$(dirname "${0}")"
dropdb --if-exist contrib_regression
createdb contrib_regression
psql -d contrib_regression -c "CREATE EXTENSION vector" -c "CREATE EXTENSION rag"
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --inputdir=./ --bindir='/usr/local/pgsql/bin' --use-existing --load-extension=vector --load-extension=rag --dbname=contrib_regression basic_functions text_processing api_keys chunking_functions document_processing embedding_api_functions voyageai_functions

View File

@@ -1,10 +0,0 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
make installcheck || true
dropdb --if-exist contrib_regression
createdb contrib_regression
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
sed -i '/hastap/d' test/build/run.sch
sed -Ei 's/\b(aretap|enumtap|ownership|privs|usergroup)\b//g' test/build/run.sch
${PG_REGRESS} --use-existing --dbname=contrib_regression --inputdir=./ --bindir='/usr/local/pgsql/bin' --inputdir=test --max-connections=879 --schedule test/schedule/main.sch --schedule test/build/run.sch

View File

@@ -1,8 +0,0 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
dropdb --if-exist contrib_regression
createdb contrib_regression
psql -d contrib_regression -c "CREATE EXTENSION vector"
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --inputdir=./ --bindir='/usr/local/pgsql/bin' --inputdir=test --use-existing --dbname=contrib_regression bit btree cast copy halfvec hnsw_bit hnsw_halfvec hnsw_sparsevec hnsw_vector ivfflat_bit ivfflat_halfvec ivfflat_vector sparsevec vector_type

View File

@@ -4,21 +4,13 @@ PGFILEDESC = "pgx_ulid - ULID type for PostgreSQL"
PG_CONFIG ?= pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
PG_REGRESS = $(dir $(PGXS))/../../src/test/regress/pg_regress
PG_MAJOR_VERSION := $(word 2, $(subst ., , $(shell $(PG_CONFIG) --version)))
ifeq ($(shell test $(PG_MAJOR_VERSION) -lt 17; echo $$?),0)
REGRESS_OPTS = --load-extension=ulid
REGRESS = 00_ulid_generation 01_ulid_conversions 03_ulid_errors
EXTNAME = ulid
else
REGRESS_OPTS = --load-extension=pgx_ulid
REGRESS = 00_ulid_generation 01_ulid_conversions 02_ulid_conversions 03_ulid_errors
EXTNAME = pgx_ulid
endif
.PHONY: installcheck
installcheck: regression-test
regression-test:
dropdb --if-exists contrib_regression
createdb contrib_regression
psql -d contrib_regression -c "CREATE EXTENSION $(EXTNAME)"
$(PG_REGRESS) --inputdir=. --outputdir=. --use-existing --dbname=contrib_regression $(REGRESS)
include $(PGXS)

View File

@@ -1,12 +0,0 @@
#!/bin/bash
set -ex
cd "$(dirname ${0})"
dropdb --if-exist contrib_regression
createdb contrib_regression
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
REGRESS="$(make -n installcheck | awk '{print substr($0,index($0,"init-extension"));}')"
REGRESS="${REGRESS/startup_perms/}"
REGRESS="${REGRESS/startup /}"
REGRESS="${REGRESS/find_function_perms/}"
REGRESS="${REGRESS/guc/}"
${PG_REGRESS} --inputdir=./ --bindir='/usr/local/pgsql/bin' --use-existing --dbname=contrib_regression ${REGRESS}

View File

@@ -1,7 +0,0 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
dropdb --if-exist contrib_regression
createdb contrib_regression
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --inputdir=./ --bindir='/usr/local/pgsql/bin' --use-existing --dbname=contrib_regression extension tables unit binary unicode prefix units time temperature functions language_functions round derived compare aggregate iec custom crosstab convert

View File

@@ -1,7 +0,0 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
dropdb --if-exist contrib_regression
createdb contrib_regression
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --use-existing --inputdir=./ --bindir='/usr/local/pgsql/bin' --dbname=contrib_regression create_extension prefix falcon explain queries

View File

@@ -3,13 +3,8 @@ MODULE_big = rag_bge_small_en_v15
OBJS = $(patsubst %.rs,%.o,$(wildcard src/*.rs))
REGRESS = basic_functions embedding_functions basic_functions_enhanced embedding_functions_enhanced
REGRESS_OPTS = --load-extension=vector --load-extension=rag_bge_small_en_v15
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
PG_REGRESS := $(dir $(PGXS))../../src/test/regress/pg_regress
.PHONY installcheck:
installcheck:
dropdb --if-exists contrib_regression
createdb contrib_regression
psql -d contrib_regression -c "CREATE EXTENSION vector" -c "CREATE EXTENSION rag_bge_small_en_v15"
$(PG_REGRESS) --use-existing --dbname=contrib_regression $(REGRESS)
include $(PGXS)

View File

@@ -3,13 +3,8 @@ MODULE_big = rag_jina_reranker_v1_tiny_en
OBJS = $(patsubst %.rs,%.o,$(wildcard src/*.rs))
REGRESS = reranking_functions reranking_functions_enhanced
REGRESS_OPTS = --load-extension=vector --load-extension=rag_jina_reranker_v1_tiny_en
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
PG_REGRESS := $(dir $(PGXS))../../src/test/regress/pg_regress
.PHONY installcheck:
installcheck:
dropdb --if-exists contrib_regression
createdb contrib_regression
psql -d contrib_regression -c "CREATE EXTENSION vector" -c "CREATE EXTENSION rag_jina_reranker_v1_tiny_en"
$(PG_REGRESS) --use-existing --dbname=contrib_regression $(REGRESS)
include $(PGXS)

View File

@@ -1,27 +1,25 @@
-- Reranking function tests
SELECT ROUND(rag_jina_reranker_v1_tiny_en.rerank_distance('the cat sat on the mat', 'the baboon played with the balloon')::NUMERIC,4);
round
--------
0.8989
(1 row)
SELECT ARRAY(SELECT ROUND(x::NUMERIC,4) FROM unnest(rag_jina_reranker_v1_tiny_en.rerank_distance('the cat sat on the mat',
ARRAY['the baboon played with the balloon', 'the tanks fired at the buildings'])) AS x);
array
SELECT rag_jina_reranker_v1_tiny_en.rerank_distance('the cat sat on the mat', 'the baboon played with the balloon');
rerank_distance
-----------------
{0.8989,1.3018}
0.8989152
(1 row)
SELECT ROUND(rag_jina_reranker_v1_tiny_en.rerank_score('the cat sat on the mat', 'the baboon played with the balloon')::NUMERIC,4);
round
---------
-0.8989
SELECT rag_jina_reranker_v1_tiny_en.rerank_distance('the cat sat on the mat', ARRAY['the baboon played with the balloon', 'the tanks fired at the buildings']);
rerank_distance
-----------------------
{0.8989152,1.3018152}
(1 row)
SELECT ARRAY(SELECT ROUND(x::NUMERIC,4) FROM unnest(rag_jina_reranker_v1_tiny_en.rerank_score('the cat sat on the mat',
ARRAY['the baboon played with the balloon', 'the tanks fired at the buildings'])) as x);
array
-------------------
{-0.8989,-1.3018}
SELECT rag_jina_reranker_v1_tiny_en.rerank_score('the cat sat on the mat', 'the baboon played with the balloon');
rerank_score
--------------
-0.8989152
(1 row)
SELECT rag_jina_reranker_v1_tiny_en.rerank_score('the cat sat on the mat', ARRAY['the baboon played with the balloon', 'the tanks fired at the buildings']);
rerank_score
-------------------------
{-0.8989152,-1.3018152}
(1 row)

View File

@@ -1,41 +1,41 @@
-- Reranking function tests - single passage
SELECT ROUND(rag_jina_reranker_v1_tiny_en.rerank_distance('the cat sat on the mat', 'the baboon played with the balloon')::NUMERIC,4);
round
--------
0.8989
SELECT rag_jina_reranker_v1_tiny_en.rerank_distance('the cat sat on the mat', 'the baboon played with the balloon');
rerank_distance
-----------------
0.8989152
(1 row)
SELECT ROUND(rag_jina_reranker_v1_tiny_en.rerank_distance('the cat sat on the mat', 'the tanks fired at the buildings')::NUMERIC,4);
round
--------
1.3018
SELECT rag_jina_reranker_v1_tiny_en.rerank_distance('the cat sat on the mat', 'the tanks fired at the buildings');
rerank_distance
-----------------
1.3018152
(1 row)
SELECT ROUND(rag_jina_reranker_v1_tiny_en.rerank_distance('query about cats', 'information about felines')::NUMERIC,4);
round
--------
1.3133
SELECT rag_jina_reranker_v1_tiny_en.rerank_distance('query about cats', 'information about felines');
rerank_distance
-----------------
1.3133051
(1 row)
SELECT ROUND(rag_jina_reranker_v1_tiny_en.rerank_distance('', 'empty query test')::NUMERIC,4);
round
--------
0.7076
SELECT rag_jina_reranker_v1_tiny_en.rerank_distance('', 'empty query test');
rerank_distance
-----------------
0.7075559
(1 row)
-- Reranking function tests - array of passages
SELECT ARRAY(SELECT ROUND(x::NUMERIC,4) FROM unnest(rag_jina_reranker_v1_tiny_en.rerank_distance('the cat sat on the mat',
ARRAY['the baboon played with the balloon', 'the tanks fired at the buildings'])) AS x);
array
-----------------
{0.8989,1.3018}
SELECT rag_jina_reranker_v1_tiny_en.rerank_distance('the cat sat on the mat',
ARRAY['the baboon played with the balloon', 'the tanks fired at the buildings']);
rerank_distance
-----------------------
{0.8989152,1.3018152}
(1 row)
SELECT ARRAY(SELECT ROUND(x::NUMERIC,4) FROM unnest(rag_jina_reranker_v1_tiny_en.rerank_distance('query about programming',
ARRAY['Python is a programming language', 'Java is also a programming language', 'SQL is used for databases'])) AS x);
array
------------------------
{0.1659,0.3348,0.1013}
SELECT rag_jina_reranker_v1_tiny_en.rerank_distance('query about programming',
ARRAY['Python is a programming language', 'Java is also a programming language', 'SQL is used for databases']);
rerank_distance
------------------------------------
{0.16591403,0.33475375,0.10132827}
(1 row)
SELECT rag_jina_reranker_v1_tiny_en.rerank_distance('empty array test', ARRAY[]::text[]);
@@ -45,43 +45,43 @@ SELECT rag_jina_reranker_v1_tiny_en.rerank_distance('empty array test', ARRAY[]:
(1 row)
-- Reranking score function tests - single passage
SELECT ROUND(rag_jina_reranker_v1_tiny_en.rerank_score('the cat sat on the mat', 'the baboon played with the balloon')::NUMERIC,4);
round
---------
-0.8989
SELECT rag_jina_reranker_v1_tiny_en.rerank_score('the cat sat on the mat', 'the baboon played with the balloon');
rerank_score
--------------
-0.8989152
(1 row)
SELECT ROUND(rag_jina_reranker_v1_tiny_en.rerank_score('the cat sat on the mat', 'the tanks fired at the buildings')::NUMERIC,4);
round
---------
-1.3018
SELECT rag_jina_reranker_v1_tiny_en.rerank_score('the cat sat on the mat', 'the tanks fired at the buildings');
rerank_score
--------------
-1.3018152
(1 row)
SELECT ROUND(rag_jina_reranker_v1_tiny_en.rerank_score('query about cats', 'information about felines')::NUMERIC,4);
round
---------
-1.3133
SELECT rag_jina_reranker_v1_tiny_en.rerank_score('query about cats', 'information about felines');
rerank_score
--------------
-1.3133051
(1 row)
SELECT ROUND(rag_jina_reranker_v1_tiny_en.rerank_score('', 'empty query test')::NUMERIC,4);
round
---------
-0.7076
SELECT rag_jina_reranker_v1_tiny_en.rerank_score('', 'empty query test');
rerank_score
--------------
-0.7075559
(1 row)
-- Reranking score function tests - array of passages
SELECT ARRAY(SELECT ROUND(x::NUMERIC,4) FROM unnest(rag_jina_reranker_v1_tiny_en.rerank_score('the cat sat on the mat',
ARRAY['the baboon played with the balloon', 'the tanks fired at the buildings'])) AS x);
array
-------------------
{-0.8989,-1.3018}
SELECT rag_jina_reranker_v1_tiny_en.rerank_score('the cat sat on the mat',
ARRAY['the baboon played with the balloon', 'the tanks fired at the buildings']);
rerank_score
-------------------------
{-0.8989152,-1.3018152}
(1 row)
SELECT ARRAY(SELECT ROUND(x::NUMERIC,4) FROM unnest(rag_jina_reranker_v1_tiny_en.rerank_score('query about programming',
ARRAY['Python is a programming language', 'Java is also a programming language', 'SQL is used for databases'])) AS x);
array
---------------------------
{-0.1659,-0.3348,-0.1013}
SELECT rag_jina_reranker_v1_tiny_en.rerank_score('query about programming',
ARRAY['Python is a programming language', 'Java is also a programming language', 'SQL is used for databases']);
rerank_score
---------------------------------------
{-0.16591403,-0.33475375,-0.10132827}
(1 row)
SELECT rag_jina_reranker_v1_tiny_en.rerank_score('empty array test', ARRAY[]::text[]);

View File

@@ -1,10 +1,8 @@
-- Reranking function tests
SELECT ROUND(rag_jina_reranker_v1_tiny_en.rerank_distance('the cat sat on the mat', 'the baboon played with the balloon')::NUMERIC,4);
SELECT rag_jina_reranker_v1_tiny_en.rerank_distance('the cat sat on the mat', 'the baboon played with the balloon');
SELECT ARRAY(SELECT ROUND(x::NUMERIC,4) FROM unnest(rag_jina_reranker_v1_tiny_en.rerank_distance('the cat sat on the mat',
ARRAY['the baboon played with the balloon', 'the tanks fired at the buildings'])) AS x);
SELECT rag_jina_reranker_v1_tiny_en.rerank_distance('the cat sat on the mat', ARRAY['the baboon played with the balloon', 'the tanks fired at the buildings']);
SELECT ROUND(rag_jina_reranker_v1_tiny_en.rerank_score('the cat sat on the mat', 'the baboon played with the balloon')::NUMERIC,4);
SELECT rag_jina_reranker_v1_tiny_en.rerank_score('the cat sat on the mat', 'the baboon played with the balloon');
SELECT ARRAY(SELECT ROUND(x::NUMERIC,4) FROM unnest(rag_jina_reranker_v1_tiny_en.rerank_score('the cat sat on the mat',
ARRAY['the baboon played with the balloon', 'the tanks fired at the buildings'])) as x);
SELECT rag_jina_reranker_v1_tiny_en.rerank_score('the cat sat on the mat', ARRAY['the baboon played with the balloon', 'the tanks fired at the buildings']);

View File

@@ -1,35 +1,35 @@
-- Reranking function tests - single passage
SELECT ROUND(rag_jina_reranker_v1_tiny_en.rerank_distance('the cat sat on the mat', 'the baboon played with the balloon')::NUMERIC,4);
SELECT rag_jina_reranker_v1_tiny_en.rerank_distance('the cat sat on the mat', 'the baboon played with the balloon');
SELECT ROUND(rag_jina_reranker_v1_tiny_en.rerank_distance('the cat sat on the mat', 'the tanks fired at the buildings')::NUMERIC,4);
SELECT rag_jina_reranker_v1_tiny_en.rerank_distance('the cat sat on the mat', 'the tanks fired at the buildings');
SELECT ROUND(rag_jina_reranker_v1_tiny_en.rerank_distance('query about cats', 'information about felines')::NUMERIC,4);
SELECT rag_jina_reranker_v1_tiny_en.rerank_distance('query about cats', 'information about felines');
SELECT ROUND(rag_jina_reranker_v1_tiny_en.rerank_distance('', 'empty query test')::NUMERIC,4);
SELECT rag_jina_reranker_v1_tiny_en.rerank_distance('', 'empty query test');
-- Reranking function tests - array of passages
SELECT ARRAY(SELECT ROUND(x::NUMERIC,4) FROM unnest(rag_jina_reranker_v1_tiny_en.rerank_distance('the cat sat on the mat',
ARRAY['the baboon played with the balloon', 'the tanks fired at the buildings'])) AS x);
SELECT rag_jina_reranker_v1_tiny_en.rerank_distance('the cat sat on the mat',
ARRAY['the baboon played with the balloon', 'the tanks fired at the buildings']);
SELECT ARRAY(SELECT ROUND(x::NUMERIC,4) FROM unnest(rag_jina_reranker_v1_tiny_en.rerank_distance('query about programming',
ARRAY['Python is a programming language', 'Java is also a programming language', 'SQL is used for databases'])) AS x);
SELECT rag_jina_reranker_v1_tiny_en.rerank_distance('query about programming',
ARRAY['Python is a programming language', 'Java is also a programming language', 'SQL is used for databases']);
SELECT rag_jina_reranker_v1_tiny_en.rerank_distance('empty array test', ARRAY[]::text[]);
-- Reranking score function tests - single passage
SELECT ROUND(rag_jina_reranker_v1_tiny_en.rerank_score('the cat sat on the mat', 'the baboon played with the balloon')::NUMERIC,4);
SELECT rag_jina_reranker_v1_tiny_en.rerank_score('the cat sat on the mat', 'the baboon played with the balloon');
SELECT ROUND(rag_jina_reranker_v1_tiny_en.rerank_score('the cat sat on the mat', 'the tanks fired at the buildings')::NUMERIC,4);
SELECT rag_jina_reranker_v1_tiny_en.rerank_score('the cat sat on the mat', 'the tanks fired at the buildings');
SELECT ROUND(rag_jina_reranker_v1_tiny_en.rerank_score('query about cats', 'information about felines')::NUMERIC,4);
SELECT rag_jina_reranker_v1_tiny_en.rerank_score('query about cats', 'information about felines');
SELECT ROUND(rag_jina_reranker_v1_tiny_en.rerank_score('', 'empty query test')::NUMERIC,4);
SELECT rag_jina_reranker_v1_tiny_en.rerank_score('', 'empty query test');
-- Reranking score function tests - array of passages
SELECT ARRAY(SELECT ROUND(x::NUMERIC,4) FROM unnest(rag_jina_reranker_v1_tiny_en.rerank_score('the cat sat on the mat',
ARRAY['the baboon played with the balloon', 'the tanks fired at the buildings'])) AS x);
SELECT rag_jina_reranker_v1_tiny_en.rerank_score('the cat sat on the mat',
ARRAY['the baboon played with the balloon', 'the tanks fired at the buildings']);
SELECT ARRAY(SELECT ROUND(x::NUMERIC,4) FROM unnest(rag_jina_reranker_v1_tiny_en.rerank_score('query about programming',
ARRAY['Python is a programming language', 'Java is also a programming language', 'SQL is used for databases'])) AS x);
SELECT rag_jina_reranker_v1_tiny_en.rerank_score('query about programming',
ARRAY['Python is a programming language', 'Java is also a programming language', 'SQL is used for databases']);
SELECT rag_jina_reranker_v1_tiny_en.rerank_score('empty array test', ARRAY[]::text[]);

View File

@@ -1,7 +0,0 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
dropdb --if-exist contrib_regression
createdb contrib_regression
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --inputdir=./ --bindir='/usr/local/pgsql/bin' --use-existing --dbname=contrib_regression rum rum_hash ruminv timestamp orderby orderby_hash altorder altorder_hash limits int2 int4 int8 float4 float8 money oid time timetz date interval macaddr inet cidr text varchar char bytea bit varbit numeric rum_weight expr array

44
docker-compose/run-tests.sh Executable file → Normal file
View File

@@ -1,42 +1,6 @@
#!/bin/bash
set -x
if [[ -v BENCHMARK_CONNSTR ]]; then
uri_no_proto="${BENCHMARK_CONNSTR#postgres://}"
uri_no_proto="${uri_no_proto#postgresql://}"
if [[ $uri_no_proto == *\?* ]]; then
base="${uri_no_proto%%\?*}" # before '?'
else
base="$uri_no_proto"
fi
if [[ $base =~ ^([^:]+):([^@]+)@([^:/]+):?([0-9]*)/(.+)$ ]]; then
export PGUSER="${BASH_REMATCH[1]}"
export PGPASSWORD="${BASH_REMATCH[2]}"
export PGHOST="${BASH_REMATCH[3]}"
export PGPORT="${BASH_REMATCH[4]:-5432}"
export PGDATABASE="${BASH_REMATCH[5]}"
echo export PGUSER="${BASH_REMATCH[1]}"
echo export PGPASSWORD="${BASH_REMATCH[2]}"
echo export PGHOST="${BASH_REMATCH[3]}"
echo export PGPORT="${BASH_REMATCH[4]:-5432}"
echo export PGDATABASE="${BASH_REMATCH[5]}"
else
echo "Invalid PostgreSQL base URI"
exit 1
fi
fi
REGULAR_USER=false
while getopts r arg; do
case $arg in
r)
REGULAR_USER=true
shift $((OPTIND-1))
;;
*) :
;;
esac
done
extdir=${1}
cd "${extdir}" || exit 2
@@ -48,11 +12,6 @@ for d in ${LIST}; do
FAILED="${d} ${FAILED}"
break
fi
if [[ ${REGULAR_USER} = true ]] && [ -f "${d}"/regular-test.sh ]; then
"${d}/regular-test.sh" || FAILED="${d} ${FAILED}"
continue
fi
if [ -f "${d}/neon-test.sh" ]; then
"${d}/neon-test.sh" || FAILED="${d} ${FAILED}"
else
@@ -60,8 +19,5 @@ for d in ${LIST}; do
fi
done
[ -z "${FAILED}" ] && exit 0
for d in ${FAILED}; do
cat "$(find $d -name regression.diffs)"
done
echo "${FAILED}"
exit 1

View File

@@ -13,7 +13,7 @@ For design details see [the RFC](./rfcs/021-metering.md) and [the discussion on
batch format is
```json
{ "events" : [metric1, metric2, ...] }
{ "events" : [metric1, metric2, ...]]}
```
See metric format examples below.
@@ -49,13 +49,11 @@ Size of the remote storage (S3) directory.
This is an absolute, per-tenant metric.
- `timeline_logical_size`
Logical size of the data in the timeline.
Logical size of the data in the timeline
This is an absolute, per-timeline metric.
- `synthetic_storage_size`
Size of all tenant's branches including WAL.
Size of all tenant's branches including WAL
This is the same metric that `tenant/{tenant_id}/size` endpoint returns.
This is an absolute, per-tenant metric.
@@ -108,10 +106,10 @@ This is an incremental, per-endpoint metric.
```
The metric is incremental, so the value is the difference between the current and the previous value.
If there is no previous value, the value is the current value and the `start_time` equals `stop_time`.
If there is no previous value, the value, the value is the current value and the `start_time` equals `stop_time`.
### TODO
- [ ] Handle errors better: currently if one tenant fails to gather metrics, the whole iteration fails and metrics are not sent for any tenant.
- [ ] Add retries
- [ ] Tune the interval
- [ ] Tune the interval

View File

@@ -169,8 +169,6 @@ pub struct TenantDescribeResponseShard {
pub is_pending_compute_notification: bool,
/// A shard split is currently underway
pub is_splitting: bool,
/// A timeline is being imported into this tenant
pub is_importing: bool,
pub scheduling_policy: ShardSchedulingPolicy,

View File

@@ -1803,8 +1803,6 @@ pub struct TopTenantShardsResponse {
}
pub mod virtual_file {
use std::sync::LazyLock;
#[derive(
Copy,
Clone,
@@ -1842,33 +1840,35 @@ pub mod virtual_file {
pub enum IoMode {
/// Uses buffered IO.
Buffered,
/// Uses direct IO for reads only.
/// Uses direct IO, error out if the operation fails.
#[cfg(target_os = "linux")]
Direct,
/// Use direct IO for reads and writes.
#[cfg(target_os = "linux")]
DirectRw,
}
impl IoMode {
pub fn preferred() -> Self {
// The default behavior when running Rust unit tests without any further
// flags is to use the newest behavior (DirectRw).
// flags is to use the newest behavior if available on the platform (Direct).
// The CI uses the following environment variable to unit tests for all
// different modes.
// NB: the Python regression & perf tests have their own defaults management
// that writes pageserver.toml; they do not use this variable.
if cfg!(test) {
static CACHED: LazyLock<IoMode> = LazyLock::new(|| {
use once_cell::sync::Lazy;
static CACHED: Lazy<IoMode> = Lazy::new(|| {
utils::env::var_serde_json_string(
"NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IO_MODE",
)
.unwrap_or(
.unwrap_or({
#[cfg(target_os = "linux")]
IoMode::DirectRw,
{
IoMode::Direct
}
#[cfg(not(target_os = "linux"))]
IoMode::Buffered,
)
{
IoMode::Buffered
}
})
});
*CACHED
} else {
@@ -1885,8 +1885,6 @@ pub mod virtual_file {
v if v == (IoMode::Buffered as u8) => IoMode::Buffered,
#[cfg(target_os = "linux")]
v if v == (IoMode::Direct as u8) => IoMode::Direct,
#[cfg(target_os = "linux")]
v if v == (IoMode::DirectRw as u8) => IoMode::DirectRw,
x => return Err(x),
})
}

View File

@@ -106,7 +106,6 @@ hex-literal.workspace = true
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time", "test-util"] }
indoc.workspace = true
uuid.workspace = true
rstest.workspace = true
[[bench]]
name = "bench_layer_map"

View File

@@ -61,7 +61,7 @@ async fn ingest(
tokio::fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id)).await?;
let ctx =
let ctx2 =
RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error).with_scope_debug_tools();
let gate = utils::sync::gate::Gate::default();
@@ -248,8 +248,6 @@ fn criterion_benchmark(c: &mut Criterion) {
IoMode::Buffered,
#[cfg(target_os = "linux")]
IoMode::Direct,
#[cfg(target_os = "linux")]
IoMode::DirectRw,
] {
for param in expect.clone() {
let HandPickedParameters {
@@ -311,114 +309,78 @@ cargo bench --bench bench_ingest
im4gn.2xlarge:
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
time: [1.2901 s 1.2943 s 1.2991 s]
thrpt: [98.533 MiB/s 98.892 MiB/s 99.220 MiB/s]
time: [1.8491 s 1.8540 s 1.8592 s]
thrpt: [68.847 MiB/s 69.039 MiB/s 69.222 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
time: [2.1387 s 2.1623 s 2.1845 s]
thrpt: [58.595 MiB/s 59.197 MiB/s 59.851 MiB/s]
time: [2.6976 s 2.7123 s 2.7286 s]
thrpt: [46.911 MiB/s 47.193 MiB/s 47.450 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Y...
time: [1.2036 s 1.2074 s 1.2122 s]
thrpt: [105.60 MiB/s 106.01 MiB/s 106.35 MiB/s]
time: [1.7433 s 1.7510 s 1.7600 s]
thrpt: [72.729 MiB/s 73.099 MiB/s 73.423 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
time: [520.55 ms 521.46 ms 522.57 ms]
thrpt: [244.94 MiB/s 245.47 MiB/s 245.89 MiB/s]
time: [499.63 ms 500.07 ms 500.46 ms]
thrpt: [255.77 MiB/s 255.96 MiB/s 256.19 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
time: [440.33 ms 442.24 ms 444.10 ms]
thrpt: [288.22 MiB/s 289.43 MiB/s 290.69 MiB/s]
time: [456.97 ms 459.61 ms 461.92 ms]
thrpt: [277.11 MiB/s 278.50 MiB/s 280.11 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
time: [168.78 ms 169.42 ms 170.18 ms]
thrpt: [752.16 MiB/s 755.52 MiB/s 758.40 MiB/s]
time: [158.82 ms 159.16 ms 159.56 ms]
thrpt: [802.22 MiB/s 804.24 MiB/s 805.93 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
time: [1.2978 s 1.3094 s 1.3227 s]
thrpt: [96.775 MiB/s 97.758 MiB/s 98.632 MiB/s]
time: [1.8856 s 1.8997 s 1.9179 s]
thrpt: [66.740 MiB/s 67.380 MiB/s 67.882 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
time: [2.1976 s 2.2067 s 2.2154 s]
thrpt: [57.777 MiB/s 58.006 MiB/s 58.245 MiB/s]
time: [2.7468 s 2.7625 s 2.7785 s]
thrpt: [46.068 MiB/s 46.335 MiB/s 46.600 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Yes
time: [1.2103 s 1.2160 s 1.2233 s]
thrpt: [104.64 MiB/s 105.26 MiB/s 105.76 MiB/s]
time: [1.7689 s 1.7726 s 1.7767 s]
thrpt: [72.045 MiB/s 72.208 MiB/s 72.363 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
time: [525.05 ms 526.37 ms 527.79 ms]
thrpt: [242.52 MiB/s 243.17 MiB/s 243.79 MiB/s]
time: [497.64 ms 498.60 ms 499.67 ms]
thrpt: [256.17 MiB/s 256.72 MiB/s 257.21 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
time: [443.06 ms 444.88 ms 447.15 ms]
thrpt: [286.26 MiB/s 287.72 MiB/s 288.90 MiB/s]
time: [493.72 ms 505.07 ms 518.03 ms]
thrpt: [247.09 MiB/s 253.43 MiB/s 259.26 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
time: [169.40 ms 169.80 ms 170.17 ms]
thrpt: [752.21 MiB/s 753.81 MiB/s 755.60 MiB/s]
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
time: [1.2844 s 1.2915 s 1.2990 s]
thrpt: [98.536 MiB/s 99.112 MiB/s 99.657 MiB/s]
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
time: [2.1431 s 2.1663 s 2.1900 s]
thrpt: [58.446 MiB/s 59.087 MiB/s 59.726 MiB/s]
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Y...
time: [1.1906 s 1.1926 s 1.1947 s]
thrpt: [107.14 MiB/s 107.33 MiB/s 107.51 MiB/s]
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
time: [516.86 ms 518.25 ms 519.47 ms]
thrpt: [246.40 MiB/s 246.98 MiB/s 247.65 MiB/s]
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
time: [536.50 ms 536.53 ms 536.60 ms]
thrpt: [238.54 MiB/s 238.57 MiB/s 238.59 MiB/s]
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
time: [267.77 ms 267.90 ms 268.04 ms]
thrpt: [477.53 MiB/s 477.79 MiB/s 478.02 MiB/s]
time: [267.76 ms 267.85 ms 267.96 ms]
thrpt: [477.69 MiB/s 477.88 MiB/s 478.03 MiB/s]
Hetzner AX102:
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
time: [836.58 ms 861.93 ms 886.57 ms]
thrpt: [144.38 MiB/s 148.50 MiB/s 153.00 MiB/s]
time: [1.0683 s 1.1006 s 1.1386 s]
thrpt: [112.42 MiB/s 116.30 MiB/s 119.82 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
time: [1.2782 s 1.3191 s 1.3665 s]
thrpt: [93.668 MiB/s 97.037 MiB/s 100.14 MiB/s]
time: [1.5719 s 1.6012 s 1.6228 s]
thrpt: [78.877 MiB/s 79.938 MiB/s 81.430 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Y...
time: [791.27 ms 807.08 ms 822.95 ms]
thrpt: [155.54 MiB/s 158.60 MiB/s 161.77 MiB/s]
time: [1.1095 s 1.1331 s 1.1580 s]
thrpt: [110.53 MiB/s 112.97 MiB/s 115.37 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
time: [310.78 ms 314.66 ms 318.47 ms]
thrpt: [401.92 MiB/s 406.79 MiB/s 411.87 MiB/s]
time: [303.20 ms 307.83 ms 311.90 ms]
thrpt: [410.39 MiB/s 415.81 MiB/s 422.16 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
time: [377.11 ms 387.77 ms 399.21 ms]
thrpt: [320.63 MiB/s 330.10 MiB/s 339.42 MiB/s]
time: [406.34 ms 429.37 ms 451.63 ms]
thrpt: [283.42 MiB/s 298.11 MiB/s 315.00 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
time: [128.37 ms 132.96 ms 138.55 ms]
thrpt: [923.83 MiB/s 962.69 MiB/s 997.11 MiB/s]
time: [134.01 ms 135.78 ms 137.48 ms]
thrpt: [931.03 MiB/s 942.68 MiB/s 955.12 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
time: [900.38 ms 914.88 ms 928.86 ms]
thrpt: [137.80 MiB/s 139.91 MiB/s 142.16 MiB/s]
time: [1.0406 s 1.0580 s 1.0772 s]
thrpt: [118.83 MiB/s 120.98 MiB/s 123.00 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
time: [1.2538 s 1.2936 s 1.3313 s]
thrpt: [96.149 MiB/s 98.946 MiB/s 102.09 MiB/s]
time: [1.5059 s 1.5339 s 1.5625 s]
thrpt: [81.920 MiB/s 83.448 MiB/s 84.999 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Yes
time: [787.17 ms 803.89 ms 820.63 ms]
thrpt: [155.98 MiB/s 159.23 MiB/s 162.61 MiB/s]
time: [1.0714 s 1.0934 s 1.1161 s]
thrpt: [114.69 MiB/s 117.06 MiB/s 119.47 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
time: [318.78 ms 321.89 ms 324.74 ms]
thrpt: [394.16 MiB/s 397.65 MiB/s 401.53 MiB/s]
time: [262.68 ms 265.14 ms 267.71 ms]
thrpt: [478.13 MiB/s 482.76 MiB/s 487.29 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
time: [374.01 ms 383.45 ms 393.20 ms]
thrpt: [325.53 MiB/s 333.81 MiB/s 342.24 MiB/s]
time: [375.19 ms 393.80 ms 411.40 ms]
thrpt: [311.14 MiB/s 325.04 MiB/s 341.16 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
time: [137.98 ms 141.31 ms 143.57 ms]
thrpt: [891.58 MiB/s 905.79 MiB/s 927.66 MiB/s]
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
time: [613.69 ms 622.48 ms 630.97 ms]
thrpt: [202.86 MiB/s 205.63 MiB/s 208.57 MiB/s]
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
time: [1.0299 s 1.0766 s 1.1273 s]
thrpt: [113.55 MiB/s 118.90 MiB/s 124.29 MiB/s]
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Y...
time: [637.80 ms 647.78 ms 658.01 ms]
thrpt: [194.53 MiB/s 197.60 MiB/s 200.69 MiB/s]
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
time: [266.09 ms 267.20 ms 268.31 ms]
thrpt: [477.06 MiB/s 479.04 MiB/s 481.04 MiB/s]
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
time: [269.34 ms 273.27 ms 277.69 ms]
thrpt: [460.95 MiB/s 468.40 MiB/s 475.24 MiB/s]
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
time: [123.18 ms 124.24 ms 125.15 ms]
thrpt: [1022.8 MiB/s 1.0061 GiB/s 1.0148 GiB/s]
time: [123.02 ms 123.85 ms 124.66 ms]
thrpt: [1.0027 GiB/s 1.0093 GiB/s 1.0161 GiB/s]
*/

View File

@@ -27,9 +27,6 @@ pub(super) enum Name {
/// Timeline logical size
#[serde(rename = "timeline_logical_size")]
LogicalSize,
/// Timeline delta from parent (WAL bytes clamped to logical size)
#[serde(rename = "timeline_changed_bytes_from_parent")]
ChangedBytesFromParent,
/// Tenant remote size
#[serde(rename = "remote_storage_size")]
RemoteSize,
@@ -178,24 +175,6 @@ impl MetricsKey {
.absolute_values()
}
/// [`Timeline::get_last_record_lsn`] - [`Timeline::get_ancestor_lsn`], clamped to
/// [`Timeline::get_current_logical_size`].
///
/// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
/// [`Timeline::get_ancestor_lsn`]: crate::tenant::Timeline::get_ancestor_lsn
/// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
const fn timeline_changed_bytes_from_parent(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
metric: Name::ChangedBytesFromParent,
}
.absolute_values()
}
/// [`TenantShard::remote_size`]
///
/// [`TenantShard::remote_size`]: crate::tenant::TenantShard::remote_size
@@ -392,7 +371,6 @@ struct TimelineSnapshot {
loaded_at: (Lsn, SystemTime),
last_record_lsn: Lsn,
current_exact_logical_size: Option<u64>,
changed_bytes_from_parent: Option<u64>,
}
impl TimelineSnapshot {
@@ -428,22 +406,10 @@ impl TimelineSnapshot {
}
};
// This is an approximation of how much data has changed on this branch vs. its
// ancestor: the number of bytes written to the WAL, clamped to the size of the branch.
let changed_bytes_from_parent = current_exact_logical_size.and_then(|size| {
if t.get_ancestor_lsn() == Lsn::MAX {
return None;
}
t.get_last_record_lsn()
.checked_sub(t.get_ancestor_lsn())
.map(|wal_bytes| wal_bytes.0.min(size))
});
Ok(Some(TimelineSnapshot {
loaded_at,
last_record_lsn,
current_exact_logical_size,
changed_bytes_from_parent,
}))
}
}
@@ -521,17 +487,6 @@ impl TimelineSnapshot {
metrics.push(factory.at(now, size));
}
}
{
let factory = MetricsKey::timeline_changed_bytes_from_parent(tenant_id, timeline_id);
let current_or_previous = self
.changed_bytes_from_parent
.or_else(|| cache.get(factory.key()).map(|item| item.value));
if let Some(size) = current_or_previous {
metrics.push(factory.at(now, size));
}
}
}
}

View File

@@ -18,7 +18,6 @@ fn startup_collected_timeline_metrics_before_advancing() {
loaded_at: (disk_consistent_lsn, SystemTime::now()),
last_record_lsn: disk_consistent_lsn,
current_exact_logical_size: Some(0x42000),
changed_bytes_from_parent: Some(0x1000),
};
let now = DateTime::<Utc>::from(SystemTime::now());
@@ -34,8 +33,7 @@ fn startup_collected_timeline_metrics_before_advancing() {
0
),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000),
MetricsKey::timeline_changed_bytes_from_parent(tenant_id, timeline_id).at(now, 0x1000)
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
]
);
}
@@ -62,7 +60,6 @@ fn startup_collected_timeline_metrics_second_round() {
loaded_at: (disk_consistent_lsn, init),
last_record_lsn: disk_consistent_lsn,
current_exact_logical_size: Some(0x42000),
changed_bytes_from_parent: Some(0x1000),
};
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
@@ -72,8 +69,7 @@ fn startup_collected_timeline_metrics_second_round() {
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, now, 0),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000),
MetricsKey::timeline_changed_bytes_from_parent(tenant_id, timeline_id).at(now, 0x1000)
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
]
);
}
@@ -108,7 +104,6 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
loaded_at: (disk_consistent_lsn, init),
last_record_lsn: disk_consistent_lsn,
current_exact_logical_size: Some(0x42000),
changed_bytes_from_parent: Some(0x1000),
};
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
@@ -118,8 +113,7 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(just_before, now, 0),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000),
MetricsKey::timeline_changed_bytes_from_parent(tenant_id, timeline_id).at(now, 0x1000)
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
]
);
}
@@ -147,7 +141,6 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
loaded_at: (Lsn(50), at_restart),
last_record_lsn: Lsn(50),
current_exact_logical_size: None,
changed_bytes_from_parent: None,
};
let mut cache = HashMap::from([
@@ -209,7 +202,6 @@ fn post_restart_current_exact_logical_size_uses_cached() {
loaded_at: (Lsn(50), at_restart),
last_record_lsn: Lsn(50),
current_exact_logical_size: None,
changed_bytes_from_parent: Some(0x1000),
};
let cache = HashMap::from([MetricsKey::timeline_logical_size(tenant_id, timeline_id)

View File

@@ -1289,7 +1289,6 @@ pub(crate) enum StorageIoOperation {
Seek,
Fsync,
Metadata,
SetLen,
}
impl StorageIoOperation {
@@ -1304,7 +1303,6 @@ impl StorageIoOperation {
StorageIoOperation::Seek => "seek",
StorageIoOperation::Fsync => "fsync",
StorageIoOperation::Metadata => "metadata",
StorageIoOperation::SetLen => "set_len",
}
}
}

View File

@@ -3816,24 +3816,6 @@ impl TenantShard {
MaybeDeletedIndexPart::IndexPart(p) => p,
};
// A shard split may not take place while a timeline import is on-going
// for the tenant. Timeline imports run as part of each tenant shard
// and rely on the sharding scheme to split the work among pageservers.
// If we were to split in the middle of this process, we would have to
// either ensure that it's driven to completion on the old shard set
// or transfer it to the new shard set. It's technically possible, but complex.
match index_part.import_pgdata {
Some(ref import) if !import.is_done() => {
anyhow::bail!(
"Cannot split due to import with idempotency key: {:?}",
import.idempotency_key()
);
}
Some(_) | None => {
// fallthrough
}
}
for child_shard in child_shards {
tracing::info!(%timeline_id, "Uploading index_part for child {}", child_shard.to_index());
upload_index_part(

View File

@@ -15,23 +15,21 @@
//! len >= 128: 1CCCXXXX XXXXXXXX XXXXXXXX XXXXXXXX
//!
use std::cmp::min;
use std::io::Error;
use anyhow::Context;
use async_compression::Level;
use bytes::{BufMut, BytesMut};
use pageserver_api::models::ImageCompressionAlgorithm;
use tokio::io::AsyncWriteExt;
use tokio_epoll_uring::IoBuf;
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
use tokio_util::sync::CancellationToken;
use tracing::warn;
use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
use crate::tenant::block_io::BlockCursor;
use crate::virtual_file::IoBufferMut;
use crate::virtual_file::TempVirtualFile;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::owned_buffers_io::write::{BufferedWriter, FlushTaskError};
use crate::virtual_file::owned_buffers_io::write::{BufferedWriterShutdownMode, OwnedAsyncWriter};
#[derive(Copy, Clone, Debug)]
pub struct CompressionInfo {
@@ -52,9 +50,12 @@ pub struct Header {
impl Header {
/// Decodes a header from a byte slice.
pub fn decode(bytes: &[u8]) -> anyhow::Result<Self> {
pub fn decode(bytes: &[u8]) -> Result<Self, std::io::Error> {
let Some(&first_header_byte) = bytes.first() else {
anyhow::bail!("zero-length blob header");
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"zero-length blob header",
));
};
// If the first bit is 0, this is just a 1-byte length prefix up to 128 bytes.
@@ -68,9 +69,12 @@ impl Header {
// Otherwise, this is a 4-byte header containing compression information and length.
const HEADER_LEN: usize = 4;
let mut header_buf: [u8; HEADER_LEN] = bytes[0..HEADER_LEN]
.try_into()
.map_err(|_| anyhow::anyhow!("blob header too short: {bytes:?}"))?;
let mut header_buf: [u8; HEADER_LEN] = bytes[0..HEADER_LEN].try_into().map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("blob header too short: {bytes:?}"),
)
})?;
// TODO: verify the compression bits and convert to an enum.
let compression_bits = header_buf[0] & LEN_COMPRESSION_BIT_MASK;
@@ -90,16 +94,6 @@ impl Header {
}
}
#[derive(Debug, thiserror::Error)]
pub enum WriteBlobError {
#[error(transparent)]
Flush(FlushTaskError),
#[error("blob too large ({len} bytes)")]
BlobTooLarge { len: usize },
#[error(transparent)]
WriteBlobRaw(anyhow::Error),
}
impl BlockCursor<'_> {
/// Read a blob into a new buffer.
pub async fn read_blob(
@@ -219,64 +213,143 @@ pub(super) const BYTE_UNCOMPRESSED: u8 = 0x80;
pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
/// A wrapper of `VirtualFile` that allows users to write blobs.
pub struct BlobWriter<W> {
///
/// If a `BlobWriter` is dropped, the internal buffer will be
/// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
/// manually before dropping.
pub struct BlobWriter<const BUFFERED: bool> {
inner: TempVirtualFile,
offset: u64,
/// A buffer to save on write calls, only used if BUFFERED=true
buf: Vec<u8>,
/// We do tiny writes for the length headers; they need to be in an owned buffer;
io_buf: Option<BytesMut>,
writer: BufferedWriter<IoBufferMut, W>,
offset: u64,
}
impl<W> BlobWriter<W>
where
W: OwnedAsyncWriter + std::fmt::Debug + Send + Sync + 'static,
{
/// See [`BufferedWriter`] struct-level doc comment for semantics of `start_offset`.
impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
pub fn new(
file: W,
inner: TempVirtualFile,
start_offset: u64,
gate: &utils::sync::gate::Gate,
cancel: CancellationToken,
ctx: &RequestContext,
flush_task_span: tracing::Span,
) -> anyhow::Result<Self> {
Ok(Self {
io_buf: Some(BytesMut::new()),
writer: BufferedWriter::new(
file,
start_offset,
|| IoBufferMut::with_capacity(Self::CAPACITY),
gate.enter()?,
cancel,
ctx,
flush_task_span,
),
_gate: &utils::sync::gate::Gate,
_cancel: CancellationToken,
_ctx: &RequestContext,
) -> Self {
Self {
inner,
offset: start_offset,
})
buf: Vec::with_capacity(Self::CAPACITY),
io_buf: Some(BytesMut::new()),
}
}
pub fn size(&self) -> u64 {
self.offset
}
const CAPACITY: usize = 64 * 1024;
const CAPACITY: usize = if BUFFERED { 64 * 1024 } else { 0 };
/// Writes `src_buf` to the file at the current offset.
/// Writes the given buffer directly to the underlying `VirtualFile`.
/// You need to make sure that the internal buffer is empty, otherwise
/// data will be written in wrong order.
#[inline(always)]
async fn write_all_unbuffered<Buf: IoBuf + Send>(
&mut self,
src_buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<(), Error>) {
let (src_buf, res) = self.inner.write_all(src_buf, ctx).await;
let nbytes = match res {
Ok(nbytes) => nbytes,
Err(e) => return (src_buf, Err(e)),
};
self.offset += nbytes as u64;
(src_buf, Ok(()))
}
#[inline(always)]
/// Flushes the internal buffer to the underlying `VirtualFile`.
pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> {
let buf = std::mem::take(&mut self.buf);
let (slice, res) = self.inner.write_all(buf.slice_len(), ctx).await;
res?;
let mut buf = slice.into_raw_slice().into_inner();
buf.clear();
self.buf = buf;
Ok(())
}
#[inline(always)]
/// Writes as much of `src_buf` into the internal buffer as it fits
fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
let remaining = Self::CAPACITY - self.buf.len();
let to_copy = src_buf.len().min(remaining);
self.buf.extend_from_slice(&src_buf[..to_copy]);
self.offset += to_copy as u64;
to_copy
}
/// Internal, possibly buffered, write function
async fn write_all<Buf: IoBuf + Send>(
&mut self,
src_buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<(), FlushTaskError>) {
let res = self
.writer
// TODO: why are we taking a FullSlice if we're going to pass a borrow downstack?
// Can remove all the complexity around owned buffers upstack
.write_buffered_borrowed(&src_buf, ctx)
.await
.map(|len| {
self.offset += len as u64;
});
) -> (FullSlice<Buf>, Result<(), Error>) {
let src_buf = src_buf.into_raw_slice();
let src_buf_bounds = src_buf.bounds();
let restore = move |src_buf_slice: Slice<_>| {
FullSlice::must_new(Slice::from_buf_bounds(
src_buf_slice.into_inner(),
src_buf_bounds,
))
};
(src_buf, res)
if !BUFFERED {
assert!(self.buf.is_empty());
return self
.write_all_unbuffered(FullSlice::must_new(src_buf), ctx)
.await;
}
let remaining = Self::CAPACITY - self.buf.len();
let src_buf_len = src_buf.bytes_init();
if src_buf_len == 0 {
return (restore(src_buf), Ok(()));
}
let mut src_buf = src_buf.slice(0..src_buf_len);
// First try to copy as much as we can into the buffer
if remaining > 0 {
let copied = self.write_into_buffer(&src_buf);
src_buf = src_buf.slice(copied..);
}
// Then, if the buffer is full, flush it out
if self.buf.len() == Self::CAPACITY {
if let Err(e) = self.flush_buffer(ctx).await {
return (restore(src_buf), Err(e));
}
}
// Finally, write the tail of src_buf:
// If it wholly fits into the buffer without
// completely filling it, then put it there.
// If not, write it out directly.
let src_buf = if !src_buf.is_empty() {
assert_eq!(self.buf.len(), 0);
if src_buf.len() < Self::CAPACITY {
let copied = self.write_into_buffer(&src_buf);
// We just verified above that src_buf fits into our internal buffer.
assert_eq!(copied, src_buf.len());
restore(src_buf)
} else {
let (src_buf, res) = self
.write_all_unbuffered(FullSlice::must_new(src_buf), ctx)
.await;
if let Err(e) = res {
return (src_buf, Err(e));
}
src_buf
}
} else {
restore(src_buf)
};
(src_buf, Ok(()))
}
/// Write a blob of data. Returns the offset that it was written to,
@@ -285,7 +358,7 @@ where
&mut self,
srcbuf: FullSlice<Buf>,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<u64, WriteBlobError>) {
) -> (FullSlice<Buf>, Result<u64, Error>) {
let (buf, res) = self
.write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled)
.await;
@@ -299,10 +372,7 @@ where
srcbuf: FullSlice<Buf>,
ctx: &RequestContext,
algorithm: ImageCompressionAlgorithm,
) -> (
FullSlice<Buf>,
Result<(u64, CompressionInfo), WriteBlobError>,
) {
) -> (FullSlice<Buf>, Result<(u64, CompressionInfo), Error>) {
let offset = self.offset;
let mut compression_info = CompressionInfo {
written_compressed: false,
@@ -318,16 +388,14 @@ where
if len < 128 {
// Short blob. Write a 1-byte length header
io_buf.put_u8(len as u8);
let (slice, res) = self.write_all(io_buf.slice_len(), ctx).await;
let res = res.map_err(WriteBlobError::Flush);
((slice, res), srcbuf)
(self.write_all(io_buf.slice_len(), ctx).await, srcbuf)
} else {
// Write a 4-byte length header
if len > MAX_SUPPORTED_BLOB_LEN {
return (
(
io_buf.slice_len(),
Err(WriteBlobError::BlobTooLarge { len }),
Err(Error::other(format!("blob too large ({len} bytes)"))),
),
srcbuf,
);
@@ -361,9 +429,7 @@ where
assert_eq!(len_buf[0] & 0xf0, 0);
len_buf[0] |= high_bit_mask;
io_buf.extend_from_slice(&len_buf[..]);
let (slice, res) = self.write_all(io_buf.slice_len(), ctx).await;
let res = res.map_err(WriteBlobError::Flush);
((slice, res), srcbuf)
(self.write_all(io_buf.slice_len(), ctx).await, srcbuf)
}
}
.await;
@@ -378,7 +444,6 @@ where
} else {
self.write_all(srcbuf, ctx).await
};
let res = res.map_err(WriteBlobError::Flush);
(srcbuf, res.map(|_| (offset, compression_info)))
}
@@ -387,12 +452,9 @@ where
&mut self,
raw_with_header: FullSlice<Buf>,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<u64, WriteBlobError>) {
) -> (FullSlice<Buf>, Result<u64, Error>) {
// Verify the header, to ensure we don't write invalid/corrupt data.
let header = match Header::decode(&raw_with_header)
.context("decoding blob header")
.map_err(WriteBlobError::WriteBlobRaw)
{
let header = match Header::decode(&raw_with_header) {
Ok(header) => header,
Err(err) => return (raw_with_header, Err(err)),
};
@@ -401,26 +463,29 @@ where
let raw_len = raw_with_header.len();
return (
raw_with_header,
Err(WriteBlobError::WriteBlobRaw(anyhow::anyhow!(
"header length mismatch: {header_total_len} != {raw_len}"
))),
Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("header length mismatch: {header_total_len} != {raw_len}"),
)),
);
}
let offset = self.offset;
let (raw_with_header, result) = self.write_all(raw_with_header, ctx).await;
let result = result.map_err(WriteBlobError::Flush);
(raw_with_header, result.map(|_| offset))
}
}
/// Finish this blob writer and return the underlying `W`.
pub async fn shutdown(
self,
mode: BufferedWriterShutdownMode,
ctx: &RequestContext,
) -> Result<W, FlushTaskError> {
let (_, file) = self.writer.shutdown(mode, ctx).await?;
Ok(file)
impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
/// Finish this blob writer and return the underlying [`TempVirtualFile`].
///
/// If there is an internal buffer (depends on `BUFFERED`), it will
/// be flushed before this method returns.
pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<TempVirtualFile, Error> {
if BUFFERED {
self.flush_buffer(ctx).await?;
}
Ok(self.inner)
}
}
@@ -429,25 +494,22 @@ pub(crate) mod tests {
use camino::Utf8PathBuf;
use camino_tempfile::Utf8TempDir;
use rand::{Rng, SeedableRng};
use tracing::info_span;
use super::*;
use crate::context::DownloadBehavior;
use crate::task_mgr::TaskKind;
use crate::tenant::block_io::BlockReaderRef;
use crate::virtual_file;
use crate::virtual_file::TempVirtualFile;
use crate::virtual_file::VirtualFile;
async fn round_trip_test(blobs: &[Vec<u8>]) -> anyhow::Result<()> {
round_trip_test_compressed(blobs, false).await
async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
round_trip_test_compressed::<BUFFERED>(blobs, false).await
}
pub(crate) async fn write_maybe_compressed(
pub(crate) async fn write_maybe_compressed<const BUFFERED: bool>(
blobs: &[Vec<u8>],
compression: bool,
ctx: &RequestContext,
) -> anyhow::Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>)> {
) -> Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>), Error> {
let temp_dir = camino_tempfile::tempdir()?;
let pathbuf = temp_dir.path().join("file");
let gate = utils::sync::gate::Gate::default();
@@ -457,18 +519,10 @@ pub(crate) mod tests {
let mut offsets = Vec::new();
{
let file = TempVirtualFile::new(
VirtualFile::open_with_options_v2(
pathbuf.as_path(),
virtual_file::OpenOptions::new()
.create_new(true)
.write(true),
ctx,
)
.await?,
gate.enter()?,
VirtualFile::create(pathbuf.as_path(), ctx).await?,
gate.enter().unwrap(),
);
let mut wtr =
BlobWriter::new(file, 0, &gate, cancel.clone(), ctx, info_span!("test")).unwrap();
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0, &gate, cancel.clone(), ctx);
for blob in blobs.iter() {
let (_, res) = if compression {
let res = wtr
@@ -485,28 +539,28 @@ pub(crate) mod tests {
let offs = res?;
offsets.push(offs);
}
let file = wtr
.shutdown(
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
ctx,
)
.await?;
file.disarm_into_inner()
};
// Write out one page worth of zeros so that we can
// read again with read_blk
let (_, res) = wtr.write_blob(vec![0; PAGE_SZ].slice_len(), ctx).await;
let offs = res?;
println!("Writing final blob at offs={offs}");
let file = wtr.into_inner(ctx).await?;
file.disarm_into_inner();
}
Ok((temp_dir, pathbuf, offsets))
}
async fn round_trip_test_compressed(
async fn round_trip_test_compressed<const BUFFERED: bool>(
blobs: &[Vec<u8>],
compression: bool,
) -> anyhow::Result<()> {
) -> Result<(), Error> {
let ctx =
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
let (_temp_dir, pathbuf, offsets) =
write_maybe_compressed(blobs, compression, &ctx).await?;
write_maybe_compressed::<BUFFERED>(blobs, compression, &ctx).await?;
println!("Done writing!");
let file = VirtualFile::open_v2(pathbuf, &ctx).await?;
let file = VirtualFile::open(pathbuf, &ctx).await?;
let rdr = BlockReaderRef::VirtualFile(&file);
let rdr = BlockCursor::new_with_compression(rdr, compression);
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
@@ -525,27 +579,30 @@ pub(crate) mod tests {
}
#[tokio::test]
async fn test_one() -> anyhow::Result<()> {
async fn test_one() -> Result<(), Error> {
let blobs = &[vec![12, 21, 22]];
round_trip_test(blobs).await?;
round_trip_test::<false>(blobs).await?;
round_trip_test::<true>(blobs).await?;
Ok(())
}
#[tokio::test]
async fn test_hello_simple() -> anyhow::Result<()> {
async fn test_hello_simple() -> Result<(), Error> {
let blobs = &[
vec![0, 1, 2, 3],
b"Hello, World!".to_vec(),
Vec::new(),
b"foobar".to_vec(),
];
round_trip_test(blobs).await?;
round_trip_test_compressed(blobs, true).await?;
round_trip_test::<false>(blobs).await?;
round_trip_test::<true>(blobs).await?;
round_trip_test_compressed::<false>(blobs, true).await?;
round_trip_test_compressed::<true>(blobs, true).await?;
Ok(())
}
#[tokio::test]
async fn test_really_big_array() -> anyhow::Result<()> {
async fn test_really_big_array() -> Result<(), Error> {
let blobs = &[
b"test".to_vec(),
random_array(10 * PAGE_SZ),
@@ -554,22 +611,25 @@ pub(crate) mod tests {
vec![0xf3; 24 * PAGE_SZ],
b"foobar".to_vec(),
];
round_trip_test(blobs).await?;
round_trip_test_compressed(blobs, true).await?;
round_trip_test::<false>(blobs).await?;
round_trip_test::<true>(blobs).await?;
round_trip_test_compressed::<false>(blobs, true).await?;
round_trip_test_compressed::<true>(blobs, true).await?;
Ok(())
}
#[tokio::test]
async fn test_arrays_inc() -> anyhow::Result<()> {
async fn test_arrays_inc() -> Result<(), Error> {
let blobs = (0..PAGE_SZ / 8)
.map(|v| random_array(v * 16))
.collect::<Vec<_>>();
round_trip_test(&blobs).await?;
round_trip_test::<false>(&blobs).await?;
round_trip_test::<true>(&blobs).await?;
Ok(())
}
#[tokio::test]
async fn test_arrays_random_size() -> anyhow::Result<()> {
async fn test_arrays_random_size() -> Result<(), Error> {
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
let blobs = (0..1024)
.map(|_| {
@@ -581,18 +641,20 @@ pub(crate) mod tests {
random_array(sz.into())
})
.collect::<Vec<_>>();
round_trip_test(&blobs).await?;
round_trip_test::<false>(&blobs).await?;
round_trip_test::<true>(&blobs).await?;
Ok(())
}
#[tokio::test]
async fn test_arrays_page_boundary() -> anyhow::Result<()> {
async fn test_arrays_page_boundary() -> Result<(), Error> {
let blobs = &[
random_array(PAGE_SZ - 4),
random_array(PAGE_SZ - 4),
random_array(PAGE_SZ - 4),
];
round_trip_test(blobs).await?;
round_trip_test::<false>(blobs).await?;
round_trip_test::<true>(blobs).await?;
Ok(())
}
}

View File

@@ -4,12 +4,14 @@
use std::ops::Deref;
use bytes::Bytes;
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
use crate::context::RequestContext;
use crate::page_cache::{self, FileId, PAGE_SZ, PageReadGuard, PageWriteGuard, ReadBufResult};
#[cfg(test)]
use crate::virtual_file::IoBufferMut;
use crate::virtual_file::{IoBuffer, VirtualFile};
use crate::virtual_file::VirtualFile;
/// This is implemented by anything that can read 8 kB (PAGE_SZ)
/// blocks, using the page cache
@@ -245,17 +247,17 @@ pub trait BlockWriter {
/// 'buf' must be of size PAGE_SZ. Returns the block number the page was
/// written to.
///
fn write_blk(&mut self, buf: IoBuffer) -> Result<u32, std::io::Error>;
fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error>;
}
///
/// A simple in-memory buffer of blocks.
///
pub struct BlockBuf {
pub blocks: Vec<IoBuffer>,
pub blocks: Vec<Bytes>,
}
impl BlockWriter for BlockBuf {
fn write_blk(&mut self, buf: IoBuffer) -> Result<u32, std::io::Error> {
fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error> {
assert!(buf.len() == PAGE_SZ);
let blknum = self.blocks.len();
self.blocks.push(buf);

View File

@@ -25,7 +25,7 @@ use std::{io, result};
use async_stream::try_stream;
use byteorder::{BE, ReadBytesExt};
use bytes::BufMut;
use bytes::{BufMut, Bytes, BytesMut};
use either::Either;
use futures::{Stream, StreamExt};
use hex;
@@ -34,7 +34,6 @@ use tracing::error;
use crate::context::RequestContext;
use crate::tenant::block_io::{BlockReader, BlockWriter};
use crate::virtual_file::{IoBuffer, IoBufferMut, owned_buffers_io::write::Buffer};
// The maximum size of a value stored in the B-tree. 5 bytes is enough currently.
pub const VALUE_SZ: usize = 5;
@@ -788,12 +787,12 @@ impl<const L: usize> BuildNode<L> {
///
/// Serialize the node to on-disk format.
///
fn pack(&self) -> IoBuffer {
fn pack(&self) -> Bytes {
assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
assert!(self.num_children > 0);
let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
let mut buf = BytesMut::new();
buf.put_u16(self.num_children);
buf.put_u8(self.level);
@@ -806,7 +805,7 @@ impl<const L: usize> BuildNode<L> {
assert!(buf.len() == self.size);
assert!(buf.len() <= PAGE_SZ);
buf.extend_with(0, PAGE_SZ - buf.len());
buf.resize(PAGE_SZ, 0);
buf.freeze()
}
@@ -840,7 +839,7 @@ pub(crate) mod tests {
#[derive(Clone, Default)]
pub(crate) struct TestDisk {
blocks: Vec<IoBuffer>,
blocks: Vec<Bytes>,
}
impl TestDisk {
fn new() -> Self {
@@ -858,7 +857,7 @@ pub(crate) mod tests {
}
}
impl BlockWriter for &mut TestDisk {
fn write_blk(&mut self, buf: IoBuffer) -> io::Result<u32> {
fn write_blk(&mut self, buf: Bytes) -> io::Result<u32> {
let blknum = self.blocks.len();
self.blocks.push(buf);
Ok(blknum as u32)

View File

@@ -79,9 +79,9 @@ impl EphemeralFile {
VirtualFile::open_with_options_v2(
&filename,
virtual_file::OpenOptions::new()
.create_new(true)
.read(true)
.write(true),
.write(true)
.create(true),
ctx,
)
.await?,
@@ -98,7 +98,6 @@ impl EphemeralFile {
file: file.clone(),
buffered_writer: BufferedWriter::new(
file,
0,
|| IoBufferMut::with_capacity(TAIL_SZ),
gate.enter()?,
cancel.child_token(),
@@ -131,14 +130,6 @@ impl OwnedAsyncWriter for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter
> + Send {
self.inner.write_all_at(buf, offset, ctx)
}
fn set_len(
&self,
len: u64,
ctx: &RequestContext,
) -> impl Future<Output = std::io::Result<()>> + Send {
self.inner.set_len(len, ctx)
}
}
impl std::ops::Deref for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {

View File

@@ -91,7 +91,9 @@ pub async fn download_layer_file<'a>(
);
let temp_file = TempVirtualFile::new(
VirtualFile::open_with_options_v2(
// Not _v2 yet which is sensitive to virtual_file_io_mode.
// That'll happen in PR https://github.com/neondatabase/neon/pull/11558
VirtualFile::open_with_options(
&temp_file_path,
virtual_file::OpenOptions::new()
.create_new(true)
@@ -195,7 +197,6 @@ async fn download_object(
let dst_path = destination_file.path().to_owned();
let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(
destination_file,
0,
|| IoBufferMut::with_capacity(super::BUFFER_SIZE),
gate.enter().map_err(|_| DownloadError::Cancelled)?,
cancel.child_token(),
@@ -218,15 +219,10 @@ async fn download_object(
FlushTaskError::Cancelled => DownloadError::Cancelled,
})?;
}
buffered
.shutdown(
owned_buffers_io::write::BufferedWriterShutdownMode::PadThenTruncate,
ctx,
)
.await
.map_err(|e| match e {
FlushTaskError::Cancelled => DownloadError::Cancelled,
})
let inner = buffered.shutdown(ctx).await.map_err(|e| match e {
FlushTaskError::Cancelled => DownloadError::Cancelled,
})?;
Ok(inner)
}
.await?;

View File

@@ -1521,11 +1521,12 @@ async fn load_heatmap(
path: &Utf8PathBuf,
ctx: &RequestContext,
) -> Result<Option<HeatMapTenant>, anyhow::Error> {
let st = match VirtualFile::read_to_string(path, ctx).await {
Ok(st) => st,
let mut file = match VirtualFile::open(path, ctx).await {
Ok(file) => file,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(e) => Err(e)?,
};
let st = file.read_to_string(ctx).await?;
let htm = serde_json::from_str(&st)?;
Ok(Some(htm))
}

View File

@@ -29,6 +29,7 @@
//!
use std::collections::{HashMap, VecDeque};
use std::fs::File;
use std::io::SeekFrom;
use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::str::FromStr;
@@ -51,7 +52,6 @@ use tokio_epoll_uring::IoBuf;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::bin_ser::BeSer;
use utils::bin_ser::SerializeError;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
@@ -75,8 +75,7 @@ use crate::tenant::vectored_blob_io::{
};
use crate::virtual_file::TempVirtualFile;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode};
use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
///
@@ -114,15 +113,6 @@ impl From<&DeltaLayer> for Summary {
}
impl Summary {
/// Serializes the summary header into an aligned buffer of lenth `PAGE_SZ`.
pub fn ser_into_page(&self) -> Result<IoBuffer, SerializeError> {
let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
Self::ser_into(self, &mut buf)?;
// Pad zeroes to the buffer so the length is a multiple of the alignment.
buf.extend_with(0, buf.capacity() - buf.len());
Ok(buf.freeze())
}
pub(super) fn expected(
tenant_id: TenantId,
timeline_id: TimelineId,
@@ -402,7 +392,7 @@ struct DeltaLayerWriterInner {
tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
blob_writer: BlobWriter<TempVirtualFile>,
blob_writer: BlobWriter<true>,
// Number of key-lsns in the layer.
num_keys: usize,
@@ -426,29 +416,16 @@ impl DeltaLayerWriterInner {
// Create the file initially with a temporary filename. We don't know
// the end key yet, so we cannot form the final filename yet. We will
// rename it when we're done.
//
// Note: This overwrites any existing file. There shouldn't be any.
// FIXME: throw an error instead?
let path =
DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
let file = TempVirtualFile::new(
VirtualFile::open_with_options_v2(
&path,
virtual_file::OpenOptions::new()
.create_new(true)
.write(true),
ctx,
)
.await?,
gate.enter()?,
);
// Start at PAGE_SZ, make room for the header block
let blob_writer = BlobWriter::new(
file,
PAGE_SZ as u64,
gate,
cancel,
ctx,
info_span!(parent: None, "delta_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
)?;
let mut file = TempVirtualFile::new(VirtualFile::create(&path, ctx).await?, gate.enter()?);
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();
@@ -542,24 +519,15 @@ impl DeltaLayerWriterInner {
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
let file = self
.blob_writer
.shutdown(
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
ctx,
)
.await?;
let mut file = self.blob_writer.into_inner(ctx).await?;
// Write out the index
let (index_root_blk, block_buf) = self.tree.finish()?;
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
// TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
// Should we just replace BlockBuf::blocks with one big buffer
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
.await?;
for buf in block_buf.blocks {
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
res?;
offset += PAGE_SZ as u64;
}
assert!(self.lsn_range.start < self.lsn_range.end);
// Fill in the summary on blk 0
@@ -574,9 +542,11 @@ impl DeltaLayerWriterInner {
index_root_blk,
};
// Writes summary at the first block (offset 0).
let buf = summary.ser_into_page()?;
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
let mut buf = Vec::with_capacity(PAGE_SZ);
// TODO: could use smallvec here but it's a pain with Slice<T>
Summary::ser_into(&summary, &mut buf)?;
file.seek(SeekFrom::Start(0)).await?;
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
res?;
let metadata = file
@@ -768,7 +738,7 @@ impl DeltaLayer {
where
F: Fn(Summary) -> Summary,
{
let file = VirtualFile::open_with_options_v2(
let mut file = VirtualFile::open_with_options(
path,
virtual_file::OpenOptions::new().read(true).write(true),
ctx,
@@ -785,8 +755,11 @@ impl DeltaLayer {
let new_summary = rewrite(actual_summary);
let buf = new_summary.ser_into_page().context("serialize")?;
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
let mut buf = Vec::with_capacity(PAGE_SZ);
// TODO: could use smallvec here, but it's a pain with Slice<T>
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
file.seek(SeekFrom::Start(0)).await?;
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
res?;
Ok(())
}
@@ -1442,19 +1415,6 @@ impl DeltaLayerInner {
}
pub fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> {
self.iter_with_options(
ctx,
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1024, // The default value. Unit tests might use a different value
)
}
pub fn iter_with_options<'a>(
&'a self,
ctx: &'a RequestContext,
max_read_size: u64,
max_batch_size: usize,
) -> DeltaLayerIterator<'a> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
@@ -1464,7 +1424,10 @@ impl DeltaLayerInner {
index_iter: tree_reader.iter(&[0; DELTA_KEY_SIZE], ctx),
key_values_batch: std::collections::VecDeque::new(),
is_end: false,
planner: StreamingVectoredReadPlanner::new(max_read_size, max_batch_size),
planner: StreamingVectoredReadPlanner::new(
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1024, // The default value. Unit tests might use a different value
),
}
}

View File

@@ -27,6 +27,7 @@
//! actual page images are stored in the "values" part.
use std::collections::{HashMap, VecDeque};
use std::fs::File;
use std::io::SeekFrom;
use std::ops::Range;
use std::os::unix::prelude::FileExt;
use std::str::FromStr;
@@ -49,7 +50,6 @@ use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::bin_ser::BeSer;
use utils::bin_ser::SerializeError;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
@@ -73,8 +73,7 @@ use crate::tenant::vectored_blob_io::{
};
use crate::virtual_file::TempVirtualFile;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode};
use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
///
@@ -113,15 +112,6 @@ impl From<&ImageLayer> for Summary {
}
impl Summary {
/// Serializes the summary header into an aligned buffer of lenth `PAGE_SZ`.
pub fn ser_into_page(&self) -> Result<IoBuffer, SerializeError> {
let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
Self::ser_into(self, &mut buf)?;
// Pad zeroes to the buffer so the length is a multiple of the alignment.
buf.extend_with(0, buf.capacity() - buf.len());
Ok(buf.freeze())
}
pub(super) fn expected(
tenant_id: TenantId,
timeline_id: TimelineId,
@@ -363,7 +353,7 @@ impl ImageLayer {
where
F: Fn(Summary) -> Summary,
{
let file = VirtualFile::open_with_options_v2(
let mut file = VirtualFile::open_with_options(
path,
virtual_file::OpenOptions::new().read(true).write(true),
ctx,
@@ -380,8 +370,11 @@ impl ImageLayer {
let new_summary = rewrite(actual_summary);
let buf = new_summary.ser_into_page().context("serialize")?;
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
let mut buf = Vec::with_capacity(PAGE_SZ);
// TODO: could use smallvec here but it's a pain with Slice<T>
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
file.seek(SeekFrom::Start(0)).await?;
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
res?;
Ok(())
}
@@ -685,19 +678,6 @@ impl ImageLayerInner {
}
pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
self.iter_with_options(
ctx,
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1024, // The default value. Unit tests might use a different value
)
}
pub(crate) fn iter_with_options<'a>(
&'a self,
ctx: &'a RequestContext,
max_read_size: u64,
max_batch_size: usize,
) -> ImageLayerIterator<'a> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
@@ -707,7 +687,10 @@ impl ImageLayerInner {
index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
key_values_batch: VecDeque::new(),
is_end: false,
planner: StreamingVectoredReadPlanner::new(max_read_size, max_batch_size),
planner: StreamingVectoredReadPlanner::new(
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1024, // The default value. Unit tests might use a different value
),
}
}
@@ -760,7 +743,7 @@ struct ImageLayerWriterInner {
// Number of keys in the layer.
num_keys: usize,
blob_writer: BlobWriter<TempVirtualFile>,
blob_writer: BlobWriter<false>,
tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
#[cfg(feature = "testing")]
@@ -794,27 +777,20 @@ impl ImageLayerWriterInner {
},
);
trace!("creating image layer {}", path);
let file = TempVirtualFile::new(
VirtualFile::open_with_options_v2(
let mut file = TempVirtualFile::new(
VirtualFile::open_with_options(
&path,
virtual_file::OpenOptions::new()
.create_new(true)
.write(true),
.write(true)
.create_new(true),
ctx,
)
.await?,
gate.enter()?,
);
// Start at `PAGE_SZ` to make room for the header block.
let blob_writer = BlobWriter::new(
file,
PAGE_SZ as u64,
gate,
cancel,
ctx,
info_span!(parent: None, "image_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
)?;
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();
@@ -942,24 +918,15 @@ impl ImageLayerWriterInner {
crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
};
let file = self
.blob_writer
.shutdown(
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
ctx,
)
.await?;
let mut file = self.blob_writer.into_inner(ctx).await?;
// Write out the index
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
.await?;
let (index_root_blk, block_buf) = self.tree.finish()?;
// TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
// Should we just replace BlockBuf::blocks with one big buffer?
for buf in block_buf.blocks {
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
res?;
offset += PAGE_SZ as u64;
}
let final_key_range = if let Some(end_key) = end_key {
@@ -980,9 +947,11 @@ impl ImageLayerWriterInner {
index_root_blk,
};
// Writes summary at the first block (offset 0).
let buf = summary.ser_into_page()?;
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
let mut buf = Vec::with_capacity(PAGE_SZ);
// TODO: could use smallvec here but it's a pain with Slice<T>
Summary::ser_into(&summary, &mut buf)?;
file.seek(SeekFrom::Start(0)).await?;
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
res?;
let metadata = file

View File

@@ -19,7 +19,6 @@ pub(crate) enum LayerRef<'a> {
}
impl<'a> LayerRef<'a> {
#[allow(dead_code)]
fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
match self {
Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
@@ -27,22 +26,6 @@ impl<'a> LayerRef<'a> {
}
}
fn iter_with_options(
self,
ctx: &'a RequestContext,
max_read_size: u64,
max_batch_size: usize,
) -> LayerIterRef<'a> {
match self {
Self::Image(x) => {
LayerIterRef::Image(x.iter_with_options(ctx, max_read_size, max_batch_size))
}
Self::Delta(x) => {
LayerIterRef::Delta(x.iter_with_options(ctx, max_read_size, max_batch_size))
}
}
}
fn layer_dbg_info(&self) -> String {
match self {
Self::Image(x) => x.layer_dbg_info(),
@@ -83,8 +66,6 @@ pub(crate) enum IteratorWrapper<'a> {
first_key_lower_bound: (Key, Lsn),
layer: LayerRef<'a>,
source_desc: Arc<PersistentLayerKey>,
max_read_size: u64,
max_batch_size: usize,
},
Loaded {
iter: PeekableLayerIterRef<'a>,
@@ -165,8 +146,6 @@ impl<'a> IteratorWrapper<'a> {
pub fn create_from_image_layer(
image_layer: &'a ImageLayerInner,
ctx: &'a RequestContext,
max_read_size: u64,
max_batch_size: usize,
) -> Self {
Self::NotLoaded {
layer: LayerRef::Image(image_layer),
@@ -178,16 +157,12 @@ impl<'a> IteratorWrapper<'a> {
is_delta: false,
}
.into(),
max_read_size,
max_batch_size,
}
}
pub fn create_from_delta_layer(
delta_layer: &'a DeltaLayerInner,
ctx: &'a RequestContext,
max_read_size: u64,
max_batch_size: usize,
) -> Self {
Self::NotLoaded {
layer: LayerRef::Delta(delta_layer),
@@ -199,8 +174,6 @@ impl<'a> IteratorWrapper<'a> {
is_delta: true,
}
.into(),
max_read_size,
max_batch_size,
}
}
@@ -231,13 +204,11 @@ impl<'a> IteratorWrapper<'a> {
first_key_lower_bound,
layer,
source_desc,
max_read_size,
max_batch_size,
} = self
else {
unreachable!()
};
let iter = layer.iter_with_options(ctx, *max_read_size, *max_batch_size);
let iter = layer.iter(ctx);
let iter = PeekableLayerIterRef::create(iter).await?;
if let Some((k1, l1, _)) = iter.peek() {
let (k2, l2) = first_key_lower_bound;
@@ -322,41 +293,21 @@ impl MergeIteratorItem for ((Key, Lsn, Value), Arc<PersistentLayerKey>) {
}
impl<'a> MergeIterator<'a> {
pub fn create_with_options(
deltas: &[&'a DeltaLayerInner],
images: &[&'a ImageLayerInner],
ctx: &'a RequestContext,
max_read_size: u64,
max_batch_size: usize,
) -> Self {
let mut heap = Vec::with_capacity(images.len() + deltas.len());
for image in images {
heap.push(IteratorWrapper::create_from_image_layer(
image,
ctx,
max_read_size,
max_batch_size,
));
}
for delta in deltas {
heap.push(IteratorWrapper::create_from_delta_layer(
delta,
ctx,
max_read_size,
max_batch_size,
));
}
Self {
heap: BinaryHeap::from(heap),
}
}
pub fn create(
deltas: &[&'a DeltaLayerInner],
images: &[&'a ImageLayerInner],
ctx: &'a RequestContext,
) -> Self {
Self::create_with_options(deltas, images, ctx, 1024 * 8192, 1024)
let mut heap = Vec::with_capacity(images.len() + deltas.len());
for image in images {
heap.push(IteratorWrapper::create_from_image_layer(image, ctx));
}
for delta in deltas {
heap.push(IteratorWrapper::create_from_delta_layer(delta, ctx));
}
Self {
heap: BinaryHeap::from(heap),
}
}
pub(crate) async fn next_inner<R: MergeIteratorItem>(&mut self) -> anyhow::Result<Option<R>> {

View File

@@ -2828,41 +2828,6 @@ impl Timeline {
Ok(())
}
/// Check if the memory usage is within the limit.
async fn check_memory_usage(
self: &Arc<Self>,
layer_selection: &[Layer],
) -> Result<(), CompactionError> {
let mut estimated_memory_usage_mb = 0.0;
let mut num_image_layers = 0;
let mut num_delta_layers = 0;
let target_layer_size_bytes = 256 * 1024 * 1024;
for layer in layer_selection {
let layer_desc = layer.layer_desc();
if layer_desc.is_delta() {
// Delta layers at most have 1MB buffer; 3x to make it safe (there're deltas as large as 16KB).
// Multiply the layer size so that tests can pass.
estimated_memory_usage_mb +=
3.0 * (layer_desc.file_size / target_layer_size_bytes) as f64;
num_delta_layers += 1;
} else {
// Image layers at most have 1MB buffer but it might be compressed; assume 5x compression ratio.
estimated_memory_usage_mb +=
5.0 * (layer_desc.file_size / target_layer_size_bytes) as f64;
num_image_layers += 1;
}
}
if estimated_memory_usage_mb > 1024.0 {
return Err(CompactionError::Other(anyhow!(
"estimated memory usage is too high: {}MB, giving up compaction; num_image_layers={}, num_delta_layers={}",
estimated_memory_usage_mb,
num_image_layers,
num_delta_layers
)));
}
Ok(())
}
/// Get a watermark for gc-compaction, that is the lowest LSN that we can use as the `gc_horizon` for
/// the compaction algorithm. It is min(space_cutoff, time_cutoff, latest_gc_cutoff, standby_horizon).
/// Leases and retain_lsns are considered in the gc-compaction job itself so we don't need to account for them
@@ -3299,17 +3264,6 @@ impl Timeline {
self.check_compaction_space(&job_desc.selected_layers)
.await?;
self.check_memory_usage(&job_desc.selected_layers).await?;
if job_desc.selected_layers.len() > 100
&& job_desc.rewrite_layers.len() as f64 >= job_desc.selected_layers.len() as f64 * 0.7
{
return Err(CompactionError::Other(anyhow!(
"too many layers to rewrite: {} / {}, giving up compaction",
job_desc.rewrite_layers.len(),
job_desc.selected_layers.len()
)));
}
// Generate statistics for the compaction
for layer in &job_desc.selected_layers {
let desc = layer.layer_desc();
@@ -3405,13 +3359,7 @@ impl Timeline {
.context("failed to collect gc compaction keyspace")
.map_err(CompactionError::Other)?;
let mut merge_iter = FilterIterator::create(
MergeIterator::create_with_options(
&delta_layers,
&image_layers,
ctx,
128 * 8192, /* 1MB buffer for each of the inner iterators */
128,
),
MergeIterator::create(&delta_layers, &image_layers, ctx),
dense_ks,
sparse_ks,
)

View File

@@ -507,9 +507,7 @@ impl<'a> VectoredBlobReader<'a> {
for (blob_start, meta) in blobs_at.iter().copied() {
let header_start = (blob_start - read.start) as usize;
let header = Header::decode(&buf[header_start..]).map_err(|anyhow_err| {
std::io::Error::new(std::io::ErrorKind::InvalidData, anyhow_err)
})?;
let header = Header::decode(&buf[header_start..])?;
let data_start = header_start + header.header_len;
let end = data_start + header.data_len;
let compression_bits = header.compression_bits;
@@ -664,6 +662,7 @@ impl StreamingVectoredReadPlanner {
#[cfg(test)]
mod tests {
use anyhow::Error;
use super::super::blob_io::tests::{random_array, write_maybe_compressed};
use super::*;
@@ -946,16 +945,13 @@ mod tests {
}
}
async fn round_trip_test_compressed(
blobs: &[Vec<u8>],
compression: bool,
) -> anyhow::Result<()> {
async fn round_trip_test_compressed(blobs: &[Vec<u8>], compression: bool) -> Result<(), Error> {
let ctx =
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
let (_temp_dir, pathbuf, offsets) =
write_maybe_compressed(blobs, compression, &ctx).await?;
write_maybe_compressed::<true>(blobs, compression, &ctx).await?;
let file = VirtualFile::open_v2(&pathbuf, &ctx).await?;
let file = VirtualFile::open(&pathbuf, &ctx).await?;
let file_len = std::fs::metadata(&pathbuf)?.len();
// Multiply by two (compressed data might need more space), and add a few bytes for the header
@@ -1001,7 +997,7 @@ mod tests {
}
#[tokio::test]
async fn test_really_big_array() -> anyhow::Result<()> {
async fn test_really_big_array() -> Result<(), Error> {
let blobs = &[
b"test".to_vec(),
random_array(10 * PAGE_SZ),
@@ -1016,7 +1012,7 @@ mod tests {
}
#[tokio::test]
async fn test_arrays_inc() -> anyhow::Result<()> {
async fn test_arrays_inc() -> Result<(), Error> {
let blobs = (0..PAGE_SZ / 8)
.map(|v| random_array(v * 16))
.collect::<Vec<_>>();

View File

@@ -12,11 +12,10 @@
//! src/backend/storage/file/fd.c
//!
use std::fs::File;
use std::io::{Error, ErrorKind};
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
#[cfg(target_os = "linux")]
use std::os::unix::fs::OpenOptionsExt;
use std::sync::LazyLock;
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
use camino::{Utf8Path, Utf8PathBuf};
@@ -97,38 +96,69 @@ impl VirtualFile {
Self::open_with_options_v2(path.as_ref(), OpenOptions::new().read(true), ctx).await
}
pub async fn create<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
let inner = VirtualFileInner::create(path, ctx).await?;
Ok(VirtualFile {
inner,
_mode: IoMode::Buffered,
})
}
pub async fn create_v2<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
VirtualFile::open_with_options_v2(
path.as_ref(),
OpenOptions::new().write(true).create(true).truncate(true),
ctx,
)
.await
}
pub async fn open_with_options<P: AsRef<Utf8Path>>(
path: P,
open_options: &OpenOptions,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
Ok(VirtualFile {
inner,
_mode: IoMode::Buffered,
})
}
pub async fn open_with_options_v2<P: AsRef<Utf8Path>>(
path: P,
open_options: &OpenOptions,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
let mode = get_io_mode();
let set_o_direct = match (mode, open_options.is_write()) {
(IoMode::Buffered, _) => false,
#[cfg(target_os = "linux")]
(IoMode::Direct, false) => true,
#[cfg(target_os = "linux")]
(IoMode::Direct, true) => false,
#[cfg(target_os = "linux")]
(IoMode::DirectRw, _) => true,
};
let open_options = open_options.clone();
let open_options = if set_o_direct {
#[cfg(target_os = "linux")]
{
let mut open_options = open_options;
open_options.custom_flags(nix::libc::O_DIRECT);
open_options
let file = match get_io_mode() {
IoMode::Buffered => {
let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
VirtualFile {
inner,
_mode: IoMode::Buffered,
}
}
#[cfg(target_os = "linux")]
IoMode::Direct => {
let inner = VirtualFileInner::open_with_options(
path,
open_options.clone().custom_flags(nix::libc::O_DIRECT),
ctx,
)
.await?;
VirtualFile {
inner,
_mode: IoMode::Direct,
}
}
#[cfg(not(target_os = "linux"))]
unreachable!(
"O_DIRECT is not supported on this platform, IoMode's that result in set_o_direct=true shouldn't even be defined"
);
} else {
open_options
};
let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
Ok(VirtualFile { inner, _mode: mode })
Ok(file)
}
pub fn path(&self) -> &Utf8Path {
@@ -157,14 +187,18 @@ impl VirtualFile {
self.inner.sync_data().await
}
pub async fn set_len(&self, len: u64, ctx: &RequestContext) -> Result<(), Error> {
self.inner.set_len(len, ctx).await
}
pub async fn metadata(&self) -> Result<Metadata, Error> {
self.inner.metadata().await
}
pub fn remove(self) {
self.inner.remove();
}
pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
self.inner.seek(pos).await
}
pub async fn read_exact_at<Buf>(
&self,
slice: Slice<Buf>,
@@ -195,31 +229,25 @@ impl VirtualFile {
self.inner.write_all_at(buf, offset, ctx).await
}
pub(crate) async fn read_to_string<P: AsRef<Utf8Path>>(
path: P,
pub async fn write_all<Buf: IoBuf + Send>(
&mut self,
buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> std::io::Result<String> {
let file = VirtualFile::open(path, ctx).await?; // TODO: open_v2
) -> (FullSlice<Buf>, Result<usize, Error>) {
self.inner.write_all(buf, ctx).await
}
async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
self.inner.read_to_end(buf, ctx).await
}
pub(crate) async fn read_to_string(
&mut self,
ctx: &RequestContext,
) -> Result<String, anyhow::Error> {
let mut buf = Vec::new();
let mut tmp = vec![0; 128];
let mut pos: u64 = 0;
loop {
let slice = tmp.slice(..128);
let (slice, res) = file.inner.read_at(slice, pos, ctx).await;
match res {
Ok(0) => break,
Ok(n) => {
pos += n as u64;
buf.extend_from_slice(&slice[..n]);
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
tmp = slice.into_inner();
}
String::from_utf8(buf).map_err(|_| {
std::io::Error::new(ErrorKind::InvalidData, "file contents are not valid UTF-8")
})
self.read_to_end(&mut buf, ctx).await?;
Ok(String::from_utf8(buf)?)
}
}
@@ -266,6 +294,9 @@ pub struct VirtualFileInner {
/// belongs to a different VirtualFile.
handle: RwLock<SlotHandle>,
/// Current file position
pos: u64,
/// File path and options to use to open it.
///
/// Note: this only contains the options needed to re-open it. For example,
@@ -530,7 +561,21 @@ impl VirtualFileInner {
path: P,
ctx: &RequestContext,
) -> Result<VirtualFileInner, std::io::Error> {
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true).clone(), ctx).await
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await
}
/// Create a new file for writing. If the file exists, it will be truncated.
/// Like File::create.
pub async fn create<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<VirtualFileInner, std::io::Error> {
Self::open_with_options(
path.as_ref(),
OpenOptions::new().write(true).create(true).truncate(true),
ctx,
)
.await
}
/// Open a file with given options.
@@ -540,7 +585,7 @@ impl VirtualFileInner {
/// on the first time. Make sure that's sane!
pub async fn open_with_options<P: AsRef<Utf8Path>>(
path: P,
open_options: OpenOptions,
open_options: &OpenOptions,
_ctx: &RequestContext,
) -> Result<VirtualFileInner, std::io::Error> {
let path = path.as_ref();
@@ -565,6 +610,7 @@ impl VirtualFileInner {
let vfile = VirtualFileInner {
handle: RwLock::new(handle),
pos: 0,
path: path.to_owned(),
open_options: reopen_options,
};
@@ -631,13 +677,6 @@ impl VirtualFileInner {
})
}
pub async fn set_len(&self, len: u64, _ctx: &RequestContext) -> Result<(), Error> {
with_file!(self, StorageIoOperation::SetLen, |file_guard| {
let (_file_guard, res) = io_engine::get().set_len(file_guard, len).await;
res.maybe_fatal_err("set_len")
})
}
/// Helper function internal to `VirtualFile` that looks up the underlying File,
/// opens it and evicts some other File if necessary. The passed parameter is
/// assumed to be a function available for the physical `File`.
@@ -705,6 +744,38 @@ impl VirtualFileInner {
})
}
pub fn remove(self) {
let path = self.path.clone();
drop(self);
std::fs::remove_file(path).expect("failed to remove the virtual file");
}
pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
match pos {
SeekFrom::Start(offset) => {
self.pos = offset;
}
SeekFrom::End(offset) => {
self.pos = with_file!(self, StorageIoOperation::Seek, |mut file_guard| file_guard
.with_std_file_mut(|std_file| std_file.seek(SeekFrom::End(offset))))?
}
SeekFrom::Current(offset) => {
let pos = self.pos as i128 + offset as i128;
if pos < 0 {
return Err(Error::new(
ErrorKind::InvalidInput,
"offset would be negative",
));
}
if pos > u64::MAX as i128 {
return Err(Error::new(ErrorKind::InvalidInput, "offset overflow"));
}
self.pos = pos as u64;
}
}
Ok(self.pos)
}
/// Read the file contents in range `offset..(offset + slice.bytes_total())` into `slice[0..slice.bytes_total()]`.
///
/// The returned `Slice<Buf>` is equivalent to the input `slice`, i.e., it's the same view into the same buffer.
@@ -788,7 +859,59 @@ impl VirtualFileInner {
(restore(buf), Ok(()))
}
pub(super) async fn read_at<Buf>(
/// Writes `buf` to the file at the current offset.
///
/// Panics if there is an uninitialized range in `buf`, as that is most likely a bug in the caller.
pub async fn write_all<Buf: IoBuf + Send>(
&mut self,
buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<usize, Error>) {
let buf = buf.into_raw_slice();
let bounds = buf.bounds();
let restore =
|buf: Slice<_>| FullSlice::must_new(Slice::from_buf_bounds(buf.into_inner(), bounds));
let nbytes = buf.len();
let mut buf = buf;
while !buf.is_empty() {
let (tmp, res) = self.write(FullSlice::must_new(buf), ctx).await;
buf = tmp.into_raw_slice();
match res {
Ok(0) => {
return (
restore(buf),
Err(Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
)),
);
}
Ok(n) => {
buf = buf.slice(n..);
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return (restore(buf), Err(e)),
}
}
(restore(buf), Ok(nbytes))
}
async fn write<B: IoBuf + Send>(
&mut self,
buf: FullSlice<B>,
ctx: &RequestContext,
) -> (FullSlice<B>, Result<usize, std::io::Error>) {
let pos = self.pos;
let (buf, res) = self.write_at(buf, pos, ctx).await;
let n = match res {
Ok(n) => n,
Err(e) => return (buf, Err(e)),
};
self.pos += n as u64;
(buf, Ok(n))
}
pub(crate) async fn read_at<Buf>(
&self,
buf: tokio_epoll_uring::Slice<Buf>,
offset: u64,
@@ -816,11 +939,23 @@ impl VirtualFileInner {
})
}
/// The function aborts the process if the error is fatal.
async fn write_at<B: IoBuf + Send>(
&self,
buf: FullSlice<B>,
offset: u64,
ctx: &RequestContext,
) -> (FullSlice<B>, Result<usize, Error>) {
let (slice, result) = self.write_at_inner(buf, offset, ctx).await;
let result = result.maybe_fatal_err("write_at");
(slice, result)
}
async fn write_at_inner<B: IoBuf + Send>(
&self,
buf: FullSlice<B>,
offset: u64,
ctx: &RequestContext,
) -> (FullSlice<B>, Result<usize, Error>) {
let file_guard = match self.lock_file().await {
Ok(file_guard) => file_guard,
@@ -829,13 +964,30 @@ impl VirtualFileInner {
observe_duration!(StorageIoOperation::Write, {
let ((_file_guard, buf), result) =
io_engine::get().write_at(file_guard, offset, buf).await;
let result = result.maybe_fatal_err("write_at");
if let Ok(size) = result {
ctx.io_size_metrics().write.add(size.into_u64());
}
(buf, result)
})
}
async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
let mut tmp = vec![0; 128];
loop {
let slice = tmp.slice(..128);
let (slice, res) = self.read_at(slice, self.pos, ctx).await;
match res {
Ok(0) => return Ok(()),
Ok(n) => {
self.pos += n as u64;
buf.extend_from_slice(&slice[..n]);
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
tmp = slice.into_inner();
}
}
}
// Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
@@ -1050,6 +1202,19 @@ impl FileGuard {
let _ = file.into_raw_fd();
res
}
/// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
fn with_std_file_mut<F, R>(&mut self, with: F) -> R
where
F: FnOnce(&mut File) -> R,
{
// SAFETY:
// - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
// - &mut usage below: `self` is `&mut`, hence this call is the only task/thread that has control over the underlying fd
let mut file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
let res = with(&mut file);
let _ = file.into_raw_fd();
res
}
}
impl tokio_epoll_uring::IoFd for FileGuard {
@@ -1139,9 +1304,6 @@ impl OwnedAsyncWriter for VirtualFile {
) -> (FullSlice<Buf>, std::io::Result<()>) {
VirtualFile::write_all_at(self, buf, offset, ctx).await
}
async fn set_len(&self, len: u64, ctx: &RequestContext) -> std::io::Result<()> {
VirtualFile::set_len(self, len, ctx).await
}
}
impl OpenFiles {
@@ -1206,7 +1368,8 @@ pub(crate) type IoBuffer = AlignedBuffer<ConstAlign<{ get_io_buffer_alignment()
pub(crate) type IoPageSlice<'a> =
AlignedSlice<'a, PAGE_SZ, ConstAlign<{ get_io_buffer_alignment() }>>;
static IO_MODE: LazyLock<AtomicU8> = LazyLock::new(|| AtomicU8::new(IoMode::preferred() as u8));
static IO_MODE: once_cell::sync::Lazy<AtomicU8> =
once_cell::sync::Lazy::new(|| AtomicU8::new(IoMode::preferred() as u8));
pub fn set_io_mode(mode: IoMode) {
IO_MODE.store(mode as u8, std::sync::atomic::Ordering::Relaxed);
@@ -1220,6 +1383,7 @@ static SYNC_MODE: AtomicU8 = AtomicU8::new(SyncMode::Sync as u8);
#[cfg(test)]
mod tests {
use std::io::Write;
use std::os::unix::fs::FileExt;
use std::sync::Arc;
@@ -1272,6 +1436,43 @@ mod tests {
MaybeVirtualFile::File(file) => file.write_all_at(&buf[..], offset),
}
}
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.seek(pos).await,
MaybeVirtualFile::File(file) => file.seek(pos),
}
}
async fn write_all<Buf: IoBuf + Send>(
&mut self,
buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => {
let (_buf, res) = file.write_all(buf, ctx).await;
res.map(|_| ())
}
MaybeVirtualFile::File(file) => file.write_all(&buf[..]),
}
}
// Helper function to slurp contents of a file, starting at the current position,
// into a string
async fn read_string(&mut self, ctx: &RequestContext) -> Result<String, Error> {
use std::io::Read;
let mut buf = String::new();
match self {
MaybeVirtualFile::VirtualFile(file) => {
let mut buf = Vec::new();
file.read_to_end(&mut buf, ctx).await?;
return Ok(String::from_utf8(buf).unwrap());
}
MaybeVirtualFile::File(file) => {
file.read_to_string(&mut buf)?;
}
}
Ok(buf)
}
// Helper function to slurp a portion of a file into a string
async fn read_string_at(
@@ -1307,7 +1508,7 @@ mod tests {
opts: OpenOptions,
ctx: &RequestContext,
) -> Result<MaybeVirtualFile, anyhow::Error> {
let vf = VirtualFile::open_with_options_v2(&path, &opts, ctx).await?;
let vf = VirtualFile::open_with_options(&path, &opts, ctx).await?;
Ok(MaybeVirtualFile::VirtualFile(vf))
}
}
@@ -1367,23 +1568,48 @@ mod tests {
.await?;
file_a
.write_all_at(IoBuffer::from(b"foobar").slice_len(), 0, &ctx)
.write_all(b"foobar".to_vec().slice_len(), &ctx)
.await?;
// cannot read from a file opened in write-only mode
let _ = file_a.read_string_at(0, 1, &ctx).await.unwrap_err();
let _ = file_a.read_string(&ctx).await.unwrap_err();
// Close the file and re-open for reading
let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?;
// cannot write to a file opened in read-only mode
let _ = file_a
.write_all_at(IoBuffer::from(b"bar").slice_len(), 0, &ctx)
.write_all(b"bar".to_vec().slice_len(), &ctx)
.await
.unwrap_err();
// Try simple read
assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?);
assert_eq!("foobar", file_a.read_string(&ctx).await?);
// It's positioned at the EOF now.
assert_eq!("", file_a.read_string(&ctx).await?);
// Test seeks.
assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
assert_eq!("oobar", file_a.read_string(&ctx).await?);
assert_eq!(file_a.seek(SeekFrom::End(-2)).await?, 4);
assert_eq!("ar", file_a.read_string(&ctx).await?);
assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
assert_eq!(file_a.seek(SeekFrom::Current(2)).await?, 3);
assert_eq!("bar", file_a.read_string(&ctx).await?);
assert_eq!(file_a.seek(SeekFrom::Current(-5)).await?, 1);
assert_eq!("oobar", file_a.read_string(&ctx).await?);
// Test erroneous seeks to before byte 0
file_a.seek(SeekFrom::End(-7)).await.unwrap_err();
assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
file_a.seek(SeekFrom::Current(-2)).await.unwrap_err();
// the erroneous seek should have left the position unchanged
assert_eq!("oobar", file_a.read_string(&ctx).await?);
// Create another test file, and try FileExt functions on it.
let path_b = testdir.join("file_b");
@@ -1409,6 +1635,9 @@ mod tests {
// Open a lot of files, enough to cause some evictions. (Or to be precise,
// open the same file many times. The effect is the same.)
//
// leave file_a positioned at offset 1 before we start
assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
let mut vfiles = Vec::new();
for _ in 0..100 {
@@ -1418,7 +1647,7 @@ mod tests {
&ctx,
)
.await?;
assert_eq!("FOOBAR", vfile.read_string_at(0, 6, &ctx).await?);
assert_eq!("FOOBAR", vfile.read_string(&ctx).await?);
vfiles.push(vfile);
}
@@ -1426,8 +1655,8 @@ mod tests {
assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
// The underlying file descriptor for 'file_a' should be closed now. Try to read
// from it again.
assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?);
// from it again. We left the file positioned at offset 1 above.
assert_eq!("oobar", file_a.read_string(&ctx).await?);
// Check that all the other FDs still work too. Use them in random order for
// good measure.
@@ -1466,7 +1695,7 @@ mod tests {
for _ in 0..VIRTUAL_FILES {
let f = VirtualFileInner::open_with_options(
&test_file_path,
OpenOptions::new().read(true).clone(),
OpenOptions::new().read(true),
&ctx,
)
.await?;
@@ -1521,7 +1750,7 @@ mod tests {
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
let post = file.read_string_at(0, 3, &ctx).await.unwrap();
let post = file.read_string(&ctx).await.unwrap();
assert_eq!(post, "foo");
assert!(!tmp_path.exists());
drop(file);
@@ -1530,7 +1759,7 @@ mod tests {
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
let post = file.read_string_at(0, 3, &ctx).await.unwrap();
let post = file.read_string(&ctx).await.unwrap();
assert_eq!(post, "bar");
assert!(!tmp_path.exists());
drop(file);
@@ -1555,7 +1784,7 @@ mod tests {
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
let post = file.read_string_at(0, 3, &ctx).await.unwrap();
let post = file.read_string(&ctx).await.unwrap();
assert_eq!(post, "foo");
assert!(!tmp_path.exists());
drop(file);

View File

@@ -209,27 +209,6 @@ impl IoEngine {
}
}
}
pub(super) async fn set_len(
&self,
file_guard: FileGuard,
len: u64,
) -> (FileGuard, std::io::Result<()>) {
match self {
IoEngine::NotSet => panic!("not initialized"),
IoEngine::StdFs => {
let res = file_guard.with_std_file(|std_file| std_file.set_len(len));
(file_guard, res)
}
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
// TODO: ftruncate op for tokio-epoll-uring
let res = file_guard.with_std_file(|std_file| std_file.set_len(len));
(file_guard, res)
}
}
}
pub(super) async fn write_at<B: IoBuf + Send>(
&self,
file_guard: FileGuard,

View File

@@ -6,12 +6,7 @@ use std::path::Path;
use super::io_engine::IoEngine;
#[derive(Debug, Clone)]
pub struct OpenOptions {
write: bool,
inner: Inner,
}
#[derive(Debug, Clone)]
enum Inner {
pub enum OpenOptions {
StdFs(std::fs::OpenOptions),
#[cfg(target_os = "linux")]
TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions),
@@ -19,17 +14,13 @@ enum Inner {
impl Default for OpenOptions {
fn default() -> Self {
let inner = match super::io_engine::get() {
match super::io_engine::get() {
IoEngine::NotSet => panic!("io engine not set"),
IoEngine::StdFs => Inner::StdFs(std::fs::OpenOptions::new()),
IoEngine::StdFs => Self::StdFs(std::fs::OpenOptions::new()),
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
Inner::TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions::new())
Self::TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions::new())
}
};
Self {
write: false,
inner,
}
}
}
@@ -39,17 +30,13 @@ impl OpenOptions {
Self::default()
}
pub(super) fn is_write(&self) -> bool {
self.write
}
pub fn read(&mut self, read: bool) -> &mut OpenOptions {
match &mut self.inner {
Inner::StdFs(x) => {
match self {
OpenOptions::StdFs(x) => {
let _ = x.read(read);
}
#[cfg(target_os = "linux")]
Inner::TokioEpollUring(x) => {
OpenOptions::TokioEpollUring(x) => {
let _ = x.read(read);
}
}
@@ -57,13 +44,12 @@ impl OpenOptions {
}
pub fn write(&mut self, write: bool) -> &mut OpenOptions {
self.write = write;
match &mut self.inner {
Inner::StdFs(x) => {
match self {
OpenOptions::StdFs(x) => {
let _ = x.write(write);
}
#[cfg(target_os = "linux")]
Inner::TokioEpollUring(x) => {
OpenOptions::TokioEpollUring(x) => {
let _ = x.write(write);
}
}
@@ -71,12 +57,12 @@ impl OpenOptions {
}
pub fn create(&mut self, create: bool) -> &mut OpenOptions {
match &mut self.inner {
Inner::StdFs(x) => {
match self {
OpenOptions::StdFs(x) => {
let _ = x.create(create);
}
#[cfg(target_os = "linux")]
Inner::TokioEpollUring(x) => {
OpenOptions::TokioEpollUring(x) => {
let _ = x.create(create);
}
}
@@ -84,12 +70,12 @@ impl OpenOptions {
}
pub fn create_new(&mut self, create_new: bool) -> &mut OpenOptions {
match &mut self.inner {
Inner::StdFs(x) => {
match self {
OpenOptions::StdFs(x) => {
let _ = x.create_new(create_new);
}
#[cfg(target_os = "linux")]
Inner::TokioEpollUring(x) => {
OpenOptions::TokioEpollUring(x) => {
let _ = x.create_new(create_new);
}
}
@@ -97,12 +83,12 @@ impl OpenOptions {
}
pub fn truncate(&mut self, truncate: bool) -> &mut OpenOptions {
match &mut self.inner {
Inner::StdFs(x) => {
match self {
OpenOptions::StdFs(x) => {
let _ = x.truncate(truncate);
}
#[cfg(target_os = "linux")]
Inner::TokioEpollUring(x) => {
OpenOptions::TokioEpollUring(x) => {
let _ = x.truncate(truncate);
}
}
@@ -110,10 +96,10 @@ impl OpenOptions {
}
pub(in crate::virtual_file) async fn open(&self, path: &Path) -> std::io::Result<OwnedFd> {
match &self.inner {
Inner::StdFs(x) => x.open(path).map(|file| file.into()),
match self {
OpenOptions::StdFs(x) => x.open(path).map(|file| file.into()),
#[cfg(target_os = "linux")]
Inner::TokioEpollUring(x) => {
OpenOptions::TokioEpollUring(x) => {
let system = super::io_engine::tokio_epoll_uring_ext::thread_local_system().await;
system.open(path, x).await.map_err(|e| match e {
tokio_epoll_uring::Error::Op(e) => e,
@@ -128,12 +114,12 @@ impl OpenOptions {
impl std::os::unix::prelude::OpenOptionsExt for OpenOptions {
fn mode(&mut self, mode: u32) -> &mut OpenOptions {
match &mut self.inner {
Inner::StdFs(x) => {
match self {
OpenOptions::StdFs(x) => {
let _ = x.mode(mode);
}
#[cfg(target_os = "linux")]
Inner::TokioEpollUring(x) => {
OpenOptions::TokioEpollUring(x) => {
let _ = x.mode(mode);
}
}
@@ -141,12 +127,12 @@ impl std::os::unix::prelude::OpenOptionsExt for OpenOptions {
}
fn custom_flags(&mut self, flags: i32) -> &mut OpenOptions {
match &mut self.inner {
Inner::StdFs(x) => {
match self {
OpenOptions::StdFs(x) => {
let _ = x.custom_flags(flags);
}
#[cfg(target_os = "linux")]
Inner::TokioEpollUring(x) => {
OpenOptions::TokioEpollUring(x) => {
let _ = x.custom_flags(flags);
}
}

View File

@@ -282,17 +282,6 @@ unsafe impl<A: Alignment> tokio_epoll_uring::IoBufMut for AlignedBufferMut<A> {
}
}
impl<A: Alignment> std::io::Write for AlignedBufferMut<A> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {

View File

@@ -1,19 +1,14 @@
mod flush;
use bytes::BufMut;
pub(crate) use flush::FlushControl;
use flush::FlushHandle;
pub(crate) use flush::FlushTaskError;
use flush::ShutdownRequest;
use tokio_epoll_uring::IoBuf;
use tokio_util::sync::CancellationToken;
use tracing::trace;
use super::io_buf_aligned::IoBufAligned;
use super::io_buf_aligned::IoBufAlignedMut;
use super::io_buf_ext::{FullSlice, IoBufExt};
use crate::context::RequestContext;
use crate::virtual_file::UsizeIsU64;
use crate::virtual_file::{IoBuffer, IoBufferMut};
pub(crate) trait CheapCloneForRead {
@@ -38,49 +33,12 @@ pub trait OwnedAsyncWriter {
offset: u64,
ctx: &RequestContext,
) -> impl std::future::Future<Output = (FullSlice<Buf>, std::io::Result<()>)> + Send;
fn set_len(
&self,
len: u64,
ctx: &RequestContext,
) -> impl Future<Output = std::io::Result<()>> + Send;
}
/// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch
/// small writes into larger writes of size [`Buffer::cap`].
///
/// The buffer is flushed if and only if it is full ([`Buffer::pending`] == [`Buffer::cap`]).
/// This guarantees that writes to the filesystem happen
/// - at offsets that are multiples of [`Buffer::cap`]
/// - in lengths that are multiples of [`Buffer::cap`]
///
/// Above property is useful for Direct IO, where whatever the
/// effectively dominating disk-sector/filesystem-block/memory-page size
/// determines the requirements on
/// - the alignment of the pointer passed to the read/write operation
/// - the value of `count` (i.e., the length of the read/write operation)
/// which must be a multiple of the dominating sector/block/page size.
///
/// See [`BufferedWriter::shutdown`] / [`BufferedWriterShutdownMode`] for different
/// ways of dealing with the special case that the buffer is not full by the time
/// we are done writing.
///
/// The first flush to the underlying `W` happens at offset `start_offset` (arg of [`BufferedWriter::new`]).
/// The next flush is to offset `start_offset + Buffer::cap`. The one after at `start_offset + 2 * Buffer::cap` and so on.
///
/// TODO: decouple buffer capacity from alignment requirement.
/// Right now we assume [`Buffer::cap`] is the alignment requirement,
/// but actually [`Buffer::cap`] should only determine how often we flush
/// while writing, while a separate alignment requirement argument should
/// be passed to determine alignment requirement. This could be used by
/// [`BufferedWriterShutdownMode::PadThenTruncate`] to avoid excessive
/// padding of zeroes. For example, today, with a capacity of 64KiB, we
/// would pad up to 64KiB-1 bytes of zeroes, then truncate off 64KiB-1.
/// This is wasteful, e.g., if the alignment requirement is 4KiB, we only
/// need to pad & truncate up to 4KiB-1 bytes of zeroes
///
// TODO(yuchen): For large write, implementing buffer bypass for aligned parts of the write could be beneficial to throughput,
// since we would avoid copying majority of the data into the internal buffer.
// https://github.com/neondatabase/neon/issues/10101
pub struct BufferedWriter<B: Buffer, W> {
/// Clone of the buffer that was last submitted to the flush loop.
/// `None` if no flush request has been submitted, Some forever after.
@@ -102,24 +60,9 @@ pub struct BufferedWriter<B: Buffer, W> {
bytes_submitted: u64,
}
/// How [`BufferedWriter::shutdown`] should deal with pending (=not-yet-flushed) data.
///
/// Cf the [`BufferedWriter`] comment's paragraph for context on why we need to think about this.
pub enum BufferedWriterShutdownMode {
/// Drop pending data, don't write back to file.
DropTail,
/// Pad the pending data with zeroes (cf [`usize::next_multiple_of`]).
ZeroPadToNextMultiple(usize),
/// Fill the IO buffer with zeroes, flush to disk, the `ftruncate` the
/// file to the exact number of bytes written to [`Self`].
///
/// TODO: see in [`BufferedWriter`] comment about decoupling buffer capacity from alignment requirement.
PadThenTruncate,
}
impl<B, Buf, W> BufferedWriter<B, W>
where
B: IoBufAlignedMut + Buffer<IoBuf = Buf> + Send + 'static,
B: Buffer<IoBuf = Buf> + Send + 'static,
Buf: IoBufAligned + Send + Sync + CheapCloneForRead,
W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
{
@@ -128,7 +71,6 @@ where
/// The `buf_new` function provides a way to initialize the owned buffers used by this writer.
pub fn new(
writer: W,
start_offset: u64,
buf_new: impl Fn() -> B,
gate_guard: utils::sync::gate::GateGuard,
cancel: CancellationToken,
@@ -146,7 +88,7 @@ where
ctx.attached_child(),
flush_task_span,
),
bytes_submitted: start_offset,
bytes_submitted: 0,
}
}
@@ -167,80 +109,18 @@ where
}
#[cfg_attr(target_os = "macos", allow(dead_code))]
pub async fn shutdown(
mut self,
mode: BufferedWriterShutdownMode,
ctx: &RequestContext,
) -> Result<(u64, W), FlushTaskError> {
let mut mutable = self.mutable.take().expect("must not use after an error");
let unpadded_pending = mutable.pending();
let final_len: u64;
let shutdown_req;
match mode {
BufferedWriterShutdownMode::DropTail => {
trace!(pending=%mutable.pending(), "dropping pending data");
drop(mutable);
pub async fn shutdown(mut self, ctx: &RequestContext) -> Result<(u64, W), FlushTaskError> {
self.flush(ctx).await?;
final_len = self.bytes_submitted;
shutdown_req = ShutdownRequest { set_len: None };
}
BufferedWriterShutdownMode::ZeroPadToNextMultiple(next_multiple) => {
let len = mutable.pending();
let cap = mutable.cap();
assert!(
len <= cap,
"buffer impl ensures this, but let's check because the extend_with below would panic if we go beyond"
);
let padded_len = len.next_multiple_of(next_multiple);
assert!(
padded_len <= cap,
"caller specified a multiple that is larger than the buffer capacity"
);
let count = padded_len - len;
mutable.extend_with(0, count);
trace!(count, "padding with zeros");
self.mutable = Some(mutable);
final_len = self.bytes_submitted + padded_len.into_u64();
shutdown_req = ShutdownRequest { set_len: None };
}
BufferedWriterShutdownMode::PadThenTruncate => {
let len = mutable.pending();
let cap = mutable.cap();
// TODO: see struct comment TODO on decoupling buffer capacity from alignment requirement.
let alignment_requirement = cap;
assert!(len <= cap, "buffer impl should ensure this");
let padding_end_offset = len.next_multiple_of(alignment_requirement);
assert!(
padding_end_offset <= cap,
"{padding_end_offset} <= {cap} ({alignment_requirement})"
);
let count = padding_end_offset - len;
mutable.extend_with(0, count);
trace!(count, "padding with zeros");
self.mutable = Some(mutable);
final_len = self.bytes_submitted + len.into_u64();
shutdown_req = ShutdownRequest {
// Avoid set_len call if we didn't need to pad anything.
set_len: if count > 0 { Some(final_len) } else { None },
};
}
};
let padded_pending = self.mutable.as_ref().map(|b| b.pending());
trace!(unpadded_pending, padded_pending, "padding done");
if self.mutable.is_some() {
self.flush(ctx).await?;
}
let Self {
mutable: _,
mutable: buf,
maybe_flushed: _,
mut flush_handle,
bytes_submitted: _,
bytes_submitted: bytes_amount,
} = self;
let writer = flush_handle.shutdown(shutdown_req).await?;
Ok((final_len, writer))
let writer = flush_handle.shutdown().await?;
assert!(buf.is_some());
Ok((bytes_amount, writer))
}
#[cfg(test)]
@@ -344,10 +224,6 @@ pub trait Buffer {
/// panics if `other.len() > self.cap() - self.pending()`.
fn extend_from_slice(&mut self, other: &[u8]);
/// Add `count` bytes `val` into `self`.
/// Panics if `count > self.cap() - self.pending()`.
fn extend_with(&mut self, val: u8, count: usize);
/// Number of bytes in the buffer.
fn pending(&self) -> usize;
@@ -375,14 +251,6 @@ impl Buffer for IoBufferMut {
IoBufferMut::extend_from_slice(self, other);
}
fn extend_with(&mut self, val: u8, count: usize) {
if self.len() + count > self.cap() {
panic!("Buffer capacity exceeded");
}
IoBufferMut::put_bytes(self, val, count);
}
fn pending(&self) -> usize {
self.len()
}
@@ -405,22 +273,26 @@ impl Buffer for IoBufferMut {
mod tests {
use std::sync::Mutex;
use rstest::rstest;
use super::*;
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::TaskKind;
#[derive(Debug, PartialEq, Eq)]
enum Op {
Write { buf: Vec<u8>, offset: u64 },
SetLen { len: u64 },
}
#[derive(Default, Debug)]
struct RecorderWriter {
/// record bytes and write offsets.
recording: Mutex<Vec<Op>>,
writes: Mutex<Vec<(Vec<u8>, u64)>>,
}
impl RecorderWriter {
/// Gets recorded bytes and write offsets.
fn get_writes(&self) -> Vec<Vec<u8>> {
self.writes
.lock()
.unwrap()
.iter()
.map(|(buf, _)| buf.clone())
.collect()
}
}
impl OwnedAsyncWriter for RecorderWriter {
@@ -430,42 +302,28 @@ mod tests {
offset: u64,
_: &RequestContext,
) -> (FullSlice<Buf>, std::io::Result<()>) {
self.recording.lock().unwrap().push(Op::Write {
buf: Vec::from(&buf[..]),
offset,
});
self.writes
.lock()
.unwrap()
.push((Vec::from(&buf[..]), offset));
(buf, Ok(()))
}
async fn set_len(&self, len: u64, _ctx: &RequestContext) -> std::io::Result<()> {
self.recording.lock().unwrap().push(Op::SetLen { len });
Ok(())
}
}
fn test_ctx() -> RequestContext {
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
}
#[rstest]
#[tokio::test]
async fn test_write_all_borrowed_always_goes_through_buffer(
#[values(
BufferedWriterShutdownMode::DropTail,
BufferedWriterShutdownMode::ZeroPadToNextMultiple(2),
BufferedWriterShutdownMode::PadThenTruncate
)]
mode: BufferedWriterShutdownMode,
) -> anyhow::Result<()> {
async fn test_write_all_borrowed_always_goes_through_buffer() -> anyhow::Result<()> {
let ctx = test_ctx();
let ctx = &ctx;
let recorder = RecorderWriter::default();
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
let cap = 4;
let mut writer = BufferedWriter::<_, RecorderWriter>::new(
recorder,
0,
|| IoBufferMut::with_capacity(cap),
|| IoBufferMut::with_capacity(2),
gate.enter()?,
cancel,
ctx,
@@ -475,89 +333,23 @@ mod tests {
writer.write_buffered_borrowed(b"abc", ctx).await?;
writer.write_buffered_borrowed(b"", ctx).await?;
writer.write_buffered_borrowed(b"d", ctx).await?;
writer.write_buffered_borrowed(b"efg", ctx).await?;
writer.write_buffered_borrowed(b"hijklm", ctx).await?;
let mut expect = {
[(0, b"abcd"), (4, b"efgh"), (8, b"ijkl")]
.into_iter()
.map(|(offset, v)| Op::Write {
offset,
buf: v[..].to_vec(),
})
.collect::<Vec<_>>()
};
let expect_next_offset = 12;
match &mode {
BufferedWriterShutdownMode::DropTail => (),
// We test the case with padding to next multiple of 2 so that it's different
// from the alignment requirement of 4 inferred from buffer capacity.
// See TODOs in the `BufferedWriter` struct comment on decoupling buffer capacity from alignment requirement.
BufferedWriterShutdownMode::ZeroPadToNextMultiple(2) => {
expect.push(Op::Write {
offset: expect_next_offset,
// it's legitimate for pad-to-next multiple 2 to be < alignment requirement 4 inferred from buffer capacity
buf: b"m\0".to_vec(),
});
}
BufferedWriterShutdownMode::ZeroPadToNextMultiple(_) => unimplemented!(),
BufferedWriterShutdownMode::PadThenTruncate => {
expect.push(Op::Write {
offset: expect_next_offset,
buf: b"m\0\0\0".to_vec(),
});
expect.push(Op::SetLen { len: 13 });
}
}
let (_, recorder) = writer.shutdown(mode, ctx).await?;
assert_eq!(&*recorder.recording.lock().unwrap(), &expect);
Ok(())
}
#[tokio::test]
async fn test_set_len_is_skipped_if_not_needed() -> anyhow::Result<()> {
let ctx = test_ctx();
let ctx = &ctx;
let recorder = RecorderWriter::default();
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
let cap = 4;
let mut writer = BufferedWriter::<_, RecorderWriter>::new(
recorder,
0,
|| IoBufferMut::with_capacity(cap),
gate.enter()?,
cancel,
ctx,
tracing::Span::none(),
);
// write a multiple of `cap`
writer.write_buffered_borrowed(b"abc", ctx).await?;
writer.write_buffered_borrowed(b"defgh", ctx).await?;
let (_, recorder) = writer
.shutdown(BufferedWriterShutdownMode::PadThenTruncate, ctx)
.await?;
let expect = {
[(0, b"abcd"), (4, b"efgh")]
.into_iter()
.map(|(offset, v)| Op::Write {
offset,
buf: v[..].to_vec(),
})
.collect::<Vec<_>>()
};
writer.write_buffered_borrowed(b"e", ctx).await?;
writer.write_buffered_borrowed(b"fg", ctx).await?;
writer.write_buffered_borrowed(b"hi", ctx).await?;
writer.write_buffered_borrowed(b"j", ctx).await?;
writer.write_buffered_borrowed(b"klmno", ctx).await?;
let (_, recorder) = writer.shutdown(ctx).await?;
assert_eq!(
&*recorder.recording.lock().unwrap(),
&expect,
"set_len should not be called if the buffer is already aligned"
recorder.get_writes(),
{
let expect: &[&[u8]] = &[b"ab", b"cd", b"ef", b"gh", b"ij", b"kl", b"mn", b"o"];
expect
}
.iter()
.map(|v| v[..].to_vec())
.collect::<Vec<_>>()
);
Ok(())
}
}

View File

@@ -1,7 +1,7 @@
use std::ops::ControlFlow;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, info_span, warn};
use tracing::{Instrument, info, info_span, warn};
use utils::sync::duplex;
use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter};
@@ -18,7 +18,7 @@ pub struct FlushHandle<Buf, W> {
pub struct FlushHandleInner<Buf, W> {
/// A bi-directional channel that sends (buffer, offset) for writes,
/// and receives recyled buffer.
channel: duplex::mpsc::Duplex<Request<Buf>, FullSlice<Buf>>,
channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
/// Join handle for the background flush task.
join_handle: tokio::task::JoinHandle<Result<W, FlushTaskError>>,
}
@@ -27,27 +27,9 @@ struct FlushRequest<Buf> {
slice: FullSlice<Buf>,
offset: u64,
#[cfg(test)]
ready_to_flush_rx: Option<tokio::sync::oneshot::Receiver<()>>,
ready_to_flush_rx: tokio::sync::oneshot::Receiver<()>,
#[cfg(test)]
done_flush_tx: Option<tokio::sync::oneshot::Sender<()>>,
}
pub struct ShutdownRequest {
pub set_len: Option<u64>,
}
enum Request<Buf> {
Flush(FlushRequest<Buf>),
Shutdown(ShutdownRequest),
}
impl<Buf> Request<Buf> {
fn op_str(&self) -> &'static str {
match self {
Request::Flush(_) => "flush",
Request::Shutdown(_) => "shutdown",
}
}
done_flush_tx: tokio::sync::oneshot::Sender<()>,
}
/// Constructs a request and a control object for a new flush operation.
@@ -69,8 +51,8 @@ fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>,
let request = FlushRequest {
slice,
offset,
ready_to_flush_rx: Some(ready_to_flush_rx),
done_flush_tx: Some(done_flush_tx),
ready_to_flush_rx,
done_flush_tx,
};
(request, control)
}
@@ -177,7 +159,10 @@ where
let (request, flush_control) = new_flush_op(slice, offset);
// Submits the buffer to the background task.
self.send(Request::Flush(request)).await?;
let submit = self.inner_mut().channel.send(request).await;
if submit.is_err() {
return self.handle_error().await;
}
// Wait for an available buffer from the background flush task.
// This is the BACKPRESSURE mechanism: if the flush task can't keep up,
@@ -189,28 +174,15 @@ where
Ok((recycled, flush_control))
}
/// Sends poison pill to flush task and waits for it to exit.
pub async fn shutdown(&mut self, req: ShutdownRequest) -> Result<W, FlushTaskError> {
self.send(Request::Shutdown(req)).await?;
self.wait().await
}
async fn send(&mut self, request: Request<Buf>) -> Result<(), FlushTaskError> {
let submit = self.inner_mut().channel.send(request).await;
if submit.is_err() {
return self.handle_error().await;
}
Ok(())
}
async fn handle_error<T>(&mut self) -> Result<T, FlushTaskError> {
Err(self
.wait()
.shutdown()
.await
.expect_err("flush task only disconnects duplex if it exits with an error"))
}
async fn wait(&mut self) -> Result<W, FlushTaskError> {
/// Cleans up the channel, join the flush task.
pub async fn shutdown(&mut self) -> Result<W, FlushTaskError> {
let handle = self
.inner
.take()
@@ -232,7 +204,7 @@ where
pub struct FlushBackgroundTask<Buf, W> {
/// A bi-directional channel that receives (buffer, offset) for writes,
/// and send back recycled buffer.
channel: duplex::mpsc::Duplex<FullSlice<Buf>, Request<Buf>>,
channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
/// A writter for persisting data to disk.
writer: W,
ctx: RequestContext,
@@ -254,7 +226,7 @@ where
{
/// Creates a new background flush task.
fn new(
channel: duplex::mpsc::Duplex<FullSlice<Buf>, Request<Buf>>,
channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
file: W,
gate_guard: utils::sync::gate::GateGuard,
cancel: CancellationToken,
@@ -273,9 +245,15 @@ where
async fn run(mut self) -> Result<W, FlushTaskError> {
// Exit condition: channel is closed and there is no remaining buffer to be flushed
while let Some(request) = self.channel.recv().await {
let op_kind = request.op_str();
#[cfg(test)]
{
// In test, wait for control to signal that we are ready to flush.
if request.ready_to_flush_rx.await.is_err() {
tracing::debug!("control dropped");
}
}
// Perform the requested operation.
// Write slice to disk at `offset`.
//
// Error handling happens according to the current policy of crashing
// on fatal IO errors and retrying in place otherwise (deeming all other errors retryable).
@@ -284,112 +262,52 @@ where
//
// TODO: use utils::backoff::retry once async closures are actually usable
//
let mut request_storage = Some(request);
let mut slice_storage = Some(request.slice);
for attempt in 1.. {
if self.cancel.is_cancelled() {
return Err(FlushTaskError::Cancelled);
}
let result = async {
let request: Request<Buf> = request_storage .take().expect(
if attempt > 1 {
info!("retrying flush");
}
let slice = slice_storage.take().expect(
"likely previous invocation of this future didn't get polled to completion",
);
match &request {
Request::Shutdown(ShutdownRequest { set_len: None }) => {
request_storage = Some(request);
return ControlFlow::Break(());
},
Request::Flush(_) | Request::Shutdown(ShutdownRequest { set_len: Some(_) }) => {
},
}
if attempt > 1 {
warn!(op=%request.op_str(), "retrying");
}
// borrows so we can async move the requests into async block while not moving these borrows here
let writer = &self.writer;
let request_storage = &mut request_storage;
let ctx = &self.ctx;
let io_fut = match request {
Request::Flush(FlushRequest { slice, offset, #[cfg(test)] ready_to_flush_rx, #[cfg(test)] done_flush_tx }) => futures::future::Either::Left(async move {
#[cfg(test)]
if let Some(ready_to_flush_rx) = ready_to_flush_rx {
{
// In test, wait for control to signal that we are ready to flush.
if ready_to_flush_rx.await.is_err() {
tracing::debug!("control dropped");
}
}
}
let (slice, res) = writer.write_all_at(slice, offset, ctx).await;
*request_storage = Some(Request::Flush(FlushRequest {
slice,
offset,
#[cfg(test)]
ready_to_flush_rx: None, // the contract is that we notify before first attempt
#[cfg(test)]
done_flush_tx
}));
res
}),
Request::Shutdown(ShutdownRequest { set_len }) => futures::future::Either::Right(async move {
let set_len = set_len.expect("we filter out the None case above");
let res = writer.set_len(set_len, ctx).await;
*request_storage = Some(Request::Shutdown(ShutdownRequest {
set_len: Some(set_len),
}));
res
}),
};
// Don't cancel the io_fut by doing tokio::select with self.cancel.cancelled().
// Don't cancel this write by doing tokio::select with self.cancel.cancelled().
// The underlying tokio-epoll-uring slot / kernel operation is still ongoing and occupies resources.
// If we retry indefinitely, we'll deplete those resources.
// Future: teach tokio-epoll-uring io_uring operation cancellation, but still,
// wait for cancelled ops to complete and discard their error.
let res = io_fut.await;
let (slice, res) = self.writer.write_all_at(slice, request.offset, &self.ctx).await;
slice_storage = Some(slice);
let res = res.maybe_fatal_err("owned_buffers_io flush");
let Err(err) = res else {
if attempt > 1 {
warn!(op=%op_kind, "retry succeeded");
}
return ControlFlow::Break(());
};
warn!(%err, "error flushing buffered writer buffer to disk, retrying after backoff");
utils::backoff::exponential_backoff(attempt, 1.0, 10.0, &self.cancel).await;
ControlFlow::Continue(())
}
.instrument(info_span!("attempt", %attempt, %op_kind))
.instrument(info_span!("flush_attempt", %attempt))
.await;
match result {
ControlFlow::Break(()) => break,
ControlFlow::Continue(()) => continue,
}
}
let request = request_storage.expect("loop must have run at least once");
let slice = slice_storage.expect("loop must have run at least once");
let slice = match request {
Request::Flush(FlushRequest {
slice,
#[cfg(test)]
mut done_flush_tx,
..
}) => {
#[cfg(test)]
{
// In test, tell control we are done flushing buffer.
if done_flush_tx.take().expect("always Some").send(()).is_err() {
tracing::debug!("control dropped");
}
}
slice
#[cfg(test)]
{
// In test, tell control we are done flushing buffer.
if request.done_flush_tx.send(()).is_err() {
tracing::debug!("control dropped");
}
Request::Shutdown(_) => {
// next iteration will observe recv() returning None
continue;
}
};
}
// Sends the buffer back to the handle for reuse. The handle is in charged of cleaning the buffer.
let send_res = self.channel.send(slice).await;
if send_res.is_err() {
if self.channel.send(slice).await.is_err() {
// Although channel is closed. Still need to finish flushing the remaining buffers.
continue;
}

View File

@@ -33,10 +33,6 @@ impl OwnedAsyncWriter for TempVirtualFile {
) -> impl std::future::Future<Output = (FullSlice<Buf>, std::io::Result<()>)> + Send {
VirtualFile::write_all_at(self, buf, offset, ctx)
}
async fn set_len(&self, len: u64, ctx: &RequestContext) -> std::io::Result<()> {
VirtualFile::set_len(self, len, ctx).await
}
}
impl Drop for TempVirtualFile {

18
poetry.lock generated
View File

@@ -1274,14 +1274,14 @@ files = [
[[package]]
name = "h11"
version = "0.16.0"
version = "0.14.0"
description = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1"
optional = false
python-versions = ">=3.8"
python-versions = ">=3.7"
groups = ["main"]
files = [
{file = "h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86"},
{file = "h11-0.16.0.tar.gz", hash = "sha256:4e35b956cf45792e4caa5885e69fba00bdbc6ffafbfa020300e549b208ee5ff1"},
{file = "h11-0.14.0-py3-none-any.whl", hash = "sha256:e3fe4ac4b851c468cc8363d500db52c2ead036020723024a109d37346efaa761"},
{file = "h11-0.14.0.tar.gz", hash = "sha256:8f19fbbe99e72420ff35c00b27a34cb9937e902a8b810e2c88300c6f0a3b699d"},
]
[[package]]
@@ -1314,25 +1314,25 @@ files = [
[[package]]
name = "httpcore"
version = "1.0.9"
version = "1.0.3"
description = "A minimal low-level HTTP client."
optional = false
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "httpcore-1.0.9-py3-none-any.whl", hash = "sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55"},
{file = "httpcore-1.0.9.tar.gz", hash = "sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8"},
{file = "httpcore-1.0.3-py3-none-any.whl", hash = "sha256:9a6a501c3099307d9fd76ac244e08503427679b1e81ceb1d922485e2f2462ad2"},
{file = "httpcore-1.0.3.tar.gz", hash = "sha256:5c0f9546ad17dac4d0772b0808856eb616eb8b48ce94f49ed819fd6982a8a544"},
]
[package.dependencies]
certifi = "*"
h11 = ">=0.16"
h11 = ">=0.13,<0.15"
[package.extras]
asyncio = ["anyio (>=4.0,<5.0)"]
http2 = ["h2 (>=3,<5)"]
socks = ["socksio (==1.*)"]
trio = ["trio (>=0.22.0,<1.0)"]
trio = ["trio (>=0.22.0,<0.24.0)"]
[[package]]
name = "httpx"

View File

@@ -8,7 +8,7 @@ use crate::error::{ErrorKind, ReportableError, UserFacingError};
use crate::proxy::retry::CouldRetry;
/// A go-to error message which doesn't leak any detail.
pub(crate) const REQUEST_FAILED: &str = "Control plane request failed";
pub(crate) const REQUEST_FAILED: &str = "Console request failed";
/// Common console API error.
#[derive(Debug, Error)]

View File

@@ -11,7 +11,6 @@ bench = []
anyhow.workspace = true
async-stream.workspace = true
bytes.workspace = true
camino.workspace = true
clap = { workspace = true, features = ["derive"] }
const_format.workspace = true
futures.workspace = true
@@ -20,14 +19,12 @@ futures-util.workspace = true
humantime.workspace = true
hyper = { workspace = true, features = ["full"] }
http-body-util.workspace = true
http-utils.workspace = true
hyper-util = "0.1"
once_cell.workspace = true
parking_lot.workspace = true
prost.workspace = true
tonic.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-rustls.workspace = true
tracing.workspace = true
metrics.workspace = true
utils.workspace = true

View File

@@ -17,13 +17,10 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use camino::Utf8PathBuf;
use clap::{Parser, command};
use futures::future::OptionFuture;
use futures_core::Stream;
use futures_util::StreamExt;
use http_body_util::Full;
use http_utils::tls_certs::ReloadingCertificateResolver;
use hyper::body::Incoming;
use hyper::header::CONTENT_TYPE;
use hyper::service::service_fn;
@@ -41,7 +38,7 @@ use storage_broker::proto::{
FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
SafekeeperTimelineInfo, SubscribeByFilterRequest, SubscribeSafekeeperInfoRequest, TypedMessage,
};
use storage_broker::{DEFAULT_KEEPALIVE_INTERVAL, parse_proto_ttid};
use storage_broker::{DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR, parse_proto_ttid};
use tokio::net::TcpListener;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
@@ -62,25 +59,12 @@ project_build_tag!(BUILD_TAG);
const DEFAULT_CHAN_SIZE: usize = 32;
const DEFAULT_ALL_KEYS_CHAN_SIZE: usize = 16384;
const DEFAULT_SSL_KEY_FILE: &str = "server.key";
const DEFAULT_SSL_CERT_FILE: &str = "server.crt";
const DEFAULT_SSL_CERT_RELOAD_PERIOD: &str = "60s";
#[derive(Parser, Debug)]
#[command(version = GIT_VERSION, about = "Broker for neon storage nodes communication", long_about = None)]
#[clap(group(
clap::ArgGroup::new("listen-addresses")
.required(true)
.multiple(true)
.args(&["listen_addr", "listen_https_addr"]),
))]
struct Args {
/// Endpoint to listen HTTP on.
#[arg(short, long)]
listen_addr: Option<SocketAddr>,
/// Endpoint to listen HTTPS on.
#[arg(long)]
listen_https_addr: Option<SocketAddr>,
/// Endpoint to listen on.
#[arg(short, long, default_value = DEFAULT_LISTEN_ADDR)]
listen_addr: SocketAddr,
/// Size of the queue to the per timeline subscriber.
#[arg(long, default_value_t = DEFAULT_CHAN_SIZE)]
timeline_chan_size: usize,
@@ -88,20 +72,11 @@ struct Args {
#[arg(long, default_value_t = DEFAULT_ALL_KEYS_CHAN_SIZE)]
all_keys_chan_size: usize,
/// HTTP/2 keepalive interval.
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_KEEPALIVE_INTERVAL)]
#[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_KEEPALIVE_INTERVAL)]
http2_keepalive_interval: Duration,
/// Format for logging, either 'plain' or 'json'.
#[arg(long, default_value = "plain")]
log_format: String,
/// Path to a file with certificate's private key for https API.
#[arg(long, default_value = DEFAULT_SSL_KEY_FILE)]
ssl_key_file: Utf8PathBuf,
/// Path to a file with a X509 certificate for https API.
#[arg(long, default_value = DEFAULT_SSL_CERT_FILE)]
ssl_cert_file: Utf8PathBuf,
/// Period to reload certificate and private key from files.
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_SSL_CERT_RELOAD_PERIOD)]
ssl_cert_reload_period: Duration,
}
/// Id of publisher for registering in maps
@@ -699,50 +674,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
};
let storage_broker_server = BrokerServiceServer::new(storage_broker_impl);
let http_listener = match &args.listen_addr {
Some(addr) => {
info!("listening HTTP on {}", addr);
Some(TcpListener::bind(addr).await?)
}
None => None,
};
let (https_listener, tls_acceptor) = match &args.listen_https_addr {
Some(addr) => {
let listener = TcpListener::bind(addr).await?;
let cert_resolver = ReloadingCertificateResolver::new(
"main",
&args.ssl_key_file,
&args.ssl_cert_file,
args.ssl_cert_reload_period,
)
.await?;
let mut tls_config = rustls::ServerConfig::builder()
.with_no_client_auth()
.with_cert_resolver(cert_resolver);
// Tonic is HTTP/2 only and it negotiates it with ALPN.
tls_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(tls_config));
info!("listening HTTPS on {}", addr);
(Some(listener), Some(acceptor))
}
None => (None, None),
};
// grpc is served along with http1 for metrics on a single port, hence we
// don't use tonic's Server.
let tcp_listener = TcpListener::bind(&args.listen_addr).await?;
info!("listening on {}", &args.listen_addr);
loop {
let (conn, is_https) = tokio::select! {
Some(conn) = OptionFuture::from(http_listener.as_ref().map(|l| l.accept())) => (conn, false),
Some(conn) = OptionFuture::from(https_listener.as_ref().map(|l| l.accept())) => (conn, true),
};
let (tcp_stream, addr) = match conn {
let (stream, addr) = match tcp_listener.accept().await {
Ok(v) => v,
Err(e) => {
info!("couldn't accept connection: {e}");
@@ -797,32 +734,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
.await;
let tls_acceptor = tls_acceptor.clone();
tokio::task::spawn(async move {
let res = if is_https {
let tls_acceptor =
tls_acceptor.expect("tls_acceptor is set together with https_listener");
let tls_stream = match tls_acceptor.accept(tcp_stream).await {
Ok(tls_stream) => tls_stream,
Err(e) => {
info!("error accepting TLS connection from {addr}: {e}");
return;
}
};
builder
.serve_connection(TokioIo::new(tls_stream), service_fn_)
.await
} else {
builder
.serve_connection(TokioIo::new(tcp_stream), service_fn_)
.await
};
let res = builder
.serve_connection(TokioIo::new(stream), service_fn_)
.await;
if let Err(e) = res {
info!(%is_https, "error serving connection from {addr}: {e}");
info!("error serving connection from {addr}: {e}");
}
});
}

View File

@@ -72,7 +72,6 @@ impl HttpState {
neon_metrics: NeonMetrics::new(build_info),
allowlist_routes: &[
"/status",
"/live",
"/ready",
"/metrics",
"/profile/cpu",
@@ -1261,8 +1260,16 @@ async fn handle_step_down(req: Request<Body>) -> Result<Response<Body>, ApiError
ForwardOutcome::NotForwarded(req) => req,
};
let state = get_state(&req);
let result = state.service.step_down().await;
// Spawn a background task: once we start stepping down, we must finish: if the client drops
// their request we should avoid stopping in some part-stepped-down state.
let handle = tokio::spawn(async move {
let state = get_state(&req);
state.service.step_down().await
});
let result = handle
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
json_response(StatusCode::OK, result)
}
@@ -1394,8 +1401,6 @@ async fn handle_reconcile_all(req: Request<Body>) -> Result<Response<Body>, ApiE
}
/// Status endpoint is just used for checking that our HTTP listener is up
///
/// This serves as our k8s startup probe.
async fn handle_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
@@ -1407,30 +1412,6 @@ async fn handle_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
json_response(StatusCode::OK, ())
}
/// Liveness endpoint indicates that this storage controller is in a state
/// where it can fulfill it's responsibilties. Namely, startup has finished
/// and it is the current leader.
///
/// This serves as our k8s liveness probe.
async fn handle_live(req: Request<Body>) -> Result<Response<Body>, ApiError> {
let req = match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
return res;
}
ForwardOutcome::NotForwarded(req) => req,
};
let state = get_state(&req);
let live = state.service.startup_complete.is_ready()
&& state.service.get_leadership_status() == LeadershipStatus::Leader;
if live {
json_response(StatusCode::OK, ())
} else {
json_response(StatusCode::SERVICE_UNAVAILABLE, ())
}
}
/// Readiness endpoint indicates when we're done doing startup I/O (e.g. reconciling
/// with remote pageserver nodes). This is intended for use as a kubernetes readiness probe.
async fn handle_ready(req: Request<Body>) -> Result<Response<Body>, ApiError> {
@@ -1764,7 +1745,6 @@ async fn maybe_forward(req: Request<Body>) -> ForwardOutcome {
const NOT_FOR_FORWARD: &[&str] = &[
"/control/v1/step_down",
"/status",
"/live",
"/ready",
"/metrics",
"/profile/cpu",
@@ -1989,9 +1969,6 @@ pub fn make_router(
.get("/status", |r| {
named_request_span(r, handle_status, RequestName("status"))
})
.get("/live", |r| {
named_request_span(r, handle_live, RequestName("live"))
})
.get("/ready", |r| {
named_request_span(r, handle_ready, RequestName("ready"))
})

View File

@@ -43,19 +43,6 @@ impl Leadership {
&self,
) -> Result<(Option<ControllerPersistence>, Option<GlobalObservedState>)> {
let leader = self.current_leader().await?;
if leader.as_ref().map(|l| &l.address)
== self
.config
.address_for_peers
.as_ref()
.map(Uri::to_string)
.as_ref()
{
// We already are the current leader. This is a restart.
return Ok((leader, None));
}
let leader_step_down_state = if let Some(ref leader) = leader {
if self.config.start_as_candidate {
self.request_step_down(leader).await

View File

@@ -196,7 +196,7 @@ struct Cli {
ssl_cert_reload_period: humantime::Duration,
/// Trusted root CA certificates to use in https APIs.
#[arg(long)]
ssl_ca_file: Option<Utf8PathBuf>,
ssl_ca_file: Option<PathBuf>,
/// Neon local specific flag. When set, ignore [`Cli::control_plane_url`] and deliver
/// the compute notification directly (instead of via control plane).

View File

@@ -55,12 +55,9 @@ impl ResponseErrorMessageExt for reqwest::Response {
}
}
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
#[derive(Serialize, Deserialize, Debug, Default)]
pub(crate) struct GlobalObservedState(pub(crate) HashMap<TenantShardId, ObservedState>);
const STEP_DOWN_RETRIES: u32 = 8;
const STEP_DOWN_TIMEOUT: Duration = Duration::from_secs(1);
impl PeerClient {
pub(crate) fn new(http_client: reqwest::Client, uri: Uri, jwt: Option<String>) -> Self {
Self {
@@ -79,7 +76,7 @@ impl PeerClient {
req
};
let req = req.timeout(STEP_DOWN_TIMEOUT);
let req = req.timeout(Duration::from_secs(2));
let res = req
.send()
@@ -97,7 +94,8 @@ impl PeerClient {
}
/// Request the peer to step down and return its current observed state
/// All errors are re-tried
/// All errors are retried with exponential backoff for a maximum of 4 attempts.
/// Assuming all retries are performed, the function times out after roughly 4 seconds.
pub(crate) async fn step_down(
&self,
cancel: &CancellationToken,
@@ -106,7 +104,7 @@ impl PeerClient {
|| self.request_step_down(),
|_e| false,
2,
STEP_DOWN_RETRIES,
4,
"Send step down request",
cancel,
)

View File

@@ -133,8 +133,6 @@ pub(crate) enum DatabaseOperation {
InsertTimelineImport,
UpdateTimelineImport,
DeleteTimelineImport,
ListTimelineImports,
IsTenantImportingTimeline,
}
#[must_use]
@@ -1642,30 +1640,6 @@ impl Persistence {
.await
}
pub(crate) async fn list_timeline_imports(&self) -> DatabaseResult<Vec<TimelineImport>> {
use crate::schema::timeline_imports::dsl;
let persistent = self
.with_measured_conn(DatabaseOperation::ListTimelineImports, move |conn| {
Box::pin(async move {
let from_db: Vec<TimelineImportPersistence> =
dsl::timeline_imports.load(conn).await?;
Ok(from_db)
})
})
.await?;
let imports: Result<Vec<TimelineImport>, _> = persistent
.into_iter()
.map(TimelineImport::from_persistent)
.collect();
match imports {
Ok(ok) => Ok(ok.into_iter().collect()),
Err(err) => Err(DatabaseError::Logical(format!(
"failed to deserialize import: {err}"
))),
}
}
pub(crate) async fn delete_timeline_import(
&self,
tenant_id: TenantId,
@@ -1769,25 +1743,6 @@ impl Persistence {
})
.await
}
pub(crate) async fn is_tenant_importing_timeline(
&self,
tenant_id: TenantId,
) -> DatabaseResult<bool> {
use crate::schema::timeline_imports::dsl;
self.with_measured_conn(DatabaseOperation::IsTenantImportingTimeline, move |conn| {
Box::pin(async move {
let imports: i64 = dsl::timeline_imports
.filter(dsl::tenant_id.eq(tenant_id.to_string()))
.count()
.get_result(conn)
.await?;
Ok(imports > 0)
})
})
.await
}
}
pub(crate) fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> {

View File

@@ -11,7 +11,7 @@ use std::num::NonZeroU32;
use std::ops::{Deref, DerefMut};
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, OnceLock};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use anyhow::Context;
@@ -97,9 +97,7 @@ use crate::tenant_shard::{
ReconcileNeeded, ReconcileResult, ReconcileWaitError, ReconcilerStatus, ReconcilerWaiter,
ScheduleOptimization, ScheduleOptimizationAction, TenantShard,
};
use crate::timeline_import::{
ShardImportStatuses, TimelineImport, TimelineImportState, UpcallClient,
};
use crate::timeline_import::{ShardImportStatuses, TimelineImport, UpcallClient};
const WAITER_FILL_DRAIN_POLL_TIMEOUT: Duration = Duration::from_millis(500);
@@ -526,9 +524,6 @@ pub struct Service {
/// HTTP client with proper CA certs.
http_client: reqwest::Client,
/// Handle for the step down background task if one was ever requested
step_down_barrier: OnceLock<tokio::sync::watch::Receiver<Option<GlobalObservedState>>>,
}
impl From<ReconcileWaitError> for ApiError {
@@ -880,40 +875,6 @@ impl Service {
});
}
// Reconcile the timeline imports:
// 1. Mark each tenant shard of tenants with an importing timeline as importing.
// 2. Finalize the completed imports in the background. This handles the case where
// the previous storage controller instance shut down whilst finalizing imports.
let imports = self.persistence.list_timeline_imports().await;
match imports {
Ok(mut imports) => {
{
let mut locked = self.inner.write().unwrap();
for import in &imports {
locked
.tenants
.range_mut(TenantShardId::tenant_range(import.tenant_id))
.for_each(|(_id, shard)| {
shard.importing = TimelineImportState::Importing
});
}
}
imports.retain(|import| import.is_complete());
tokio::task::spawn({
let finalize_imports_self = self.clone();
async move {
finalize_imports_self
.finalize_timeline_imports(imports)
.await
}
});
}
Err(err) => {
tracing::error!("Could not retrieve completed imports from database: {err}");
}
}
tracing::info!(
"Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)"
);
@@ -1784,7 +1745,6 @@ impl Service {
tenant_op_locks: Default::default(),
node_op_locks: Default::default(),
http_client,
step_down_barrier: Default::default(),
});
let result_task_this = this.clone();
@@ -3792,22 +3752,6 @@ impl Service {
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
let is_import = create_req.is_import();
if is_import {
// Ensure that there is no split on-going.
// [`Self::tenant_shard_split`] holds the exclusive tenant lock
// for the duration of the split, but here we handle the case
// where we restarted and the split is being aborted.
let locked = self.inner.read().unwrap();
let splitting = locked
.tenants
.range(TenantShardId::tenant_range(tenant_id))
.any(|(_id, shard)| shard.splitting != SplitState::Idle);
if splitting {
return Err(ApiError::Conflict("Tenant is splitting shard".to_string()));
}
}
let timeline_info = self
.tenant_timeline_create_pageservers(tenant_id, create_req)
.await?;
@@ -3845,14 +3789,6 @@ impl Service {
.context("timeline import insert")
.map_err(ApiError::InternalServerError)?;
// Set the importing flag on the tenant shards
self.inner
.write()
.unwrap()
.tenants
.range_mut(TenantShardId::tenant_range(tenant_id))
.for_each(|(_id, shard)| shard.importing = TimelineImportState::Importing);
match inserted {
true => {
tracing::info!(%tenant_id, %timeline_id, "Inserted timeline import");
@@ -3929,9 +3865,12 @@ impl Service {
self: &Arc<Self>,
import: TimelineImport,
) -> anyhow::Result<()> {
tracing::info!("Finalizing timeline import");
// TODO(vlad): On start-up, load up the imports and notify cplane of the
// ones that have been completed. This assumes the new cplane API will
// be idempotent. If that's not possible, bang a flag in the database.
// https://github.com/neondatabase/neon/issues/11570
pausable_failpoint!("timeline-import-pre-cplane-notification");
tracing::info!("Finalizing timeline import");
let import_failed = import.completion_error().is_some();
@@ -3975,13 +3914,6 @@ impl Service {
tracing::warn!("Failed to delete timeline import entry from database: {err}");
}
self.inner
.write()
.unwrap()
.tenants
.range_mut(TenantShardId::tenant_range(import.tenant_id))
.for_each(|(_id, shard)| shard.importing = TimelineImportState::Idle);
// TODO(vlad): Timeline creations in import mode do not return a correct initdb lsn,
// so we can't create the timeline on the safekeepers. Fix by moving creation here.
// https://github.com/neondatabase/neon/issues/11569
@@ -3990,15 +3922,6 @@ impl Service {
Ok(())
}
async fn finalize_timeline_imports(self: &Arc<Self>, imports: Vec<TimelineImport>) {
futures::future::join_all(
imports
.into_iter()
.map(|import| self.finalize_timeline_import(import)),
)
.await;
}
async fn timeline_active_on_all_shards(
self: &Arc<Self>,
import: &TimelineImport,
@@ -4965,7 +4888,6 @@ impl Service {
is_reconciling: shard.reconciler.is_some(),
is_pending_compute_notification: shard.pending_compute_notification,
is_splitting: matches!(shard.splitting, SplitState::Splitting),
is_importing: shard.importing == TimelineImportState::Importing,
scheduling_policy: shard.get_scheduling_policy(),
preferred_az_id: shard.preferred_az().map(ToString::to_string),
})
@@ -5456,27 +5378,6 @@ impl Service {
.enter()
.map_err(|_| ApiError::ShuttingDown)?;
// Timeline imports on the pageserver side can't handle shard-splits.
// If the tenant is importing a timeline, dont't shard split it.
match self
.persistence
.is_tenant_importing_timeline(tenant_id)
.await
{
Ok(importing) => {
if importing {
return Err(ApiError::Conflict(
"Cannot shard split during timeline import".to_string(),
));
}
}
Err(err) => {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Failed to check for running imports: {err}"
)));
}
}
let new_shard_count = ShardCount::new(split_req.new_shard_count);
let new_stripe_size = split_req.new_stripe_size;
@@ -8149,25 +8050,12 @@ impl Service {
candidates.extend(size_candidates);
}
// Filter out tenants in a prohibiting scheduling modes
// and tenants with an ongoing import.
//
// Note that the import check here is oportunistic. An import might start
// after the check before we actually update [`TenantShard::splitting`].
// [`Self::tenant_shard_split`] checks the database whilst holding the exclusive
// tenant lock. Imports might take a long time, so the check here allows us
// to split something else instead of trying the same shard over and over.
// Filter out tenants in a prohibiting scheduling mode.
{
let state = self.inner.read().unwrap();
candidates.retain(|i| {
let shard = state.tenants.get(&i.id);
match shard {
Some(t) => {
t.get_scheduling_policy() == ShardSchedulingPolicy::Active
&& t.importing == TimelineImportState::Idle
}
None => false,
}
let policy = state.tenants.get(&i.id).map(|s| s.get_scheduling_policy());
policy == Some(ShardSchedulingPolicy::Active)
});
}
@@ -8998,59 +8886,27 @@ impl Service {
self.inner.read().unwrap().get_leadership_status()
}
/// Handler for step down requests
///
/// Step down runs in separate task since once it's called it should
/// be driven to completion. Subsequent requests will wait on the same
/// step down task.
pub(crate) async fn step_down(self: &Arc<Self>) -> GlobalObservedState {
let handle = self.step_down_barrier.get_or_init(|| {
let step_down_self = self.clone();
let (tx, rx) = tokio::sync::watch::channel::<Option<GlobalObservedState>>(None);
tokio::spawn(async move {
let state = step_down_self.step_down_task().await;
tx.send(Some(state))
.expect("Task Arc<Service> keeps receiver alive");
});
rx
});
handle
.clone()
.wait_for(|observed_state| observed_state.is_some())
.await
.expect("Task Arc<Service> keeps sender alive")
.deref()
.clone()
.expect("Checked above")
}
async fn step_down_task(&self) -> GlobalObservedState {
pub(crate) async fn step_down(&self) -> GlobalObservedState {
tracing::info!("Received step down request from peer");
failpoint_support::sleep_millis_async!("sleep-on-step-down-handling");
self.inner.write().unwrap().step_down();
let stop_reconciliations =
self.stop_reconciliations(StopReconciliationsReason::SteppingDown);
let mut stop_reconciliations = std::pin::pin!(stop_reconciliations);
// Wait for reconciliations to stop, or terminate this process if they
// fail to stop in time (this indicates a bug in shutdown)
tokio::select! {
_ = self.stop_reconciliations(StopReconciliationsReason::SteppingDown) => {
tracing::info!("Reconciliations stopped, proceeding with step down");
}
_ = async {
failpoint_support::sleep_millis_async!("step-down-delay-timeout");
tokio::time::sleep(Duration::from_secs(10)).await
} => {
tracing::warn!("Step down timed out while waiting for reconciliation gate, terminating process");
let started_at = Instant::now();
// Wait for reconciliations to stop and warn if that's taking a long time
loop {
tokio::select! {
_ = &mut stop_reconciliations => {
tracing::info!("Reconciliations stopped, proceeding with step down");
break;
}
_ = tokio::time::sleep(Duration::from_secs(10)) => {
tracing::warn!(
elapsed_sec=%started_at.elapsed().as_secs(),
"Stopping reconciliations during step down is taking too long"
);
}
// The caller may proceed to act as leader when it sees this request fail: reduce the chance
// of a split-brain situation by terminating this controller instead of leaving it up in a partially-shut-down state.
std::process::exit(1);
}
}

View File

@@ -33,7 +33,6 @@ use crate::scheduler::{
RefCountUpdate, ScheduleContext, ScheduleError, Scheduler, SecondaryShardTag, ShardTag,
};
use crate::service::ReconcileResultRequest;
use crate::timeline_import::TimelineImportState;
use crate::{Sequence, service};
/// Serialization helper
@@ -101,10 +100,6 @@ pub(crate) struct TenantShard {
/// reconciliation, and timeline creation.
pub(crate) splitting: SplitState,
/// Flag indicating whether the tenant has an in-progress timeline import.
/// Used to disallow shard splits while an import is in progress.
pub(crate) importing: TimelineImportState,
/// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
/// is set. This flag is cleared when the tenant is popped off the delay queue.
pub(crate) delayed_reconcile: bool,
@@ -588,7 +583,6 @@ impl TenantShard {
config: TenantConfig::default(),
reconciler: None,
splitting: SplitState::Idle,
importing: TimelineImportState::Idle,
sequence: Sequence(1),
delayed_reconcile: false,
waiter: Arc::new(SeqWait::new(Sequence(0))),
@@ -1850,8 +1844,6 @@ impl TenantShard {
config: serde_json::from_str(&tsp.config).unwrap(),
reconciler: None,
splitting: tsp.splitting,
// Filled in during [`Service::startup_reconcile`]
importing: TimelineImportState::Idle,
waiter: Arc::new(SeqWait::new(Sequence::initial())),
error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
last_error: Arc::default(),

View File

@@ -14,12 +14,6 @@ use utils::{
use crate::{persistence::TimelineImportPersistence, service::Config};
#[derive(Deserialize, Serialize, PartialEq, Eq)]
pub(crate) enum TimelineImportState {
Importing,
Idle,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub(crate) struct ShardImportStatuses(pub(crate) HashMap<ShardIndex, ShardImportStatus>);
@@ -109,7 +103,7 @@ impl TimelineImport {
let crnt = occ.get_mut();
if *crnt == status {
Ok(TimelineImportUpdateFollowUp::None)
} else if crnt.is_terminal() && *crnt != status {
} else if crnt.is_terminal() && !status.is_terminal() {
Err(TimelineImportUpdateError::UnexpectedUpdate)
} else {
*crnt = status;

View File

@@ -16,5 +16,4 @@ pytest_plugins = (
"fixtures.slow",
"fixtures.reruns",
"fixtures.fast_import",
"fixtures.pg_config",
)

View File

@@ -501,9 +501,6 @@ class NeonEnvBuilder:
# Flag to use https listener in storage controller, generate local ssl certs,
# and force pageservers and neon_local to use https for storage controller api.
self.use_https_storage_controller_api: bool = False
# Flag to use https listener in storage broker, generate local ssl certs,
# and force pageservers and safekeepers to use https for storage broker api.
self.use_https_storage_broker_api: bool = False
self.pageserver_virtual_file_io_engine: str | None = pageserver_virtual_file_io_engine
self.pageserver_get_vectored_concurrent_io: str | None = (
@@ -1089,7 +1086,7 @@ class NeonEnv:
self.safekeepers: list[Safekeeper] = []
self.pageservers: list[NeonPageserver] = []
self.num_azs = config.num_azs
self.broker = NeonBroker(self, config.use_https_storage_broker_api)
self.broker = NeonBroker(self)
self.pageserver_remote_storage = config.pageserver_remote_storage
self.safekeepers_remote_storage = config.safekeepers_remote_storage
self.pg_version = config.pg_version
@@ -1109,7 +1106,6 @@ class NeonEnv:
config.use_https_pageserver_api
or config.use_https_safekeeper_api
or config.use_https_storage_controller_api
or config.use_https_storage_broker_api
)
self.ssl_ca_file = (
self.repo_dir.joinpath("rootCA.crt") if self.generate_local_ssl_certs else None
@@ -1182,18 +1178,15 @@ class NeonEnv:
# Create the neon_local's `NeonLocalInitConf`
cfg: dict[str, Any] = {
"default_tenant_id": str(self.initial_tenant),
"broker": {},
"broker": {
"listen_addr": self.broker.listen_addr(),
},
"safekeepers": [],
"pageservers": [],
"endpoint_storage": {"port": self.port_distributor.get_port()},
"generate_local_ssl_certs": self.generate_local_ssl_certs,
}
if config.use_https_storage_broker_api:
cfg["broker"]["listen_https_addr"] = self.broker.listen_addr()
else:
cfg["broker"]["listen_addr"] = self.broker.listen_addr()
if self.control_plane_api is not None:
cfg["control_plane_api"] = self.control_plane_api
@@ -1298,11 +1291,7 @@ class NeonEnv:
ps_cfg[key] = value
if self.pageserver_virtual_file_io_mode is not None:
# TODO(christian): https://github.com/neondatabase/neon/issues/11598
if not config.test_may_use_compatibility_snapshot_binaries:
ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode
else:
log.info("ignoring virtual_file_io_mode parametrization for compatibility test")
ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode
if self.pageserver_wal_receiver_protocol is not None:
key, value = PageserverWalReceiverProtocol.to_config_key_value(
@@ -3391,9 +3380,6 @@ class VanillaPostgres(PgProtocol):
"""Return size of pgdatadir subdirectory in bytes."""
return get_dir_size(self.pgdatadir / subdir)
def is_running(self) -> bool:
return self.running
def __enter__(self) -> Self:
return self
@@ -4940,10 +4926,9 @@ class Safekeeper(LogUtils):
class NeonBroker(LogUtils):
"""An object managing storage_broker instance"""
def __init__(self, env: NeonEnv, use_https: bool):
super().__init__(logfile=env.repo_dir / "storage_broker" / "storage_broker.log")
def __init__(self, env: NeonEnv):
super().__init__(logfile=env.repo_dir / "storage_broker.log")
self.env = env
self.scheme = "https" if use_https else "http"
self.port: int = self.env.port_distributor.get_port()
self.running = False
@@ -4966,7 +4951,7 @@ class NeonBroker(LogUtils):
return f"127.0.0.1:{self.port}"
def client_url(self):
return f"{self.scheme}://{self.listen_addr()}"
return f"http://{self.listen_addr()}"
def assert_no_errors(self):
assert_no_errors(self.logfile, "storage_controller", [])

Some files were not shown because too many files have changed in this diff Show More