Compare commits

..

1 Commits

Author SHA1 Message Date
Dmitrii Kovalkov
0a57a85fc0 pageserver: add enable-basebackup-cache feature flag for gradual roll out 2025-06-13 14:51:42 +02:00
126 changed files with 1381 additions and 4412 deletions

View File

@@ -38,11 +38,6 @@ on:
required: false
default: 1
type: number
rerun-failed:
description: 'rerun failed tests to ignore flaky tests'
required: false
default: true
type: boolean
defaults:
run:
@@ -384,7 +379,7 @@ jobs:
- name: Pytest regression tests
continue-on-error: ${{ matrix.lfc_state == 'with-lfc' && inputs.build-type == 'debug' }}
uses: ./.github/actions/run-python-test-set
timeout-minutes: ${{ (inputs.build-type == 'release' && inputs.sanitizers != 'enabled') && 75 || 180 }}
timeout-minutes: ${{ inputs.sanitizers != 'enabled' && 75 || 180 }}
with:
build_type: ${{ inputs.build-type }}
test_selection: regress
@@ -392,14 +387,14 @@ jobs:
run_with_real_s3: true
real_s3_bucket: neon-github-ci-tests
real_s3_region: eu-central-1
rerun_failed: ${{ inputs.rerun-failed }}
rerun_failed: ${{ inputs.test-run-count == 1 }}
pg_version: ${{ matrix.pg_version }}
sanitizers: ${{ inputs.sanitizers }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
# `--session-timeout` is equal to (timeout-minutes - 10 minutes) * 60 seconds.
# Attempt to stop tests gracefully to generate test reports
# until they are forcibly stopped by the stricter `timeout-minutes` limit.
extra_params: --session-timeout=${{ (inputs.build-type == 'release' && inputs.sanitizers != 'enabled') && 3000 || 10200 }} --count=${{ inputs.test-run-count }}
extra_params: --session-timeout=${{ inputs.sanitizers != 'enabled' && 3000 || 10200 }} --count=${{ inputs.test-run-count }}
${{ inputs.test-selection != '' && format('-k "{0}"', inputs.test-selection) || '' }}
env:
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}

View File

@@ -58,7 +58,6 @@ jobs:
test-cfg: ${{ inputs.pg-versions }}
test-selection: ${{ inputs.test-selection }}
test-run-count: ${{ fromJson(inputs.run-count) }}
rerun-failed: false
secrets: inherit
create-test-report:

View File

@@ -199,28 +199,6 @@ jobs:
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
secrets: inherit
validate-compute-manifest:
runs-on: ubuntu-22.04
needs: [ meta, check-permissions ]
# We do need to run this in `.*-rc-pr` because of hotfixes.
if: ${{ contains(fromJSON('["pr", "push-main", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Set up Node.js
uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0
with:
node-version: '24'
- name: Validate manifest against schema
run: |
make -C compute manifest-schema-validation
build-and-test-locally:
needs: [ meta, build-build-tools-image ]
# We do need to run this in `.*-rc-pr` because of hotfixes.

View File

@@ -1,151 +0,0 @@
name: Build and Test Fully
on:
schedule:
# * is a special character in YAML so you have to quote this string
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
- cron: '0 3 * * *' # run once a day, timezone is utc
workflow_dispatch:
defaults:
run:
shell: bash -euxo pipefail {0}
concurrency:
# Allow only one workflow per any non-`main` branch.
group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
cancel-in-progress: true
env:
RUST_BACKTRACE: 1
COPT: '-Werror'
jobs:
tag:
runs-on: [ self-hosted, small ]
container: ${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}.dkr.ecr.${{ vars.AWS_ECR_REGION }}.amazonaws.com/base:pinned
outputs:
build-tag: ${{steps.build-tag.outputs.tag}}
steps:
# Need `fetch-depth: 0` to count the number of commits in the branch
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
fetch-depth: 0
- name: Get build tag
run: |
echo run:$GITHUB_RUN_ID
echo ref:$GITHUB_REF_NAME
echo rev:$(git rev-list --count HEAD)
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
echo "tag=$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
echo "tag=release-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release-proxy" ]]; then
echo "tag=release-proxy-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release-compute" ]]; then
echo "tag=release-compute-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release', 'release-proxy', 'release-compute'"
echo "tag=$GITHUB_RUN_ID" >> $GITHUB_OUTPUT
fi
shell: bash
id: build-tag
build-build-tools-image:
uses: ./.github/workflows/build-build-tools-image.yml
secrets: inherit
build-and-test-locally:
needs: [ tag, build-build-tools-image ]
strategy:
fail-fast: false
matrix:
arch: [ x64, arm64 ]
build-type: [ debug, release ]
uses: ./.github/workflows/_build-and-test-locally.yml
with:
arch: ${{ matrix.arch }}
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
build-tag: ${{ needs.tag.outputs.build-tag }}
build-type: ${{ matrix.build-type }}
rerun-failed: false
test-cfg: '[{"pg_version":"v14", "lfc_state": "with-lfc"},
{"pg_version":"v15", "lfc_state": "with-lfc"},
{"pg_version":"v16", "lfc_state": "with-lfc"},
{"pg_version":"v17", "lfc_state": "with-lfc"},
{"pg_version":"v14", "lfc_state": "without-lfc"},
{"pg_version":"v15", "lfc_state": "without-lfc"},
{"pg_version":"v16", "lfc_state": "without-lfc"},
{"pg_version":"v17", "lfc_state": "withouts-lfc"}]'
secrets: inherit
create-test-report:
needs: [ build-and-test-locally, build-build-tools-image ]
if: ${{ !cancelled() }}
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
contents: write
pull-requests: write
outputs:
report-url: ${{ steps.create-allure-report.outputs.report-url }}
runs-on: [ self-hosted, small ]
container:
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Create Allure report
if: ${{ !cancelled() }}
id: create-allure-report
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
- uses: actions/github-script@60a0d83039c74a4aee543508d2ffcb1c3799cdea # v7.0.1
if: ${{ !cancelled() }}
with:
# Retry script for 5XX server errors: https://github.com/actions/github-script#retries
retries: 5
script: |
const report = {
reportUrl: "${{ steps.create-allure-report.outputs.report-url }}",
reportJsonUrl: "${{ steps.create-allure-report.outputs.report-json-url }}",
}
const coverage = {}
const script = require("./scripts/comment-test-report.js")
await script({
github,
context,
fetch,
report,
coverage,
})

View File

@@ -79,7 +79,6 @@ jobs:
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
build-tag: ${{ needs.tag.outputs.build-tag }}
build-type: ${{ matrix.build-type }}
rerun-failed: false
test-cfg: '[{"pg_version":"v17"}]'
sanitizers: enabled
secrets: inherit

View File

@@ -33,19 +33,11 @@ jobs:
fail-fast: false # allow other variants to continue even if one fails
matrix:
include:
# test only read-only custom scripts in new branch without database maintenance
- target: new_branch
custom_scripts: select_any_webhook_with_skew.sql@300 select_recent_webhook.sql@397 select_prefetch_webhook.sql@3
test_maintenance: false
# test all custom scripts in new branch with database maintenance
- target: new_branch
custom_scripts: insert_webhooks.sql@200 select_any_webhook_with_skew.sql@300 select_recent_webhook.sql@397 select_prefetch_webhook.sql@3 IUD_one_transaction.sql@100
test_maintenance: true
# test all custom scripts in reuse branch with database maintenance
- target: reuse_branch
custom_scripts: insert_webhooks.sql@200 select_any_webhook_with_skew.sql@300 select_recent_webhook.sql@397 select_prefetch_webhook.sql@3 IUD_one_transaction.sql@100
test_maintenance: true
max-parallel: 1 # we want to run each benchmark sequentially to not have noisy neighbors on shared storage (PS, SK)
max-parallel: 1 # we want to run each stripe size sequentially to be able to compare the results
permissions:
contents: write
statuses: write
@@ -153,7 +145,6 @@ jobs:
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
- name: Benchmark database maintenance
if: ${{ matrix.test_maintenance == 'true' }}
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}

View File

@@ -1,175 +0,0 @@
name: large oltp growth
# workflow to grow the reuse branch of large oltp benchmark continuously (about 16 GB per run)
on:
# uncomment to run on push for debugging your PR
# push:
# branches: [ bodobolero/increase_large_oltp_workload ]
schedule:
# * is a special character in YAML so you have to quote this string
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
- cron: '0 6 * * *' # 06:00 UTC
- cron: '0 8 * * *' # 08:00 UTC
- cron: '0 10 * * *' # 10:00 UTC
- cron: '0 12 * * *' # 12:00 UTC
- cron: '0 14 * * *' # 14:00 UTC
- cron: '0 16 * * *' # 16:00 UTC
workflow_dispatch: # adds ability to run this manually
defaults:
run:
shell: bash -euxo pipefail {0}
concurrency:
# Allow only one workflow globally because we need dedicated resources which only exist once
group: large-oltp-growth
cancel-in-progress: true
permissions:
contents: read
jobs:
oltp:
strategy:
fail-fast: false # allow other variants to continue even if one fails
matrix:
include:
# for now only grow the reuse branch, not the other branches.
- target: reuse_branch
custom_scripts:
- grow_action_blocks.sql
- grow_action_kwargs.sql
- grow_device_fingerprint_event.sql
- grow_edges.sql
- grow_hotel_rate_mapping.sql
- grow_ocr_pipeline_results_version.sql
- grow_priceline_raw_response.sql
- grow_relabled_transactions.sql
- grow_state_values.sql
- grow_values.sql
- grow_vertices.sql
- update_accounting_coding_body_tracking_category_selection.sql
- update_action_blocks.sql
- update_action_kwargs.sql
- update_denormalized_approval_workflow.sql
- update_device_fingerprint_event.sql
- update_edges.sql
- update_heron_transaction_enriched_log.sql
- update_heron_transaction_enrichment_requests.sql
- update_hotel_rate_mapping.sql
- update_incoming_webhooks.sql
- update_manual_transaction.sql
- update_ml_receipt_matching_log.sql
- update_ocr_pipeine_results_version.sql
- update_orc_pipeline_step_results.sql
- update_orc_pipeline_step_results_version.sql
- update_priceline_raw_response.sql
- update_quickbooks_transactions.sql
- update_raw_finicity_transaction.sql
- update_relabeled_transactions.sql
- update_state_values.sql
- update_stripe_authorization_event_log.sql
- update_transaction.sql
- update_values.sql
- update_vertices.sql
max-parallel: 1 # we want to run each growth workload sequentially (for now there is just one)
permissions:
contents: write
statuses: write
id-token: write # aws-actions/configure-aws-credentials
env:
TEST_PG_BENCH_DURATIONS_MATRIX: "1h"
TEST_PGBENCH_CUSTOM_SCRIPTS: ${{ join(matrix.custom_scripts, ' ') }}
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
PG_VERSION: 16 # pre-determined by pre-determined project
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
PLATFORM: ${{ matrix.target }}
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Configure AWS credentials # necessary to download artefacts
uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 18000 # 5 hours is currently max associated with IAM role
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Set up Connection String
id: set-up-connstr
run: |
case "${{ matrix.target }}" in
reuse_branch)
CONNSTR=${{ secrets.BENCHMARK_LARGE_OLTP_REUSE_CONNSTR }}
;;
*)
echo >&2 "Unknown target=${{ matrix.target }}"
exit 1
;;
esac
CONNSTR_WITHOUT_POOLER="${CONNSTR//-pooler/}"
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
echo "connstr_without_pooler=${CONNSTR_WITHOUT_POOLER}" >> $GITHUB_OUTPUT
- name: pgbench with custom-scripts
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance
run_in_parallel: false
save_perf_report: true
extra_params: -m remote_cluster --timeout 7200 -k test_perf_oltp_large_tenant_growth
pg_version: ${{ env.PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
- name: Create Allure report
id: create-allure-report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
uses: slackapi/slack-github-action@fcfb566f8b0aab22203f066d80ca1d7e4b5d05b3 # v1.27.1
with:
channel-id: "C06KHQVQ7U3" # on-call-qa-staging-stream
slack-message: |
Periodic large oltp tenant growth increase: ${{ job.status }}
<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
<${{ steps.create-allure-report.outputs.report-url }}|Allure report>
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}

3
Cargo.lock generated
View File

@@ -4465,14 +4465,11 @@ dependencies = [
name = "pageserver_page_api"
version = "0.1.0"
dependencies = [
"anyhow",
"bytes",
"futures",
"pageserver_api",
"postgres_ffi",
"prost 0.13.5",
"thiserror 1.0.69",
"tokio",
"tonic 0.13.1",
"tonic-build",
"utils",

3
compute/.gitignore vendored
View File

@@ -3,6 +3,3 @@ etc/neon_collector.yml
etc/neon_collector_autoscaling.yml
etc/sql_exporter.yml
etc/sql_exporter_autoscaling.yml
# Node.js dependencies
node_modules/

View File

@@ -48,11 +48,3 @@ jsonnetfmt-test:
.PHONY: jsonnetfmt-format
jsonnetfmt-format:
jsonnetfmt --in-place $(jsonnet_files)
.PHONY: manifest-schema-validation
manifest-schema-validation: node_modules
node_modules/.bin/jsonschema validate -d https://json-schema.org/draft/2020-12/schema manifest.schema.json manifest.yaml
node_modules: package.json
npm install
touch node_modules

View File

@@ -149,10 +149,8 @@ RUN case $DEBIAN_VERSION in \
ninja-build git autoconf automake libtool build-essential bison flex libreadline-dev \
zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget ca-certificates pkg-config libssl-dev \
libicu-dev libxslt1-dev liblz4-dev libzstd-dev zstd curl unzip g++ \
libclang-dev \
$VERSION_INSTALLS \
&& apt clean && rm -rf /var/lib/apt/lists/* && \
useradd -ms /bin/bash nonroot -b /home
&& apt clean && rm -rf /var/lib/apt/lists/*
#########################################################################################
#
@@ -540,33 +538,6 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) OPTFLAGS="" && \
make -j $(getconf _NPROCESSORS_ONLN) OPTFLAGS="" install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/vector.control
#########################################################################################
#
# Layer "pg_tpcds-build"
# compile pg_tpcds extension
#
#########################################################################################
FROM build-deps AS pg_tpcds-src
ARG PG_VERSION
WORKDIR /ext-src/
RUN case "${PG_VERSION:?}" in \
"v14" ) \
echo "Skipping pg_tpcds for PG_VERSION=$PG_VERSION" && exit 0 ;; \
* ) \
git clone --recurse-submodules --depth 1 https://github.com/neondatabase-labs/pg_tpcds.git pg_tpcds-src ;; \
esac
FROM pg-build AS pg_tpcds-build
COPY --from=pg_tpcds-src /ext-src/ /ext-src/
WORKDIR /ext-src/
RUN if [ -d pg_tpcds-src ]; then \
cd pg_tpcds-src && \
cmake -Bbuild && \
cmake --build build --target install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_tpcds.control; \
fi
#########################################################################################
#
# Layer "pgjwt-build"
@@ -1086,10 +1057,17 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) && \
#########################################################################################
#
# Layer "build-deps with Rust toolchain installed"
# Layer "pg build with nonroot user and cargo installed"
# This layer is base and common for layers with `pgrx`
#
#########################################################################################
FROM build-deps AS build-deps-with-cargo
FROM pg-build AS pg-build-nonroot-with-cargo
ARG PG_VERSION
RUN apt update && \
apt install --no-install-recommends --no-install-suggests -y curl libclang-dev && \
apt clean && rm -rf /var/lib/apt/lists/* && \
useradd -ms /bin/bash nonroot -b /home
ENV HOME=/home/nonroot
ENV PATH="/home/nonroot/.cargo/bin:$PATH"
@@ -1104,29 +1082,13 @@ RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux
./rustup-init -y --no-modify-path --profile minimal --default-toolchain stable && \
rm rustup-init
#########################################################################################
#
# Layer "pg-build with Rust toolchain installed"
# This layer is base and common for layers with `pgrx`
#
#########################################################################################
FROM pg-build AS pg-build-with-cargo
ARG PG_VERSION
ENV HOME=/home/nonroot
ENV PATH="/home/nonroot/.cargo/bin:$PATH"
USER nonroot
WORKDIR /home/nonroot
COPY --from=build-deps-with-cargo /home/nonroot /home/nonroot
#########################################################################################
#
# Layer "rust extensions"
# This layer is used to build `pgrx` deps
#
#########################################################################################
FROM pg-build-with-cargo AS rust-extensions-build
FROM pg-build-nonroot-with-cargo AS rust-extensions-build
ARG PG_VERSION
RUN case "${PG_VERSION:?}" in \
@@ -1148,7 +1110,7 @@ USER root
# and eventually get merged with `rust-extensions-build`
#
#########################################################################################
FROM pg-build-with-cargo AS rust-extensions-build-pgrx12
FROM pg-build-nonroot-with-cargo AS rust-extensions-build-pgrx12
ARG PG_VERSION
RUN cargo install --locked --version 0.12.9 cargo-pgrx && \
@@ -1165,7 +1127,7 @@ USER root
# and eventually get merged with `rust-extensions-build`
#
#########################################################################################
FROM pg-build-with-cargo AS rust-extensions-build-pgrx14
FROM pg-build-nonroot-with-cargo AS rust-extensions-build-pgrx14
ARG PG_VERSION
RUN cargo install --locked --version 0.14.1 cargo-pgrx && \
@@ -1182,12 +1144,10 @@ USER root
FROM build-deps AS pgrag-src
ARG PG_VERSION
WORKDIR /ext-src
COPY compute/patches/onnxruntime.patch .
WORKDIR /ext-src
RUN wget https://github.com/microsoft/onnxruntime/archive/refs/tags/v1.18.1.tar.gz -O onnxruntime.tar.gz && \
mkdir onnxruntime-src && cd onnxruntime-src && tar xzf ../onnxruntime.tar.gz --strip-components=1 -C . && \
patch -p1 < /ext-src/onnxruntime.patch && \
echo "#nothing to test here" > neon-test.sh
RUN wget https://github.com/neondatabase-labs/pgrag/archive/refs/tags/v0.1.2.tar.gz -O pgrag.tar.gz && \
@@ -1748,7 +1708,6 @@ COPY --from=pg_duckdb-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_repack-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pgaudit-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pgauditlogtofile-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_tpcds-build /usr/local/pgsql/ /usr/local/pgsql/
#########################################################################################
#

View File

@@ -21,8 +21,6 @@ unix_socket_dir=/tmp/
unix_socket_mode=0777
; required for pgbouncer_exporter
ignore_startup_parameters=extra_float_digits
; pidfile for graceful termination
pidfile=/tmp/pgbouncer.pid
;; Disable connection logging. It produces a lot of logs that no one looks at,
;; and we can get similar log entries from the proxy too. We had incidents in

View File

@@ -1,209 +0,0 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "Neon Compute Manifest Schema",
"description": "Schema for Neon compute node configuration manifest",
"type": "object",
"properties": {
"pg_settings": {
"type": "object",
"properties": {
"common": {
"type": "object",
"properties": {
"client_connection_check_interval": {
"type": "string",
"description": "Check for client disconnection interval in milliseconds"
},
"effective_io_concurrency": {
"type": "string",
"description": "Effective IO concurrency setting"
},
"fsync": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether to force fsync to disk"
},
"hot_standby": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether hot standby is enabled"
},
"idle_in_transaction_session_timeout": {
"type": "string",
"description": "Timeout for idle transactions in milliseconds"
},
"listen_addresses": {
"type": "string",
"description": "Addresses to listen on"
},
"log_connections": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether to log connections"
},
"log_disconnections": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether to log disconnections"
},
"log_temp_files": {
"type": "string",
"description": "Size threshold for logging temporary files in KB"
},
"log_error_verbosity": {
"type": "string",
"enum": ["terse", "verbose", "default"],
"description": "Error logging verbosity level"
},
"log_min_error_statement": {
"type": "string",
"description": "Minimum error level for statement logging"
},
"maintenance_io_concurrency": {
"type": "string",
"description": "Maintenance IO concurrency setting"
},
"max_connections": {
"type": "string",
"description": "Maximum number of connections"
},
"max_replication_flush_lag": {
"type": "string",
"description": "Maximum replication flush lag"
},
"max_replication_slots": {
"type": "string",
"description": "Maximum number of replication slots"
},
"max_replication_write_lag": {
"type": "string",
"description": "Maximum replication write lag"
},
"max_wal_senders": {
"type": "string",
"description": "Maximum number of WAL senders"
},
"max_wal_size": {
"type": "string",
"description": "Maximum WAL size"
},
"neon.unstable_extensions": {
"type": "string",
"description": "List of unstable extensions"
},
"neon.protocol_version": {
"type": "string",
"description": "Neon protocol version"
},
"password_encryption": {
"type": "string",
"description": "Password encryption method"
},
"restart_after_crash": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether to restart after crash"
},
"superuser_reserved_connections": {
"type": "string",
"description": "Number of reserved connections for superuser"
},
"synchronous_standby_names": {
"type": "string",
"description": "Names of synchronous standby servers"
},
"wal_keep_size": {
"type": "string",
"description": "WAL keep size"
},
"wal_level": {
"type": "string",
"description": "WAL level"
},
"wal_log_hints": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether to log hints in WAL"
},
"wal_sender_timeout": {
"type": "string",
"description": "WAL sender timeout in milliseconds"
}
},
"required": [
"client_connection_check_interval",
"effective_io_concurrency",
"fsync",
"hot_standby",
"idle_in_transaction_session_timeout",
"listen_addresses",
"log_connections",
"log_disconnections",
"log_temp_files",
"log_error_verbosity",
"log_min_error_statement",
"maintenance_io_concurrency",
"max_connections",
"max_replication_flush_lag",
"max_replication_slots",
"max_replication_write_lag",
"max_wal_senders",
"max_wal_size",
"neon.unstable_extensions",
"neon.protocol_version",
"password_encryption",
"restart_after_crash",
"superuser_reserved_connections",
"synchronous_standby_names",
"wal_keep_size",
"wal_level",
"wal_log_hints",
"wal_sender_timeout"
]
},
"replica": {
"type": "object",
"properties": {
"hot_standby": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether hot standby is enabled for replicas"
}
},
"required": ["hot_standby"]
},
"per_version": {
"type": "object",
"patternProperties": {
"^1[4-7]$": {
"type": "object",
"properties": {
"common": {
"type": "object",
"properties": {
"io_combine_limit": {
"type": "string",
"description": "IO combine limit"
}
}
},
"replica": {
"type": "object",
"properties": {
"recovery_prefetch": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether to enable recovery prefetch for PostgreSQL replicas"
}
}
}
}
}
}
}
},
"required": ["common", "replica", "per_version"]
}
},
"required": ["pg_settings"]
}

View File

@@ -105,17 +105,17 @@ pg_settings:
# Neon hot standby ignores pages that are not in the shared_buffers
recovery_prefetch: "off"
16:
common: {}
common:
replica:
# prefetching of blocks referenced in WAL doesn't make sense for us
# Neon hot standby ignores pages that are not in the shared_buffers
recovery_prefetch: "off"
15:
common: {}
common:
replica:
# prefetching of blocks referenced in WAL doesn't make sense for us
# Neon hot standby ignores pages that are not in the shared_buffers
recovery_prefetch: "off"
14:
common: {}
replica: {}
common:
replica:

View File

@@ -1,37 +0,0 @@
{
"name": "neon-compute",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "neon-compute",
"dependencies": {
"@sourcemeta/jsonschema": "9.3.4"
}
},
"node_modules/@sourcemeta/jsonschema": {
"version": "9.3.4",
"resolved": "https://registry.npmjs.org/@sourcemeta/jsonschema/-/jsonschema-9.3.4.tgz",
"integrity": "sha512-hkujfkZAIGXUs4U//We9faZW8LZ4/H9LqagRYsFSulH/VLcKPNhZyCTGg7AhORuzm27zqENvKpnX4g2FzudYFw==",
"cpu": [
"x64",
"arm64"
],
"license": "AGPL-3.0",
"os": [
"darwin",
"linux",
"win32"
],
"bin": {
"jsonschema": "cli.js"
},
"engines": {
"node": ">=16"
},
"funding": {
"url": "https://github.com/sponsors/sourcemeta"
}
}
}
}

View File

@@ -1,7 +0,0 @@
{
"name": "neon-compute",
"private": true,
"dependencies": {
"@sourcemeta/jsonschema": "9.3.4"
}
}

View File

@@ -1,15 +0,0 @@
diff --git a/cmake/deps.txt b/cmake/deps.txt
index d213b09034..229de2ebf0 100644
--- a/cmake/deps.txt
+++ b/cmake/deps.txt
@@ -22,7 +22,9 @@ dlpack;https://github.com/dmlc/dlpack/archive/refs/tags/v0.6.zip;4d565dd2e5b3132
# it contains changes on top of 3.4.0 which are required to fix build issues.
# Until the 3.4.1 release this is the best option we have.
# Issue link: https://gitlab.com/libeigen/eigen/-/issues/2744
-eigen;https://gitlab.com/libeigen/eigen/-/archive/e7248b26a1ed53fa030c5c459f7ea095dfd276ac/eigen-e7248b26a1ed53fa030c5c459f7ea095dfd276ac.zip;be8be39fdbc6e60e94fa7870b280707069b5b81a
+# Moved to github mirror to avoid gitlab issues.Add commentMore actions
+# Issue link: https://github.com/bazelbuild/bazel-central-registry/issues/4355
+eigen;https://github.com/eigen-mirror/eigen/archive/e7248b26a1ed53fa030c5c459f7ea095dfd276ac/eigen-e7248b26a1ed53fa030c5c459f7ea095dfd276ac.zip;61418a349000ba7744a3ad03cf5071f22ebf860a
flatbuffers;https://github.com/google/flatbuffers/archive/refs/tags/v23.5.26.zip;59422c3b5e573dd192fead2834d25951f1c1670c
fp16;https://github.com/Maratyszcza/FP16/archive/0a92994d729ff76a58f692d3028ca1b64b145d91.zip;b985f6985a05a1c03ff1bb71190f66d8f98a1494
fxdiv;https://github.com/Maratyszcza/FXdiv/archive/63058eff77e11aa15bf531df5dd34395ec3017c8.zip;a5658f4036402dbca7cebee32be57fb8149811e1

View File

@@ -124,10 +124,6 @@ struct Cli {
/// Interval in seconds for collecting installed extensions statistics
#[arg(long, default_value = "3600")]
pub installed_extensions_collection_interval: u64,
/// Run in development mode, skipping VM-specific operations like process termination
#[arg(long, action = clap::ArgAction::SetTrue)]
pub dev: bool,
}
impl Cli {
@@ -163,7 +159,7 @@ fn main() -> Result<()> {
.build()?;
let _rt_guard = runtime.enter();
runtime.block_on(init(cli.dev))?;
runtime.block_on(init())?;
// enable core dumping for all child processes
setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
@@ -202,13 +198,13 @@ fn main() -> Result<()> {
deinit_and_exit(exit_code);
}
async fn init(dev_mode: bool) -> Result<()> {
async fn init() -> Result<()> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL).await?;
let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
thread::spawn(move || {
for sig in signals.forever() {
handle_exit_signal(sig, dev_mode);
handle_exit_signal(sig);
}
});
@@ -267,9 +263,9 @@ fn deinit_and_exit(exit_code: Option<i32>) -> ! {
/// When compute_ctl is killed, send also termination signal to sync-safekeepers
/// to prevent leakage. TODO: it is better to convert compute_ctl to async and
/// wait for termination which would be easy then.
fn handle_exit_signal(sig: i32, dev_mode: bool) {
fn handle_exit_signal(sig: i32) {
info!("received {sig} termination signal");
forward_termination_signal(dev_mode);
forward_termination_signal();
exit(1);
}

View File

@@ -35,7 +35,6 @@ use url::Url;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::measured_stream::MeasuredReader;
use utils::pid_file;
use crate::configurator::launch_configurator;
use crate::disk_quota::set_disk_quota;
@@ -45,7 +44,6 @@ use crate::lsn_lease::launch_lsn_lease_bg_task_for_static;
use crate::metrics::COMPUTE_CTL_UP;
use crate::monitor::launch_monitor;
use crate::pg_helpers::*;
use crate::pgbouncer::*;
use crate::rsyslog::{
PostgresLogsRsyslogConfig, configure_audit_rsyslog, configure_postgres_logs_export,
launch_pgaudit_gc,
@@ -163,10 +161,6 @@ pub struct ComputeState {
pub lfc_prewarm_state: LfcPrewarmState,
pub lfc_offload_state: LfcOffloadState,
/// WAL flush LSN that is set after terminating Postgres and syncing safekeepers if
/// mode == ComputeMode::Primary. None otherwise
pub terminate_flush_lsn: Option<Lsn>,
pub metrics: ComputeMetrics,
}
@@ -182,7 +176,6 @@ impl ComputeState {
metrics: ComputeMetrics::default(),
lfc_prewarm_state: LfcPrewarmState::default(),
lfc_offload_state: LfcOffloadState::default(),
terminate_flush_lsn: None,
}
}
@@ -222,46 +215,6 @@ pub struct ParsedSpec {
pub endpoint_storage_token: Option<String>,
}
impl ParsedSpec {
pub fn validate(&self) -> Result<(), String> {
// Only Primary nodes are using safekeeper_connstrings, and at the moment
// this method only validates that part of the specs.
if self.spec.mode != ComputeMode::Primary {
return Ok(());
}
// While it seems like a good idea to check for an odd number of entries in
// the safekeepers connection string, changes to the list of safekeepers might
// incur appending a new server to a list of 3, in which case a list of 4
// entries is okay in production.
//
// Still we want unique entries, and at least one entry in the vector
if self.safekeeper_connstrings.is_empty() {
return Err(String::from("safekeeper_connstrings is empty"));
}
// check for uniqueness of the connection strings in the set
let mut connstrings = self.safekeeper_connstrings.clone();
connstrings.sort();
let mut previous = &connstrings[0];
for current in connstrings.iter().skip(1) {
// duplicate entry?
if current == previous {
return Err(format!(
"duplicate entry in safekeeper_connstrings: {}!",
current,
));
}
previous = current;
}
Ok(())
}
}
impl TryFrom<ComputeSpec> for ParsedSpec {
type Error = String;
fn try_from(spec: ComputeSpec) -> Result<Self, String> {
@@ -291,7 +244,6 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
} else {
spec.safekeeper_connstrings.clone()
};
let storage_auth_token = spec.storage_auth_token.clone();
let tenant_id: TenantId = if let Some(tenant_id) = spec.tenant_id {
tenant_id
@@ -326,7 +278,7 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
.clone()
.or_else(|| spec.cluster.settings.find("neon.endpoint_storage_token"));
let res = ParsedSpec {
Ok(ParsedSpec {
spec,
pageserver_connstr,
safekeeper_connstrings,
@@ -335,11 +287,7 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
timeline_id,
endpoint_storage_addr,
endpoint_storage_token,
};
// Now check validity of the parsed specification
res.validate()?;
Ok(res)
})
}
}
@@ -406,6 +354,11 @@ impl ComputeNode {
// that can affect `compute_ctl` and prevent it from properly configuring the database schema.
// Unset them via connection string options before connecting to the database.
// N.B. keep it in sync with `ZENITH_OPTIONS` in `get_maintenance_client()`.
//
// TODO(ololobus): we currently pass `-c default_transaction_read_only=off` from control plane
// as well. After rolling out this code, we can remove this parameter from control plane.
// In the meantime, double-passing is fine, the last value is applied.
// See: <https://github.com/neondatabase/cloud/blob/133dd8c4dbbba40edfbad475bf6a45073ca63faf/goapp/controlplane/internal/pkg/compute/provisioner/provisioner_common.go#L70>
const EXTRA_OPTIONS: &str = "-c role=cloud_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0";
let options = match conn_conf.get_options() {
Some(options) => format!("{} {}", options, EXTRA_OPTIONS),
@@ -536,21 +489,12 @@ impl ComputeNode {
// Reap the postgres process
delay_exit |= this.cleanup_after_postgres_exit()?;
// /terminate returns LSN. If we don't sleep at all, connection will break and we
// won't get result. If we sleep too much, tests will take significantly longer
// and Github Action run will error out
let sleep_duration = if delay_exit {
Duration::from_secs(30)
} else {
Duration::from_millis(300)
};
// If launch failed, keep serving HTTP requests for a while, so the cloud
// control plane can get the actual error.
if delay_exit {
info!("giving control plane 30s to collect the error before shutdown");
std::thread::sleep(Duration::from_secs(30));
}
std::thread::sleep(sleep_duration);
Ok(exit_code)
}
@@ -922,25 +866,20 @@ impl ComputeNode {
// Maybe sync safekeepers again, to speed up next startup
let compute_state = self.state.lock().unwrap().clone();
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let lsn = if matches!(pspec.spec.mode, compute_api::spec::ComputeMode::Primary) {
if matches!(pspec.spec.mode, compute_api::spec::ComputeMode::Primary) {
info!("syncing safekeepers on shutdown");
let storage_auth_token = pspec.storage_auth_token.clone();
let lsn = self.sync_safekeepers(storage_auth_token)?;
info!(%lsn, "synced safekeepers");
Some(lsn)
} else {
info!("not primary, not syncing safekeepers");
None
};
info!("synced safekeepers at lsn {lsn}");
}
let mut delay_exit = false;
let mut state = self.state.lock().unwrap();
state.terminate_flush_lsn = lsn;
if let ComputeStatus::TerminationPending { mode } = state.status {
if state.status == ComputeStatus::TerminationPending {
state.status = ComputeStatus::Terminated;
self.state_changed.notify_all();
// we were asked to terminate gracefully, don't exit to avoid restart
delay_exit = mode == compute_api::responses::TerminateMode::Fast
delay_exit = true
}
drop(state);
@@ -1811,7 +1750,7 @@ impl ComputeNode {
// exit loop
ComputeStatus::Failed
| ComputeStatus::TerminationPending { .. }
| ComputeStatus::TerminationPending
| ComputeStatus::Terminated => break 'cert_update,
// wait
@@ -2312,68 +2251,12 @@ pub async fn installed_extensions(conf: tokio_postgres::Config) -> Result<()> {
Ok(())
}
pub fn forward_termination_signal(dev_mode: bool) {
pub fn forward_termination_signal() {
let ss_pid = SYNC_SAFEKEEPERS_PID.load(Ordering::SeqCst);
if ss_pid != 0 {
let ss_pid = nix::unistd::Pid::from_raw(ss_pid as i32);
kill(ss_pid, Signal::SIGTERM).ok();
}
if !dev_mode {
info!("not in dev mode, terminating pgbouncer");
// Terminate pgbouncer with SIGKILL
match pid_file::read(PGBOUNCER_PIDFILE.into()) {
Ok(pid_file::PidFileRead::LockedByOtherProcess(pid)) => {
info!("sending SIGKILL to pgbouncer process pid: {}", pid);
if let Err(e) = kill(pid, Signal::SIGKILL) {
error!("failed to terminate pgbouncer: {}", e);
}
}
// pgbouncer does not lock the pid file, so we read and kill the process directly
Ok(pid_file::PidFileRead::NotHeldByAnyProcess(_)) => {
if let Ok(pid_str) = std::fs::read_to_string(PGBOUNCER_PIDFILE) {
if let Ok(pid) = pid_str.trim().parse::<i32>() {
info!(
"sending SIGKILL to pgbouncer process pid: {} (from unlocked pid file)",
pid
);
if let Err(e) = kill(Pid::from_raw(pid), Signal::SIGKILL) {
error!("failed to terminate pgbouncer: {}", e);
}
}
} else {
info!("pgbouncer pid file exists but process not running");
}
}
Ok(pid_file::PidFileRead::NotExist) => {
info!("pgbouncer pid file not found, process may not be running");
}
Err(e) => {
error!("error reading pgbouncer pid file: {}", e);
}
}
}
// Terminate local_proxy
match pid_file::read("/etc/local_proxy/pid".into()) {
Ok(pid_file::PidFileRead::LockedByOtherProcess(pid)) => {
info!("sending SIGTERM to local_proxy process pid: {}", pid);
if let Err(e) = kill(pid, Signal::SIGTERM) {
error!("failed to terminate local_proxy: {}", e);
}
}
Ok(pid_file::PidFileRead::NotHeldByAnyProcess(_)) => {
info!("local_proxy PID file exists but process not running");
}
Ok(pid_file::PidFileRead::NotExist) => {
info!("local_proxy PID file not found, process may not be running");
}
Err(e) => {
error!("error reading local_proxy PID file: {}", e);
}
}
let pg_pid = PG_PID.load(Ordering::SeqCst);
if pg_pid != 0 {
let pg_pid = nix::unistd::Pid::from_raw(pg_pid as i32);
@@ -2406,21 +2289,3 @@ impl<T: 'static> JoinSetExt<T> for tokio::task::JoinSet<T> {
})
}
}
#[cfg(test)]
mod tests {
use std::fs::File;
use super::*;
#[test]
fn duplicate_safekeeper_connstring() {
let file = File::open("tests/cluster_spec.json").unwrap();
let spec: ComputeSpec = serde_json::from_reader(file).unwrap();
match ParsedSpec::try_from(spec.clone()) {
Ok(_p) => panic!("Failed to detect duplicate entry"),
Err(e) => assert!(e.starts_with("duplicate entry in safekeeper_connstrings:")),
};
}
}

View File

@@ -1,42 +1,32 @@
use crate::compute::{ComputeNode, forward_termination_signal};
use crate::http::JsonResponse;
use axum::extract::State;
use axum::response::Response;
use axum_extra::extract::OptionalQuery;
use compute_api::responses::{ComputeStatus, TerminateResponse};
use http::StatusCode;
use serde::Deserialize;
use std::sync::Arc;
use axum::extract::State;
use axum::response::{IntoResponse, Response};
use compute_api::responses::ComputeStatus;
use http::StatusCode;
use tokio::task;
use tracing::info;
#[derive(Deserialize, Default)]
pub struct TerminateQuery {
mode: compute_api::responses::TerminateMode,
}
use crate::compute::{ComputeNode, forward_termination_signal};
use crate::http::JsonResponse;
/// Terminate the compute.
pub(in crate::http) async fn terminate(
State(compute): State<Arc<ComputeNode>>,
OptionalQuery(terminate): OptionalQuery<TerminateQuery>,
) -> Response {
let mode = terminate.unwrap_or_default().mode;
pub(in crate::http) async fn terminate(State(compute): State<Arc<ComputeNode>>) -> Response {
{
let mut state = compute.state.lock().unwrap();
if state.status == ComputeStatus::Terminated {
return JsonResponse::success(StatusCode::CREATED, state.terminate_flush_lsn);
return StatusCode::CREATED.into_response();
}
if !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) {
return JsonResponse::invalid_status(state.status);
}
state.set_status(
ComputeStatus::TerminationPending { mode },
&compute.state_changed,
);
state.set_status(ComputeStatus::TerminationPending, &compute.state_changed);
drop(state);
}
forward_termination_signal(false);
forward_termination_signal();
info!("sent signal and notified waiters");
// Spawn a blocking thread to wait for compute to become Terminated.
@@ -44,7 +34,7 @@ pub(in crate::http) async fn terminate(
// be able to serve other requests while some particular request
// is waiting for compute to finish configuration.
let c = compute.clone();
let lsn = task::spawn_blocking(move || {
task::spawn_blocking(move || {
let mut state = c.state.lock().unwrap();
while state.status != ComputeStatus::Terminated {
state = c.state_changed.wait(state).unwrap();
@@ -54,10 +44,11 @@ pub(in crate::http) async fn terminate(
state.status
);
}
state.terminate_flush_lsn
})
.await
.unwrap();
info!("terminated Postgres");
JsonResponse::success(StatusCode::OK, TerminateResponse { lsn })
StatusCode::OK.into_response()
}

View File

@@ -22,7 +22,6 @@ mod migration;
pub mod monitor;
pub mod params;
pub mod pg_helpers;
pub mod pgbouncer;
pub mod rsyslog;
pub mod spec;
mod spec_apply;

View File

@@ -83,9 +83,7 @@ impl ComputeMonitor {
let compute_status = self.compute.get_status();
if matches!(
compute_status,
ComputeStatus::Terminated
| ComputeStatus::TerminationPending { .. }
| ComputeStatus::Failed
ComputeStatus::Terminated | ComputeStatus::TerminationPending | ComputeStatus::Failed
) {
info!(
"compute is in {} status, stopping compute monitor",

View File

@@ -1 +0,0 @@
pub const PGBOUNCER_PIDFILE: &str = "/tmp/pgbouncer.pid";

View File

@@ -1,6 +0,0 @@
### Test files
The file `cluster_spec.json` has been copied over from libs/compute_api
tests, with some edits:
- the neon.safekeepers setting contains a duplicate value

View File

@@ -1,245 +0,0 @@
{
"format_version": 1.0,
"timestamp": "2021-05-23T18:25:43.511Z",
"operation_uuid": "0f657b36-4b0f-4a2d-9c2e-1dcd615e7d8b",
"cluster": {
"cluster_id": "test-cluster-42",
"name": "Zenith Test",
"state": "restarted",
"roles": [
{
"name": "postgres",
"encrypted_password": "6b1d16b78004bbd51fa06af9eda75972",
"options": null
},
{
"name": "alexk",
"encrypted_password": null,
"options": null
},
{
"name": "zenith \"new\"",
"encrypted_password": "5b1d16b78004bbd51fa06af9eda75972",
"options": null
},
{
"name": "zen",
"encrypted_password": "9b1d16b78004bbd51fa06af9eda75972"
},
{
"name": "\"name\";\\n select 1;",
"encrypted_password": "5b1d16b78004bbd51fa06af9eda75972"
},
{
"name": "MyRole",
"encrypted_password": "5b1d16b78004bbd51fa06af9eda75972"
}
],
"databases": [
{
"name": "DB2",
"owner": "alexk",
"options": [
{
"name": "LC_COLLATE",
"value": "C",
"vartype": "string"
},
{
"name": "LC_CTYPE",
"value": "C",
"vartype": "string"
},
{
"name": "TEMPLATE",
"value": "template0",
"vartype": "enum"
}
]
},
{
"name": "zenith",
"owner": "MyRole"
},
{
"name": "zen",
"owner": "zen"
}
],
"settings": [
{
"name": "fsync",
"value": "off",
"vartype": "bool"
},
{
"name": "wal_level",
"value": "logical",
"vartype": "enum"
},
{
"name": "hot_standby",
"value": "on",
"vartype": "bool"
},
{
"name": "prewarm_lfc_on_startup",
"value": "off",
"vartype": "bool"
},
{
"name": "neon.safekeepers",
"value": "127.0.0.1:6502,127.0.0.1:6503,127.0.0.1:6501,127.0.0.1:6502",
"vartype": "string"
},
{
"name": "wal_log_hints",
"value": "on",
"vartype": "bool"
},
{
"name": "log_connections",
"value": "on",
"vartype": "bool"
},
{
"name": "shared_buffers",
"value": "32768",
"vartype": "integer"
},
{
"name": "port",
"value": "55432",
"vartype": "integer"
},
{
"name": "max_connections",
"value": "100",
"vartype": "integer"
},
{
"name": "max_wal_senders",
"value": "10",
"vartype": "integer"
},
{
"name": "listen_addresses",
"value": "0.0.0.0",
"vartype": "string"
},
{
"name": "wal_sender_timeout",
"value": "0",
"vartype": "integer"
},
{
"name": "password_encryption",
"value": "md5",
"vartype": "enum"
},
{
"name": "maintenance_work_mem",
"value": "65536",
"vartype": "integer"
},
{
"name": "max_parallel_workers",
"value": "8",
"vartype": "integer"
},
{
"name": "max_worker_processes",
"value": "8",
"vartype": "integer"
},
{
"name": "neon.tenant_id",
"value": "b0554b632bd4d547a63b86c3630317e8",
"vartype": "string"
},
{
"name": "max_replication_slots",
"value": "10",
"vartype": "integer"
},
{
"name": "neon.timeline_id",
"value": "2414a61ffc94e428f14b5758fe308e13",
"vartype": "string"
},
{
"name": "shared_preload_libraries",
"value": "neon",
"vartype": "string"
},
{
"name": "synchronous_standby_names",
"value": "walproposer",
"vartype": "string"
},
{
"name": "neon.pageserver_connstring",
"value": "host=127.0.0.1 port=6400",
"vartype": "string"
},
{
"name": "test.escaping",
"value": "here's a backslash \\ and a quote ' and a double-quote \" hooray",
"vartype": "string"
}
]
},
"delta_operations": [
{
"action": "delete_db",
"name": "zenith_test"
},
{
"action": "rename_db",
"name": "DB",
"new_name": "DB2"
},
{
"action": "delete_role",
"name": "zenith2"
},
{
"action": "rename_role",
"name": "zenith new",
"new_name": "zenith \"new\""
}
],
"remote_extensions": {
"library_index": {
"postgis-3": "postgis",
"libpgrouting-3.4": "postgis",
"postgis_raster-3": "postgis",
"postgis_sfcgal-3": "postgis",
"postgis_topology-3": "postgis",
"address_standardizer-3": "postgis"
},
"extension_data": {
"postgis": {
"archive_path": "5834329303/v15/extensions/postgis.tar.zst",
"control_data": {
"postgis.control": "# postgis extension\ncomment = ''PostGIS geometry and geography spatial types and functions''\ndefault_version = ''3.3.2''\nmodule_pathname = ''$libdir/postgis-3''\nrelocatable = false\ntrusted = true\n",
"pgrouting.control": "# pgRouting Extension\ncomment = ''pgRouting Extension''\ndefault_version = ''3.4.2''\nmodule_pathname = ''$libdir/libpgrouting-3.4''\nrelocatable = true\nrequires = ''plpgsql''\nrequires = ''postgis''\ntrusted = true\n",
"postgis_raster.control": "# postgis_raster extension\ncomment = ''PostGIS raster types and functions''\ndefault_version = ''3.3.2''\nmodule_pathname = ''$libdir/postgis_raster-3''\nrelocatable = false\nrequires = postgis\ntrusted = true\n",
"postgis_sfcgal.control": "# postgis topology extension\ncomment = ''PostGIS SFCGAL functions''\ndefault_version = ''3.3.2''\nrelocatable = true\nrequires = postgis\ntrusted = true\n",
"postgis_topology.control": "# postgis topology extension\ncomment = ''PostGIS topology spatial types and functions''\ndefault_version = ''3.3.2''\nrelocatable = false\nschema = topology\nrequires = postgis\ntrusted = true\n",
"address_standardizer.control": "# address_standardizer extension\ncomment = ''Used to parse an address into constituent elements. Generally used to support geocoding address normalization step.''\ndefault_version = ''3.3.2''\nrelocatable = true\ntrusted = true\n",
"postgis_tiger_geocoder.control": "# postgis tiger geocoder extension\ncomment = ''PostGIS tiger geocoder and reverse geocoder''\ndefault_version = ''3.3.2''\nrelocatable = false\nschema = tiger\nrequires = ''postgis,fuzzystrmatch''\nsuperuser= false\ntrusted = true\n",
"address_standardizer_data_us.control": "# address standardizer us dataset\ncomment = ''Address Standardizer US dataset example''\ndefault_version = ''3.3.2''\nrelocatable = true\ntrusted = true\n"
}
}
},
"custom_extensions": [],
"public_extensions": ["postgis"]
},
"pgbouncer_settings": {
"default_pool_size": "42",
"pool_mode": "session"
}
}

View File

@@ -18,7 +18,7 @@ use clap::Parser;
use compute_api::requests::ComputeClaimsScope;
use compute_api::spec::ComputeMode;
use control_plane::broker::StorageBroker;
use control_plane::endpoint::{ComputeControlPlane, EndpointTerminateMode, PageserverProtocol};
use control_plane::endpoint::ComputeControlPlane;
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_ADDR, EndpointStorage};
use control_plane::local_env;
use control_plane::local_env::{
@@ -605,14 +605,6 @@ struct EndpointCreateCmdArgs {
#[clap(long, help = "Postgres version")]
pg_version: u32,
/// Use gRPC to communicate with Pageservers, by generating grpc:// connstrings.
///
/// Specified on creation such that it's retained across reconfiguration and restarts.
///
/// NB: not yet supported by computes.
#[clap(long)]
grpc: bool,
#[clap(
long,
help = "If set, the node will be a hot replica on the specified timeline",
@@ -672,13 +664,6 @@ struct EndpointStartCmdArgs {
#[clap(short = 't', long, value_parser= humantime::parse_duration, help = "timeout until we fail the command")]
#[arg(default_value = "90s")]
start_timeout: Duration,
#[clap(
long,
help = "Run in development mode, skipping VM-specific operations like process termination",
action = clap::ArgAction::SetTrue
)]
dev: bool,
}
#[derive(clap::Args)]
@@ -711,9 +696,10 @@ struct EndpointStopCmdArgs {
)]
destroy: bool,
#[clap(long, help = "Postgres shutdown mode")]
#[clap(default_value = "fast")]
mode: EndpointTerminateMode,
#[clap(long, help = "Postgres shutdown mode, passed to \"pg_ctl -m <mode>\"")]
#[arg(value_parser(["smart", "fast", "immediate"]))]
#[arg(default_value = "fast")]
mode: String,
}
#[derive(clap::Args)]
@@ -1465,7 +1451,6 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
args.internal_http_port,
args.pg_version,
mode,
args.grpc,
!args.update_catalog,
false,
)?;
@@ -1506,20 +1491,13 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
let (pageservers, stripe_size) = if let Some(pageserver_id) = pageserver_id {
let conf = env.get_pageserver_conf(pageserver_id).unwrap();
// Use gRPC if requested.
let pageserver = if endpoint.grpc {
let grpc_addr = conf.listen_grpc_addr.as_ref().expect("bad config");
let (host, port) = parse_host_port(grpc_addr)?;
let port = port.unwrap_or(DEFAULT_PAGESERVER_GRPC_PORT);
(PageserverProtocol::Grpc, host, port)
} else {
let (host, port) = parse_host_port(&conf.listen_pg_addr)?;
let port = port.unwrap_or(5432);
(PageserverProtocol::Libpq, host, port)
};
// If caller is telling us what pageserver to use, this is not a tenant which is
// fully managed by storage controller, therefore not sharded.
(vec![pageserver], DEFAULT_STRIPE_SIZE)
let parsed = parse_host_port(&conf.listen_pg_addr).expect("Bad config");
(
vec![(parsed.0, parsed.1.unwrap_or(5432))],
// If caller is telling us what pageserver to use, this is not a tenant which is
// full managed by storage controller, therefore not sharded.
DEFAULT_STRIPE_SIZE,
)
} else {
// Look up the currently attached location of the tenant, and its striping metadata,
// to pass these on to postgres.
@@ -1538,20 +1516,11 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
.await?;
}
let pageserver = if endpoint.grpc {
(
PageserverProtocol::Grpc,
Host::parse(&shard.listen_grpc_addr.expect("no gRPC address"))?,
shard.listen_grpc_port.expect("no gRPC port"),
)
} else {
(
PageserverProtocol::Libpq,
Host::parse(&shard.listen_pg_addr)?,
shard.listen_pg_port,
)
};
anyhow::Ok(pageserver)
anyhow::Ok((
Host::parse(&shard.listen_pg_addr)
.expect("Storage controller reported bad hostname"),
shard.listen_pg_port,
))
}),
)
.await?;
@@ -1596,7 +1565,6 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
stripe_size.0 as usize,
args.create_test_user,
args.start_timeout,
args.dev,
)
.await?;
}
@@ -1607,19 +1575,11 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
.get(endpoint_id.as_str())
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
let pageservers = if let Some(ps_id) = args.endpoint_pageserver_id {
let conf = env.get_pageserver_conf(ps_id)?;
// Use gRPC if requested.
let pageserver = if endpoint.grpc {
let grpc_addr = conf.listen_grpc_addr.as_ref().expect("bad config");
let (host, port) = parse_host_port(grpc_addr)?;
let port = port.unwrap_or(DEFAULT_PAGESERVER_GRPC_PORT);
(PageserverProtocol::Grpc, host, port)
} else {
let (host, port) = parse_host_port(&conf.listen_pg_addr)?;
let port = port.unwrap_or(5432);
(PageserverProtocol::Libpq, host, port)
};
vec![pageserver]
let pageserver = PageServerNode::from_env(env, env.get_pageserver_conf(ps_id)?);
vec![(
pageserver.pg_connection_config.host().clone(),
pageserver.pg_connection_config.port(),
)]
} else {
let storage_controller = StorageController::from_env(env);
storage_controller
@@ -1628,21 +1588,11 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
.shards
.into_iter()
.map(|shard| {
// Use gRPC if requested.
if endpoint.grpc {
(
PageserverProtocol::Grpc,
Host::parse(&shard.listen_grpc_addr.expect("no gRPC address"))
.expect("bad hostname"),
shard.listen_grpc_port.expect("no gRPC port"),
)
} else {
(
PageserverProtocol::Libpq,
Host::parse(&shard.listen_pg_addr).expect("bad hostname"),
shard.listen_pg_port,
)
}
(
Host::parse(&shard.listen_pg_addr)
.expect("Storage controller reported malformed host"),
shard.listen_pg_port,
)
})
.collect::<Vec<_>>()
};
@@ -1657,10 +1607,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
.endpoints
.get(endpoint_id)
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
match endpoint.stop(args.mode, args.destroy).await?.lsn {
Some(lsn) => println!("{lsn}"),
None => println!("null"),
}
endpoint.stop(&args.mode, args.destroy)?;
}
EndpointCmd::GenerateJwt(args) => {
let endpoint = {
@@ -2092,16 +2039,11 @@ async fn handle_stop_all(args: &StopCmdArgs, env: &local_env::LocalEnv) -> Resul
}
async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
let mode = if immediate {
EndpointTerminateMode::Immediate
} else {
EndpointTerminateMode::Fast
};
// Stop all endpoints
match ComputeControlPlane::load(env.clone()) {
Ok(cplane) => {
for (_k, node) in cplane.endpoints {
if let Err(e) = node.stop(mode, false).await {
if let Err(e) = node.stop(if immediate { "immediate" } else { "fast" }, false) {
eprintln!("postgres stop failed: {e:#}");
}
}

View File

@@ -37,7 +37,6 @@
//! ```
//!
use std::collections::BTreeMap;
use std::fmt::Display;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
use std::path::PathBuf;
use std::process::Command;
@@ -52,8 +51,7 @@ use compute_api::requests::{
COMPUTE_AUDIENCE, ComputeClaims, ComputeClaimsScope, ConfigurationRequest,
};
use compute_api::responses::{
ComputeConfig, ComputeCtlConfig, ComputeStatus, ComputeStatusResponse, TerminateResponse,
TlsConfig,
ComputeConfig, ComputeCtlConfig, ComputeStatus, ComputeStatusResponse, TlsConfig,
};
use compute_api::spec::{
Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PgIdent,
@@ -78,6 +76,7 @@ use utils::id::{NodeId, TenantId, TimelineId};
use crate::local_env::LocalEnv;
use crate::postgresql_conf::PostgresConf;
use crate::storage_controller::StorageController;
// contents of a endpoint.json file
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
@@ -90,7 +89,6 @@ pub struct EndpointConf {
external_http_port: u16,
internal_http_port: u16,
pg_version: u32,
grpc: bool,
skip_pg_catalog_updates: bool,
reconfigure_concurrency: usize,
drop_subscriptions_before_start: bool,
@@ -194,7 +192,6 @@ impl ComputeControlPlane {
internal_http_port: Option<u16>,
pg_version: u32,
mode: ComputeMode,
grpc: bool,
skip_pg_catalog_updates: bool,
drop_subscriptions_before_start: bool,
) -> Result<Arc<Endpoint>> {
@@ -229,7 +226,6 @@ impl ComputeControlPlane {
// we also skip catalog updates in the cloud.
skip_pg_catalog_updates,
drop_subscriptions_before_start,
grpc,
reconfigure_concurrency: 1,
features: vec![],
cluster: None,
@@ -248,7 +244,6 @@ impl ComputeControlPlane {
internal_http_port,
pg_port,
pg_version,
grpc,
skip_pg_catalog_updates,
drop_subscriptions_before_start,
reconfigure_concurrency: 1,
@@ -303,8 +298,6 @@ pub struct Endpoint {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub mode: ComputeMode,
/// If true, the endpoint should use gRPC to communicate with Pageservers.
pub grpc: bool,
// port and address of the Postgres server and `compute_ctl`'s HTTP APIs
pub pg_address: SocketAddr,
@@ -340,58 +333,15 @@ pub enum EndpointStatus {
RunningNoPidfile,
}
impl Display for EndpointStatus {
impl std::fmt::Display for EndpointStatus {
fn fmt(&self, writer: &mut std::fmt::Formatter) -> std::fmt::Result {
writer.write_str(match self {
let s = match self {
Self::Running => "running",
Self::Stopped => "stopped",
Self::Crashed => "crashed",
Self::RunningNoPidfile => "running, no pidfile",
})
}
}
#[derive(Default, Clone, Copy, clap::ValueEnum)]
pub enum EndpointTerminateMode {
#[default]
/// Use pg_ctl stop -m fast
Fast,
/// Use pg_ctl stop -m immediate
Immediate,
/// Use /terminate?mode=immediate
ImmediateTerminate,
}
impl std::fmt::Display for EndpointTerminateMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match &self {
EndpointTerminateMode::Fast => "fast",
EndpointTerminateMode::Immediate => "immediate",
EndpointTerminateMode::ImmediateTerminate => "immediate-terminate",
})
}
}
/// Protocol used to connect to a Pageserver.
#[derive(Clone, Copy, Debug)]
pub enum PageserverProtocol {
Libpq,
Grpc,
}
impl PageserverProtocol {
/// Returns the URL scheme for the protocol, used in connstrings.
pub fn scheme(&self) -> &'static str {
match self {
Self::Libpq => "postgresql",
Self::Grpc => "grpc",
}
}
}
impl Display for PageserverProtocol {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.scheme())
};
write!(writer, "{}", s)
}
}
@@ -430,7 +380,6 @@ impl Endpoint {
mode: conf.mode,
tenant_id: conf.tenant_id,
pg_version: conf.pg_version,
grpc: conf.grpc,
skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
reconfigure_concurrency: conf.reconfigure_concurrency,
drop_subscriptions_before_start: conf.drop_subscriptions_before_start,
@@ -659,10 +608,10 @@ impl Endpoint {
}
}
fn build_pageserver_connstr(pageservers: &[(PageserverProtocol, Host, u16)]) -> String {
fn build_pageserver_connstr(pageservers: &[(Host, u16)]) -> String {
pageservers
.iter()
.map(|(scheme, host, port)| format!("{scheme}://no_user@{host}:{port}"))
.map(|(host, port)| format!("postgresql://no_user@{host}:{port}"))
.collect::<Vec<_>>()
.join(",")
}
@@ -707,12 +656,11 @@ impl Endpoint {
endpoint_storage_addr: String,
safekeepers_generation: Option<SafekeeperGeneration>,
safekeepers: Vec<NodeId>,
pageservers: Vec<(PageserverProtocol, Host, u16)>,
pageservers: Vec<(Host, u16)>,
remote_ext_base_url: Option<&String>,
shard_stripe_size: usize,
create_test_user: bool,
start_timeout: Duration,
dev: bool,
) -> Result<()> {
if self.status() == EndpointStatus::Running {
anyhow::bail!("The endpoint is already running");
@@ -883,10 +831,6 @@ impl Endpoint {
cmd.args(["--remote-ext-base-url", remote_ext_base_url]);
}
if dev {
cmd.arg("--dev");
}
let child = cmd.spawn()?;
// set up a scopeguard to kill & wait for the child in case we panic or bail below
let child = scopeguard::guard(child, |mut child| {
@@ -939,7 +883,7 @@ impl Endpoint {
ComputeStatus::Empty
| ComputeStatus::ConfigurationPending
| ComputeStatus::Configuration
| ComputeStatus::TerminationPending { .. }
| ComputeStatus::TerminationPending
| ComputeStatus::Terminated => {
bail!("unexpected compute status: {:?}", state.status)
}
@@ -997,12 +941,10 @@ impl Endpoint {
pub async fn reconfigure(
&self,
pageservers: Vec<(PageserverProtocol, Host, u16)>,
mut pageservers: Vec<(Host, u16)>,
stripe_size: Option<ShardStripeSize>,
safekeepers: Option<Vec<NodeId>>,
) -> Result<()> {
anyhow::ensure!(!pageservers.is_empty(), "no pageservers provided");
let (mut spec, compute_ctl_config) = {
let config_path = self.endpoint_path().join("config.json");
let file = std::fs::File::open(config_path)?;
@@ -1014,7 +956,25 @@ impl Endpoint {
let postgresql_conf = self.read_postgresql_conf()?;
spec.cluster.postgresql_conf = Some(postgresql_conf);
// If we weren't given explicit pageservers, query the storage controller
if pageservers.is_empty() {
let storage_controller = StorageController::from_env(&self.env);
let locate_result = storage_controller.tenant_locate(self.tenant_id).await?;
pageservers = locate_result
.shards
.into_iter()
.map(|shard| {
(
Host::parse(&shard.listen_pg_addr)
.expect("Storage controller reported bad hostname"),
shard.listen_pg_port,
)
})
.collect::<Vec<_>>();
}
let pageserver_connstr = Self::build_pageserver_connstr(&pageservers);
assert!(!pageserver_connstr.is_empty());
spec.pageserver_connstring = Some(pageserver_connstr);
if stripe_size.is_some() {
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
@@ -1061,27 +1021,8 @@ impl Endpoint {
}
}
pub async fn stop(
&self,
mode: EndpointTerminateMode,
destroy: bool,
) -> Result<TerminateResponse> {
// pg_ctl stop is fast but doesn't allow us to collect LSN. /terminate is
// slow, and test runs time out. Solution: special mode "immediate-terminate"
// which uses /terminate
let response = if let EndpointTerminateMode::ImmediateTerminate = mode {
let ip = self.external_http_address.ip();
let port = self.external_http_address.port();
let url = format!("http://{ip}:{port}/terminate?mode=immediate");
let token = self.generate_jwt(Some(ComputeClaimsScope::Admin))?;
let request = reqwest::Client::new().post(url).bearer_auth(token);
let response = request.send().await.context("/terminate")?;
let text = response.text().await.context("/terminate result")?;
serde_json::from_str(&text).with_context(|| format!("deserializing {text}"))?
} else {
self.pg_ctl(&["-m", &mode.to_string(), "stop"], &None)?;
TerminateResponse { lsn: None }
};
pub fn stop(&self, mode: &str, destroy: bool) -> Result<()> {
self.pg_ctl(&["-m", mode, "stop"], &None)?;
// Also wait for the compute_ctl process to die. It might have some
// cleanup work to do after postgres stops, like syncing safekeepers,
@@ -1091,7 +1032,7 @@ impl Endpoint {
// waiting. Sometimes we do *not* want this cleanup: tests intentionally
// do stop when majority of safekeepers is down, so sync-safekeepers
// would hang otherwise. This could be a separate flag though.
let send_sigterm = destroy || !matches!(mode, EndpointTerminateMode::Fast);
let send_sigterm = destroy || mode == "immediate";
self.wait_for_compute_ctl_to_exit(send_sigterm)?;
if destroy {
println!(
@@ -1100,7 +1041,7 @@ impl Endpoint {
);
std::fs::remove_dir_all(self.endpoint_path())?;
}
Ok(response)
Ok(())
}
pub fn connstr(&self, user: &str, db_name: &str) -> String {

View File

@@ -16,7 +16,6 @@ use std::time::Duration;
use anyhow::{Context, bail};
use camino::Utf8PathBuf;
use pageserver_api::config::{DEFAULT_GRPC_LISTEN_PORT, DEFAULT_HTTP_LISTEN_PORT};
use pageserver_api::models::{self, TenantInfo, TimelineInfo};
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
@@ -253,10 +252,9 @@ impl PageServerNode {
// the storage controller
let metadata_path = datadir.join("metadata.json");
let http_host = "localhost".to_string();
let (_, http_port) =
let (_http_host, http_port) =
parse_host_port(&self.conf.listen_http_addr).expect("Unable to parse listen_http_addr");
let http_port = http_port.unwrap_or(DEFAULT_HTTP_LISTEN_PORT);
let http_port = http_port.unwrap_or(9898);
let https_port = match self.conf.listen_https_addr.as_ref() {
Some(https_addr) => {
@@ -267,13 +265,6 @@ impl PageServerNode {
None => None,
};
let (mut grpc_host, mut grpc_port) = (None, None);
if let Some(grpc_addr) = &self.conf.listen_grpc_addr {
let (_, port) = parse_host_port(grpc_addr).expect("Unable to parse listen_grpc_addr");
grpc_host = Some("localhost".to_string());
grpc_port = Some(port.unwrap_or(DEFAULT_GRPC_LISTEN_PORT));
}
// Intentionally hand-craft JSON: this acts as an implicit format compat test
// in case the pageserver-side structure is edited, and reflects the real life
// situation: the metadata is written by some other script.
@@ -282,9 +273,7 @@ impl PageServerNode {
serde_json::to_vec(&pageserver_api::config::NodeMetadata {
postgres_host: "localhost".to_string(),
postgres_port: self.pg_connection_config.port(),
grpc_host,
grpc_port,
http_host,
http_host: "localhost".to_string(),
http_port,
https_port,
other: HashMap::from([(

View File

@@ -36,10 +36,6 @@ enum Command {
listen_pg_addr: String,
#[arg(long)]
listen_pg_port: u16,
#[arg(long)]
listen_grpc_addr: Option<String>,
#[arg(long)]
listen_grpc_port: Option<u16>,
#[arg(long)]
listen_http_addr: String,
@@ -422,8 +418,6 @@ async fn main() -> anyhow::Result<()> {
node_id,
listen_pg_addr,
listen_pg_port,
listen_grpc_addr,
listen_grpc_port,
listen_http_addr,
listen_http_port,
listen_https_port,
@@ -437,8 +431,6 @@ async fn main() -> anyhow::Result<()> {
node_id,
listen_pg_addr,
listen_pg_port,
listen_grpc_addr,
listen_grpc_port,
listen_http_addr,
listen_http_port,
listen_https_port,

View File

@@ -95,4 +95,3 @@ echo "Start compute node"
-b /usr/local/bin/postgres \
--compute-id "compute-${RANDOM}" \
--config "${CONFIG_FILE}"
--dev

View File

@@ -83,16 +83,6 @@ pub struct ComputeStatusResponse {
pub error: Option<String>,
}
#[derive(Serialize, Clone, Copy, Debug, Deserialize, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum TerminateMode {
#[default]
/// wait 30s till returning from /terminate to allow control plane to get the error
Fast,
/// return from /terminate immediately as soon as all components are terminated
Immediate,
}
#[derive(Serialize, Clone, Copy, Debug, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ComputeStatus {
@@ -113,16 +103,11 @@ pub enum ComputeStatus {
// control-plane to terminate it.
Failed,
// Termination requested
TerminationPending { mode: TerminateMode },
TerminationPending,
// Terminated Postgres
Terminated,
}
#[derive(Deserialize, Serialize)]
pub struct TerminateResponse {
pub lsn: Option<utils::lsn::Lsn>,
}
impl Display for ComputeStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
@@ -132,7 +117,7 @@ impl Display for ComputeStatus {
ComputeStatus::Running => f.write_str("running"),
ComputeStatus::Configuration => f.write_str("configuration"),
ComputeStatus::Failed => f.write_str("failed"),
ComputeStatus::TerminationPending { .. } => f.write_str("termination-pending"),
ComputeStatus::TerminationPending => f.write_str("termination-pending"),
ComputeStatus::Terminated => f.write_str("terminated"),
}
}

View File

@@ -419,13 +419,13 @@ pub fn now() -> u64 {
with_thread_context(|ctx| ctx.clock.get().unwrap().now())
}
pub fn exit(code: i32, msg: String) -> ! {
pub fn exit(code: i32, msg: String) {
with_thread_context(|ctx| {
ctx.allow_panic.store(true, Ordering::SeqCst);
let mut result = ctx.result.lock();
*result = (code, msg);
panic!("exit");
})
});
}
pub(crate) fn get_thread_ctx() -> Arc<ThreadContext> {

View File

@@ -12,7 +12,6 @@ pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LI
pub const DEFAULT_GRPC_LISTEN_PORT: u16 = 51051; // storage-broker already uses 50051
use std::collections::HashMap;
use std::fmt::Display;
use std::num::{NonZeroU64, NonZeroUsize};
use std::str::FromStr;
use std::time::Duration;
@@ -25,17 +24,16 @@ use utils::logging::LogFormat;
use crate::models::{ImageCompressionAlgorithm, LsnLease};
// Certain metadata (e.g. externally-addressable name, AZ) is delivered
// as a separate structure. This information is not needed by the pageserver
// as a separate structure. This information is not neeed by the pageserver
// itself, it is only used for registering the pageserver with the control
// plane and/or storage controller.
//
#[derive(PartialEq, Eq, Debug, serde::Serialize, serde::Deserialize)]
pub struct NodeMetadata {
#[serde(rename = "host")]
pub postgres_host: String,
#[serde(rename = "port")]
pub postgres_port: u16,
pub grpc_host: Option<String>,
pub grpc_port: Option<u16>,
pub http_host: String,
pub http_port: u16,
pub https_port: Option<u16>,
@@ -46,23 +44,6 @@ pub struct NodeMetadata {
pub other: HashMap<String, serde_json::Value>,
}
impl Display for NodeMetadata {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"postgresql://{}:{} ",
self.postgres_host, self.postgres_port
)?;
if let Some(grpc_host) = &self.grpc_host {
let grpc_port = self.grpc_port.unwrap_or_default();
write!(f, "grpc://{grpc_host}:{grpc_port} ")?;
}
write!(f, "http://{}:{} ", self.http_host, self.http_port)?;
write!(f, "other:{:?}", self.other)?;
Ok(())
}
}
/// PostHog integration config.
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct PostHogConfig {
@@ -356,21 +337,16 @@ pub struct TimelineImportConfig {
pub struct BasebackupCacheConfig {
#[serde(with = "humantime_serde")]
pub cleanup_period: Duration,
/// Maximum total size of basebackup cache entries on disk in bytes.
/// The cache may slightly exceed this limit because we do not know
/// the exact size of the cache entry untill it's written to disk.
pub max_total_size_bytes: u64,
// TODO(diko): support max_entry_size_bytes.
// pub max_entry_size_bytes: u64,
pub max_size_entries: usize,
// FIXME: Support max_size_bytes.
// pub max_size_bytes: usize,
pub max_size_entries: i64,
}
impl Default for BasebackupCacheConfig {
fn default() -> Self {
Self {
cleanup_period: Duration::from_secs(60),
max_total_size_bytes: 1024 * 1024 * 1024, // 1 GiB
// max_entry_size_bytes: 16 * 1024 * 1024, // 16 MiB
// max_size_bytes: 1024 * 1024 * 1024, // 1 GiB
max_size_entries: 1000,
}
}

View File

@@ -14,8 +14,6 @@ fn test_node_metadata_v1_backward_compatibilty() {
NodeMetadata {
postgres_host: "localhost".to_string(),
postgres_port: 23,
grpc_host: None,
grpc_port: None,
http_host: "localhost".to_string(),
http_port: 42,
https_port: None,
@@ -39,35 +37,6 @@ fn test_node_metadata_v2_backward_compatibilty() {
NodeMetadata {
postgres_host: "localhost".to_string(),
postgres_port: 23,
grpc_host: None,
grpc_port: None,
http_host: "localhost".to_string(),
http_port: 42,
https_port: Some(123),
other: HashMap::new(),
}
)
}
#[test]
fn test_node_metadata_v3_backward_compatibilty() {
let v3 = serde_json::to_vec(&serde_json::json!({
"host": "localhost",
"port": 23,
"grpc_host": "localhost",
"grpc_port": 51,
"http_host": "localhost",
"http_port": 42,
"https_port": 123,
}));
assert_eq!(
serde_json::from_slice::<NodeMetadata>(&v3.unwrap()).unwrap(),
NodeMetadata {
postgres_host: "localhost".to_string(),
postgres_port: 23,
grpc_host: Some("localhost".to_string()),
grpc_port: Some(51),
http_host: "localhost".to_string(),
http_port: 42,
https_port: Some(123),

View File

@@ -52,8 +52,6 @@ pub struct NodeRegisterRequest {
pub listen_pg_addr: String,
pub listen_pg_port: u16,
pub listen_grpc_addr: Option<String>,
pub listen_grpc_port: Option<u16>,
pub listen_http_addr: String,
pub listen_http_port: u16,
@@ -103,8 +101,6 @@ pub struct TenantLocateResponseShard {
pub listen_pg_addr: String,
pub listen_pg_port: u16,
pub listen_grpc_addr: Option<String>,
pub listen_grpc_port: Option<u16>,
pub listen_http_addr: String,
pub listen_http_port: u16,
@@ -156,8 +152,6 @@ pub struct NodeDescribeResponse {
pub listen_pg_addr: String,
pub listen_pg_port: u16,
pub listen_grpc_addr: Option<String>,
pub listen_grpc_port: Option<u16>,
}
#[derive(Serialize, Deserialize, Debug)]

View File

@@ -5,7 +5,6 @@ pub mod controller_api;
pub mod key;
pub mod keyspace;
pub mod models;
pub mod pagestream_api;
pub mod record;
pub mod reltag;
pub mod shard;

View File

@@ -5,12 +5,16 @@ pub mod utilization;
use core::ops::Range;
use std::collections::HashMap;
use std::fmt::Display;
use std::io::{BufRead, Read};
use std::num::{NonZeroU32, NonZeroU64, NonZeroUsize};
use std::str::FromStr;
use std::time::{Duration, SystemTime};
use byteorder::{BigEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
#[cfg(feature = "testing")]
use camino::Utf8PathBuf;
use postgres_ffi::BLCKSZ;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_with::serde_as;
pub use utilization::PageserverUtilization;
@@ -20,6 +24,7 @@ use utils::{completion, serde_system_time};
use crate::config::Ratio;
use crate::key::{CompactKey, Key};
use crate::reltag::RelTag;
use crate::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardStripeSize, TenantShardId};
/// The state of a tenant in this pageserver.
@@ -1902,6 +1907,219 @@ pub struct ScanDisposableKeysResponse {
pub not_disposable_count: usize,
}
// Wrapped in libpq CopyData
#[derive(PartialEq, Eq, Debug)]
pub enum PagestreamFeMessage {
Exists(PagestreamExistsRequest),
Nblocks(PagestreamNblocksRequest),
GetPage(PagestreamGetPageRequest),
DbSize(PagestreamDbSizeRequest),
GetSlruSegment(PagestreamGetSlruSegmentRequest),
#[cfg(feature = "testing")]
Test(PagestreamTestRequest),
}
// Wrapped in libpq CopyData
#[derive(Debug, strum_macros::EnumProperty)]
pub enum PagestreamBeMessage {
Exists(PagestreamExistsResponse),
Nblocks(PagestreamNblocksResponse),
GetPage(PagestreamGetPageResponse),
Error(PagestreamErrorResponse),
DbSize(PagestreamDbSizeResponse),
GetSlruSegment(PagestreamGetSlruSegmentResponse),
#[cfg(feature = "testing")]
Test(PagestreamTestResponse),
}
// Keep in sync with `pagestore_client.h`
#[repr(u8)]
enum PagestreamFeMessageTag {
Exists = 0,
Nblocks = 1,
GetPage = 2,
DbSize = 3,
GetSlruSegment = 4,
/* future tags above this line */
/// For testing purposes, not available in production.
#[cfg(feature = "testing")]
Test = 99,
}
// Keep in sync with `pagestore_client.h`
#[repr(u8)]
enum PagestreamBeMessageTag {
Exists = 100,
Nblocks = 101,
GetPage = 102,
Error = 103,
DbSize = 104,
GetSlruSegment = 105,
/* future tags above this line */
/// For testing purposes, not available in production.
#[cfg(feature = "testing")]
Test = 199,
}
impl TryFrom<u8> for PagestreamFeMessageTag {
type Error = u8;
fn try_from(value: u8) -> Result<Self, u8> {
match value {
0 => Ok(PagestreamFeMessageTag::Exists),
1 => Ok(PagestreamFeMessageTag::Nblocks),
2 => Ok(PagestreamFeMessageTag::GetPage),
3 => Ok(PagestreamFeMessageTag::DbSize),
4 => Ok(PagestreamFeMessageTag::GetSlruSegment),
#[cfg(feature = "testing")]
99 => Ok(PagestreamFeMessageTag::Test),
_ => Err(value),
}
}
}
impl TryFrom<u8> for PagestreamBeMessageTag {
type Error = u8;
fn try_from(value: u8) -> Result<Self, u8> {
match value {
100 => Ok(PagestreamBeMessageTag::Exists),
101 => Ok(PagestreamBeMessageTag::Nblocks),
102 => Ok(PagestreamBeMessageTag::GetPage),
103 => Ok(PagestreamBeMessageTag::Error),
104 => Ok(PagestreamBeMessageTag::DbSize),
105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
#[cfg(feature = "testing")]
199 => Ok(PagestreamBeMessageTag::Test),
_ => Err(value),
}
}
}
// A GetPage request contains two LSN values:
//
// request_lsn: Get the page version at this point in time. Lsn::Max is a special value that means
// "get the latest version present". It's used by the primary server, which knows that no one else
// is writing WAL. 'not_modified_since' must be set to a proper value even if request_lsn is
// Lsn::Max. Standby servers use the current replay LSN as the request LSN.
//
// not_modified_since: Hint to the pageserver that the client knows that the page has not been
// modified between 'not_modified_since' and the request LSN. It's always correct to set
// 'not_modified_since equal' to 'request_lsn' (unless Lsn::Max is used as the 'request_lsn'), but
// passing an earlier LSN can speed up the request, by allowing the pageserver to process the
// request without waiting for 'request_lsn' to arrive.
//
// The now-defunct V1 interface contained only one LSN, and a boolean 'latest' flag. The V1 interface was
// sufficient for the primary; the 'lsn' was equivalent to the 'not_modified_since' value, and
// 'latest' was set to true. The V2 interface was added because there was no correct way for a
// standby to request a page at a particular non-latest LSN, and also include the
// 'not_modified_since' hint. That led to an awkward choice of either using an old LSN in the
// request, if the standby knows that the page hasn't been modified since, and risk getting an error
// if that LSN has fallen behind the GC horizon, or requesting the current replay LSN, which could
// require the pageserver unnecessarily to wait for the WAL to arrive up to that point. The new V2
// interface allows sending both LSNs, and let the pageserver do the right thing. There was no
// difference in the responses between V1 and V2.
//
// V3 version of protocol adds request ID to all requests. This request ID is also included in response
// as well as other fields from requests, which allows to verify that we receive response for our request.
// We copy fields from request to response to make checking more reliable: request ID is formed from process ID
// and local counter, so in principle there can be duplicated requests IDs if process PID is reused.
//
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum PagestreamProtocolVersion {
V2,
V3,
}
pub type RequestId = u64;
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
pub struct PagestreamRequest {
pub reqid: RequestId,
pub request_lsn: Lsn,
pub not_modified_since: Lsn,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct PagestreamExistsRequest {
pub hdr: PagestreamRequest,
pub rel: RelTag,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct PagestreamNblocksRequest {
pub hdr: PagestreamRequest,
pub rel: RelTag,
}
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
pub struct PagestreamGetPageRequest {
pub hdr: PagestreamRequest,
pub rel: RelTag,
pub blkno: u32,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct PagestreamDbSizeRequest {
pub hdr: PagestreamRequest,
pub dbnode: u32,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct PagestreamGetSlruSegmentRequest {
pub hdr: PagestreamRequest,
pub kind: u8,
pub segno: u32,
}
#[derive(Debug)]
pub struct PagestreamExistsResponse {
pub req: PagestreamExistsRequest,
pub exists: bool,
}
#[derive(Debug)]
pub struct PagestreamNblocksResponse {
pub req: PagestreamNblocksRequest,
pub n_blocks: u32,
}
#[derive(Debug)]
pub struct PagestreamGetPageResponse {
pub req: PagestreamGetPageRequest,
pub page: Bytes,
}
#[derive(Debug)]
pub struct PagestreamGetSlruSegmentResponse {
pub req: PagestreamGetSlruSegmentRequest,
pub segment: Bytes,
}
#[derive(Debug)]
pub struct PagestreamErrorResponse {
pub req: PagestreamRequest,
pub message: String,
}
#[derive(Debug)]
pub struct PagestreamDbSizeResponse {
pub req: PagestreamDbSizeRequest,
pub db_size: i64,
}
#[cfg(feature = "testing")]
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct PagestreamTestRequest {
pub hdr: PagestreamRequest,
pub batch_key: u64,
pub message: String,
}
#[cfg(feature = "testing")]
#[derive(Debug)]
pub struct PagestreamTestResponse {
pub req: PagestreamTestRequest,
}
// This is a cut-down version of TenantHistorySize from the pageserver crate, omitting fields
// that require pageserver-internal types. It is sufficient to get the total size.
#[derive(Serialize, Deserialize, Debug)]
@@ -1913,6 +2131,506 @@ pub struct TenantHistorySize {
pub size: Option<u64>,
}
impl PagestreamFeMessage {
/// Serialize a compute -> pageserver message. This is currently only used in testing
/// tools. Always uses protocol version 3.
pub fn serialize(&self) -> Bytes {
let mut bytes = BytesMut::new();
match self {
Self::Exists(req) => {
bytes.put_u8(PagestreamFeMessageTag::Exists as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
}
Self::Nblocks(req) => {
bytes.put_u8(PagestreamFeMessageTag::Nblocks as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
}
Self::GetPage(req) => {
bytes.put_u8(PagestreamFeMessageTag::GetPage as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
bytes.put_u32(req.blkno);
}
Self::DbSize(req) => {
bytes.put_u8(PagestreamFeMessageTag::DbSize as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u32(req.dbnode);
}
Self::GetSlruSegment(req) => {
bytes.put_u8(PagestreamFeMessageTag::GetSlruSegment as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u8(req.kind);
bytes.put_u32(req.segno);
}
#[cfg(feature = "testing")]
Self::Test(req) => {
bytes.put_u8(PagestreamFeMessageTag::Test as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u64(req.batch_key);
let message = req.message.as_bytes();
bytes.put_u64(message.len() as u64);
bytes.put_slice(message);
}
}
bytes.into()
}
pub fn parse<R: std::io::Read>(
body: &mut R,
protocol_version: PagestreamProtocolVersion,
) -> anyhow::Result<PagestreamFeMessage> {
// these correspond to the NeonMessageTag enum in pagestore_client.h
//
// TODO: consider using protobuf or serde bincode for less error prone
// serialization.
let msg_tag = body.read_u8()?;
let (reqid, request_lsn, not_modified_since) = match protocol_version {
PagestreamProtocolVersion::V2 => (
0,
Lsn::from(body.read_u64::<BigEndian>()?),
Lsn::from(body.read_u64::<BigEndian>()?),
),
PagestreamProtocolVersion::V3 => (
body.read_u64::<BigEndian>()?,
Lsn::from(body.read_u64::<BigEndian>()?),
Lsn::from(body.read_u64::<BigEndian>()?),
),
};
match PagestreamFeMessageTag::try_from(msg_tag)
.map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))?
{
PagestreamFeMessageTag::Exists => {
Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
}))
}
PagestreamFeMessageTag::Nblocks => {
Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
}))
}
PagestreamFeMessageTag::GetPage => {
Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
blkno: body.read_u32::<BigEndian>()?,
}))
}
PagestreamFeMessageTag::DbSize => {
Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
dbnode: body.read_u32::<BigEndian>()?,
}))
}
PagestreamFeMessageTag::GetSlruSegment => Ok(PagestreamFeMessage::GetSlruSegment(
PagestreamGetSlruSegmentRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
kind: body.read_u8()?,
segno: body.read_u32::<BigEndian>()?,
},
)),
#[cfg(feature = "testing")]
PagestreamFeMessageTag::Test => Ok(PagestreamFeMessage::Test(PagestreamTestRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
batch_key: body.read_u64::<BigEndian>()?,
message: {
let len = body.read_u64::<BigEndian>()?;
let mut buf = vec![0; len as usize];
body.read_exact(&mut buf)?;
String::from_utf8(buf)?
},
})),
}
}
}
impl PagestreamBeMessage {
pub fn serialize(&self, protocol_version: PagestreamProtocolVersion) -> Bytes {
let mut bytes = BytesMut::new();
use PagestreamBeMessageTag as Tag;
match protocol_version {
PagestreamProtocolVersion::V2 => {
match self {
Self::Exists(resp) => {
bytes.put_u8(Tag::Exists as u8);
bytes.put_u8(resp.exists as u8);
}
Self::Nblocks(resp) => {
bytes.put_u8(Tag::Nblocks as u8);
bytes.put_u32(resp.n_blocks);
}
Self::GetPage(resp) => {
bytes.put_u8(Tag::GetPage as u8);
bytes.put(&resp.page[..])
}
Self::Error(resp) => {
bytes.put_u8(Tag::Error as u8);
bytes.put(resp.message.as_bytes());
bytes.put_u8(0); // null terminator
}
Self::DbSize(resp) => {
bytes.put_u8(Tag::DbSize as u8);
bytes.put_i64(resp.db_size);
}
Self::GetSlruSegment(resp) => {
bytes.put_u8(Tag::GetSlruSegment as u8);
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
bytes.put(&resp.segment[..]);
}
#[cfg(feature = "testing")]
Self::Test(resp) => {
bytes.put_u8(Tag::Test as u8);
bytes.put_u64(resp.req.batch_key);
let message = resp.req.message.as_bytes();
bytes.put_u64(message.len() as u64);
bytes.put_slice(message);
}
}
}
PagestreamProtocolVersion::V3 => {
match self {
Self::Exists(resp) => {
bytes.put_u8(Tag::Exists as u8);
bytes.put_u64(resp.req.hdr.reqid);
bytes.put_u64(resp.req.hdr.request_lsn.0);
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u32(resp.req.rel.spcnode);
bytes.put_u32(resp.req.rel.dbnode);
bytes.put_u32(resp.req.rel.relnode);
bytes.put_u8(resp.req.rel.forknum);
bytes.put_u8(resp.exists as u8);
}
Self::Nblocks(resp) => {
bytes.put_u8(Tag::Nblocks as u8);
bytes.put_u64(resp.req.hdr.reqid);
bytes.put_u64(resp.req.hdr.request_lsn.0);
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u32(resp.req.rel.spcnode);
bytes.put_u32(resp.req.rel.dbnode);
bytes.put_u32(resp.req.rel.relnode);
bytes.put_u8(resp.req.rel.forknum);
bytes.put_u32(resp.n_blocks);
}
Self::GetPage(resp) => {
bytes.put_u8(Tag::GetPage as u8);
bytes.put_u64(resp.req.hdr.reqid);
bytes.put_u64(resp.req.hdr.request_lsn.0);
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u32(resp.req.rel.spcnode);
bytes.put_u32(resp.req.rel.dbnode);
bytes.put_u32(resp.req.rel.relnode);
bytes.put_u8(resp.req.rel.forknum);
bytes.put_u32(resp.req.blkno);
bytes.put(&resp.page[..])
}
Self::Error(resp) => {
bytes.put_u8(Tag::Error as u8);
bytes.put_u64(resp.req.reqid);
bytes.put_u64(resp.req.request_lsn.0);
bytes.put_u64(resp.req.not_modified_since.0);
bytes.put(resp.message.as_bytes());
bytes.put_u8(0); // null terminator
}
Self::DbSize(resp) => {
bytes.put_u8(Tag::DbSize as u8);
bytes.put_u64(resp.req.hdr.reqid);
bytes.put_u64(resp.req.hdr.request_lsn.0);
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u32(resp.req.dbnode);
bytes.put_i64(resp.db_size);
}
Self::GetSlruSegment(resp) => {
bytes.put_u8(Tag::GetSlruSegment as u8);
bytes.put_u64(resp.req.hdr.reqid);
bytes.put_u64(resp.req.hdr.request_lsn.0);
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u8(resp.req.kind);
bytes.put_u32(resp.req.segno);
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
bytes.put(&resp.segment[..]);
}
#[cfg(feature = "testing")]
Self::Test(resp) => {
bytes.put_u8(Tag::Test as u8);
bytes.put_u64(resp.req.hdr.reqid);
bytes.put_u64(resp.req.hdr.request_lsn.0);
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u64(resp.req.batch_key);
let message = resp.req.message.as_bytes();
bytes.put_u64(message.len() as u64);
bytes.put_slice(message);
}
}
}
}
bytes.into()
}
pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
let mut buf = buf.reader();
let msg_tag = buf.read_u8()?;
use PagestreamBeMessageTag as Tag;
let ok =
match Tag::try_from(msg_tag).map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))? {
Tag::Exists => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let rel = RelTag {
spcnode: buf.read_u32::<BigEndian>()?,
dbnode: buf.read_u32::<BigEndian>()?,
relnode: buf.read_u32::<BigEndian>()?,
forknum: buf.read_u8()?,
};
let exists = buf.read_u8()? != 0;
Self::Exists(PagestreamExistsResponse {
req: PagestreamExistsRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel,
},
exists,
})
}
Tag::Nblocks => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let rel = RelTag {
spcnode: buf.read_u32::<BigEndian>()?,
dbnode: buf.read_u32::<BigEndian>()?,
relnode: buf.read_u32::<BigEndian>()?,
forknum: buf.read_u8()?,
};
let n_blocks = buf.read_u32::<BigEndian>()?;
Self::Nblocks(PagestreamNblocksResponse {
req: PagestreamNblocksRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel,
},
n_blocks,
})
}
Tag::GetPage => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let rel = RelTag {
spcnode: buf.read_u32::<BigEndian>()?,
dbnode: buf.read_u32::<BigEndian>()?,
relnode: buf.read_u32::<BigEndian>()?,
forknum: buf.read_u8()?,
};
let blkno = buf.read_u32::<BigEndian>()?;
let mut page = vec![0; 8192]; // TODO: use MaybeUninit
buf.read_exact(&mut page)?;
Self::GetPage(PagestreamGetPageResponse {
req: PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel,
blkno,
},
page: page.into(),
})
}
Tag::Error => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let mut msg = Vec::new();
buf.read_until(0, &mut msg)?;
let cstring = std::ffi::CString::from_vec_with_nul(msg)?;
let rust_str = cstring.to_str()?;
Self::Error(PagestreamErrorResponse {
req: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
message: rust_str.to_owned(),
})
}
Tag::DbSize => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let dbnode = buf.read_u32::<BigEndian>()?;
let db_size = buf.read_i64::<BigEndian>()?;
Self::DbSize(PagestreamDbSizeResponse {
req: PagestreamDbSizeRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
dbnode,
},
db_size,
})
}
Tag::GetSlruSegment => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let kind = buf.read_u8()?;
let segno = buf.read_u32::<BigEndian>()?;
let n_blocks = buf.read_u32::<BigEndian>()?;
let mut segment = vec![0; n_blocks as usize * BLCKSZ as usize];
buf.read_exact(&mut segment)?;
Self::GetSlruSegment(PagestreamGetSlruSegmentResponse {
req: PagestreamGetSlruSegmentRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
kind,
segno,
},
segment: segment.into(),
})
}
#[cfg(feature = "testing")]
Tag::Test => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let batch_key = buf.read_u64::<BigEndian>()?;
let len = buf.read_u64::<BigEndian>()?;
let mut msg = vec![0; len as usize];
buf.read_exact(&mut msg)?;
let message = String::from_utf8(msg)?;
Self::Test(PagestreamTestResponse {
req: PagestreamTestRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
batch_key,
message,
},
})
}
};
let remaining = buf.into_inner();
if !remaining.is_empty() {
anyhow::bail!(
"remaining bytes in msg with tag={msg_tag}: {}",
remaining.len()
);
}
Ok(ok)
}
pub fn kind(&self) -> &'static str {
match self {
Self::Exists(_) => "Exists",
Self::Nblocks(_) => "Nblocks",
Self::GetPage(_) => "GetPage",
Self::Error(_) => "Error",
Self::DbSize(_) => "DbSize",
Self::GetSlruSegment(_) => "GetSlruSegment",
#[cfg(feature = "testing")]
Self::Test(_) => "Test",
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct PageTraceEvent {
pub key: CompactKey,
@@ -1938,6 +2656,68 @@ mod tests {
use super::*;
#[test]
fn test_pagestream() {
// Test serialization/deserialization of PagestreamFeMessage
let messages = vec![
PagestreamFeMessage::Exists(PagestreamExistsRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: Lsn(4),
not_modified_since: Lsn(3),
},
rel: RelTag {
forknum: 1,
spcnode: 2,
dbnode: 3,
relnode: 4,
},
}),
PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: Lsn(4),
not_modified_since: Lsn(4),
},
rel: RelTag {
forknum: 1,
spcnode: 2,
dbnode: 3,
relnode: 4,
},
}),
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: Lsn(4),
not_modified_since: Lsn(3),
},
rel: RelTag {
forknum: 1,
spcnode: 2,
dbnode: 3,
relnode: 4,
},
blkno: 7,
}),
PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: Lsn(4),
not_modified_since: Lsn(3),
},
dbnode: 7,
}),
];
for msg in messages {
let bytes = msg.serialize();
let reconstructed =
PagestreamFeMessage::parse(&mut bytes.reader(), PagestreamProtocolVersion::V3)
.unwrap();
assert!(msg == reconstructed);
}
}
#[test]
fn test_tenantinfo_serde() {
// Test serialization/deserialization of TenantInfo

View File

@@ -1,792 +0,0 @@
//! Rust definitions of the libpq-based pagestream API
//!
//! See also the C implementation of the same API in pgxn/neon/pagestore_client.h
use std::io::{BufRead, Read};
use crate::reltag::RelTag;
use byteorder::{BigEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use postgres_ffi::BLCKSZ;
use utils::lsn::Lsn;
// Wrapped in libpq CopyData
#[derive(PartialEq, Eq, Debug)]
pub enum PagestreamFeMessage {
Exists(PagestreamExistsRequest),
Nblocks(PagestreamNblocksRequest),
GetPage(PagestreamGetPageRequest),
DbSize(PagestreamDbSizeRequest),
GetSlruSegment(PagestreamGetSlruSegmentRequest),
#[cfg(feature = "testing")]
Test(PagestreamTestRequest),
}
// Wrapped in libpq CopyData
#[derive(Debug, strum_macros::EnumProperty)]
pub enum PagestreamBeMessage {
Exists(PagestreamExistsResponse),
Nblocks(PagestreamNblocksResponse),
GetPage(PagestreamGetPageResponse),
Error(PagestreamErrorResponse),
DbSize(PagestreamDbSizeResponse),
GetSlruSegment(PagestreamGetSlruSegmentResponse),
#[cfg(feature = "testing")]
Test(PagestreamTestResponse),
}
// Keep in sync with `pagestore_client.h`
#[repr(u8)]
enum PagestreamFeMessageTag {
Exists = 0,
Nblocks = 1,
GetPage = 2,
DbSize = 3,
GetSlruSegment = 4,
/* future tags above this line */
/// For testing purposes, not available in production.
#[cfg(feature = "testing")]
Test = 99,
}
// Keep in sync with `pagestore_client.h`
#[repr(u8)]
enum PagestreamBeMessageTag {
Exists = 100,
Nblocks = 101,
GetPage = 102,
Error = 103,
DbSize = 104,
GetSlruSegment = 105,
/* future tags above this line */
/// For testing purposes, not available in production.
#[cfg(feature = "testing")]
Test = 199,
}
impl TryFrom<u8> for PagestreamFeMessageTag {
type Error = u8;
fn try_from(value: u8) -> Result<Self, u8> {
match value {
0 => Ok(PagestreamFeMessageTag::Exists),
1 => Ok(PagestreamFeMessageTag::Nblocks),
2 => Ok(PagestreamFeMessageTag::GetPage),
3 => Ok(PagestreamFeMessageTag::DbSize),
4 => Ok(PagestreamFeMessageTag::GetSlruSegment),
#[cfg(feature = "testing")]
99 => Ok(PagestreamFeMessageTag::Test),
_ => Err(value),
}
}
}
impl TryFrom<u8> for PagestreamBeMessageTag {
type Error = u8;
fn try_from(value: u8) -> Result<Self, u8> {
match value {
100 => Ok(PagestreamBeMessageTag::Exists),
101 => Ok(PagestreamBeMessageTag::Nblocks),
102 => Ok(PagestreamBeMessageTag::GetPage),
103 => Ok(PagestreamBeMessageTag::Error),
104 => Ok(PagestreamBeMessageTag::DbSize),
105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
#[cfg(feature = "testing")]
199 => Ok(PagestreamBeMessageTag::Test),
_ => Err(value),
}
}
}
// A GetPage request contains two LSN values:
//
// request_lsn: Get the page version at this point in time. Lsn::Max is a special value that means
// "get the latest version present". It's used by the primary server, which knows that no one else
// is writing WAL. 'not_modified_since' must be set to a proper value even if request_lsn is
// Lsn::Max. Standby servers use the current replay LSN as the request LSN.
//
// not_modified_since: Hint to the pageserver that the client knows that the page has not been
// modified between 'not_modified_since' and the request LSN. It's always correct to set
// 'not_modified_since equal' to 'request_lsn' (unless Lsn::Max is used as the 'request_lsn'), but
// passing an earlier LSN can speed up the request, by allowing the pageserver to process the
// request without waiting for 'request_lsn' to arrive.
//
// The now-defunct V1 interface contained only one LSN, and a boolean 'latest' flag. The V1 interface was
// sufficient for the primary; the 'lsn' was equivalent to the 'not_modified_since' value, and
// 'latest' was set to true. The V2 interface was added because there was no correct way for a
// standby to request a page at a particular non-latest LSN, and also include the
// 'not_modified_since' hint. That led to an awkward choice of either using an old LSN in the
// request, if the standby knows that the page hasn't been modified since, and risk getting an error
// if that LSN has fallen behind the GC horizon, or requesting the current replay LSN, which could
// require the pageserver unnecessarily to wait for the WAL to arrive up to that point. The new V2
// interface allows sending both LSNs, and let the pageserver do the right thing. There was no
// difference in the responses between V1 and V2.
//
// V3 version of protocol adds request ID to all requests. This request ID is also included in response
// as well as other fields from requests, which allows to verify that we receive response for our request.
// We copy fields from request to response to make checking more reliable: request ID is formed from process ID
// and local counter, so in principle there can be duplicated requests IDs if process PID is reused.
//
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum PagestreamProtocolVersion {
V2,
V3,
}
pub type RequestId = u64;
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
pub struct PagestreamRequest {
pub reqid: RequestId,
pub request_lsn: Lsn,
pub not_modified_since: Lsn,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct PagestreamExistsRequest {
pub hdr: PagestreamRequest,
pub rel: RelTag,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct PagestreamNblocksRequest {
pub hdr: PagestreamRequest,
pub rel: RelTag,
}
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
pub struct PagestreamGetPageRequest {
pub hdr: PagestreamRequest,
pub rel: RelTag,
pub blkno: u32,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct PagestreamDbSizeRequest {
pub hdr: PagestreamRequest,
pub dbnode: u32,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct PagestreamGetSlruSegmentRequest {
pub hdr: PagestreamRequest,
pub kind: u8,
pub segno: u32,
}
#[derive(Debug)]
pub struct PagestreamExistsResponse {
pub req: PagestreamExistsRequest,
pub exists: bool,
}
#[derive(Debug)]
pub struct PagestreamNblocksResponse {
pub req: PagestreamNblocksRequest,
pub n_blocks: u32,
}
#[derive(Debug)]
pub struct PagestreamGetPageResponse {
pub req: PagestreamGetPageRequest,
pub page: Bytes,
}
#[derive(Debug)]
pub struct PagestreamGetSlruSegmentResponse {
pub req: PagestreamGetSlruSegmentRequest,
pub segment: Bytes,
}
#[derive(Debug)]
pub struct PagestreamErrorResponse {
pub req: PagestreamRequest,
pub message: String,
}
#[derive(Debug)]
pub struct PagestreamDbSizeResponse {
pub req: PagestreamDbSizeRequest,
pub db_size: i64,
}
#[cfg(feature = "testing")]
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct PagestreamTestRequest {
pub hdr: PagestreamRequest,
pub batch_key: u64,
pub message: String,
}
#[cfg(feature = "testing")]
#[derive(Debug)]
pub struct PagestreamTestResponse {
pub req: PagestreamTestRequest,
}
impl PagestreamFeMessage {
/// Serialize a compute -> pageserver message. This is currently only used in testing
/// tools. Always uses protocol version 3.
pub fn serialize(&self) -> Bytes {
let mut bytes = BytesMut::new();
match self {
Self::Exists(req) => {
bytes.put_u8(PagestreamFeMessageTag::Exists as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
}
Self::Nblocks(req) => {
bytes.put_u8(PagestreamFeMessageTag::Nblocks as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
}
Self::GetPage(req) => {
bytes.put_u8(PagestreamFeMessageTag::GetPage as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
bytes.put_u32(req.blkno);
}
Self::DbSize(req) => {
bytes.put_u8(PagestreamFeMessageTag::DbSize as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u32(req.dbnode);
}
Self::GetSlruSegment(req) => {
bytes.put_u8(PagestreamFeMessageTag::GetSlruSegment as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u8(req.kind);
bytes.put_u32(req.segno);
}
#[cfg(feature = "testing")]
Self::Test(req) => {
bytes.put_u8(PagestreamFeMessageTag::Test as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
bytes.put_u64(req.batch_key);
let message = req.message.as_bytes();
bytes.put_u64(message.len() as u64);
bytes.put_slice(message);
}
}
bytes.into()
}
pub fn parse<R: std::io::Read>(
body: &mut R,
protocol_version: PagestreamProtocolVersion,
) -> anyhow::Result<PagestreamFeMessage> {
// these correspond to the NeonMessageTag enum in pagestore_client.h
//
// TODO: consider using protobuf or serde bincode for less error prone
// serialization.
let msg_tag = body.read_u8()?;
let (reqid, request_lsn, not_modified_since) = match protocol_version {
PagestreamProtocolVersion::V2 => (
0,
Lsn::from(body.read_u64::<BigEndian>()?),
Lsn::from(body.read_u64::<BigEndian>()?),
),
PagestreamProtocolVersion::V3 => (
body.read_u64::<BigEndian>()?,
Lsn::from(body.read_u64::<BigEndian>()?),
Lsn::from(body.read_u64::<BigEndian>()?),
),
};
match PagestreamFeMessageTag::try_from(msg_tag)
.map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))?
{
PagestreamFeMessageTag::Exists => {
Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
}))
}
PagestreamFeMessageTag::Nblocks => {
Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
}))
}
PagestreamFeMessageTag::GetPage => {
Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
blkno: body.read_u32::<BigEndian>()?,
}))
}
PagestreamFeMessageTag::DbSize => {
Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
dbnode: body.read_u32::<BigEndian>()?,
}))
}
PagestreamFeMessageTag::GetSlruSegment => Ok(PagestreamFeMessage::GetSlruSegment(
PagestreamGetSlruSegmentRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
kind: body.read_u8()?,
segno: body.read_u32::<BigEndian>()?,
},
)),
#[cfg(feature = "testing")]
PagestreamFeMessageTag::Test => Ok(PagestreamFeMessage::Test(PagestreamTestRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
batch_key: body.read_u64::<BigEndian>()?,
message: {
let len = body.read_u64::<BigEndian>()?;
let mut buf = vec![0; len as usize];
body.read_exact(&mut buf)?;
String::from_utf8(buf)?
},
})),
}
}
}
impl PagestreamBeMessage {
pub fn serialize(&self, protocol_version: PagestreamProtocolVersion) -> Bytes {
let mut bytes = BytesMut::new();
use PagestreamBeMessageTag as Tag;
match protocol_version {
PagestreamProtocolVersion::V2 => {
match self {
Self::Exists(resp) => {
bytes.put_u8(Tag::Exists as u8);
bytes.put_u8(resp.exists as u8);
}
Self::Nblocks(resp) => {
bytes.put_u8(Tag::Nblocks as u8);
bytes.put_u32(resp.n_blocks);
}
Self::GetPage(resp) => {
bytes.put_u8(Tag::GetPage as u8);
bytes.put(&resp.page[..])
}
Self::Error(resp) => {
bytes.put_u8(Tag::Error as u8);
bytes.put(resp.message.as_bytes());
bytes.put_u8(0); // null terminator
}
Self::DbSize(resp) => {
bytes.put_u8(Tag::DbSize as u8);
bytes.put_i64(resp.db_size);
}
Self::GetSlruSegment(resp) => {
bytes.put_u8(Tag::GetSlruSegment as u8);
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
bytes.put(&resp.segment[..]);
}
#[cfg(feature = "testing")]
Self::Test(resp) => {
bytes.put_u8(Tag::Test as u8);
bytes.put_u64(resp.req.batch_key);
let message = resp.req.message.as_bytes();
bytes.put_u64(message.len() as u64);
bytes.put_slice(message);
}
}
}
PagestreamProtocolVersion::V3 => {
match self {
Self::Exists(resp) => {
bytes.put_u8(Tag::Exists as u8);
bytes.put_u64(resp.req.hdr.reqid);
bytes.put_u64(resp.req.hdr.request_lsn.0);
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u32(resp.req.rel.spcnode);
bytes.put_u32(resp.req.rel.dbnode);
bytes.put_u32(resp.req.rel.relnode);
bytes.put_u8(resp.req.rel.forknum);
bytes.put_u8(resp.exists as u8);
}
Self::Nblocks(resp) => {
bytes.put_u8(Tag::Nblocks as u8);
bytes.put_u64(resp.req.hdr.reqid);
bytes.put_u64(resp.req.hdr.request_lsn.0);
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u32(resp.req.rel.spcnode);
bytes.put_u32(resp.req.rel.dbnode);
bytes.put_u32(resp.req.rel.relnode);
bytes.put_u8(resp.req.rel.forknum);
bytes.put_u32(resp.n_blocks);
}
Self::GetPage(resp) => {
bytes.put_u8(Tag::GetPage as u8);
bytes.put_u64(resp.req.hdr.reqid);
bytes.put_u64(resp.req.hdr.request_lsn.0);
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u32(resp.req.rel.spcnode);
bytes.put_u32(resp.req.rel.dbnode);
bytes.put_u32(resp.req.rel.relnode);
bytes.put_u8(resp.req.rel.forknum);
bytes.put_u32(resp.req.blkno);
bytes.put(&resp.page[..])
}
Self::Error(resp) => {
bytes.put_u8(Tag::Error as u8);
bytes.put_u64(resp.req.reqid);
bytes.put_u64(resp.req.request_lsn.0);
bytes.put_u64(resp.req.not_modified_since.0);
bytes.put(resp.message.as_bytes());
bytes.put_u8(0); // null terminator
}
Self::DbSize(resp) => {
bytes.put_u8(Tag::DbSize as u8);
bytes.put_u64(resp.req.hdr.reqid);
bytes.put_u64(resp.req.hdr.request_lsn.0);
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u32(resp.req.dbnode);
bytes.put_i64(resp.db_size);
}
Self::GetSlruSegment(resp) => {
bytes.put_u8(Tag::GetSlruSegment as u8);
bytes.put_u64(resp.req.hdr.reqid);
bytes.put_u64(resp.req.hdr.request_lsn.0);
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u8(resp.req.kind);
bytes.put_u32(resp.req.segno);
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
bytes.put(&resp.segment[..]);
}
#[cfg(feature = "testing")]
Self::Test(resp) => {
bytes.put_u8(Tag::Test as u8);
bytes.put_u64(resp.req.hdr.reqid);
bytes.put_u64(resp.req.hdr.request_lsn.0);
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u64(resp.req.batch_key);
let message = resp.req.message.as_bytes();
bytes.put_u64(message.len() as u64);
bytes.put_slice(message);
}
}
}
}
bytes.into()
}
pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
let mut buf = buf.reader();
let msg_tag = buf.read_u8()?;
use PagestreamBeMessageTag as Tag;
let ok =
match Tag::try_from(msg_tag).map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))? {
Tag::Exists => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let rel = RelTag {
spcnode: buf.read_u32::<BigEndian>()?,
dbnode: buf.read_u32::<BigEndian>()?,
relnode: buf.read_u32::<BigEndian>()?,
forknum: buf.read_u8()?,
};
let exists = buf.read_u8()? != 0;
Self::Exists(PagestreamExistsResponse {
req: PagestreamExistsRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel,
},
exists,
})
}
Tag::Nblocks => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let rel = RelTag {
spcnode: buf.read_u32::<BigEndian>()?,
dbnode: buf.read_u32::<BigEndian>()?,
relnode: buf.read_u32::<BigEndian>()?,
forknum: buf.read_u8()?,
};
let n_blocks = buf.read_u32::<BigEndian>()?;
Self::Nblocks(PagestreamNblocksResponse {
req: PagestreamNblocksRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel,
},
n_blocks,
})
}
Tag::GetPage => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let rel = RelTag {
spcnode: buf.read_u32::<BigEndian>()?,
dbnode: buf.read_u32::<BigEndian>()?,
relnode: buf.read_u32::<BigEndian>()?,
forknum: buf.read_u8()?,
};
let blkno = buf.read_u32::<BigEndian>()?;
let mut page = vec![0; 8192]; // TODO: use MaybeUninit
buf.read_exact(&mut page)?;
Self::GetPage(PagestreamGetPageResponse {
req: PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel,
blkno,
},
page: page.into(),
})
}
Tag::Error => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let mut msg = Vec::new();
buf.read_until(0, &mut msg)?;
let cstring = std::ffi::CString::from_vec_with_nul(msg)?;
let rust_str = cstring.to_str()?;
Self::Error(PagestreamErrorResponse {
req: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
message: rust_str.to_owned(),
})
}
Tag::DbSize => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let dbnode = buf.read_u32::<BigEndian>()?;
let db_size = buf.read_i64::<BigEndian>()?;
Self::DbSize(PagestreamDbSizeResponse {
req: PagestreamDbSizeRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
dbnode,
},
db_size,
})
}
Tag::GetSlruSegment => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let kind = buf.read_u8()?;
let segno = buf.read_u32::<BigEndian>()?;
let n_blocks = buf.read_u32::<BigEndian>()?;
let mut segment = vec![0; n_blocks as usize * BLCKSZ as usize];
buf.read_exact(&mut segment)?;
Self::GetSlruSegment(PagestreamGetSlruSegmentResponse {
req: PagestreamGetSlruSegmentRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
kind,
segno,
},
segment: segment.into(),
})
}
#[cfg(feature = "testing")]
Tag::Test => {
let reqid = buf.read_u64::<BigEndian>()?;
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
let batch_key = buf.read_u64::<BigEndian>()?;
let len = buf.read_u64::<BigEndian>()?;
let mut msg = vec![0; len as usize];
buf.read_exact(&mut msg)?;
let message = String::from_utf8(msg)?;
Self::Test(PagestreamTestResponse {
req: PagestreamTestRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
batch_key,
message,
},
})
}
};
let remaining = buf.into_inner();
if !remaining.is_empty() {
anyhow::bail!(
"remaining bytes in msg with tag={msg_tag}: {}",
remaining.len()
);
}
Ok(ok)
}
pub fn kind(&self) -> &'static str {
match self {
Self::Exists(_) => "Exists",
Self::Nblocks(_) => "Nblocks",
Self::GetPage(_) => "GetPage",
Self::Error(_) => "Error",
Self::DbSize(_) => "DbSize",
Self::GetSlruSegment(_) => "GetSlruSegment",
#[cfg(feature = "testing")]
Self::Test(_) => "Test",
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pagestream() {
// Test serialization/deserialization of PagestreamFeMessage
let messages = vec![
PagestreamFeMessage::Exists(PagestreamExistsRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: Lsn(4),
not_modified_since: Lsn(3),
},
rel: RelTag {
forknum: 1,
spcnode: 2,
dbnode: 3,
relnode: 4,
},
}),
PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: Lsn(4),
not_modified_since: Lsn(4),
},
rel: RelTag {
forknum: 1,
spcnode: 2,
dbnode: 3,
relnode: 4,
},
}),
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: Lsn(4),
not_modified_since: Lsn(3),
},
rel: RelTag {
forknum: 1,
spcnode: 2,
dbnode: 3,
relnode: 4,
},
blkno: 7,
}),
PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: Lsn(4),
not_modified_since: Lsn(3),
},
dbnode: 7,
}),
];
for msg in messages {
let bytes = msg.serialize();
let reconstructed =
PagestreamFeMessage::parse(&mut bytes.reader(), PagestreamProtocolVersion::V3)
.unwrap();
assert!(msg == reconstructed);
}
}
}

View File

@@ -311,7 +311,7 @@ extern "C" fn get_redo_start_lsn(wp: *mut WalProposer) -> XLogRecPtr {
}
}
unsafe extern "C-unwind" fn finish_sync_safekeepers(wp: *mut WalProposer, lsn: XLogRecPtr) -> ! {
extern "C-unwind" fn finish_sync_safekeepers(wp: *mut WalProposer, lsn: XLogRecPtr) {
unsafe {
let callback_data = (*(*wp).config).callback_data;
let api = callback_data as *mut Box<dyn ApiImpl>;

View File

@@ -144,7 +144,7 @@ pub trait ApiImpl {
todo!()
}
fn finish_sync_safekeepers(&self, _lsn: u64) -> ! {
fn finish_sync_safekeepers(&self, _lsn: u64) {
todo!()
}
@@ -469,7 +469,7 @@ mod tests {
true
}
fn finish_sync_safekeepers(&self, lsn: u64) -> ! {
fn finish_sync_safekeepers(&self, lsn: u64) {
self.sync_channel.send(lsn).unwrap();
panic!("sync safekeepers finished at lsn={}", lsn);
}

View File

@@ -2,7 +2,7 @@ use std::sync::{Arc, Mutex};
use futures::stream::{SplitSink, SplitStream};
use futures::{SinkExt, StreamExt};
use pageserver_api::pagestream_api::{
use pageserver_api::models::{
PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
};
use pageserver_api::reltag::RelTag;

View File

@@ -20,7 +20,7 @@
//!
//! # local timeline dir
//! ls test_output/test_pgbench\[neon-45-684\]/repo/tenants/$TENANT/timelines/$TIMELINE | \
//! grep "__" | cargo run --release --bin pagectl draw-timeline > out.svg
//! grep "__" | cargo run --release --bin pagectl draw-timeline-dir > out.svg
//!
//! # Layer map dump from `/v1/tenant/$TENANT/timeline/$TIMELINE/layer`
//! (jq -r '.historic_layers[] | .layer_file_name' | cargo run -p pagectl draw-timeline) < layer-map.json > out.svg
@@ -81,11 +81,7 @@ fn build_coordinate_compression_map<T: Ord + Copy>(coords: Vec<T>) -> BTreeMap<T
fn parse_filename(name: &str) -> (Range<Key>, Range<Lsn>) {
let split: Vec<&str> = name.split("__").collect();
let keys: Vec<&str> = split[0].split('-').collect();
// Remove the temporary file extension, e.g., remove the `.d20a.___temp` part from the following filename:
// 000000067F000040490000404A00441B0000-000000067F000040490000404A00441B4000__000043483A34CE00.d20a.___temp
let lsns = split[1].split('.').collect::<Vec<&str>>()[0];
let mut lsns: Vec<&str> = lsns.split('-').collect();
let mut lsns: Vec<&str> = split[1].split('-').collect();
// The current format of the layer file name: 000000067F0000000400000B150100000000-000000067F0000000400000D350100000000__00000000014B7AC8-v1-00000001

View File

@@ -13,7 +13,7 @@ use pageserver::{page_cache, virtual_file};
use pageserver_api::key::Key;
use utils::id::{TenantId, TimelineId};
use crate::layer_map_analyzer::{LayerFile, parse_filename};
use crate::layer_map_analyzer::parse_filename;
#[derive(Subcommand)]
pub(crate) enum LayerCmd {
@@ -38,8 +38,6 @@ pub(crate) enum LayerCmd {
/// The id from list-layer command
id: usize,
},
/// Dump all information of a layer file locally
DumpLayerLocal { path: PathBuf },
RewriteSummary {
layer_file_path: Utf8PathBuf,
#[clap(long)]
@@ -133,7 +131,15 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
}
for (idx, layer_file) in to_print {
print_layer_file(idx, &layer_file);
println!(
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
idx,
layer_file.key_range.start,
layer_file.key_range.end,
layer_file.lsn_range.start,
layer_file.lsn_range.end,
layer_file.is_delta,
);
}
Ok(())
}
@@ -153,7 +159,16 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
let layer = layer?;
if let Ok(layer_file) = parse_filename(&layer.file_name().into_string().unwrap()) {
if *id == idx {
print_layer_file(idx, &layer_file);
// TODO(chi): dedup code
println!(
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
idx,
layer_file.key_range.start,
layer_file.key_range.end,
layer_file.lsn_range.start,
layer_file.lsn_range.end,
layer_file.is_delta,
);
if layer_file.is_delta {
read_delta_file(layer.path(), &ctx).await?;
@@ -168,18 +183,6 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
}
Ok(())
}
LayerCmd::DumpLayerLocal { path } => {
if let Ok(layer_file) = parse_filename(path.file_name().unwrap().to_str().unwrap()) {
print_layer_file(0, &layer_file);
if layer_file.is_delta {
read_delta_file(path, &ctx).await?;
} else {
read_image_file(path, &ctx).await?;
}
}
Ok(())
}
LayerCmd::RewriteSummary {
layer_file_path,
new_tenant_id,
@@ -244,15 +247,3 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
}
}
}
fn print_layer_file(idx: usize, layer_file: &LayerFile) {
println!(
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
idx,
layer_file.key_range.start,
layer_file.key_range.end,
layer_file.lsn_range.start,
layer_file.lsn_range.end,
layer_file.is_delta,
);
}

View File

@@ -5,14 +5,11 @@ edition.workspace = true
license.workspace = true
[dependencies]
anyhow.workspace = true
bytes.workspace = true
futures.workspace = true
pageserver_api.workspace = true
postgres_ffi.workspace = true
prost.workspace = true
thiserror.workspace = true
tokio.workspace = true
tonic.workspace = true
utils.workspace = true
workspace_hack.workspace = true

View File

@@ -102,14 +102,12 @@ message CheckRelExistsResponse {
bool exists = 1;
}
// Requests a base backup.
// Requests a base backup at a given LSN.
message GetBaseBackupRequest {
// The LSN to fetch the base backup at. 0 or absent means the latest LSN known to the Pageserver.
uint64 lsn = 1;
// The LSN to fetch a base backup at.
ReadLsn read_lsn = 1;
// If true, logical replication slots will not be created.
bool replica = 2;
// If true, include relation files in the base backup. Mainly for debugging and tests.
bool full = 3;
}
// Base backup response chunk, returned as an ordered stream.

View File

@@ -1,191 +0,0 @@
use std::convert::TryInto;
use bytes::Bytes;
use futures::TryStreamExt;
use futures::{Stream, StreamExt};
use tonic::metadata::AsciiMetadataValue;
use tonic::metadata::errors::InvalidMetadataValue;
use tonic::transport::Channel;
use tonic::{Request, Streaming};
use utils::id::TenantId;
use utils::id::TimelineId;
use utils::shard::ShardIndex;
use anyhow::Result;
use crate::model;
use crate::proto;
///
/// AuthInterceptor adds tenant, timeline, and auth header to the channel. These
/// headers are required at the pageserver.
///
#[derive(Clone)]
struct AuthInterceptor {
tenant_id: AsciiMetadataValue,
timeline_id: AsciiMetadataValue,
shard_id: AsciiMetadataValue,
auth_header: Option<AsciiMetadataValue>, // including "Bearer " prefix
}
impl AuthInterceptor {
fn new(
tenant_id: TenantId,
timeline_id: TimelineId,
auth_token: Option<String>,
shard_id: ShardIndex,
) -> Result<Self, InvalidMetadataValue> {
let tenant_ascii: AsciiMetadataValue = tenant_id.to_string().try_into()?;
let timeline_ascii: AsciiMetadataValue = timeline_id.to_string().try_into()?;
let shard_ascii: AsciiMetadataValue = shard_id.to_string().try_into()?;
let auth_header: Option<AsciiMetadataValue> = match auth_token {
Some(token) => Some(format!("Bearer {token}").try_into()?),
None => None,
};
Ok(Self {
tenant_id: tenant_ascii,
shard_id: shard_ascii,
timeline_id: timeline_ascii,
auth_header,
})
}
}
impl tonic::service::Interceptor for AuthInterceptor {
fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
req.metadata_mut()
.insert("neon-tenant-id", self.tenant_id.clone());
req.metadata_mut()
.insert("neon-shard-id", self.shard_id.clone());
req.metadata_mut()
.insert("neon-timeline-id", self.timeline_id.clone());
if let Some(auth_header) = &self.auth_header {
req.metadata_mut()
.insert("authorization", auth_header.clone());
}
Ok(req)
}
}
#[derive(Clone)]
pub struct Client {
client: proto::PageServiceClient<
tonic::service::interceptor::InterceptedService<Channel, AuthInterceptor>,
>,
}
impl Client {
pub async fn new<T: TryInto<tonic::transport::Endpoint> + Send + Sync + 'static>(
into_endpoint: T,
tenant_id: TenantId,
timeline_id: TimelineId,
shard_id: ShardIndex,
auth_header: Option<String>,
) -> anyhow::Result<Self> {
let endpoint: tonic::transport::Endpoint = into_endpoint
.try_into()
.map_err(|_e| anyhow::anyhow!("failed to convert endpoint"))?;
let channel = endpoint.connect().await?;
let auth = AuthInterceptor::new(tenant_id, timeline_id, auth_header, shard_id)
.map_err(|e| anyhow::anyhow!(e.to_string()))?;
let client = proto::PageServiceClient::with_interceptor(channel, auth);
Ok(Self { client })
}
/// Returns whether a relation exists.
pub async fn check_rel_exists(
&mut self,
req: model::CheckRelExistsRequest,
) -> Result<model::CheckRelExistsResponse, tonic::Status> {
let proto_req = proto::CheckRelExistsRequest::from(req);
let response = self.client.check_rel_exists(proto_req).await?;
let proto_resp = response.into_inner();
Ok(proto_resp.into())
}
/// Fetches a base backup.
pub async fn get_base_backup(
&mut self,
req: model::GetBaseBackupRequest,
) -> Result<impl Stream<Item = Result<Bytes, tonic::Status>>, tonic::Status> {
let proto_req = proto::GetBaseBackupRequest::from(req);
let response_stream: Streaming<proto::GetBaseBackupResponseChunk> =
self.client.get_base_backup(proto_req).await?.into_inner();
// TODO: Consider dechunking internally
let domain_stream = response_stream.map(|chunk_res| {
chunk_res.and_then(|proto_chunk| {
proto_chunk.try_into().map_err(|e| {
tonic::Status::internal(format!("Failed to convert response chunk: {}", e))
})
})
});
Ok(domain_stream)
}
/// Returns the total size of a database, as # of bytes.
pub async fn get_db_size(
&mut self,
req: model::GetDbSizeRequest,
) -> Result<u64, tonic::Status> {
let proto_req = proto::GetDbSizeRequest::from(req);
let response = self.client.get_db_size(proto_req).await?;
Ok(response.into_inner().into())
}
/// Fetches pages.
///
/// This is implemented as a bidirectional streaming RPC for performance.
/// Per-request errors are often returned as status_code instead of errors,
/// to avoid tearing down the entire stream via tonic::Status.
pub async fn get_pages<ReqSt>(
&mut self,
inbound: ReqSt,
) -> Result<
impl Stream<Item = Result<model::GetPageResponse, tonic::Status>> + Send + 'static,
tonic::Status,
>
where
ReqSt: Stream<Item = model::GetPageRequest> + Send + 'static,
{
let outbound_proto = inbound.map(|domain_req| domain_req.into());
let req_new = Request::new(outbound_proto);
let response_stream: Streaming<proto::GetPageResponse> =
self.client.get_pages(req_new).await?.into_inner();
let domain_stream = response_stream.map_ok(model::GetPageResponse::from);
Ok(domain_stream)
}
/// Returns the size of a relation, as # of blocks.
pub async fn get_rel_size(
&mut self,
req: model::GetRelSizeRequest,
) -> Result<model::GetRelSizeResponse, tonic::Status> {
let proto_req = proto::GetRelSizeRequest::from(req);
let response = self.client.get_rel_size(proto_req).await?;
let proto_resp = response.into_inner();
Ok(proto_resp.into())
}
/// Fetches an SLRU segment.
pub async fn get_slru_segment(
&mut self,
req: model::GetSlruSegmentRequest,
) -> Result<model::GetSlruSegmentResponse, tonic::Status> {
let proto_req = proto::GetSlruSegmentRequest::from(req);
let response = self.client.get_slru_segment(proto_req).await?;
Ok(response.into_inner().try_into()?)
}
}

View File

@@ -18,8 +18,6 @@ pub mod proto {
pub use page_service_server::{PageService, PageServiceServer};
}
mod client;
pub use client::Client;
mod model;
pub use model::*;

View File

@@ -26,7 +26,7 @@ use utils::lsn::Lsn;
use crate::proto;
/// A protocol error. Typically returned via try_from() or try_into().
#[derive(thiserror::Error, Clone, Debug)]
#[derive(thiserror::Error, Debug)]
pub enum ProtocolError {
#[error("field '{0}' has invalid value '{1}'")]
Invalid(&'static str, String),
@@ -182,33 +182,34 @@ impl From<CheckRelExistsResponse> for proto::CheckRelExistsResponse {
}
}
/// Requests a base backup.
/// Requests a base backup at a given LSN.
#[derive(Clone, Copy, Debug)]
pub struct GetBaseBackupRequest {
/// The LSN to fetch a base backup at. If None, uses the latest LSN known to the Pageserver.
pub lsn: Option<Lsn>,
/// The LSN to fetch a base backup at.
pub read_lsn: ReadLsn,
/// If true, logical replication slots will not be created.
pub replica: bool,
/// If true, include relation files in the base backup. Mainly for debugging and tests.
pub full: bool,
}
impl From<proto::GetBaseBackupRequest> for GetBaseBackupRequest {
fn from(pb: proto::GetBaseBackupRequest) -> Self {
Self {
lsn: (pb.lsn != 0).then_some(Lsn(pb.lsn)),
impl TryFrom<proto::GetBaseBackupRequest> for GetBaseBackupRequest {
type Error = ProtocolError;
fn try_from(pb: proto::GetBaseBackupRequest) -> Result<Self, Self::Error> {
Ok(Self {
read_lsn: pb
.read_lsn
.ok_or(ProtocolError::Missing("read_lsn"))?
.try_into()?,
replica: pb.replica,
full: pb.full,
}
})
}
}
impl From<GetBaseBackupRequest> for proto::GetBaseBackupRequest {
fn from(request: GetBaseBackupRequest) -> Self {
Self {
lsn: request.lsn.unwrap_or_default().0,
read_lsn: Some(request.read_lsn.into()),
replica: request.replica,
full: request.full,
}
}
}
@@ -421,39 +422,6 @@ impl From<GetPageResponse> for proto::GetPageResponse {
}
}
impl GetPageResponse {
/// Attempts to represent a tonic::Status as a GetPageResponse if appropriate. Returning a
/// tonic::Status will terminate the GetPage stream, so per-request errors are emitted as a
/// GetPageResponse with a non-OK status code instead.
#[allow(clippy::result_large_err)]
pub fn try_from_status(
status: tonic::Status,
request_id: RequestID,
) -> Result<Self, tonic::Status> {
// We shouldn't see an OK status here, because we're emitting an error.
debug_assert_ne!(status.code(), tonic::Code::Ok);
if status.code() == tonic::Code::Ok {
return Err(tonic::Status::internal(format!(
"unexpected OK status: {status:?}",
)));
}
// If we can't convert the tonic::Code to a GetPageStatusCode, this is not a per-request
// error and we should return a tonic::Status to terminate the stream.
let Ok(status_code) = status.code().try_into() else {
return Err(status);
};
// Return a GetPageResponse for the status.
Ok(Self {
request_id,
status_code,
reason: Some(status.message().to_string()),
page_images: Vec::new(),
})
}
}
/// A GetPage response status code.
///
/// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream
@@ -517,42 +485,8 @@ impl From<GetPageStatusCode> for i32 {
}
}
impl TryFrom<tonic::Code> for GetPageStatusCode {
type Error = tonic::Code;
fn try_from(code: tonic::Code) -> Result<Self, Self::Error> {
use tonic::Code;
let status_code = match code {
Code::Ok => Self::Ok,
// These are per-request errors, which should be returned as GetPageResponses.
Code::AlreadyExists => Self::InvalidRequest,
Code::DataLoss => Self::InternalError,
Code::FailedPrecondition => Self::InvalidRequest,
Code::InvalidArgument => Self::InvalidRequest,
Code::Internal => Self::InternalError,
Code::NotFound => Self::NotFound,
Code::OutOfRange => Self::InvalidRequest,
Code::ResourceExhausted => Self::SlowDown,
// These should terminate the stream by returning a tonic::Status.
Code::Aborted
| Code::Cancelled
| Code::DeadlineExceeded
| Code::PermissionDenied
| Code::Unauthenticated
| Code::Unavailable
| Code::Unimplemented
| Code::Unknown => return Err(code),
};
Ok(status_code)
}
}
// Fetches the size of a relation at a given LSN, as # of blocks. Only valid on shard 0, other
// shards will error.
#[derive(Clone, Copy, Debug)]
pub struct GetRelSizeRequest {
pub read_lsn: ReadLsn,
pub rel: RelTag,
@@ -596,7 +530,6 @@ impl From<GetRelSizeResponse> for proto::GetRelSizeResponse {
}
/// Requests an SLRU segment. Only valid on shard 0, other shards will error.
#[derive(Clone, Copy, Debug)]
pub struct GetSlruSegmentRequest {
pub read_lsn: ReadLsn,
pub kind: SlruKind,

View File

@@ -12,7 +12,7 @@ use bytes::Bytes;
use camino::Utf8PathBuf;
use pageserver_api::key::Key;
use pageserver_api::keyspace::KeySpaceAccum;
use pageserver_api::pagestream_api::{PagestreamGetPageRequest, PagestreamRequest};
use pageserver_api::models::{PagestreamGetPageRequest, PagestreamRequest};
use pageserver_api::reltag::RelTag;
use pageserver_api::shard::TenantShardId;
use pageserver_page_api::proto;

View File

@@ -19,10 +19,7 @@ use utils::{
use crate::{
basebackup::send_basebackup_tarball,
context::{DownloadBehavior, RequestContext},
metrics::{
BASEBACKUP_CACHE_ENTRIES, BASEBACKUP_CACHE_PREPARE, BASEBACKUP_CACHE_READ,
BASEBACKUP_CACHE_SIZE,
},
metrics::{BASEBACKUP_CACHE_ENTRIES, BASEBACKUP_CACHE_PREPARE, BASEBACKUP_CACHE_READ},
task_mgr::TaskKind,
tenant::{
Timeline,
@@ -39,13 +36,8 @@ pub struct BasebackupPrepareRequest {
pub type BasebackupPrepareSender = UnboundedSender<BasebackupPrepareRequest>;
pub type BasebackupPrepareReceiver = UnboundedReceiver<BasebackupPrepareRequest>;
#[derive(Clone)]
struct CacheEntry {
/// LSN at which the basebackup was taken.
lsn: Lsn,
/// Size of the basebackup archive in bytes.
size_bytes: u64,
}
type BasebackupRemoveEntrySender = UnboundedSender<Utf8PathBuf>;
type BasebackupRemoveEntryReceiver = UnboundedReceiver<Utf8PathBuf>;
/// BasebackupCache stores cached basebackup archives for timelines on local disk.
///
@@ -61,12 +53,21 @@ struct CacheEntry {
/// and ~1 RPS for get requests.
pub struct BasebackupCache {
data_dir: Utf8PathBuf,
config: BasebackupCacheConfig,
tenant_manager: Arc<TenantManager>,
remove_entry_sender: BasebackupRemoveEntrySender,
entries: std::sync::Mutex<HashMap<TenantTimelineId, CacheEntry>>,
entries: std::sync::Mutex<HashMap<TenantTimelineId, Lsn>>,
cancel: CancellationToken,
read_hit_count: GenericCounter<AtomicU64>,
read_miss_count: GenericCounter<AtomicU64>,
read_err_count: GenericCounter<AtomicU64>,
prepare_ok_count: GenericCounter<AtomicU64>,
prepare_skip_count: GenericCounter<AtomicU64>,
prepare_err_count: GenericCounter<AtomicU64>,
}
impl BasebackupCache {
@@ -82,32 +83,35 @@ impl BasebackupCache {
tenant_manager: Arc<TenantManager>,
cancel: CancellationToken,
) -> Arc<Self> {
let (remove_entry_sender, remove_entry_receiver) = tokio::sync::mpsc::unbounded_channel();
let enabled = config.is_some();
let cache = Arc::new(BasebackupCache {
data_dir,
config: config.unwrap_or_default(),
tenant_manager,
remove_entry_sender,
entries: std::sync::Mutex::new(HashMap::new()),
cancel,
read_hit_count: BASEBACKUP_CACHE_READ.with_label_values(&["hit"]),
read_miss_count: BASEBACKUP_CACHE_READ.with_label_values(&["miss"]),
read_err_count: BASEBACKUP_CACHE_READ.with_label_values(&["error"]),
prepare_ok_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["ok"]),
prepare_skip_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["skip"]),
prepare_err_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["error"]),
});
if let Some(config) = config {
let background = BackgroundTask {
c: cache.clone(),
config,
tenant_manager,
cancel,
entry_count: 0,
total_size_bytes: 0,
prepare_ok_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["ok"]),
prepare_skip_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["skip"]),
prepare_err_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["error"]),
};
runtime_handle.spawn(background.run(prepare_receiver));
if enabled {
runtime_handle.spawn(
cache
.clone()
.background(prepare_receiver, remove_entry_receiver),
);
}
cache
@@ -125,7 +129,7 @@ impl BasebackupCache {
) -> Option<tokio::fs::File> {
// Fast path. Check if the entry exists using the in-memory state.
let tti = TenantTimelineId::new(tenant_id, timeline_id);
if self.entries.lock().unwrap().get(&tti).map(|e| e.lsn) != Some(lsn) {
if self.entries.lock().unwrap().get(&tti) != Some(&lsn) {
self.read_miss_count.inc();
return None;
}
@@ -163,41 +167,9 @@ impl BasebackupCache {
self.data_dir
.join(Self::entry_filename(tenant_id, timeline_id, lsn))
}
}
/// The background task that does the job to prepare basebackups
/// and manage the cache entries on disk.
/// It is a separate struct from BasebackupCache to allow holding
/// a mutable reference to this state without a mutex lock,
/// while BasebackupCache is referenced by the clients.
struct BackgroundTask {
c: Arc<BasebackupCache>,
config: BasebackupCacheConfig,
tenant_manager: Arc<TenantManager>,
cancel: CancellationToken,
/// Number of the entries in the cache.
/// This counter is used for metrics and applying cache limits.
/// It generally should be equal to c.entries.len(), but it's calculated
/// pessimistically for abnormal situations: if we encountered some errors
/// during removing the entry from disk, we won't decrement this counter to
/// make sure that we don't exceed the limit with "trashed" files on the disk.
/// It will also count files in the data_dir that are not valid cache entries.
entry_count: usize,
/// Total size of all the entries on the disk.
/// This counter is used for metrics and applying cache limits.
/// Similar to entry_count, it is calculated pessimistically for abnormal situations.
total_size_bytes: u64,
prepare_ok_count: GenericCounter<AtomicU64>,
prepare_skip_count: GenericCounter<AtomicU64>,
prepare_err_count: GenericCounter<AtomicU64>,
}
impl BackgroundTask {
fn tmp_dir(&self) -> Utf8PathBuf {
self.c.data_dir.join("tmp")
self.data_dir.join("tmp")
}
fn entry_tmp_path(
@@ -207,7 +179,7 @@ impl BackgroundTask {
lsn: Lsn,
) -> Utf8PathBuf {
self.tmp_dir()
.join(BasebackupCache::entry_filename(tenant_id, timeline_id, lsn))
.join(Self::entry_filename(tenant_id, timeline_id, lsn))
}
fn parse_entry_filename(filename: &str) -> Option<(TenantId, TimelineId, Lsn)> {
@@ -236,11 +208,11 @@ impl BackgroundTask {
Ok(())
}
async fn cleanup(&mut self) -> anyhow::Result<()> {
async fn cleanup(&self) -> anyhow::Result<()> {
self.clean_tmp_dir().await?;
// Leave only up-to-date entries.
let entries_old = self.c.entries.lock().unwrap().clone();
// Remove outdated entries.
let entries_old = self.entries.lock().unwrap().clone();
let mut entries_new = HashMap::new();
for (tenant_shard_id, tenant_slot) in self.tenant_manager.list() {
if !tenant_shard_id.is_shard_zero() {
@@ -253,32 +225,31 @@ impl BackgroundTask {
for timeline in tenant.list_timelines() {
let tti = TenantTimelineId::new(tenant_id, timeline.timeline_id);
if let Some(entry) = entries_old.get(&tti) {
if timeline.get_last_record_lsn() <= entry.lsn {
entries_new.insert(tti, entry.clone());
if let Some(&entry_lsn) = entries_old.get(&tti) {
if timeline.get_last_record_lsn() <= entry_lsn {
entries_new.insert(tti, entry_lsn);
}
}
}
}
// Try to remove all entries that are not up-to-date.
for (&tti, entry) in entries_old.iter() {
for (&tti, &lsn) in entries_old.iter() {
if !entries_new.contains_key(&tti) {
self.try_remove_entry(tti.tenant_id, tti.timeline_id, entry)
.await;
self.remove_entry_sender
.send(self.entry_path(tti.tenant_id, tti.timeline_id, lsn))
.unwrap();
}
}
// Note: BackgroundTask is the only writer for self.c.entries,
// so it couldn't have been modified concurrently.
*self.c.entries.lock().unwrap() = entries_new;
BASEBACKUP_CACHE_ENTRIES.set(entries_new.len() as i64);
*self.entries.lock().unwrap() = entries_new;
Ok(())
}
async fn on_startup(&mut self) -> anyhow::Result<()> {
async fn on_startup(&self) -> anyhow::Result<()> {
// Create data_dir if it does not exist.
tokio::fs::create_dir_all(&self.c.data_dir)
tokio::fs::create_dir_all(&self.data_dir)
.await
.context("Failed to create basebackup cache data directory")?;
@@ -287,8 +258,8 @@ impl BackgroundTask {
.context("Failed to clean tmp directory")?;
// Read existing entries from the data_dir and add them to in-memory state.
let mut entries = HashMap::<TenantTimelineId, CacheEntry>::new();
let mut dir = tokio::fs::read_dir(&self.c.data_dir).await?;
let mut entries = HashMap::new();
let mut dir = tokio::fs::read_dir(&self.data_dir).await?;
while let Some(dir_entry) = dir.next_entry().await? {
let filename = dir_entry.file_name();
@@ -297,43 +268,33 @@ impl BackgroundTask {
continue;
}
let size_bytes = dir_entry
.metadata()
.await
.map_err(|e| {
anyhow::anyhow!("Failed to read metadata for file {:?}: {:?}", filename, e)
})?
.len();
self.entry_count += 1;
BASEBACKUP_CACHE_ENTRIES.set(self.entry_count as u64);
self.total_size_bytes += size_bytes;
BASEBACKUP_CACHE_SIZE.set(self.total_size_bytes);
let parsed = Self::parse_entry_filename(filename.to_string_lossy().as_ref());
let Some((tenant_id, timeline_id, lsn)) = parsed else {
tracing::warn!("Invalid basebackup cache file name: {:?}", filename);
continue;
};
let cur_entry = CacheEntry { lsn, size_bytes };
let tti = TenantTimelineId::new(tenant_id, timeline_id);
use std::collections::hash_map::Entry::*;
match entries.entry(tti) {
Occupied(mut entry) => {
let found_entry = entry.get();
let entry_lsn = *entry.get();
// Leave only the latest entry, remove the old one.
if cur_entry.lsn < found_entry.lsn {
self.try_remove_entry(tenant_id, timeline_id, &cur_entry)
.await;
} else if cur_entry.lsn > found_entry.lsn {
self.try_remove_entry(tenant_id, timeline_id, found_entry)
.await;
entry.insert(cur_entry);
if lsn < entry_lsn {
self.remove_entry_sender.send(self.entry_path(
tenant_id,
timeline_id,
lsn,
))?;
} else if lsn > entry_lsn {
self.remove_entry_sender.send(self.entry_path(
tenant_id,
timeline_id,
entry_lsn,
))?;
entry.insert(lsn);
} else {
// Two different filenames parsed to the same timline_id and LSN.
// Should never happen.
@@ -344,17 +305,22 @@ impl BackgroundTask {
}
}
Vacant(entry) => {
entry.insert(cur_entry);
entry.insert(lsn);
}
}
}
*self.c.entries.lock().unwrap() = entries;
BASEBACKUP_CACHE_ENTRIES.set(entries.len() as i64);
*self.entries.lock().unwrap() = entries;
Ok(())
}
async fn run(mut self, mut prepare_receiver: BasebackupPrepareReceiver) {
async fn background(
self: Arc<Self>,
mut prepare_receiver: BasebackupPrepareReceiver,
mut remove_entry_receiver: BasebackupRemoveEntryReceiver,
) {
// Panic in the background is a safe fallback.
// It will drop receivers and the cache will be effectively disabled.
self.on_startup()
@@ -377,6 +343,11 @@ impl BackgroundTask {
continue;
}
}
Some(req) = remove_entry_receiver.recv() => {
if let Err(e) = tokio::fs::remove_file(req).await {
tracing::warn!("Failed to remove basebackup cache file: {:#}", e);
}
}
_ = cleanup_ticker.tick() => {
self.cleanup().await.unwrap_or_else(|e| {
tracing::warn!("Failed to clean up basebackup cache: {:#}", e);
@@ -390,67 +361,6 @@ impl BackgroundTask {
}
}
/// Try to remove an entry from disk.
/// The caller is responsible for removing the entry from the in-memory state.
/// Updates size counters and corresponding metrics.
/// Ignores the filesystem errors as not-so-important, but the size counters
/// are not decremented in this case, so the file will continue to be counted
/// towards the size limits.
async fn try_remove_entry(
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
entry: &CacheEntry,
) {
let entry_path = self.c.entry_path(tenant_id, timeline_id, entry.lsn);
match tokio::fs::remove_file(&entry_path).await {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => {
tracing::warn!(
"Failed to remove basebackup cache file for tenant {} timeline {} LSN {}: {:#}",
tenant_id,
timeline_id,
entry.lsn,
e
);
return;
}
}
self.entry_count -= 1;
BASEBACKUP_CACHE_ENTRIES.set(self.entry_count as u64);
self.total_size_bytes -= entry.size_bytes;
BASEBACKUP_CACHE_SIZE.set(self.total_size_bytes);
}
/// Insert the cache entry into in-memory state and update the size counters.
/// Assumes that the file for the entry already exists on disk.
/// If the entry already exists with previous LSN, it will be removed.
async fn upsert_entry(
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
entry: CacheEntry,
) {
let tti = TenantTimelineId::new(tenant_id, timeline_id);
self.entry_count += 1;
BASEBACKUP_CACHE_ENTRIES.set(self.entry_count as u64);
self.total_size_bytes += entry.size_bytes;
BASEBACKUP_CACHE_SIZE.set(self.total_size_bytes);
let old_entry = self.c.entries.lock().unwrap().insert(tti, entry);
if let Some(old_entry) = old_entry {
self.try_remove_entry(tenant_id, timeline_id, &old_entry)
.await;
}
}
/// Prepare a basebackup for the given timeline.
///
/// If the basebackup already exists with a higher LSN or the timeline already
@@ -459,7 +369,7 @@ impl BackgroundTask {
/// The basebackup is prepared in a temporary directory and then moved to the final
/// location to make the operation atomic.
async fn prepare_basebackup(
&mut self,
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
req_lsn: Lsn,
@@ -473,50 +383,49 @@ impl BackgroundTask {
let tti = TenantTimelineId::new(tenant_shard_id.tenant_id, timeline_id);
// TODO(diko): I don't think we will hit the limit,
// but if we do, it makes sense to try to evict oldest entries. here
if self.entry_count >= self.config.max_size_entries {
tracing::info!(
%tenant_shard_id,
%timeline_id,
%req_lsn,
"Basebackup cache is full (max_size_entries), skipping basebackup",
);
self.prepare_skip_count.inc();
return Ok(());
}
if self.total_size_bytes >= self.config.max_total_size_bytes {
tracing::info!(
%tenant_shard_id,
%timeline_id,
%req_lsn,
"Basebackup cache is full (max_total_size_bytes), skipping basebackup",
);
self.prepare_skip_count.inc();
return Ok(());
}
{
let entries = self.c.entries.lock().unwrap();
if let Some(entry) = entries.get(&tti) {
if entry.lsn >= req_lsn {
let entries = self.entries.lock().unwrap();
if let Some(&entry_lsn) = entries.get(&tti) {
if entry_lsn >= req_lsn {
tracing::info!(
%timeline_id,
%req_lsn,
%entry.lsn,
%entry_lsn,
"Basebackup entry already exists for timeline with higher LSN, skipping basebackup",
);
self.prepare_skip_count.inc();
return Ok(());
}
}
if entries.len() as i64 >= self.config.max_size_entries {
tracing::info!(
%timeline_id,
%req_lsn,
"Basebackup cache is full, skipping basebackup",
);
self.prepare_skip_count.inc();
return Ok(());
}
}
let tenant = self
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
let feature_flag = tenant
.feature_resolver
.evaluate_boolean("enable-basebackup-cache", tenant_shard_id.tenant_id);
if feature_flag.is_err() {
tracing::info!(
tenant_id = %tenant_shard_id.tenant_id,
"Basebackup cache is disabled for tenant by feature flag, skipping basebackup",
);
self.prepare_skip_count.inc();
return Ok(());
}
let tenant_state = tenant.current_state();
if tenant_state != TenantState::Active {
anyhow::bail!(
@@ -546,21 +455,18 @@ impl BackgroundTask {
.prepare_basebackup_tmp(&entry_tmp_path, &timeline, req_lsn)
.await;
let entry = match res {
Ok(entry) => entry,
Err(err) => {
tracing::info!("Failed to prepare basebackup tmp file: {:#}", err);
// Try to clean up tmp file. If we fail, the background clean up task will take care of it.
match tokio::fs::remove_file(&entry_tmp_path).await {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => {
tracing::info!("Failed to remove basebackup tmp file: {:?}", e);
}
if let Err(err) = res {
tracing::info!("Failed to prepare basebackup tmp file: {:#}", err);
// Try to clean up tmp file. If we fail, the background clean up task will take care of it.
match tokio::fs::remove_file(&entry_tmp_path).await {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => {
tracing::info!("Failed to remove basebackup tmp file: {:?}", e);
}
return Err(err);
}
};
return Err(err);
}
// Move the tmp file to the final location atomically.
// The tmp file is fsynced, so it's guaranteed that we will not have a partial file
@@ -568,13 +474,17 @@ impl BackgroundTask {
// It's not necessary to fsync the inode after renaming, because the worst case is that
// the rename operation will be rolled back on the disk failure, the entry will disappear
// from the main directory, and the entry access will cause a cache miss.
let entry_path = self
.c
.entry_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
let entry_path = self.entry_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
tokio::fs::rename(&entry_tmp_path, &entry_path).await?;
self.upsert_entry(tenant_shard_id.tenant_id, timeline_id, entry)
.await;
let mut entries = self.entries.lock().unwrap();
if let Some(old_lsn) = entries.insert(tti, req_lsn) {
// Remove the old entry if it exists.
self.remove_entry_sender
.send(self.entry_path(tenant_shard_id.tenant_id, timeline_id, old_lsn))
.unwrap();
}
BASEBACKUP_CACHE_ENTRIES.set(entries.len() as i64);
self.prepare_ok_count.inc();
Ok(())
@@ -587,7 +497,7 @@ impl BackgroundTask {
entry_tmp_path: &Utf8Path,
timeline: &Arc<Timeline>,
req_lsn: Lsn,
) -> anyhow::Result<CacheEntry> {
) -> anyhow::Result<()> {
let ctx = RequestContext::new(TaskKind::BasebackupCache, DownloadBehavior::Download);
let ctx = ctx.with_scope_timeline(timeline);
@@ -627,12 +537,6 @@ impl BackgroundTask {
writer.flush().await?;
writer.into_inner().sync_all().await?;
// TODO(diko): we can count it via Writer wrapper instead of a syscall.
let size_bytes = tokio::fs::metadata(entry_tmp_path).await?.len();
Ok(CacheEntry {
lsn: req_lsn,
size_bytes,
})
Ok(())
}
}

View File

@@ -2,9 +2,7 @@ use std::io::{Read, Write, stdin, stdout};
use std::time::Duration;
use clap::Parser;
use pageserver_api::pagestream_api::{
PagestreamFeMessage, PagestreamRequest, PagestreamTestRequest,
};
use pageserver_api::models::{PagestreamRequest, PagestreamTestRequest};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
@@ -30,15 +28,17 @@ async fn main() -> anyhow::Result<()> {
let mut msg = 0;
loop {
msg += 1;
let fut = sender.send(PagestreamFeMessage::Test(PagestreamTestRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: Lsn(23),
not_modified_since: Lsn(23),
let fut = sender.send(pageserver_api::models::PagestreamFeMessage::Test(
PagestreamTestRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: Lsn(23),
not_modified_since: Lsn(23),
},
batch_key: 42,
message: format!("message {}", msg),
},
batch_key: 42,
message: format!("message {}", msg),
}));
));
let Ok(res) = tokio::time::timeout(Duration::from_secs(10), fut).await else {
eprintln!("pipe seems full");
break;

View File

@@ -159,7 +159,14 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
Ok(m) => {
// Since we run one time at startup, be generous in our logging and
// dump all metadata.
tracing::info!("Loaded node metadata: {m}");
tracing::info!(
"Loaded node metadata: postgres {}:{}, http {}:{}, other fields: {:?}",
m.postgres_host,
m.postgres_port,
m.http_host,
m.http_port,
m.other
);
let az_id = {
let az_id_from_metadata = m
@@ -188,8 +195,6 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
node_id: conf.id,
listen_pg_addr: m.postgres_host,
listen_pg_port: m.postgres_port,
listen_grpc_addr: m.grpc_host,
listen_grpc_port: m.grpc_port,
listen_http_addr: m.http_host,
listen_http_port: m.http_port,
listen_https_port: m.https_port,

View File

@@ -4428,16 +4428,18 @@ pub(crate) static BASEBACKUP_CACHE_PREPARE: Lazy<IntCounterVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static BASEBACKUP_CACHE_ENTRIES: Lazy<UIntGauge> = Lazy::new(|| {
register_uint_gauge!(
pub(crate) static BASEBACKUP_CACHE_ENTRIES: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"pageserver_basebackup_cache_entries_total",
"Number of entries in the basebackup cache"
)
.expect("failed to define a metric")
});
pub(crate) static BASEBACKUP_CACHE_SIZE: Lazy<UIntGauge> = Lazy::new(|| {
register_uint_gauge!(
// FIXME: Support basebackup cache size metrics.
#[allow(dead_code)]
pub(crate) static BASEBACKUP_CACHE_SIZE: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"pageserver_basebackup_cache_size_bytes",
"Total size of all basebackup cache entries on disk in bytes"
)

View File

@@ -14,7 +14,7 @@ use std::{io, str};
use anyhow::{Context as _, anyhow, bail};
use async_compression::tokio::write::GzipEncoder;
use bytes::{Buf as _, BufMut as _, BytesMut};
use bytes::{Buf, BytesMut};
use futures::future::BoxFuture;
use futures::{FutureExt, Stream};
use itertools::Itertools;
@@ -25,13 +25,12 @@ use pageserver_api::config::{
PageServiceProtocolPipelinedBatchingStrategy, PageServiceProtocolPipelinedExecutionStrategy,
};
use pageserver_api::key::rel_block_to_key;
use pageserver_api::models::{PageTraceEvent, TenantState};
use pageserver_api::pagestream_api::{
self, PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
use pageserver_api::models::{
self, PageTraceEvent, PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest,
PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
PagestreamProtocolVersion, PagestreamRequest,
PagestreamProtocolVersion, PagestreamRequest, TenantState,
};
use pageserver_api::reltag::SlruKind;
use pageserver_api::shard::TenantShardId;
@@ -624,6 +623,60 @@ enum PageStreamError {
BadRequest(Cow<'static, str>),
}
impl PageStreamError {
/// Converts a PageStreamError into a proto::GetPageResponse with the appropriate status
/// code, or a gRPC status if it should terminate the stream (e.g. shutdown). This is a
/// convenience method for use from a get_pages gRPC stream.
#[allow(clippy::result_large_err)]
fn into_get_page_response(
self,
request_id: page_api::RequestID,
) -> Result<proto::GetPageResponse, tonic::Status> {
use page_api::GetPageStatusCode;
use tonic::Code;
// We dispatch to Into<tonic::Status> first, and then map it to a GetPageResponse.
let status: tonic::Status = self.into();
let status_code = match status.code() {
// We shouldn't see an OK status here, because we're emitting an error.
Code::Ok => {
debug_assert_ne!(status.code(), Code::Ok);
return Err(tonic::Status::internal(format!(
"unexpected OK status: {status:?}",
)));
}
// These are per-request errors, returned as GetPageResponses.
Code::AlreadyExists => GetPageStatusCode::InvalidRequest,
Code::DataLoss => GetPageStatusCode::InternalError,
Code::FailedPrecondition => GetPageStatusCode::InvalidRequest,
Code::InvalidArgument => GetPageStatusCode::InvalidRequest,
Code::Internal => GetPageStatusCode::InternalError,
Code::NotFound => GetPageStatusCode::NotFound,
Code::OutOfRange => GetPageStatusCode::InvalidRequest,
Code::ResourceExhausted => GetPageStatusCode::SlowDown,
// These should terminate the stream.
Code::Aborted => return Err(status),
Code::Cancelled => return Err(status),
Code::DeadlineExceeded => return Err(status),
Code::PermissionDenied => return Err(status),
Code::Unauthenticated => return Err(status),
Code::Unavailable => return Err(status),
Code::Unimplemented => return Err(status),
Code::Unknown => return Err(status),
};
Ok(page_api::GetPageResponse {
request_id,
status_code,
reason: Some(status.message().to_string()),
page_images: Vec::new(),
}
.into())
}
}
impl From<PageStreamError> for tonic::Status {
fn from(err: PageStreamError) -> Self {
use tonic::Code;
@@ -713,7 +766,7 @@ struct BatchedGetPageRequest {
#[cfg(feature = "testing")]
struct BatchedTestRequest {
req: pagestream_api::PagestreamTestRequest,
req: models::PagestreamTestRequest,
timer: SmgrOpTimer,
}
@@ -727,13 +780,13 @@ enum BatchedFeMessage {
span: Span,
timer: SmgrOpTimer,
shard: WeakHandle<TenantManagerTypes>,
req: PagestreamExistsRequest,
req: models::PagestreamExistsRequest,
},
Nblocks {
span: Span,
timer: SmgrOpTimer,
shard: WeakHandle<TenantManagerTypes>,
req: PagestreamNblocksRequest,
req: models::PagestreamNblocksRequest,
},
GetPage {
span: Span,
@@ -745,13 +798,13 @@ enum BatchedFeMessage {
span: Span,
timer: SmgrOpTimer,
shard: WeakHandle<TenantManagerTypes>,
req: PagestreamDbSizeRequest,
req: models::PagestreamDbSizeRequest,
},
GetSlruSegment {
span: Span,
timer: SmgrOpTimer,
shard: WeakHandle<TenantManagerTypes>,
req: PagestreamGetSlruSegmentRequest,
req: models::PagestreamGetSlruSegmentRequest,
},
#[cfg(feature = "testing")]
Test {
@@ -2444,9 +2497,10 @@ impl PageServerHandler {
.map(|(req, res)| {
res.map(|page| {
(
PagestreamBeMessage::GetPage(
pagestream_api::PagestreamGetPageResponse { req: req.req, page },
),
PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse {
req: req.req,
page,
}),
req.timer,
req.ctx,
)
@@ -2513,7 +2567,7 @@ impl PageServerHandler {
.map(|(req, res)| {
res.map(|()| {
(
PagestreamBeMessage::Test(pagestream_api::PagestreamTestResponse {
PagestreamBeMessage::Test(models::PagestreamTestResponse {
req: req.req.clone(),
}),
req.timer,
@@ -3384,8 +3438,8 @@ impl GrpcPageServiceHandler {
/// Processes a GetPage batch request, via the GetPages bidirectional streaming RPC.
///
/// NB: errors returned from here are intercepted in get_pages(), and may be converted to a
/// GetPageResponse with an appropriate status code to avoid terminating the stream.
/// NB: errors will terminate the stream. Per-request errors should return a GetPageResponse
/// with an appropriate status code instead.
///
/// TODO: get_vectored() currently enforces a batch limit of 32. Postgres will typically send
/// batches up to effective_io_concurrency = 100. Either we have to accept large batches, or
@@ -3402,7 +3456,7 @@ impl GrpcPageServiceHandler {
let ctx = ctx.with_scope_page_service_pagestream(&timeline);
// Validate the request, decorate the span, and convert it to a Pagestream request.
let req = page_api::GetPageRequest::try_from(req)?;
let req: page_api::GetPageRequest = req.try_into()?;
span_record!(
req_id = %req.request_id,
@@ -3413,7 +3467,7 @@ impl GrpcPageServiceHandler {
);
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn(); // hold guard
let effective_lsn = PageServerHandler::effective_request_lsn(
let effective_lsn = match PageServerHandler::effective_request_lsn(
&timeline,
timeline.get_last_record_lsn(),
req.read_lsn.request_lsn,
@@ -3421,7 +3475,10 @@ impl GrpcPageServiceHandler {
.not_modified_since_lsn
.unwrap_or(req.read_lsn.request_lsn),
&latest_gc_cutoff_lsn,
)?;
) {
Ok(lsn) => lsn,
Err(err) => return err.into_get_page_response(req.request_id),
};
let mut batch = SmallVec::with_capacity(req.block_numbers.len());
for blkno in req.block_numbers {
@@ -3478,7 +3535,7 @@ impl GrpcPageServiceHandler {
"unexpected response: {resp:?}"
)));
}
Err(err) => return Err(err.err.into()),
Err(err) => return err.err.into_get_page_response(req.request_id),
};
}
@@ -3544,43 +3601,44 @@ impl proto::PageService for GrpcPageServiceHandler {
let timeline = self.get_request_timeline(&req).await?;
let ctx = self.ctx.with_scope_timeline(&timeline);
// Validate the request and decorate the span.
// Validate the request, decorate the span, and wait for the LSN to arrive.
//
// TODO: this requires a read LSN, is that ok?
Self::ensure_shard_zero(&timeline)?;
if timeline.is_archived() == Some(true) {
return Err(tonic::Status::failed_precondition("timeline is archived"));
}
let req: page_api::GetBaseBackupRequest = req.into_inner().into();
let req: page_api::GetBaseBackupRequest = req.into_inner().try_into()?;
span_record!(lsn=?req.lsn);
span_record!(lsn=%req.read_lsn);
// Wait for the LSN to arrive, if given.
if let Some(lsn) = req.lsn {
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
timeline
.wait_lsn(
lsn,
WaitLsnWaiter::PageService,
WaitLsnTimeout::Default,
&ctx,
)
.await?;
timeline
.check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
.map_err(|err| {
tonic::Status::invalid_argument(format!("invalid basebackup LSN: {err}"))
})?;
}
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
timeline
.wait_lsn(
req.read_lsn.request_lsn,
WaitLsnWaiter::PageService,
WaitLsnTimeout::Default,
&ctx,
)
.await?;
timeline
.check_lsn_is_in_scope(req.read_lsn.request_lsn, &latest_gc_cutoff_lsn)
.map_err(|err| {
tonic::Status::invalid_argument(format!("invalid basebackup LSN: {err}"))
})?;
// Spawn a task to run the basebackup.
//
// TODO: do we need to support full base backups, for debugging?
let span = Span::current();
let (mut simplex_read, mut simplex_write) = tokio::io::simplex(CHUNK_SIZE);
let jh = tokio::spawn(async move {
let result = basebackup::send_basebackup_tarball(
&mut simplex_write,
&timeline,
req.lsn,
Some(req.read_lsn.request_lsn),
None,
req.full,
false,
req.replica,
&ctx,
)
@@ -3594,21 +3652,20 @@ impl proto::PageService for GrpcPageServiceHandler {
// Emit chunks of size CHUNK_SIZE.
let chunks = async_stream::try_stream! {
let mut chunk = BytesMut::with_capacity(CHUNK_SIZE);
loop {
let mut chunk = BytesMut::with_capacity(CHUNK_SIZE).limit(CHUNK_SIZE);
loop {
let n = simplex_read.read_buf(&mut chunk).await.map_err(|err| {
tonic::Status::internal(format!("failed to read basebackup chunk: {err}"))
})?;
if n == 0 {
break; // full chunk or closed stream
let n = simplex_read.read_buf(&mut chunk).await.map_err(|err| {
tonic::Status::internal(format!("failed to read basebackup chunk: {err}"))
})?;
// If we read 0 bytes, either the chunk is full or the stream is closed.
if n == 0 {
if chunk.is_empty() {
break;
}
yield proto::GetBaseBackupResponseChunk::from(chunk.clone().freeze());
chunk.clear();
}
let chunk = chunk.into_inner().freeze();
if chunk.is_empty() {
break;
}
yield proto::GetBaseBackupResponseChunk::from(chunk);
}
// Wait for the basebackup task to exit and check for errors.
jh.await.map_err(|err| {
@@ -3685,16 +3742,9 @@ impl proto::PageService for GrpcPageServiceHandler {
.await?
.downgrade();
while let Some(req) = reqs.message().await? {
let req_id = req.request_id;
let result = Self::get_page(&ctx, &timeline, req, io_concurrency.clone())
yield Self::get_page(&ctx, &timeline, req, io_concurrency.clone())
.instrument(span.clone()) // propagate request span
.await;
yield match result {
Ok(resp) => resp,
// Convert per-request errors to GetPageResponses as appropriate, or terminate
// the stream with a tonic::Status.
Err(err) => page_api::GetPageResponse::try_from_status(err, req_id)?.into(),
}
.await?
}
};

View File

@@ -21,7 +21,7 @@ OBJS = \
unstable_extensions.o \
walproposer.o \
walproposer_pg.o \
neon_ddl_handler.o \
control_plane_connector.o \
walsender_hooks.o
PG_CPPFLAGS = -I$(libpq_srcdir)

View File

@@ -1,6 +1,6 @@
/*-------------------------------------------------------------------------
*
* neon_ddl_handler.c
* control_plane_connector.c
* Captures updates to roles/databases using ProcessUtility_hook and
* sends them to the control ProcessUtility_hook. The changes are sent
* via HTTP to the URL specified by the GUC neon.console_url when the
@@ -13,30 +13,18 @@
* accumulate changes. On subtransaction commit, the top of the stack
* is merged with the table below it.
*
* Support event triggers for neon_superuser
*
* IDENTIFICATION
* contrib/neon/neon_dll_handler.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <curl/curl.h>
#include <unistd.h>
#include "access/xact.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_proc.h"
#include "commands/defrem.h"
#include "commands/event_trigger.h"
#include "commands/user.h"
#include "fmgr.h"
#include "libpq/crypt.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "parser/parse_func.h"
#include "tcop/pquery.h"
#include "tcop/utility.h"
#include "utils/acl.h"
@@ -44,16 +32,11 @@
#include "utils/hsearch.h"
#include "utils/memutils.h"
#include "utils/jsonb.h"
#include <utils/lsyscache.h>
#include <utils/syscache.h>
#include "neon_ddl_handler.h"
#include "control_plane_connector.h"
#include "neon_utils.h"
static ProcessUtility_hook_type PreviousProcessUtilityHook = NULL;
static fmgr_hook_type next_fmgr_hook = NULL;
static needs_fmgr_hook_type next_needs_fmgr_hook = NULL;
static bool neon_event_triggers = true;
static const char *jwt_token = NULL;
@@ -790,7 +773,6 @@ HandleDropRole(DropRoleStmt *stmt)
}
}
static void
HandleRename(RenameStmt *stmt)
{
@@ -800,460 +782,6 @@ HandleRename(RenameStmt *stmt)
return HandleRoleRename(stmt);
}
/*
* Support for Event Triggers.
*
* In vanilla only superuser can create Event Triggers.
*
* We allow it for neon_superuser by temporary switching to superuser. But as
* far as event trigger can fire in superuser context we should protect
* superuser from execution of arbitrary user's code.
*
* The idea was taken from Supabase PR series starting at
* https://github.com/supabase/supautils/pull/98
*/
static bool
neon_needs_fmgr_hook(Oid functionId) {
return (next_needs_fmgr_hook && (*next_needs_fmgr_hook) (functionId))
|| get_func_rettype(functionId) == EVENT_TRIGGEROID;
}
static void
LookupFuncOwnerSecDef(Oid functionId, Oid *funcOwner, bool *is_secdef)
{
Form_pg_proc procForm;
HeapTuple proc_tup = SearchSysCache1(PROCOID, ObjectIdGetDatum(functionId));
if (!HeapTupleIsValid(proc_tup))
ereport(ERROR,
(errmsg("cache lookup failed for function %u", functionId)));
procForm = (Form_pg_proc) GETSTRUCT(proc_tup);
*funcOwner = procForm->proowner;
*is_secdef = procForm->prosecdef;
ReleaseSysCache(proc_tup);
}
PG_FUNCTION_INFO_V1(noop);
Datum noop(__attribute__ ((unused)) PG_FUNCTION_ARGS) { PG_RETURN_VOID();}
static void
force_noop(FmgrInfo *finfo)
{
finfo->fn_addr = (PGFunction) noop;
finfo->fn_oid = InvalidOid; /* not a known function OID anymore */
finfo->fn_nargs = 0; /* no arguments for noop */
finfo->fn_strict = false;
finfo->fn_retset = false;
finfo->fn_stats = 0; /* no stats collection */
finfo->fn_extra = NULL; /* clear out old context data */
finfo->fn_mcxt = CurrentMemoryContext;
finfo->fn_expr = NULL; /* no parse tree */
}
/*
* Skip executing Event Triggers execution for superusers, because Event
* Triggers are SECURITY DEFINER and user provided code could then attempt
* privilege escalation.
*
* Also skip executing Event Triggers when GUC neon.event_triggers has been
* set to false. This might be necessary to be able to connect again after a
* LOGIN Event Trigger has been installed that would prevent connections as
* neon_superuser.
*/
static void
neon_fmgr_hook(FmgrHookEventType event, FmgrInfo *flinfo, Datum *private)
{
/*
* It can be other needs_fmgr_hook which cause our hook to be invoked for
* non-trigger function, so recheck that is is trigger function.
*/
if (flinfo->fn_oid != InvalidOid &&
get_func_rettype(flinfo->fn_oid) != EVENT_TRIGGEROID)
{
if (next_fmgr_hook)
(*next_fmgr_hook) (event, flinfo, private);
return;
}
/*
* The neon_superuser role can use the GUC neon.event_triggers to disable
* firing Event Trigger.
*
* SET neon.event_triggers TO false;
*
* This only applies to the neon_superuser role though, and only allows
* skipping Event Triggers owned by neon_superuser, which we check by
* proxy of the Event Trigger function being owned by neon_superuser.
*
* A role that is created in role neon_superuser should be allowed to also
* benefit from the neon_event_triggers GUC, and will be considered the
* same as the neon_superuser role.
*/
if (event == FHET_START
&& !neon_event_triggers
&& is_neon_superuser())
{
Oid neon_superuser_oid = get_role_oid("neon_superuser", false);
/* Find the Function Attributes (owner Oid, security definer) */
const char *fun_owner_name = NULL;
Oid fun_owner = InvalidOid;
bool fun_is_secdef = false;
LookupFuncOwnerSecDef(flinfo->fn_oid, &fun_owner, &fun_is_secdef);
fun_owner_name = GetUserNameFromId(fun_owner, false);
if (RoleIsNeonSuperuser(fun_owner_name)
|| has_privs_of_role(fun_owner, neon_superuser_oid))
{
elog(WARNING,
"Skipping Event Trigger: neon.event_triggers is false");
/*
* we can't skip execution directly inside the fmgr_hook so instead we
* change the event trigger function to a noop function.
*/
force_noop(flinfo);
}
}
/*
* Fire Event Trigger if both function owner and current user are
* superuser, or none of them are.
*/
else if (event == FHET_START
/* still enable it to pass pg_regress tests */
&& !RegressTestMode)
{
/*
* Get the current user oid as of before SECURITY DEFINER change of
* CurrentUserId, and that would be SessionUserId.
*/
Oid current_role_oid = GetSessionUserId();
bool role_is_super = superuser_arg(current_role_oid);
/* Find the Function Attributes (owner Oid, security definer) */
Oid function_owner = InvalidOid;
bool function_is_secdef = false;
bool function_is_owned_by_super = false;
LookupFuncOwnerSecDef(flinfo->fn_oid, &function_owner, &function_is_secdef);
function_is_owned_by_super = superuser_arg(function_owner);
/*
* 1. Refuse to run SECURITY DEFINER function that belongs to a
* superuser when the current user is not a superuser itself.
*/
if (!role_is_super
&& function_is_owned_by_super
&& function_is_secdef)
{
char *func_name = get_func_name(flinfo->fn_oid);
ereport(WARNING,
(errmsg("Skipping Event Trigger"),
errdetail("Event Trigger function \"%s\" is owned by \"%s\" "
"and is SECURITY DEFINER",
func_name,
GetUserNameFromId(function_owner, false))));
/*
* we can't skip execution directly inside the fmgr_hook so
* instead we change the event trigger function to a noop
* function.
*/
force_noop(flinfo);
}
/*
* 2. Refuse to run functions that belongs to a non-superuser when the
* current user is a superuser.
*
* We could run a SECURITY DEFINER user-function here and be safe with
* privilege escalation risks, but superuser roles are only used for
* infrastructure maintenance operations, where we prefer to skip
* running user-defined code.
*/
else if (role_is_super && !function_is_owned_by_super)
{
char *func_name = get_func_name(flinfo->fn_oid);
ereport(WARNING,
(errmsg("Skipping Event Trigger"),
errdetail("Event Trigger function \"%s\" "
"is owned by non-superuser role \"%s\", "
"and current_user \"%s\" is superuser",
func_name,
GetUserNameFromId(function_owner, false),
GetUserNameFromId(current_role_oid, false))));
/*
* we can't skip execution directly inside the fmgr_hook so
* instead we change the event trigger function to a noop
* function.
*/
force_noop(flinfo);
}
}
if (next_fmgr_hook)
(*next_fmgr_hook) (event, flinfo, private);
}
static Oid prev_role_oid = 0;
static int prev_role_sec_context = 0;
static bool switched_to_superuser = false;
/*
* Switch tp superuser if not yet superuser.
* Returns false if already switched to superuser.
*/
static bool
switch_to_superuser(void)
{
Oid superuser_oid;
if (switched_to_superuser)
return false;
switched_to_superuser = true;
superuser_oid = get_role_oid("cloud_admin", true /*missing_ok*/);
if (superuser_oid == InvalidOid)
superuser_oid = BOOTSTRAP_SUPERUSERID;
GetUserIdAndSecContext(&prev_role_oid, &prev_role_sec_context);
SetUserIdAndSecContext(superuser_oid, prev_role_sec_context |
SECURITY_LOCAL_USERID_CHANGE |
SECURITY_RESTRICTED_OPERATION);
return true;
}
static void
switch_to_original_role(void)
{
SetUserIdAndSecContext(prev_role_oid, prev_role_sec_context);
switched_to_superuser = false;
}
/*
* ALTER ROLE ... SUPERUSER;
*
* Used internally to give superuser to a non-privileged role to allow
* ownership of superuser-only objects such as Event Trigger.
*
* ALTER ROLE foo SUPERUSER;
* ALTER EVENT TRIGGER ... OWNED BY foo;
* ALTER ROLE foo NOSUPERUSER;
*
* Now the EVENT TRIGGER is owned by foo, who can DROP it without having to be
* superuser again.
*/
static void
alter_role_super(const char* rolename, bool make_super)
{
AlterRoleStmt *alter_stmt = makeNode(AlterRoleStmt);
DefElem *defel_superuser =
#if PG_MAJORVERSION_NUM <= 14
makeDefElem("superuser", (Node *) makeInteger(make_super), -1);
#else
makeDefElem("superuser", (Node *) makeBoolean(make_super), -1);
#endif
RoleSpec *rolespec = makeNode(RoleSpec);
rolespec->roletype = ROLESPEC_CSTRING;
rolespec->rolename = pstrdup(rolename);
rolespec->location = -1;
alter_stmt->role = rolespec;
alter_stmt->options = list_make1(defel_superuser);
#if PG_MAJORVERSION_NUM < 15
AlterRole(alter_stmt);
#else
/* ParseState *pstate, AlterRoleStmt *stmt */
AlterRole(NULL, alter_stmt);
#endif
CommandCounterIncrement();
}
/*
* Changes the OWNER of an Event Trigger.
*
* Event Triggers can only be owned by superusers, so this ALTER ROLE with
* SUPERUSER and then removes the property.
*/
static void
alter_event_trigger_owner(const char *obj_name, Oid role_oid)
{
char* role_name = GetUserNameFromId(role_oid, false);
alter_role_super(role_name, true);
AlterEventTriggerOwner(obj_name, role_oid);
CommandCounterIncrement();
alter_role_super(role_name, false);
}
/*
* Neon processing of the CREATE EVENT TRIGGER requires special attention and
* is worth having its own ProcessUtility_hook for that.
*/
static void
ProcessCreateEventTrigger(
PlannedStmt *pstmt,
const char *queryString,
bool readOnlyTree,
ProcessUtilityContext context,
ParamListInfo params,
QueryEnvironment *queryEnv,
DestReceiver *dest,
QueryCompletion *qc)
{
Node *parseTree = pstmt->utilityStmt;
bool sudo = false;
/* We double-check that after local variable declaration block */
CreateEventTrigStmt *stmt = (CreateEventTrigStmt *) parseTree;
/*
* We are going to change the current user privileges (sudo) and might
* need after execution cleanup. For that we want to capture the UserId
* before changing it for our sudo implementation.
*/
const Oid current_user_id = GetUserId();
bool current_user_is_super = superuser_arg(current_user_id);
if (nodeTag(parseTree) != T_CreateEventTrigStmt)
{
ereport(ERROR,
errcode(ERRCODE_INTERNAL_ERROR),
errmsg("ProcessCreateEventTrigger called for the wrong command"));
}
/*
* Allow neon_superuser to create Event Trigger, while keeping the
* ownership of the object.
*
* For that we give superuser membership to the role for the execution of
* the command.
*/
if (IsTransactionState() && is_neon_superuser())
{
/* Find the Event Trigger function Oid */
Oid func_oid = LookupFuncName(stmt->funcname, 0, NULL, false);
/* Find the Function Owner Oid */
Oid func_owner = InvalidOid;
bool is_secdef = false;
bool function_is_owned_by_super = false;
LookupFuncOwnerSecDef(func_oid, &func_owner, &is_secdef);
function_is_owned_by_super = superuser_arg(func_owner);
if(!current_user_is_super && function_is_owned_by_super)
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("Permission denied to execute "
"a function owned by a superuser role"),
errdetail("current user \"%s\" is not a superuser "
"and Event Trigger function \"%s\" "
"is owned by a superuser",
GetUserNameFromId(current_user_id, false),
NameListToString(stmt->funcname))));
}
if(current_user_is_super && !function_is_owned_by_super)
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("Permission denied to execute "
"a function owned by a non-superuser role"),
errdetail("current user \"%s\" is a superuser "
"and function \"%s\" is "
"owned by a non-superuser",
GetUserNameFromId(current_user_id, false),
NameListToString(stmt->funcname))));
}
sudo = switch_to_superuser();
}
PG_TRY();
{
if (PreviousProcessUtilityHook)
{
PreviousProcessUtilityHook(
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
}
else
{
standard_ProcessUtility(
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
}
/*
* Now that the Event Trigger has been installed via our sudo
* mechanism, if the original role was not a superuser then change
* the event trigger ownership back to the original role.
*
* That way [ ALTER | DROP ] EVENT TRIGGER commands just work.
*/
if (IsTransactionState() && is_neon_superuser())
{
if (!current_user_is_super)
{
/*
* Change event trigger owner to the current role (making
* it a privileged role during the ALTER OWNER command).
*/
alter_event_trigger_owner(stmt->trigname, current_user_id);
}
}
}
PG_FINALLY();
{
if (sudo)
switch_to_original_role();
}
PG_END_TRY();
}
/*
* Neon hooks for DDLs (handling privileges, limiting features, etc).
*/
static void
NeonProcessUtility(
PlannedStmt *pstmt,
@@ -1267,27 +795,6 @@ NeonProcessUtility(
{
Node *parseTree = pstmt->utilityStmt;
/*
* The process utility hook for CREATE EVENT TRIGGER is its own
* implementation and warrant being addressed separately from here.
*/
if (nodeTag(parseTree) == T_CreateEventTrigStmt)
{
ProcessCreateEventTrigger(
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
return;
}
/*
* Other commands that need Neon specific implementations are handled here:
*/
switch (nodeTag(parseTree))
{
case T_CreatedbStmt:
@@ -1326,82 +833,37 @@ NeonProcessUtility(
if (PreviousProcessUtilityHook)
{
PreviousProcessUtilityHook(
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
}
else
{
standard_ProcessUtility(
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
}
}
/*
* Only neon_superuser is granted privilege to edit neon.event_triggers GUC.
*/
static void
neon_event_triggers_assign_hook(bool newval, void *extra)
{
/* MyDatabaseId == InvalidOid || !OidIsValid(GetUserId()) */
if (IsTransactionState() && !is_neon_superuser())
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied to set neon.event_triggers"),
errdetail("Only \"neon_superuser\" is allowed to set the GUC")));
}
}
void
InitDDLHandler()
InitControlPlaneConnector()
{
PreviousProcessUtilityHook = ProcessUtility_hook;
ProcessUtility_hook = NeonProcessUtility;
next_needs_fmgr_hook = needs_fmgr_hook;
needs_fmgr_hook = neon_needs_fmgr_hook;
next_fmgr_hook = fmgr_hook;
fmgr_hook = neon_fmgr_hook;
RegisterXactCallback(NeonXactCallback, NULL);
RegisterSubXactCallback(NeonSubXactCallback, NULL);
/*
* The GUC neon.event_triggers should provide the same effect as the
* Postgres GUC event_triggers, but the neon one is PGC_USERSET.
*
* This allows using the GUC in the connection string and work out of a
* LOGIN Event Trigger that would break database access, all without
* having to edit and reload the Postgres configuration file.
*/
DefineCustomBoolVariable(
"neon.event_triggers",
"Enable firing of event triggers",
NULL,
&neon_event_triggers,
true,
PGC_USERSET,
0,
NULL,
neon_event_triggers_assign_hook,
NULL);
DefineCustomStringVariable(
"neon.console_url",
"URL of the Neon Console, which will be forwarded changes to dbs and roles",

View File

@@ -0,0 +1,6 @@
#ifndef CONTROL_PLANE_CONNECTOR_H
#define CONTROL_PLANE_CONNECTOR_H
void InitControlPlaneConnector(void);
#endif

View File

@@ -33,9 +33,9 @@
#include "extension_server.h"
#include "file_cache.h"
#include "neon.h"
#include "neon_ddl_handler.h"
#include "neon_lwlsncache.h"
#include "neon_perf_counters.h"
#include "control_plane_connector.h"
#include "logical_replication_monitor.h"
#include "unstable_extensions.h"
#include "walsender_hooks.h"
@@ -454,7 +454,7 @@ _PG_init(void)
InitUnstableExtensionsSupport();
InitLogicalReplicationMonitor();
InitDDLHandler();
InitControlPlaneConnector();
pg_init_extension_server();

View File

@@ -1,6 +1,6 @@
# neon extension
comment = 'cloud storage for PostgreSQL'
default_version = '1.6'
default_version = '1.5'
module_pathname = '$libdir/neon'
relocatable = true
trusted = true

View File

@@ -1,6 +0,0 @@
#ifndef CONTROL_DDL_HANDLER_H
#define CONTROL_DDL_HANDLER_H
void InitDDLHandler(void);
#endif

View File

@@ -679,7 +679,8 @@ typedef struct walproposer_api
* Finish sync safekeepers with the given LSN. This function should not
* return and should exit the program.
*/
void (*finish_sync_safekeepers) (WalProposer *wp, XLogRecPtr lsn) __attribute__((noreturn)) ;
void (*finish_sync_safekeepers) (WalProposer *wp, XLogRecPtr lsn);
/*
* Called after every AppendResponse from the safekeeper. Used to
* propagate backpressure feedback and to confirm WAL persistence (has

View File

@@ -1890,7 +1890,7 @@ walprop_pg_wait_event_set(WalProposer *wp, long timeout, Safekeeper **sk, uint32
return rc;
}
static void __attribute__((noreturn))
static void
walprop_pg_finish_sync_safekeepers(WalProposer *wp, XLogRecPtr lsn)
{
fprintf(stdout, "%X/%X\n", LSN_FORMAT_ARGS(lsn));

View File

@@ -499,7 +499,7 @@ impl ApiImpl for SimulationApi {
true
}
fn finish_sync_safekeepers(&self, lsn: u64) -> ! {
fn finish_sync_safekeepers(&self, lsn: u64) {
debug!("finish_sync_safekeepers, lsn={}", lsn);
executor::exit(0, Lsn(lsn).to_string());
}

View File

@@ -1 +0,0 @@
ALTER TABLE nodes DROP listen_grpc_addr, listen_grpc_port;

View File

@@ -1 +0,0 @@
ALTER TABLE nodes ADD listen_grpc_addr VARCHAR NULL, ADD listen_grpc_port INTEGER NULL;

View File

@@ -5,11 +5,10 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus, PageserverProtocol};
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
use control_plane::local_env::LocalEnv;
use futures::StreamExt;
use hyper::StatusCode;
use pageserver_api::config::DEFAULT_GRPC_LISTEN_PORT;
use pageserver_api::controller_api::AvailabilityZone;
use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
use postgres_connection::parse_host_port;
@@ -421,31 +420,23 @@ impl ComputeHook {
preferred_az: _preferred_az,
} = reconfigure_request;
let compute_pageservers = shards
.iter()
.map(|shard| {
let ps_conf = env
.get_pageserver_conf(shard.node_id)
.expect("Unknown pageserver");
let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr)
.expect("Unable to parse listen_pg_addr");
(pg_host, pg_port.unwrap_or(5432))
})
.collect::<Vec<_>>();
for (endpoint_name, endpoint) in &cplane.endpoints {
if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
tracing::info!("Reconfiguring endpoint {endpoint_name}");
let pageservers = shards
.iter()
.map(|shard| {
let ps_conf = env
.get_pageserver_conf(shard.node_id)
.expect("Unknown pageserver");
if endpoint.grpc {
let addr = ps_conf.listen_grpc_addr.as_ref().expect("no gRPC address");
let (host, port) = parse_host_port(addr).expect("invalid gRPC address");
let port = port.unwrap_or(DEFAULT_GRPC_LISTEN_PORT);
(PageserverProtocol::Grpc, host, port)
} else {
let (host, port) = parse_host_port(&ps_conf.listen_pg_addr)
.expect("Unable to parse listen_pg_addr");
(PageserverProtocol::Libpq, host, port.unwrap_or(5432))
}
})
.collect::<Vec<_>>();
tracing::info!("Reconfiguring endpoint {}", endpoint_name,);
endpoint
.reconfigure(pageservers, *stripe_size, None)
.reconfigure(compute_pageservers.clone(), *stripe_size, None)
.await
.map_err(NotifyError::NeonLocal)?;
}

View File

@@ -2070,10 +2070,10 @@ pub fn make_router(
router
.data(Arc::new(HttpState::new(service, auth, build_info)))
// Non-prefixed generic endpoints (status, metrics, profiling)
.get("/metrics", |r| {
named_request_span(r, measured_metrics_handler, RequestName("metrics"))
})
// Non-prefixed generic endpoints (status, metrics, profiling)
.get("/status", |r| {
named_request_span(r, handle_status, RequestName("status"))
})

View File

@@ -97,7 +97,7 @@ pub(crate) struct StorageControllerMetricGroup {
/// Count of HTTP requests to the safekeeper that resulted in an error,
/// broken down by the safekeeper node id, request name and method
pub(crate) storage_controller_safekeeper_request_error:
measured::CounterVec<SafekeeperRequestLabelGroupSet>,
measured::CounterVec<PageserverRequestLabelGroupSet>,
/// Latency of HTTP requests to the pageserver, broken down by pageserver
/// node id, request name and method. This include both successful and unsuccessful
@@ -111,7 +111,7 @@ pub(crate) struct StorageControllerMetricGroup {
/// requests.
#[metric(metadata = histogram::Thresholds::exponential_buckets(0.1, 2.0))]
pub(crate) storage_controller_safekeeper_request_latency:
measured::HistogramVec<SafekeeperRequestLabelGroupSet, 5>,
measured::HistogramVec<PageserverRequestLabelGroupSet, 5>,
/// Count of pass-through HTTP requests to the pageserver that resulted in an error,
/// broken down by the pageserver node id, request name and method
@@ -136,17 +136,16 @@ pub(crate) struct StorageControllerMetricGroup {
pub(crate) storage_controller_leadership_status: measured::GaugeVec<LeadershipStatusGroupSet>,
/// Indicator of stucked (long-running) reconciles, broken down by tenant, shard and sequence.
/// The metric is automatically removed once the reconciliation completes.
/// HTTP request status counters for handled requests
pub(crate) storage_controller_reconcile_long_running:
measured::CounterVec<ReconcileLongRunningLabelGroupSet>,
/// Indicator of safekeeper reconciler queue depth, broken down by safekeeper, excluding ongoing reconciles.
pub(crate) storage_controller_safekeeper_reconciles_queued:
pub(crate) storage_controller_safkeeper_reconciles_queued:
measured::GaugeVec<SafekeeperReconcilerLabelGroupSet>,
/// Indicator of completed safekeeper reconciles, broken down by safekeeper.
pub(crate) storage_controller_safekeeper_reconciles_complete:
pub(crate) storage_controller_safkeeper_reconciles_complete:
measured::CounterVec<SafekeeperReconcilerLabelGroupSet>,
}
@@ -219,16 +218,6 @@ pub(crate) struct PageserverRequestLabelGroup<'a> {
pub(crate) method: Method,
}
#[derive(measured::LabelGroup, Clone)]
#[label(set = SafekeeperRequestLabelGroupSet)]
pub(crate) struct SafekeeperRequestLabelGroup<'a> {
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
pub(crate) safekeeper_id: &'a str,
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
pub(crate) path: &'a str,
pub(crate) method: Method,
}
#[derive(measured::LabelGroup)]
#[label(set = DatabaseQueryErrorLabelGroupSet)]
pub(crate) struct DatabaseQueryErrorLabelGroup {

View File

@@ -37,8 +37,6 @@ pub(crate) struct Node {
listen_pg_addr: String,
listen_pg_port: u16,
listen_grpc_addr: Option<String>,
listen_grpc_port: Option<u16>,
availability_zone_id: AvailabilityZone,
@@ -102,8 +100,8 @@ impl Node {
self.id == register_req.node_id
&& self.listen_http_addr == register_req.listen_http_addr
&& self.listen_http_port == register_req.listen_http_port
// Note: HTTPS and gRPC addresses may change, to allow for migrations. See
// [`Self::need_update`] for more details.
// Note: listen_https_port may change. See [`Self::need_update`] for mode details.
// && self.listen_https_port == register_req.listen_https_port
&& self.listen_pg_addr == register_req.listen_pg_addr
&& self.listen_pg_port == register_req.listen_pg_port
&& self.availability_zone_id == register_req.availability_zone_id
@@ -111,10 +109,9 @@ impl Node {
// Do we need to update an existing record in DB on this registration request?
pub(crate) fn need_update(&self, register_req: &NodeRegisterRequest) -> bool {
// These are checked here, since they may change before we're fully migrated.
// listen_https_port is checked here because it may change during migration to https.
// After migration, this check may be moved to registration_match.
self.listen_https_port != register_req.listen_https_port
|| self.listen_grpc_addr != register_req.listen_grpc_addr
|| self.listen_grpc_port != register_req.listen_grpc_port
}
/// For a shard located on this node, populate a response object
@@ -128,8 +125,6 @@ impl Node {
listen_https_port: self.listen_https_port,
listen_pg_addr: self.listen_pg_addr.clone(),
listen_pg_port: self.listen_pg_port,
listen_grpc_addr: self.listen_grpc_addr.clone(),
listen_grpc_port: self.listen_grpc_port,
}
}
@@ -216,8 +211,6 @@ impl Node {
listen_https_port: Option<u16>,
listen_pg_addr: String,
listen_pg_port: u16,
listen_grpc_addr: Option<String>,
listen_grpc_port: Option<u16>,
availability_zone_id: AvailabilityZone,
use_https: bool,
) -> anyhow::Result<Self> {
@@ -228,10 +221,6 @@ impl Node {
);
}
if listen_grpc_addr.is_some() != listen_grpc_port.is_some() {
anyhow::bail!("cannot create node {id}: must specify both gRPC address and port");
}
Ok(Self {
id,
listen_http_addr,
@@ -239,8 +228,6 @@ impl Node {
listen_https_port,
listen_pg_addr,
listen_pg_port,
listen_grpc_addr,
listen_grpc_port,
scheduling: NodeSchedulingPolicy::Active,
lifecycle: NodeLifecycle::Active,
availability: NodeAvailability::Offline,
@@ -260,8 +247,6 @@ impl Node {
listen_https_port: self.listen_https_port.map(|x| x as i32),
listen_pg_addr: self.listen_pg_addr.clone(),
listen_pg_port: self.listen_pg_port as i32,
listen_grpc_addr: self.listen_grpc_addr.clone(),
listen_grpc_port: self.listen_grpc_port.map(|port| port as i32),
availability_zone_id: self.availability_zone_id.0.clone(),
}
}
@@ -275,13 +260,6 @@ impl Node {
);
}
if np.listen_grpc_addr.is_some() != np.listen_grpc_port.is_some() {
anyhow::bail!(
"can't load node {}: must specify both gRPC address and port",
np.node_id
);
}
Ok(Self {
id: NodeId(np.node_id as u64),
// At startup we consider a node offline until proven otherwise.
@@ -294,8 +272,6 @@ impl Node {
listen_https_port: np.listen_https_port.map(|x| x as u16),
listen_pg_addr: np.listen_pg_addr,
listen_pg_port: np.listen_pg_port as u16,
listen_grpc_addr: np.listen_grpc_addr,
listen_grpc_port: np.listen_grpc_port.map(|port| port as u16),
availability_zone_id: AvailabilityZone(np.availability_zone_id),
use_https,
cancel: CancellationToken::new(),
@@ -385,8 +361,6 @@ impl Node {
listen_https_port: self.listen_https_port,
listen_pg_addr: self.listen_pg_addr.clone(),
listen_pg_port: self.listen_pg_port,
listen_grpc_addr: self.listen_grpc_addr.clone(),
listen_grpc_port: self.listen_grpc_port,
}
}
}

View File

@@ -2125,8 +2125,6 @@ pub(crate) struct NodePersistence {
pub(crate) availability_zone_id: String,
pub(crate) listen_https_port: Option<i32>,
pub(crate) lifecycle: String,
pub(crate) listen_grpc_addr: Option<String>,
pub(crate) listen_grpc_port: Option<i32>,
}
/// Tenant metadata health status that are stored durably.

View File

@@ -5,7 +5,7 @@ use safekeeper_client::mgmt_api::{Client, Result};
use utils::id::{NodeId, TenantId, TimelineId};
use utils::logging::SecretString;
use crate::metrics::SafekeeperRequestLabelGroup;
use crate::metrics::PageserverRequestLabelGroup;
/// Thin wrapper around [`safekeeper_client::mgmt_api::Client`]. It allows the storage
/// controller to collect metrics in a non-intrusive manner.
@@ -19,8 +19,8 @@ pub(crate) struct SafekeeperClient {
macro_rules! measured_request {
($name:literal, $method:expr, $node_id: expr, $invoke:expr) => {{
let labels = SafekeeperRequestLabelGroup {
safekeeper_id: $node_id,
let labels = PageserverRequestLabelGroup {
pageserver_id: $node_id,
path: $name,
method: $method,
};
@@ -35,7 +35,7 @@ macro_rules! measured_request {
if res.is_err() {
let error_counters = &crate::metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_safekeeper_request_error;
.storage_controller_pageserver_request_error;
error_counters.inc(labels)
}

View File

@@ -945,8 +945,6 @@ pub(crate) mod test_utils {
None,
format!("pghost-{i}"),
5432 + i as u16,
Some(format!("grpchost-{i}")),
Some(51051 + i as u16),
az_iter
.next()
.cloned()

View File

@@ -34,8 +34,6 @@ diesel::table! {
availability_zone_id -> Varchar,
listen_https_port -> Nullable<Int4>,
lifecycle -> Varchar,
listen_grpc_addr -> Nullable<Varchar>,
listen_grpc_port -> Nullable<Int4>,
}
}

View File

@@ -1683,8 +1683,6 @@ impl Service {
None,
"".to_string(),
123,
None,
None,
AvailabilityZone("test_az".to_string()),
false,
)
@@ -7256,12 +7254,6 @@ impl Service {
));
}
if register_req.listen_grpc_addr.is_some() != register_req.listen_grpc_port.is_some() {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"must specify both gRPC address and port"
)));
}
// Ordering: we must persist the new node _before_ adding it to in-memory state.
// This ensures that before we use it for anything or expose it via any external
// API, it is guaranteed to be available after a restart.
@@ -7272,8 +7264,6 @@ impl Service {
register_req.listen_https_port,
register_req.listen_pg_addr,
register_req.listen_pg_port,
register_req.listen_grpc_addr,
register_req.listen_grpc_port,
register_req.availability_zone_id.clone(),
self.config.use_https_pageserver_api,
);
@@ -8778,22 +8768,15 @@ impl Service {
let waiter_count = waiters.len();
match self.await_waiters(waiters, RECONCILE_TIMEOUT).await {
Ok(()) => {}
Err(ReconcileWaitError::Failed(_, reconcile_error))
if matches!(*reconcile_error, ReconcileError::Cancel) =>
{
// Ignore reconciler cancel errors: this reconciler might have shut down
// because some other change superceded it. We will return a nonzero number,
// so the caller knows they might have to call again to quiesce the system.
}
Err(e) => {
if let ReconcileWaitError::Failed(_, reconcile_error) = &e {
match **reconcile_error {
ReconcileError::Cancel
| ReconcileError::Remote(mgmt_api::Error::Cancelled) => {
// Ignore reconciler cancel errors: this reconciler might have shut down
// because some other change superceded it. We will return a nonzero number,
// so the caller knows they might have to call again to quiesce the system.
}
_ => {
return Err(e);
}
}
} else {
return Err(e);
}
return Err(e);
}
};

View File

@@ -107,7 +107,7 @@ impl ChaosInjector {
// - Skip shards doing a graceful migration already, so that we allow these to run to
// completion rather than only exercising the first part and then cancelling with
// some other chaos.
matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active)
!matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active)
&& shard.get_preferred_node().is_none()
}

View File

@@ -230,7 +230,7 @@ impl ReconcilerHandle {
// increase it before putting into the queue.
let queued_gauge = &METRICS_REGISTRY
.metrics_group
.storage_controller_safekeeper_reconciles_queued;
.storage_controller_safkeeper_reconciles_queued;
let label_group = SafekeeperReconcilerLabelGroup {
sk_az: &sk_az,
sk_node_id: &sk_node_id,
@@ -306,7 +306,7 @@ impl SafekeeperReconciler {
let queued_gauge = &METRICS_REGISTRY
.metrics_group
.storage_controller_safekeeper_reconciles_queued;
.storage_controller_safkeeper_reconciles_queued;
queued_gauge.set(
SafekeeperReconcilerLabelGroup {
sk_az: &req.safekeeper.skp.availability_zone_id,
@@ -547,7 +547,7 @@ impl SafekeeperReconcilerInner {
let complete_counter = &METRICS_REGISTRY
.metrics_group
.storage_controller_safekeeper_reconciles_complete;
.storage_controller_safkeeper_reconciles_complete;
complete_counter.inc(SafekeeperReconcilerLabelGroup {
sk_az: &req.safekeeper.skp.availability_zone_id,
sk_node_id: &req.safekeeper.get_id().to_string(),

View File

@@ -1184,19 +1184,11 @@ impl TenantShard {
for secondary in self.intent.get_secondary() {
// Make sure we don't try to migrate a secondary to our attached location: this case happens
// easily in environments without multiple AZs.
let mut exclude = match self.intent.attached {
let exclude = match self.intent.attached {
Some(attached) => vec![attached],
None => vec![],
};
// Exclude all other secondaries from the scheduling process to avoid replacing
// one existing secondary with another existing secondary.
for another_secondary in self.intent.secondary.iter() {
if another_secondary != secondary {
exclude.push(*another_secondary);
}
}
let replacement = match &self.policy {
PlacementPolicy::Attached(_) => {
// Secondaries for an attached shard should be scheduled using `SecondaryShardTag`
@@ -1356,19 +1348,28 @@ impl TenantShard {
/// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
/// funciton should not be used to decide whether to reconcile.
pub(crate) fn stably_attached(&self) -> Option<NodeId> {
// We have an intent to attach for this node
let attach_intent = self.intent.attached?;
// We have an observed state for this node
let location = self.observed.locations.get(&attach_intent)?;
// Our observed state is not None, i.e. not in flux
let location_config = location.conf.as_ref()?;
// Check if our intent and observed state agree that this node is in an attached state.
match location_config.mode {
LocationConfigMode::AttachedMulti
| LocationConfigMode::AttachedSingle
| LocationConfigMode::AttachedStale => Some(attach_intent),
_ => None,
if let Some(attach_intent) = self.intent.attached {
match self.observed.locations.get(&attach_intent) {
Some(loc) => match &loc.conf {
Some(conf) => match conf.mode {
LocationConfigMode::AttachedMulti
| LocationConfigMode::AttachedSingle
| LocationConfigMode::AttachedStale => {
// Our intent and observed state agree that this node is in an attached state.
Some(attach_intent)
}
// Our observed config is not an attached state
_ => None,
},
// Our observed state is None, i.e. in flux
None => None,
},
// We have no observed state for this node
None => None,
}
} else {
// Our intent is not to attach
None
}
}

View File

@@ -497,7 +497,6 @@ class NeonLocalCli(AbstractNeonCli):
tenant_id: TenantId,
pg_version: PgVersion,
endpoint_id: str | None = None,
grpc: bool | None = None,
hot_standby: bool = False,
lsn: Lsn | None = None,
pageserver_id: int | None = None,
@@ -522,8 +521,6 @@ class NeonLocalCli(AbstractNeonCli):
args.extend(["--external-http-port", str(external_http_port)])
if internal_http_port is not None:
args.extend(["--internal-http-port", str(internal_http_port)])
if grpc:
args.append("--grpc")
if endpoint_id is not None:
args.append(endpoint_id)
if hot_standby:
@@ -567,7 +564,6 @@ class NeonLocalCli(AbstractNeonCli):
basebackup_request_tries: int | None = None,
timeout: str | None = None,
env: dict[str, str] | None = None,
dev: bool = False,
) -> subprocess.CompletedProcess[str]:
args = [
"endpoint",
@@ -593,8 +589,6 @@ class NeonLocalCli(AbstractNeonCli):
args.extend(["--create-test-user"])
if timeout is not None:
args.extend(["--start-timeout", str(timeout)])
if dev:
args.extend(["--dev"])
res = self.raw_cli(args, extra_env_vars)
res.check_returncode()
@@ -623,7 +617,7 @@ class NeonLocalCli(AbstractNeonCli):
destroy=False,
check_return_code=True,
mode: str | None = None,
) -> tuple[Lsn | None, subprocess.CompletedProcess[str]]:
) -> subprocess.CompletedProcess[str]:
args = [
"endpoint",
"stop",
@@ -635,11 +629,7 @@ class NeonLocalCli(AbstractNeonCli):
if endpoint_id is not None:
args.append(endpoint_id)
proc = self.raw_cli(args, check_return_code=check_return_code)
log.debug(f"endpoint stop stdout: {proc.stdout}")
lsn_str = proc.stdout.split()[-1]
lsn: Lsn | None = None if lsn_str == "null" else Lsn(lsn_str)
return lsn, proc
return self.raw_cli(args, check_return_code=check_return_code)
def mappings_map_branch(
self, name: str, tenant_id: TenantId, timeline_id: TimelineId

View File

@@ -1228,7 +1228,6 @@ class NeonEnv:
):
pageserver_port = PageserverPort(
pg=self.port_distributor.get_port(),
grpc=self.port_distributor.get_port(),
http=self.port_distributor.get_port(),
https=self.port_distributor.get_port() if config.use_https_pageserver_api else None,
)
@@ -1244,14 +1243,13 @@ class NeonEnv:
ps_cfg: dict[str, Any] = {
"id": ps_id,
"listen_pg_addr": f"localhost:{pageserver_port.pg}",
"listen_grpc_addr": f"localhost:{pageserver_port.grpc}",
"listen_http_addr": f"localhost:{pageserver_port.http}",
"listen_https_addr": f"localhost:{pageserver_port.https}"
if config.use_https_pageserver_api
else None,
"pg_auth_type": pg_auth_type,
"grpc_auth_type": grpc_auth_type,
"http_auth_type": http_auth_type,
"grpc_auth_type": grpc_auth_type,
"availability_zone": availability_zone,
# Disable pageserver disk syncs in tests: when running tests concurrently, this avoids
# the pageserver taking a long time to start up due to syncfs flushing other tests' data
@@ -1764,7 +1762,6 @@ def neon_env_builder(
@dataclass
class PageserverPort:
pg: int
grpc: int
http: int
https: int | None = None
@@ -2363,7 +2360,6 @@ class NeonStorageController(MetricsGetter, LogUtils):
delay_max = max_interval
while n > 0:
n = self.reconcile_all()
if n == 0:
break
elif time.time() - start_at > timeout_secs:
@@ -4196,8 +4192,6 @@ class Endpoint(PgProtocol, LogUtils):
self._running = threading.Semaphore(0)
self.__jwt: str | None = None
self.terminate_flush_lsn: Lsn | None = None
def http_client(self, retries: Retry | None = None) -> EndpointHttpClient:
assert self.__jwt is not None
return EndpointHttpClient(
@@ -4210,7 +4204,6 @@ class Endpoint(PgProtocol, LogUtils):
self,
branch_name: str,
endpoint_id: str | None = None,
grpc: bool | None = None,
hot_standby: bool = False,
lsn: Lsn | None = None,
config_lines: list[str] | None = None,
@@ -4235,7 +4228,6 @@ class Endpoint(PgProtocol, LogUtils):
endpoint_id=self.endpoint_id,
tenant_id=self.tenant_id,
lsn=lsn,
grpc=grpc,
hot_standby=hot_standby,
pg_port=self.pg_port,
external_http_port=self.external_http_port,
@@ -4502,10 +4494,9 @@ class Endpoint(PgProtocol, LogUtils):
running = self._running.acquire(blocking=False)
if running:
assert self.endpoint_id is not None
lsn, _ = self.env.neon_cli.endpoint_stop(
self.env.neon_cli.endpoint_stop(
self.endpoint_id, check_return_code=self.check_stop_result, mode=mode
)
self.terminate_flush_lsn = lsn
if sks_wait_walreceiver_gone is not None:
for sk in sks_wait_walreceiver_gone[0]:
@@ -4523,10 +4514,9 @@ class Endpoint(PgProtocol, LogUtils):
running = self._running.acquire(blocking=False)
if running:
assert self.endpoint_id is not None
lsn, _ = self.env.neon_cli.endpoint_stop(
self.env.neon_cli.endpoint_stop(
self.endpoint_id, True, check_return_code=self.check_stop_result, mode=mode
)
self.terminate_flush_lsn = lsn
self.endpoint_id = None
return self
@@ -4535,7 +4525,6 @@ class Endpoint(PgProtocol, LogUtils):
self,
branch_name: str,
endpoint_id: str | None = None,
grpc: bool | None = None,
hot_standby: bool = False,
lsn: Lsn | None = None,
config_lines: list[str] | None = None,
@@ -4553,7 +4542,6 @@ class Endpoint(PgProtocol, LogUtils):
branch_name=branch_name,
endpoint_id=endpoint_id,
config_lines=config_lines,
grpc=grpc,
hot_standby=hot_standby,
lsn=lsn,
pageserver_id=pageserver_id,
@@ -4641,7 +4629,6 @@ class EndpointFactory:
endpoint_id: str | None = None,
tenant_id: TenantId | None = None,
lsn: Lsn | None = None,
grpc: bool | None = None,
hot_standby: bool = False,
config_lines: list[str] | None = None,
remote_ext_base_url: str | None = None,
@@ -4661,7 +4648,6 @@ class EndpointFactory:
return ep.create_start(
branch_name=branch_name,
endpoint_id=endpoint_id,
grpc=grpc,
hot_standby=hot_standby,
config_lines=config_lines,
lsn=lsn,
@@ -4676,7 +4662,6 @@ class EndpointFactory:
endpoint_id: str | None = None,
tenant_id: TenantId | None = None,
lsn: Lsn | None = None,
grpc: bool | None = None,
hot_standby: bool = False,
config_lines: list[str] | None = None,
pageserver_id: int | None = None,
@@ -4699,7 +4684,6 @@ class EndpointFactory:
branch_name=branch_name,
endpoint_id=endpoint_id,
lsn=lsn,
grpc=grpc,
hot_standby=hot_standby,
config_lines=config_lines,
pageserver_id=pageserver_id,
@@ -4724,7 +4708,6 @@ class EndpointFactory:
self,
origin: Endpoint,
endpoint_id: str | None = None,
grpc: bool | None = None,
config_lines: list[str] | None = None,
) -> Endpoint:
branch_name = origin.branch_name
@@ -4736,7 +4719,6 @@ class EndpointFactory:
endpoint_id=endpoint_id,
tenant_id=origin.tenant_id,
lsn=None,
grpc=grpc,
hot_standby=True,
config_lines=config_lines,
)
@@ -4745,7 +4727,6 @@ class EndpointFactory:
self,
origin: Endpoint,
endpoint_id: str | None = None,
grpc: bool | None = None,
config_lines: list[str] | None = None,
) -> Endpoint:
branch_name = origin.branch_name
@@ -4757,7 +4738,6 @@ class EndpointFactory:
endpoint_id=endpoint_id,
tenant_id=origin.tenant_id,
lsn=None,
grpc=grpc,
hot_standby=True,
config_lines=config_lines,
)

View File

@@ -1,22 +0,0 @@
-- add 100000 rows or approximately 11 MB to the action_blocks table
-- takes about 1 second
INSERT INTO workflows.action_blocks (
id,
uuid,
created_at,
status,
function_signature,
reference_id,
blocking,
run_synchronously
)
SELECT
id,
uuid_generate_v4(),
now() - (random() * interval '100 days'), -- Random date within the last 100 days
'CONDITIONS_NOT_MET',
'function_signature_' || id, -- Create a unique function signature using id
CASE WHEN random() > 0.5 THEN 'reference_' || id ELSE NULL END, -- 50% chance of being NULL
true,
CASE WHEN random() > 0.5 THEN true ELSE false END -- Random boolean value
FROM generate_series(1, 100000) AS id;

View File

@@ -1,11 +0,0 @@
-- add 100000 rows or approximately 10 MB to the action_kwargs table
-- takes about 5 minutes
INSERT INTO workflows.action_kwargs (created_at, key, uuid, value_id, state_value_id, action_block_id)
SELECT
now(), -- Using the default value for `created_at`
'key_' || gs.id, -- Generating a unique key based on the id
uuid_generate_v4(), -- Generating a new UUID for each row
CASE WHEN gs.id % 2 = 0 THEN gs.id ELSE NULL END, -- Setting value_id for even ids
CASE WHEN gs.id % 2 <> 0 THEN gs.id ELSE NULL END, -- Setting state_value_id for odd ids
1 -- Setting action_block_id as 1 for simplicity
FROM generate_series(1, 100000) AS gs(id);

View File

@@ -1,56 +0,0 @@
-- add 100000 rows or approx. 30 MB to the device_fingerprint_event table
-- takes about 4 minutes
INSERT INTO authentication.device_fingerprint_event (
uuid,
created_at,
identity_uuid,
fingerprint_request_id,
fingerprint_id,
confidence_score,
ip_address,
url,
client_referrer,
last_seen_at,
raw_fingerprint_response,
session_uuid,
fingerprint_response,
browser_version,
browser_name,
device,
operating_system,
operating_system_version,
user_agent,
ip_address_location_city,
ip_address_location_region,
ip_address_location_country_code,
ip_address_location_latitude,
ip_address_location_longitude,
is_incognito
)
SELECT
gen_random_uuid(), -- Generates a random UUID for primary key
now() - (random() * interval '10 days'), -- Random timestamp within the last 10 days
gen_random_uuid(), -- Random UUID for identity
md5(gs::text), -- Simulates unique fingerprint request ID using `md5` hash of series number
md5((gs + 10000)::text), -- Simulates unique fingerprint ID
round(CAST(random() AS numeric), 2), -- Generates a random score between 0 and 1, cast `random()` to numeric
'192.168.' || (random() * 255)::int || '.' || (random() * 255)::int, -- Random IP address
'https://example.com/' || (gs % 1000), -- Random URL with series number suffix
CASE WHEN random() < 0.5 THEN NULL ELSE 'https://referrer.com/' || (gs % 100)::text END, -- Random referrer, 50% chance of being NULL
now() - (random() * interval '5 days'), -- Last seen timestamp within the last 5 days
NULL, -- Keeping raw_fingerprint_response NULL for simplicity
CASE WHEN random() < 0.3 THEN gen_random_uuid() ELSE NULL END, -- Session UUID, 30% chance of NULL
NULL, -- Keeping fingerprint_response NULL for simplicity
CASE WHEN random() < 0.5 THEN '93.0' ELSE '92.0' END, -- Random browser version
CASE WHEN random() < 0.5 THEN 'Firefox' ELSE 'Chrome' END, -- Random browser name
CASE WHEN random() < 0.5 THEN 'Desktop' ELSE 'Mobile' END, -- Random device type
'Windows', -- Static value for operating system
'10.0', -- Static value for operating system version
'Mozilla/5.0', -- Static value for user agent
'City ' || (gs % 1000)::text, -- Random city name
'Region ' || (gs % 100)::text, -- Random region name
'US', -- Static country code
random() * 180 - 90, -- Random latitude between -90 and 90
random() * 360 - 180, -- Random longitude between -180 and 180
random() < 0.1 -- 10% chance of being incognito
FROM generate_series(1, 100000) AS gs;

View File

@@ -1,10 +0,0 @@
-- add 100000 rows or approximately 11 MB to the edges table
-- takes about 1 minute
INSERT INTO workflows.edges (created_at, workflow_id, uuid, from_vertex_id, to_vertex_id)
SELECT
now() - (random() * interval '365 days'), -- Random `created_at` timestamp in the last year
(random() * 100)::int + 1, -- Random `workflow_id` between 1 and 100
uuid_generate_v4(), -- Generate a new UUID for each row
(random() * 100000)::bigint + 1, -- Random `from_vertex_id` between 1 and 100,000
(random() * 100000)::bigint + 1 -- Random `to_vertex_id` between 1 and 100,000
FROM generate_series(1, 100000) AS gs; -- Generate 100,000 sequential IDs

View File

@@ -1,21 +0,0 @@
-- add 100000 rows or approximately 10 MB to the hotel_rate_mapping table
-- takes about 1 second
INSERT INTO booking_inventory.hotel_rate_mapping (
uuid,
created_at,
updated_at,
hotel_rate_id,
remote_id,
source
)
SELECT
uuid_generate_v4(), -- Unique UUID for each row
now(), -- Created at timestamp
now(), -- Updated at timestamp
'rate_' || gs AS hotel_rate_id, -- Unique hotel_rate_id
'remote_' || gs AS remote_id, -- Unique remote_id
CASE WHEN gs % 3 = 0 THEN 'source_1'
WHEN gs % 3 = 1 THEN 'source_2'
ELSE 'source_3'
END AS source -- Distributing sources among three options
FROM generate_series(1, 100000) AS gs;

View File

@@ -1,31 +0,0 @@
-- add 100000 rows or approximately 20 MB to the ocr_pipeline_results_version table
-- takes about 1 second
INSERT INTO ocr.ocr_pipeline_results_version (
id, transaction_id, operation_type, created_at, updated_at, s3_filename, completed_at, result,
end_transaction_id, pipeline_type, is_async, callback, callback_kwargs, input, error, file_type, s3_bucket_name, pipeline_kwargs
)
SELECT
gs.aid, -- id
gs.aid, -- transaction_id (same as id for simplicity)
(gs.aid % 5)::smallint + 1, -- operation_type (cyclic values from 1 to 5)
now() - interval '1 day' * (random() * 30), -- created_at (random timestamp within the last 30 days)
now() - interval '1 day' * (random() * 30), -- updated_at (random timestamp within the last 30 days)
's3_file_' || gs.aid || '.txt', -- s3_filename (synthetic filename)
now() - interval '1 day' * (random() * 30), -- completed_at (random timestamp within the last 30 days)
'{}'::jsonb, -- result (empty JSON object)
NULL, -- end_transaction_id (NULL)
CASE (gs.aid % 3) -- pipeline_type (cyclic text values)
WHEN 0 THEN 'OCR'
WHEN 1 THEN 'PDF'
ELSE 'Image'
END,
gs.aid % 2 = 0, -- is_async (alternating between true and false)
'http://callback/' || gs.aid, -- callback (synthetic URL)
'{}'::jsonb, -- callback_kwargs (empty JSON object)
'Input text ' || gs.aid, -- input (synthetic input text)
NULL, -- error (NULL)
'pdf', -- file_type (default to 'pdf')
'bucket_' || gs.aid % 10, -- s3_bucket_name (synthetic bucket names)
'{}'::jsonb -- pipeline_kwargs (empty JSON object)
FROM
generate_series(1, 100000) AS gs(aid);

View File

@@ -1,18 +0,0 @@
-- add 100000 rows or approx. 20 MB to the priceline_raw_response table
-- takes about 20 seconds
INSERT INTO booking_inventory.priceline_raw_response (
uuid, created_at, updated_at, url, base_url, path, method, params, request, response
)
SELECT
gen_random_uuid(), -- Generate random UUIDs
now() - (random() * interval '30 days'), -- Random creation time within the past 30 days
now() - (random() * interval '30 days'), -- Random update time within the past 30 days
'https://example.com/resource/' || gs, -- Construct a unique URL for each row
'https://example.com', -- Base URL for all rows
'/resource/' || gs, -- Path for each row
CASE WHEN gs % 2 = 0 THEN 'GET' ELSE 'POST' END, -- Alternate between GET and POST methods
'id=' || gs, -- Simple parameter pattern for each row
'{}'::jsonb, -- Empty JSON object for request
jsonb_build_object('status', 'success', 'data', gs) -- Construct a valid JSON response
FROM
generate_series(1, 100000) AS gs;

View File

@@ -1,26 +0,0 @@
-- add 100000 rows or approx. 1 MB to the relabeled_transactions table
-- takes about 1 second
INSERT INTO heron.relabeled_transactions (
id,
created_at,
universal_transaction_id,
raw_result,
category,
category_confidence,
merchant,
batch_id
)
SELECT
gs.aid AS id,
now() - (gs.aid % 1000) * interval '1 second' AS created_at,
'txn_' || gs.aid AS universal_transaction_id,
'{}'::jsonb AS raw_result,
CASE WHEN gs.aid % 5 = 0 THEN 'grocery'
WHEN gs.aid % 5 = 1 THEN 'electronics'
WHEN gs.aid % 5 = 2 THEN 'clothing'
WHEN gs.aid % 5 = 3 THEN 'utilities'
ELSE NULL END AS category,
ROUND(RANDOM()::numeric, 2) AS category_confidence,
CASE WHEN gs.aid % 2 = 0 THEN 'Merchant_' || gs.aid % 20 ELSE NULL END AS merchant,
gs.aid % 100 + 1 AS batch_id
FROM generate_series(1, 100000) AS gs(aid);

View File

@@ -1,9 +0,0 @@
-- add 100000 rows or approx.10 MB to the state_values table
-- takes about 14 seconds
INSERT INTO workflows.state_values (key, workflow_id, state_type, value_id)
SELECT
'key_' || gs::text, -- Key: Generate as 'key_1', 'key_2', etc.
(gs - 1) / 1000 + 1, -- workflow_id: Distribute over a range (1000 workflows)
'STATIC', -- state_type: Use constant 'STATIC' as defined in schema
gs::bigint -- value_id: Use the same as the series value
FROM generate_series(1, 100000) AS gs; -- Generate 100,000 rows

View File

@@ -1,30 +0,0 @@
-- add 100000 rows or approx. 24 MB to the values table
-- takes about 126 seconds
INSERT INTO workflows.values (
id,
type,
int_value,
string_value,
child_type,
bool_value,
uuid,
numeric_value,
workflow_id,
jsonb_value,
parent_value_id
)
SELECT
gs AS id,
'TYPE_A' AS type,
CASE WHEN selector = 1 THEN gs ELSE NULL END AS int_value,
CASE WHEN selector = 2 THEN 'string_value_' || gs::text ELSE NULL END AS string_value,
'CHILD_TYPE_A' AS child_type, -- Always non-null
CASE WHEN selector = 3 THEN (gs % 2 = 0) ELSE NULL END AS bool_value,
uuid_generate_v4() AS uuid, -- Always non-null
CASE WHEN selector = 4 THEN gs * 1.0 ELSE NULL END AS numeric_value,
(array[1, 2, 3, 4, 5])[gs % 5 + 1] AS workflow_id, -- Use only existing workflow IDs
CASE WHEN selector = 5 THEN ('{"key":' || gs::text || '}')::jsonb ELSE NULL END AS jsonb_value,
(gs % 100) + 1 AS parent_value_id -- Always non-null
FROM
generate_series(1, 100000) AS gs,
(SELECT floor(random() * 5 + 1)::int AS selector) AS s;

View File

@@ -1,26 +0,0 @@
-- add 100000 rows or approx. 18 MB to the vertices table
-- takes about 90 seconds
INSERT INTO workflows.vertices(
uuid,
created_at,
condition_block_id,
operator,
has_been_visited,
reference_id,
workflow_id,
meta_data,
-- id,
action_block_id
)
SELECT
uuid_generate_v4() AS uuid,
now() AS created_at,
CASE WHEN (gs % 2 = 0) THEN gs % 10 ELSE NULL END AS condition_block_id, -- Every alternate row has a condition_block_id
'operator_' || (gs % 10) AS operator, -- Cyclical operator values (e.g., operator_0, operator_1)
false AS has_been_visited,
'ref_' || gs AS reference_id, -- Unique reference_id for each row
(gs % 1000) + 1 AS workflow_id, -- Random workflow_id values between 1 and 1000
'{}'::jsonb AS meta_data, -- Empty JSON metadata
-- gs AS id, -- default from sequence to get unique ID
CASE WHEN (gs % 2 = 1) THEN gs ELSE NULL END AS action_block_id -- Complementary to condition_block_id
FROM generate_series(1, 100000) AS gs;

View File

@@ -1,9 +0,0 @@
-- update approximately 2000 rows or 200 kb in the accounting_coding_body_tracking_category_selection table
-- takes about 1 second
UPDATE accounting.accounting_coding_body_tracking_category_selection
SET created_at = now()
WHERE ctid in (
SELECT ctid
FROM accounting.accounting_coding_body_tracking_category_selection
TABLESAMPLE SYSTEM (0.0005)
);

View File

@@ -1,9 +0,0 @@
-- update approximately 9000 rows or 1 MB in the action_blocks table
-- takes about 1 second
UPDATE workflows.action_blocks
SET run_synchronously = NOT run_synchronously
WHERE ctid in (
SELECT ctid
FROM workflows.action_blocks
TABLESAMPLE SYSTEM (0.001)
);

View File

@@ -1,9 +0,0 @@
-- update approximately 5000 rows or 1 MB in the action_kwargs table
-- takes about 1 second
UPDATE workflows.action_kwargs
SET created_at = now()
WHERE ctid in (
SELECT ctid
FROM workflows.action_kwargs
TABLESAMPLE SYSTEM (0.0002)
);

View File

@@ -1,10 +0,0 @@
-- update approximately 3000 rows or 500 KB in the denormalized_approval_workflow table
-- takes about 1 second
UPDATE approvals_v2.denormalized_approval_workflow
SET created_at = now()
WHERE ctid in (
SELECT ctid
FROM approvals_v2.denormalized_approval_workflow
TABLESAMPLE SYSTEM (0.0005)
);

View File

@@ -1,9 +0,0 @@
-- update approximately 2000 rows or 1 MB in the device_fingerprint_event table
-- takes about 5 seconds
UPDATE authentication.device_fingerprint_event
SET is_incognito = NOT is_incognito
WHERE ctid in (
SELECT ctid
FROM authentication.device_fingerprint_event
TABLESAMPLE SYSTEM (0.001)
);

View File

@@ -1,9 +0,0 @@
-- update approximately 4000 rows or 600 kb in the edges table
-- takes about 1 second
UPDATE workflows.edges
SET created_at = now()
WHERE ctid in (
SELECT ctid
FROM workflows.edges
TABLESAMPLE SYSTEM (0.0005)
);

View File

@@ -1,9 +0,0 @@
-- update approximately 10000 rows or 200 KB in the heron_transaction_enriched_log table
-- takes about 1 minutes
UPDATE heron.heron_transaction_enriched_log
SET created_at = now()
WHERE ctid in (
SELECT ctid
FROM heron.heron_transaction_enriched_log
TABLESAMPLE SYSTEM (0.0005)
);

View File

@@ -1,9 +0,0 @@
-- update approximately 4000 rows or 1 MB in the heron_transaction_enrichment_requests table
-- takes about 2 minutes
UPDATE heron.heron_transaction_enrichment_requests
SET created_at = now()
WHERE ctid in (
SELECT ctid
FROM heron.heron_transaction_enrichment_requests
TABLESAMPLE SYSTEM (0.0002)
);

View File

@@ -1,9 +0,0 @@
-- update approximately 6000 rows or 600 kb in the hotel_rate_mapping table
-- takes about 1 second
UPDATE booking_inventory.hotel_rate_mapping
SET created_at = now()
WHERE ctid in (
SELECT ctid
FROM booking_inventory.hotel_rate_mapping
TABLESAMPLE SYSTEM (0.0005)
);

View File

@@ -1,9 +0,0 @@
-- update approximately 2000 rows or 1 MB in the incoming_webhooks table
-- takes about 5 seconds
UPDATE webhook.incoming_webhooks
SET is_body_encrypted = NOT is_body_encrypted
WHERE ctid in (
SELECT ctid
FROM webhook.incoming_webhooks
TABLESAMPLE SYSTEM (0.0002)
);

Some files were not shown because too many files have changed in this diff Show More