mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-29 11:00:38 +00:00
Compare commits
1 Commits
conrad/com
...
ephemerals
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e0b23f841d |
11
.github/workflows/_build-and-test-locally.yml
vendored
11
.github/workflows/_build-and-test-locally.yml
vendored
@@ -38,11 +38,6 @@ on:
|
||||
required: false
|
||||
default: 1
|
||||
type: number
|
||||
rerun-failed:
|
||||
description: 'rerun failed tests to ignore flaky tests'
|
||||
required: false
|
||||
default: true
|
||||
type: boolean
|
||||
|
||||
defaults:
|
||||
run:
|
||||
@@ -384,7 +379,7 @@ jobs:
|
||||
- name: Pytest regression tests
|
||||
continue-on-error: ${{ matrix.lfc_state == 'with-lfc' && inputs.build-type == 'debug' }}
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
timeout-minutes: ${{ (inputs.build-type == 'release' && inputs.sanitizers != 'enabled') && 75 || 180 }}
|
||||
timeout-minutes: ${{ inputs.sanitizers != 'enabled' && 75 || 180 }}
|
||||
with:
|
||||
build_type: ${{ inputs.build-type }}
|
||||
test_selection: regress
|
||||
@@ -392,14 +387,14 @@ jobs:
|
||||
run_with_real_s3: true
|
||||
real_s3_bucket: neon-github-ci-tests
|
||||
real_s3_region: eu-central-1
|
||||
rerun_failed: ${{ inputs.rerun-failed }}
|
||||
rerun_failed: ${{ inputs.test-run-count == 1 }}
|
||||
pg_version: ${{ matrix.pg_version }}
|
||||
sanitizers: ${{ inputs.sanitizers }}
|
||||
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
|
||||
# `--session-timeout` is equal to (timeout-minutes - 10 minutes) * 60 seconds.
|
||||
# Attempt to stop tests gracefully to generate test reports
|
||||
# until they are forcibly stopped by the stricter `timeout-minutes` limit.
|
||||
extra_params: --session-timeout=${{ (inputs.build-type == 'release' && inputs.sanitizers != 'enabled') && 3000 || 10200 }} --count=${{ inputs.test-run-count }}
|
||||
extra_params: --session-timeout=${{ inputs.sanitizers != 'enabled' && 3000 || 10200 }} --count=${{ inputs.test-run-count }}
|
||||
${{ inputs.test-selection != '' && format('-k "{0}"', inputs.test-selection) || '' }}
|
||||
env:
|
||||
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
|
||||
|
||||
@@ -58,7 +58,6 @@ jobs:
|
||||
test-cfg: ${{ inputs.pg-versions }}
|
||||
test-selection: ${{ inputs.test-selection }}
|
||||
test-run-count: ${{ fromJson(inputs.run-count) }}
|
||||
rerun-failed: false
|
||||
secrets: inherit
|
||||
|
||||
create-test-report:
|
||||
|
||||
22
.github/workflows/build_and_test.yml
vendored
22
.github/workflows/build_and_test.yml
vendored
@@ -199,28 +199,6 @@ jobs:
|
||||
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
secrets: inherit
|
||||
|
||||
validate-compute-manifest:
|
||||
runs-on: ubuntu-22.04
|
||||
needs: [ meta, check-permissions ]
|
||||
# We do need to run this in `.*-rc-pr` because of hotfixes.
|
||||
if: ${{ contains(fromJSON('["pr", "push-main", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
|
||||
steps:
|
||||
- name: Harden the runner (Audit all outbound calls)
|
||||
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
|
||||
with:
|
||||
egress-policy: audit
|
||||
|
||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
|
||||
- name: Set up Node.js
|
||||
uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0
|
||||
with:
|
||||
node-version: '24'
|
||||
|
||||
- name: Validate manifest against schema
|
||||
run: |
|
||||
make -C compute manifest-schema-validation
|
||||
|
||||
build-and-test-locally:
|
||||
needs: [ meta, build-build-tools-image ]
|
||||
# We do need to run this in `.*-rc-pr` because of hotfixes.
|
||||
|
||||
151
.github/workflows/build_and_test_fully.yml
vendored
151
.github/workflows/build_and_test_fully.yml
vendored
@@ -1,151 +0,0 @@
|
||||
name: Build and Test Fully
|
||||
|
||||
on:
|
||||
schedule:
|
||||
# * is a special character in YAML so you have to quote this string
|
||||
# ┌───────────── minute (0 - 59)
|
||||
# │ ┌───────────── hour (0 - 23)
|
||||
# │ │ ┌───────────── day of the month (1 - 31)
|
||||
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
|
||||
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
|
||||
- cron: '0 3 * * *' # run once a day, timezone is utc
|
||||
workflow_dispatch:
|
||||
|
||||
defaults:
|
||||
run:
|
||||
shell: bash -euxo pipefail {0}
|
||||
|
||||
concurrency:
|
||||
# Allow only one workflow per any non-`main` branch.
|
||||
group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
|
||||
cancel-in-progress: true
|
||||
|
||||
env:
|
||||
RUST_BACKTRACE: 1
|
||||
COPT: '-Werror'
|
||||
|
||||
jobs:
|
||||
tag:
|
||||
runs-on: [ self-hosted, small ]
|
||||
container: ${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}.dkr.ecr.${{ vars.AWS_ECR_REGION }}.amazonaws.com/base:pinned
|
||||
outputs:
|
||||
build-tag: ${{steps.build-tag.outputs.tag}}
|
||||
|
||||
steps:
|
||||
# Need `fetch-depth: 0` to count the number of commits in the branch
|
||||
- name: Harden the runner (Audit all outbound calls)
|
||||
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
|
||||
with:
|
||||
egress-policy: audit
|
||||
|
||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Get build tag
|
||||
run: |
|
||||
echo run:$GITHUB_RUN_ID
|
||||
echo ref:$GITHUB_REF_NAME
|
||||
echo rev:$(git rev-list --count HEAD)
|
||||
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
|
||||
echo "tag=$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
|
||||
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
|
||||
echo "tag=release-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
|
||||
elif [[ "$GITHUB_REF_NAME" == "release-proxy" ]]; then
|
||||
echo "tag=release-proxy-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
|
||||
elif [[ "$GITHUB_REF_NAME" == "release-compute" ]]; then
|
||||
echo "tag=release-compute-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
|
||||
else
|
||||
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release', 'release-proxy', 'release-compute'"
|
||||
echo "tag=$GITHUB_RUN_ID" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
shell: bash
|
||||
id: build-tag
|
||||
|
||||
build-build-tools-image:
|
||||
uses: ./.github/workflows/build-build-tools-image.yml
|
||||
secrets: inherit
|
||||
|
||||
build-and-test-locally:
|
||||
needs: [ tag, build-build-tools-image ]
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
arch: [ x64, arm64 ]
|
||||
build-type: [ debug, release ]
|
||||
uses: ./.github/workflows/_build-and-test-locally.yml
|
||||
with:
|
||||
arch: ${{ matrix.arch }}
|
||||
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
build-tag: ${{ needs.tag.outputs.build-tag }}
|
||||
build-type: ${{ matrix.build-type }}
|
||||
rerun-failed: false
|
||||
test-cfg: '[{"pg_version":"v14", "lfc_state": "with-lfc"},
|
||||
{"pg_version":"v15", "lfc_state": "with-lfc"},
|
||||
{"pg_version":"v16", "lfc_state": "with-lfc"},
|
||||
{"pg_version":"v17", "lfc_state": "with-lfc"},
|
||||
{"pg_version":"v14", "lfc_state": "without-lfc"},
|
||||
{"pg_version":"v15", "lfc_state": "without-lfc"},
|
||||
{"pg_version":"v16", "lfc_state": "without-lfc"},
|
||||
{"pg_version":"v17", "lfc_state": "withouts-lfc"}]'
|
||||
secrets: inherit
|
||||
|
||||
|
||||
create-test-report:
|
||||
needs: [ build-and-test-locally, build-build-tools-image ]
|
||||
if: ${{ !cancelled() }}
|
||||
permissions:
|
||||
id-token: write # aws-actions/configure-aws-credentials
|
||||
statuses: write
|
||||
contents: write
|
||||
pull-requests: write
|
||||
outputs:
|
||||
report-url: ${{ steps.create-allure-report.outputs.report-url }}
|
||||
|
||||
runs-on: [ self-hosted, small ]
|
||||
container:
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
credentials:
|
||||
username: ${{ github.actor }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
options: --init
|
||||
|
||||
steps:
|
||||
- name: Harden the runner (Audit all outbound calls)
|
||||
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
|
||||
with:
|
||||
egress-policy: audit
|
||||
|
||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
|
||||
- name: Create Allure report
|
||||
if: ${{ !cancelled() }}
|
||||
id: create-allure-report
|
||||
uses: ./.github/actions/allure-report-generate
|
||||
with:
|
||||
store-test-results-into-db: true
|
||||
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
|
||||
env:
|
||||
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
|
||||
|
||||
- uses: actions/github-script@60a0d83039c74a4aee543508d2ffcb1c3799cdea # v7.0.1
|
||||
if: ${{ !cancelled() }}
|
||||
with:
|
||||
# Retry script for 5XX server errors: https://github.com/actions/github-script#retries
|
||||
retries: 5
|
||||
script: |
|
||||
const report = {
|
||||
reportUrl: "${{ steps.create-allure-report.outputs.report-url }}",
|
||||
reportJsonUrl: "${{ steps.create-allure-report.outputs.report-json-url }}",
|
||||
}
|
||||
|
||||
const coverage = {}
|
||||
|
||||
const script = require("./scripts/comment-test-report.js")
|
||||
await script({
|
||||
github,
|
||||
context,
|
||||
fetch,
|
||||
report,
|
||||
coverage,
|
||||
})
|
||||
@@ -79,7 +79,6 @@ jobs:
|
||||
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
build-tag: ${{ needs.tag.outputs.build-tag }}
|
||||
build-type: ${{ matrix.build-type }}
|
||||
rerun-failed: false
|
||||
test-cfg: '[{"pg_version":"v17"}]'
|
||||
sanitizers: enabled
|
||||
secrets: inherit
|
||||
|
||||
3
compute/.gitignore
vendored
3
compute/.gitignore
vendored
@@ -3,6 +3,3 @@ etc/neon_collector.yml
|
||||
etc/neon_collector_autoscaling.yml
|
||||
etc/sql_exporter.yml
|
||||
etc/sql_exporter_autoscaling.yml
|
||||
|
||||
# Node.js dependencies
|
||||
node_modules/
|
||||
|
||||
@@ -48,11 +48,3 @@ jsonnetfmt-test:
|
||||
.PHONY: jsonnetfmt-format
|
||||
jsonnetfmt-format:
|
||||
jsonnetfmt --in-place $(jsonnet_files)
|
||||
|
||||
.PHONY: manifest-schema-validation
|
||||
manifest-schema-validation: node_modules
|
||||
node_modules/.bin/jsonschema validate -d https://json-schema.org/draft/2020-12/schema manifest.schema.json manifest.yaml
|
||||
|
||||
node_modules: package.json
|
||||
npm install
|
||||
touch node_modules
|
||||
|
||||
@@ -1722,29 +1722,11 @@ FROM extensions-${EXTENSIONS} AS neon-pg-ext-build
|
||||
# Compile the Neon-specific `compute_ctl`, `fast_import`, and `local_proxy` binaries
|
||||
#
|
||||
#########################################################################################
|
||||
FROM $REPOSITORY/$IMAGE:$TAG AS compute-tools-plan
|
||||
ARG BUILD_TAG
|
||||
ENV BUILD_TAG=$BUILD_TAG
|
||||
|
||||
WORKDIR /home/nonroot
|
||||
USER nonroot
|
||||
|
||||
# Copy entire project to get Cargo.* files with proper dependencies for the whole project
|
||||
COPY --chown=nonroot . .
|
||||
RUN cargo chef prepare --recipe-path recipe.json
|
||||
|
||||
FROM $REPOSITORY/$IMAGE:$TAG AS compute-tools
|
||||
ARG BUILD_TAG
|
||||
ENV BUILD_TAG=$BUILD_TAG
|
||||
|
||||
USER nonroot
|
||||
|
||||
COPY --from=compute-tools-plan /home/nonroot/recipe.json recipe.json
|
||||
RUN --mount=type=cache,uid=1000,target=/home/nonroot/.cargo/registry \
|
||||
--mount=type=cache,uid=1000,target=/home/nonroot/.cargo/git \
|
||||
--mount=type=cache,uid=1000,target=/home/nonroot/target \
|
||||
mold -run cargo chef cook --locked --profile release-line-debug-size-lto --recipe-path recipe.json
|
||||
|
||||
# Copy entire project to get Cargo.* files with proper dependencies for the whole project
|
||||
COPY --chown=nonroot . .
|
||||
RUN --mount=type=cache,uid=1000,target=/home/nonroot/.cargo/registry \
|
||||
|
||||
@@ -1,209 +0,0 @@
|
||||
{
|
||||
"$schema": "https://json-schema.org/draft/2020-12/schema",
|
||||
"title": "Neon Compute Manifest Schema",
|
||||
"description": "Schema for Neon compute node configuration manifest",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"pg_settings": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"common": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"client_connection_check_interval": {
|
||||
"type": "string",
|
||||
"description": "Check for client disconnection interval in milliseconds"
|
||||
},
|
||||
"effective_io_concurrency": {
|
||||
"type": "string",
|
||||
"description": "Effective IO concurrency setting"
|
||||
},
|
||||
"fsync": {
|
||||
"type": "string",
|
||||
"enum": ["on", "off"],
|
||||
"description": "Whether to force fsync to disk"
|
||||
},
|
||||
"hot_standby": {
|
||||
"type": "string",
|
||||
"enum": ["on", "off"],
|
||||
"description": "Whether hot standby is enabled"
|
||||
},
|
||||
"idle_in_transaction_session_timeout": {
|
||||
"type": "string",
|
||||
"description": "Timeout for idle transactions in milliseconds"
|
||||
},
|
||||
"listen_addresses": {
|
||||
"type": "string",
|
||||
"description": "Addresses to listen on"
|
||||
},
|
||||
"log_connections": {
|
||||
"type": "string",
|
||||
"enum": ["on", "off"],
|
||||
"description": "Whether to log connections"
|
||||
},
|
||||
"log_disconnections": {
|
||||
"type": "string",
|
||||
"enum": ["on", "off"],
|
||||
"description": "Whether to log disconnections"
|
||||
},
|
||||
"log_temp_files": {
|
||||
"type": "string",
|
||||
"description": "Size threshold for logging temporary files in KB"
|
||||
},
|
||||
"log_error_verbosity": {
|
||||
"type": "string",
|
||||
"enum": ["terse", "verbose", "default"],
|
||||
"description": "Error logging verbosity level"
|
||||
},
|
||||
"log_min_error_statement": {
|
||||
"type": "string",
|
||||
"description": "Minimum error level for statement logging"
|
||||
},
|
||||
"maintenance_io_concurrency": {
|
||||
"type": "string",
|
||||
"description": "Maintenance IO concurrency setting"
|
||||
},
|
||||
"max_connections": {
|
||||
"type": "string",
|
||||
"description": "Maximum number of connections"
|
||||
},
|
||||
"max_replication_flush_lag": {
|
||||
"type": "string",
|
||||
"description": "Maximum replication flush lag"
|
||||
},
|
||||
"max_replication_slots": {
|
||||
"type": "string",
|
||||
"description": "Maximum number of replication slots"
|
||||
},
|
||||
"max_replication_write_lag": {
|
||||
"type": "string",
|
||||
"description": "Maximum replication write lag"
|
||||
},
|
||||
"max_wal_senders": {
|
||||
"type": "string",
|
||||
"description": "Maximum number of WAL senders"
|
||||
},
|
||||
"max_wal_size": {
|
||||
"type": "string",
|
||||
"description": "Maximum WAL size"
|
||||
},
|
||||
"neon.unstable_extensions": {
|
||||
"type": "string",
|
||||
"description": "List of unstable extensions"
|
||||
},
|
||||
"neon.protocol_version": {
|
||||
"type": "string",
|
||||
"description": "Neon protocol version"
|
||||
},
|
||||
"password_encryption": {
|
||||
"type": "string",
|
||||
"description": "Password encryption method"
|
||||
},
|
||||
"restart_after_crash": {
|
||||
"type": "string",
|
||||
"enum": ["on", "off"],
|
||||
"description": "Whether to restart after crash"
|
||||
},
|
||||
"superuser_reserved_connections": {
|
||||
"type": "string",
|
||||
"description": "Number of reserved connections for superuser"
|
||||
},
|
||||
"synchronous_standby_names": {
|
||||
"type": "string",
|
||||
"description": "Names of synchronous standby servers"
|
||||
},
|
||||
"wal_keep_size": {
|
||||
"type": "string",
|
||||
"description": "WAL keep size"
|
||||
},
|
||||
"wal_level": {
|
||||
"type": "string",
|
||||
"description": "WAL level"
|
||||
},
|
||||
"wal_log_hints": {
|
||||
"type": "string",
|
||||
"enum": ["on", "off"],
|
||||
"description": "Whether to log hints in WAL"
|
||||
},
|
||||
"wal_sender_timeout": {
|
||||
"type": "string",
|
||||
"description": "WAL sender timeout in milliseconds"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"client_connection_check_interval",
|
||||
"effective_io_concurrency",
|
||||
"fsync",
|
||||
"hot_standby",
|
||||
"idle_in_transaction_session_timeout",
|
||||
"listen_addresses",
|
||||
"log_connections",
|
||||
"log_disconnections",
|
||||
"log_temp_files",
|
||||
"log_error_verbosity",
|
||||
"log_min_error_statement",
|
||||
"maintenance_io_concurrency",
|
||||
"max_connections",
|
||||
"max_replication_flush_lag",
|
||||
"max_replication_slots",
|
||||
"max_replication_write_lag",
|
||||
"max_wal_senders",
|
||||
"max_wal_size",
|
||||
"neon.unstable_extensions",
|
||||
"neon.protocol_version",
|
||||
"password_encryption",
|
||||
"restart_after_crash",
|
||||
"superuser_reserved_connections",
|
||||
"synchronous_standby_names",
|
||||
"wal_keep_size",
|
||||
"wal_level",
|
||||
"wal_log_hints",
|
||||
"wal_sender_timeout"
|
||||
]
|
||||
},
|
||||
"replica": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"hot_standby": {
|
||||
"type": "string",
|
||||
"enum": ["on", "off"],
|
||||
"description": "Whether hot standby is enabled for replicas"
|
||||
}
|
||||
},
|
||||
"required": ["hot_standby"]
|
||||
},
|
||||
"per_version": {
|
||||
"type": "object",
|
||||
"patternProperties": {
|
||||
"^1[4-7]$": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"common": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"io_combine_limit": {
|
||||
"type": "string",
|
||||
"description": "IO combine limit"
|
||||
}
|
||||
}
|
||||
},
|
||||
"replica": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"recovery_prefetch": {
|
||||
"type": "string",
|
||||
"enum": ["on", "off"],
|
||||
"description": "Whether to enable recovery prefetch for PostgreSQL replicas"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"required": ["common", "replica", "per_version"]
|
||||
}
|
||||
},
|
||||
"required": ["pg_settings"]
|
||||
}
|
||||
@@ -105,17 +105,17 @@ pg_settings:
|
||||
# Neon hot standby ignores pages that are not in the shared_buffers
|
||||
recovery_prefetch: "off"
|
||||
16:
|
||||
common: {}
|
||||
common:
|
||||
replica:
|
||||
# prefetching of blocks referenced in WAL doesn't make sense for us
|
||||
# Neon hot standby ignores pages that are not in the shared_buffers
|
||||
recovery_prefetch: "off"
|
||||
15:
|
||||
common: {}
|
||||
common:
|
||||
replica:
|
||||
# prefetching of blocks referenced in WAL doesn't make sense for us
|
||||
# Neon hot standby ignores pages that are not in the shared_buffers
|
||||
recovery_prefetch: "off"
|
||||
14:
|
||||
common: {}
|
||||
replica: {}
|
||||
common:
|
||||
replica:
|
||||
|
||||
37
compute/package-lock.json
generated
37
compute/package-lock.json
generated
@@ -1,37 +0,0 @@
|
||||
{
|
||||
"name": "neon-compute",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "neon-compute",
|
||||
"dependencies": {
|
||||
"@sourcemeta/jsonschema": "9.3.4"
|
||||
}
|
||||
},
|
||||
"node_modules/@sourcemeta/jsonschema": {
|
||||
"version": "9.3.4",
|
||||
"resolved": "https://registry.npmjs.org/@sourcemeta/jsonschema/-/jsonschema-9.3.4.tgz",
|
||||
"integrity": "sha512-hkujfkZAIGXUs4U//We9faZW8LZ4/H9LqagRYsFSulH/VLcKPNhZyCTGg7AhORuzm27zqENvKpnX4g2FzudYFw==",
|
||||
"cpu": [
|
||||
"x64",
|
||||
"arm64"
|
||||
],
|
||||
"license": "AGPL-3.0",
|
||||
"os": [
|
||||
"darwin",
|
||||
"linux",
|
||||
"win32"
|
||||
],
|
||||
"bin": {
|
||||
"jsonschema": "cli.js"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=16"
|
||||
},
|
||||
"funding": {
|
||||
"url": "https://github.com/sponsors/sourcemeta"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,7 +0,0 @@
|
||||
{
|
||||
"name": "neon-compute",
|
||||
"private": true,
|
||||
"dependencies": {
|
||||
"@sourcemeta/jsonschema": "9.3.4"
|
||||
}
|
||||
}
|
||||
@@ -354,6 +354,11 @@ impl ComputeNode {
|
||||
// that can affect `compute_ctl` and prevent it from properly configuring the database schema.
|
||||
// Unset them via connection string options before connecting to the database.
|
||||
// N.B. keep it in sync with `ZENITH_OPTIONS` in `get_maintenance_client()`.
|
||||
//
|
||||
// TODO(ololobus): we currently pass `-c default_transaction_read_only=off` from control plane
|
||||
// as well. After rolling out this code, we can remove this parameter from control plane.
|
||||
// In the meantime, double-passing is fine, the last value is applied.
|
||||
// See: <https://github.com/neondatabase/cloud/blob/133dd8c4dbbba40edfbad475bf6a45073ca63faf/goapp/controlplane/internal/pkg/compute/provisioner/provisioner_common.go#L70>
|
||||
const EXTRA_OPTIONS: &str = "-c role=cloud_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0";
|
||||
let options = match conn_conf.get_options() {
|
||||
Some(options) => format!("{} {}", options, EXTRA_OPTIONS),
|
||||
|
||||
@@ -18,7 +18,7 @@ use clap::Parser;
|
||||
use compute_api::requests::ComputeClaimsScope;
|
||||
use compute_api::spec::ComputeMode;
|
||||
use control_plane::broker::StorageBroker;
|
||||
use control_plane::endpoint::{ComputeControlPlane, PageserverProtocol};
|
||||
use control_plane::endpoint::ComputeControlPlane;
|
||||
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_ADDR, EndpointStorage};
|
||||
use control_plane::local_env;
|
||||
use control_plane::local_env::{
|
||||
@@ -605,14 +605,6 @@ struct EndpointCreateCmdArgs {
|
||||
#[clap(long, help = "Postgres version")]
|
||||
pg_version: u32,
|
||||
|
||||
/// Use gRPC to communicate with Pageservers, by generating grpc:// connstrings.
|
||||
///
|
||||
/// Specified on creation such that it's retained across reconfiguration and restarts.
|
||||
///
|
||||
/// NB: not yet supported by computes.
|
||||
#[clap(long)]
|
||||
grpc: bool,
|
||||
|
||||
#[clap(
|
||||
long,
|
||||
help = "If set, the node will be a hot replica on the specified timeline",
|
||||
@@ -1459,7 +1451,6 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
|
||||
args.internal_http_port,
|
||||
args.pg_version,
|
||||
mode,
|
||||
args.grpc,
|
||||
!args.update_catalog,
|
||||
false,
|
||||
)?;
|
||||
@@ -1500,20 +1491,13 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
|
||||
|
||||
let (pageservers, stripe_size) = if let Some(pageserver_id) = pageserver_id {
|
||||
let conf = env.get_pageserver_conf(pageserver_id).unwrap();
|
||||
// Use gRPC if requested.
|
||||
let pageserver = if endpoint.grpc {
|
||||
let grpc_addr = conf.listen_grpc_addr.as_ref().expect("bad config");
|
||||
let (host, port) = parse_host_port(grpc_addr)?;
|
||||
let port = port.unwrap_or(DEFAULT_PAGESERVER_GRPC_PORT);
|
||||
(PageserverProtocol::Grpc, host, port)
|
||||
} else {
|
||||
let (host, port) = parse_host_port(&conf.listen_pg_addr)?;
|
||||
let port = port.unwrap_or(5432);
|
||||
(PageserverProtocol::Libpq, host, port)
|
||||
};
|
||||
// If caller is telling us what pageserver to use, this is not a tenant which is
|
||||
// fully managed by storage controller, therefore not sharded.
|
||||
(vec![pageserver], DEFAULT_STRIPE_SIZE)
|
||||
let parsed = parse_host_port(&conf.listen_pg_addr).expect("Bad config");
|
||||
(
|
||||
vec![(parsed.0, parsed.1.unwrap_or(5432))],
|
||||
// If caller is telling us what pageserver to use, this is not a tenant which is
|
||||
// full managed by storage controller, therefore not sharded.
|
||||
DEFAULT_STRIPE_SIZE,
|
||||
)
|
||||
} else {
|
||||
// Look up the currently attached location of the tenant, and its striping metadata,
|
||||
// to pass these on to postgres.
|
||||
@@ -1532,20 +1516,11 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
|
||||
.await?;
|
||||
}
|
||||
|
||||
let pageserver = if endpoint.grpc {
|
||||
(
|
||||
PageserverProtocol::Grpc,
|
||||
Host::parse(&shard.listen_grpc_addr.expect("no gRPC address"))?,
|
||||
shard.listen_grpc_port.expect("no gRPC port"),
|
||||
)
|
||||
} else {
|
||||
(
|
||||
PageserverProtocol::Libpq,
|
||||
Host::parse(&shard.listen_pg_addr)?,
|
||||
shard.listen_pg_port,
|
||||
)
|
||||
};
|
||||
anyhow::Ok(pageserver)
|
||||
anyhow::Ok((
|
||||
Host::parse(&shard.listen_pg_addr)
|
||||
.expect("Storage controller reported bad hostname"),
|
||||
shard.listen_pg_port,
|
||||
))
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
@@ -1600,19 +1575,11 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
|
||||
.get(endpoint_id.as_str())
|
||||
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
|
||||
let pageservers = if let Some(ps_id) = args.endpoint_pageserver_id {
|
||||
let conf = env.get_pageserver_conf(ps_id)?;
|
||||
// Use gRPC if requested.
|
||||
let pageserver = if endpoint.grpc {
|
||||
let grpc_addr = conf.listen_grpc_addr.as_ref().expect("bad config");
|
||||
let (host, port) = parse_host_port(grpc_addr)?;
|
||||
let port = port.unwrap_or(DEFAULT_PAGESERVER_GRPC_PORT);
|
||||
(PageserverProtocol::Grpc, host, port)
|
||||
} else {
|
||||
let (host, port) = parse_host_port(&conf.listen_pg_addr)?;
|
||||
let port = port.unwrap_or(5432);
|
||||
(PageserverProtocol::Libpq, host, port)
|
||||
};
|
||||
vec![pageserver]
|
||||
let pageserver = PageServerNode::from_env(env, env.get_pageserver_conf(ps_id)?);
|
||||
vec![(
|
||||
pageserver.pg_connection_config.host().clone(),
|
||||
pageserver.pg_connection_config.port(),
|
||||
)]
|
||||
} else {
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
storage_controller
|
||||
@@ -1621,21 +1588,11 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
|
||||
.shards
|
||||
.into_iter()
|
||||
.map(|shard| {
|
||||
// Use gRPC if requested.
|
||||
if endpoint.grpc {
|
||||
(
|
||||
PageserverProtocol::Grpc,
|
||||
Host::parse(&shard.listen_grpc_addr.expect("no gRPC address"))
|
||||
.expect("bad hostname"),
|
||||
shard.listen_grpc_port.expect("no gRPC port"),
|
||||
)
|
||||
} else {
|
||||
(
|
||||
PageserverProtocol::Libpq,
|
||||
Host::parse(&shard.listen_pg_addr).expect("bad hostname"),
|
||||
shard.listen_pg_port,
|
||||
)
|
||||
}
|
||||
(
|
||||
Host::parse(&shard.listen_pg_addr)
|
||||
.expect("Storage controller reported malformed host"),
|
||||
shard.listen_pg_port,
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
@@ -37,7 +37,6 @@
|
||||
//! ```
|
||||
//!
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt::Display;
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
|
||||
use std::path::PathBuf;
|
||||
use std::process::Command;
|
||||
@@ -77,6 +76,7 @@ use utils::id::{NodeId, TenantId, TimelineId};
|
||||
|
||||
use crate::local_env::LocalEnv;
|
||||
use crate::postgresql_conf::PostgresConf;
|
||||
use crate::storage_controller::StorageController;
|
||||
|
||||
// contents of a endpoint.json file
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
|
||||
@@ -89,7 +89,6 @@ pub struct EndpointConf {
|
||||
external_http_port: u16,
|
||||
internal_http_port: u16,
|
||||
pg_version: u32,
|
||||
grpc: bool,
|
||||
skip_pg_catalog_updates: bool,
|
||||
reconfigure_concurrency: usize,
|
||||
drop_subscriptions_before_start: bool,
|
||||
@@ -193,7 +192,6 @@ impl ComputeControlPlane {
|
||||
internal_http_port: Option<u16>,
|
||||
pg_version: u32,
|
||||
mode: ComputeMode,
|
||||
grpc: bool,
|
||||
skip_pg_catalog_updates: bool,
|
||||
drop_subscriptions_before_start: bool,
|
||||
) -> Result<Arc<Endpoint>> {
|
||||
@@ -228,7 +226,6 @@ impl ComputeControlPlane {
|
||||
// we also skip catalog updates in the cloud.
|
||||
skip_pg_catalog_updates,
|
||||
drop_subscriptions_before_start,
|
||||
grpc,
|
||||
reconfigure_concurrency: 1,
|
||||
features: vec![],
|
||||
cluster: None,
|
||||
@@ -247,7 +244,6 @@ impl ComputeControlPlane {
|
||||
internal_http_port,
|
||||
pg_port,
|
||||
pg_version,
|
||||
grpc,
|
||||
skip_pg_catalog_updates,
|
||||
drop_subscriptions_before_start,
|
||||
reconfigure_concurrency: 1,
|
||||
@@ -302,8 +298,6 @@ pub struct Endpoint {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub mode: ComputeMode,
|
||||
/// If true, the endpoint should use gRPC to communicate with Pageservers.
|
||||
pub grpc: bool,
|
||||
|
||||
// port and address of the Postgres server and `compute_ctl`'s HTTP APIs
|
||||
pub pg_address: SocketAddr,
|
||||
@@ -339,7 +333,7 @@ pub enum EndpointStatus {
|
||||
RunningNoPidfile,
|
||||
}
|
||||
|
||||
impl Display for EndpointStatus {
|
||||
impl std::fmt::Display for EndpointStatus {
|
||||
fn fmt(&self, writer: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
let s = match self {
|
||||
Self::Running => "running",
|
||||
@@ -351,29 +345,6 @@ impl Display for EndpointStatus {
|
||||
}
|
||||
}
|
||||
|
||||
/// Protocol used to connect to a Pageserver.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub enum PageserverProtocol {
|
||||
Libpq,
|
||||
Grpc,
|
||||
}
|
||||
|
||||
impl PageserverProtocol {
|
||||
/// Returns the URL scheme for the protocol, used in connstrings.
|
||||
pub fn scheme(&self) -> &'static str {
|
||||
match self {
|
||||
Self::Libpq => "postgresql",
|
||||
Self::Grpc => "grpc",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for PageserverProtocol {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_str(self.scheme())
|
||||
}
|
||||
}
|
||||
|
||||
impl Endpoint {
|
||||
fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv) -> Result<Endpoint> {
|
||||
if !entry.file_type()?.is_dir() {
|
||||
@@ -409,7 +380,6 @@ impl Endpoint {
|
||||
mode: conf.mode,
|
||||
tenant_id: conf.tenant_id,
|
||||
pg_version: conf.pg_version,
|
||||
grpc: conf.grpc,
|
||||
skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
|
||||
reconfigure_concurrency: conf.reconfigure_concurrency,
|
||||
drop_subscriptions_before_start: conf.drop_subscriptions_before_start,
|
||||
@@ -638,10 +608,10 @@ impl Endpoint {
|
||||
}
|
||||
}
|
||||
|
||||
fn build_pageserver_connstr(pageservers: &[(PageserverProtocol, Host, u16)]) -> String {
|
||||
fn build_pageserver_connstr(pageservers: &[(Host, u16)]) -> String {
|
||||
pageservers
|
||||
.iter()
|
||||
.map(|(scheme, host, port)| format!("{scheme}://no_user@{host}:{port}"))
|
||||
.map(|(host, port)| format!("postgresql://no_user@{host}:{port}"))
|
||||
.collect::<Vec<_>>()
|
||||
.join(",")
|
||||
}
|
||||
@@ -686,7 +656,7 @@ impl Endpoint {
|
||||
endpoint_storage_addr: String,
|
||||
safekeepers_generation: Option<SafekeeperGeneration>,
|
||||
safekeepers: Vec<NodeId>,
|
||||
pageservers: Vec<(PageserverProtocol, Host, u16)>,
|
||||
pageservers: Vec<(Host, u16)>,
|
||||
remote_ext_base_url: Option<&String>,
|
||||
shard_stripe_size: usize,
|
||||
create_test_user: bool,
|
||||
@@ -971,12 +941,10 @@ impl Endpoint {
|
||||
|
||||
pub async fn reconfigure(
|
||||
&self,
|
||||
pageservers: Vec<(PageserverProtocol, Host, u16)>,
|
||||
mut pageservers: Vec<(Host, u16)>,
|
||||
stripe_size: Option<ShardStripeSize>,
|
||||
safekeepers: Option<Vec<NodeId>>,
|
||||
) -> Result<()> {
|
||||
anyhow::ensure!(!pageservers.is_empty(), "no pageservers provided");
|
||||
|
||||
let (mut spec, compute_ctl_config) = {
|
||||
let config_path = self.endpoint_path().join("config.json");
|
||||
let file = std::fs::File::open(config_path)?;
|
||||
@@ -988,7 +956,25 @@ impl Endpoint {
|
||||
let postgresql_conf = self.read_postgresql_conf()?;
|
||||
spec.cluster.postgresql_conf = Some(postgresql_conf);
|
||||
|
||||
// If we weren't given explicit pageservers, query the storage controller
|
||||
if pageservers.is_empty() {
|
||||
let storage_controller = StorageController::from_env(&self.env);
|
||||
let locate_result = storage_controller.tenant_locate(self.tenant_id).await?;
|
||||
pageservers = locate_result
|
||||
.shards
|
||||
.into_iter()
|
||||
.map(|shard| {
|
||||
(
|
||||
Host::parse(&shard.listen_pg_addr)
|
||||
.expect("Storage controller reported bad hostname"),
|
||||
shard.listen_pg_port,
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
}
|
||||
|
||||
let pageserver_connstr = Self::build_pageserver_connstr(&pageservers);
|
||||
assert!(!pageserver_connstr.is_empty());
|
||||
spec.pageserver_connstring = Some(pageserver_connstr);
|
||||
if stripe_size.is_some() {
|
||||
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
|
||||
|
||||
@@ -16,7 +16,6 @@ use std::time::Duration;
|
||||
|
||||
use anyhow::{Context, bail};
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::config::{DEFAULT_GRPC_LISTEN_PORT, DEFAULT_HTTP_LISTEN_PORT};
|
||||
use pageserver_api::models::{self, TenantInfo, TimelineInfo};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::mgmt_api;
|
||||
@@ -253,10 +252,9 @@ impl PageServerNode {
|
||||
// the storage controller
|
||||
let metadata_path = datadir.join("metadata.json");
|
||||
|
||||
let http_host = "localhost".to_string();
|
||||
let (_, http_port) =
|
||||
let (_http_host, http_port) =
|
||||
parse_host_port(&self.conf.listen_http_addr).expect("Unable to parse listen_http_addr");
|
||||
let http_port = http_port.unwrap_or(DEFAULT_HTTP_LISTEN_PORT);
|
||||
let http_port = http_port.unwrap_or(9898);
|
||||
|
||||
let https_port = match self.conf.listen_https_addr.as_ref() {
|
||||
Some(https_addr) => {
|
||||
@@ -267,13 +265,6 @@ impl PageServerNode {
|
||||
None => None,
|
||||
};
|
||||
|
||||
let (mut grpc_host, mut grpc_port) = (None, None);
|
||||
if let Some(grpc_addr) = &self.conf.listen_grpc_addr {
|
||||
let (_, port) = parse_host_port(grpc_addr).expect("Unable to parse listen_grpc_addr");
|
||||
grpc_host = Some("localhost".to_string());
|
||||
grpc_port = Some(port.unwrap_or(DEFAULT_GRPC_LISTEN_PORT));
|
||||
}
|
||||
|
||||
// Intentionally hand-craft JSON: this acts as an implicit format compat test
|
||||
// in case the pageserver-side structure is edited, and reflects the real life
|
||||
// situation: the metadata is written by some other script.
|
||||
@@ -282,9 +273,7 @@ impl PageServerNode {
|
||||
serde_json::to_vec(&pageserver_api::config::NodeMetadata {
|
||||
postgres_host: "localhost".to_string(),
|
||||
postgres_port: self.pg_connection_config.port(),
|
||||
grpc_host,
|
||||
grpc_port,
|
||||
http_host,
|
||||
http_host: "localhost".to_string(),
|
||||
http_port,
|
||||
https_port,
|
||||
other: HashMap::from([(
|
||||
|
||||
@@ -36,10 +36,6 @@ enum Command {
|
||||
listen_pg_addr: String,
|
||||
#[arg(long)]
|
||||
listen_pg_port: u16,
|
||||
#[arg(long)]
|
||||
listen_grpc_addr: Option<String>,
|
||||
#[arg(long)]
|
||||
listen_grpc_port: Option<u16>,
|
||||
|
||||
#[arg(long)]
|
||||
listen_http_addr: String,
|
||||
@@ -422,8 +418,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
node_id,
|
||||
listen_pg_addr,
|
||||
listen_pg_port,
|
||||
listen_grpc_addr,
|
||||
listen_grpc_port,
|
||||
listen_http_addr,
|
||||
listen_http_port,
|
||||
listen_https_port,
|
||||
@@ -437,8 +431,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
node_id,
|
||||
listen_pg_addr,
|
||||
listen_pg_port,
|
||||
listen_grpc_addr,
|
||||
listen_grpc_port,
|
||||
listen_http_addr,
|
||||
listen_http_port,
|
||||
listen_https_port,
|
||||
|
||||
@@ -12,7 +12,6 @@ pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LI
|
||||
pub const DEFAULT_GRPC_LISTEN_PORT: u16 = 51051; // storage-broker already uses 50051
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Display;
|
||||
use std::num::{NonZeroU64, NonZeroUsize};
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
@@ -25,17 +24,16 @@ use utils::logging::LogFormat;
|
||||
use crate::models::{ImageCompressionAlgorithm, LsnLease};
|
||||
|
||||
// Certain metadata (e.g. externally-addressable name, AZ) is delivered
|
||||
// as a separate structure. This information is not needed by the pageserver
|
||||
// as a separate structure. This information is not neeed by the pageserver
|
||||
// itself, it is only used for registering the pageserver with the control
|
||||
// plane and/or storage controller.
|
||||
//
|
||||
#[derive(PartialEq, Eq, Debug, serde::Serialize, serde::Deserialize)]
|
||||
pub struct NodeMetadata {
|
||||
#[serde(rename = "host")]
|
||||
pub postgres_host: String,
|
||||
#[serde(rename = "port")]
|
||||
pub postgres_port: u16,
|
||||
pub grpc_host: Option<String>,
|
||||
pub grpc_port: Option<u16>,
|
||||
pub http_host: String,
|
||||
pub http_port: u16,
|
||||
pub https_port: Option<u16>,
|
||||
@@ -46,23 +44,6 @@ pub struct NodeMetadata {
|
||||
pub other: HashMap<String, serde_json::Value>,
|
||||
}
|
||||
|
||||
impl Display for NodeMetadata {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"postgresql://{}:{} ",
|
||||
self.postgres_host, self.postgres_port
|
||||
)?;
|
||||
if let Some(grpc_host) = &self.grpc_host {
|
||||
let grpc_port = self.grpc_port.unwrap_or_default();
|
||||
write!(f, "grpc://{grpc_host}:{grpc_port} ")?;
|
||||
}
|
||||
write!(f, "http://{}:{} ", self.http_host, self.http_port)?;
|
||||
write!(f, "other:{:?}", self.other)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// PostHog integration config.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub struct PostHogConfig {
|
||||
@@ -356,21 +337,16 @@ pub struct TimelineImportConfig {
|
||||
pub struct BasebackupCacheConfig {
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub cleanup_period: Duration,
|
||||
/// Maximum total size of basebackup cache entries on disk in bytes.
|
||||
/// The cache may slightly exceed this limit because we do not know
|
||||
/// the exact size of the cache entry untill it's written to disk.
|
||||
pub max_total_size_bytes: u64,
|
||||
// TODO(diko): support max_entry_size_bytes.
|
||||
// pub max_entry_size_bytes: u64,
|
||||
pub max_size_entries: usize,
|
||||
// FIXME: Support max_size_bytes.
|
||||
// pub max_size_bytes: usize,
|
||||
pub max_size_entries: i64,
|
||||
}
|
||||
|
||||
impl Default for BasebackupCacheConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
cleanup_period: Duration::from_secs(60),
|
||||
max_total_size_bytes: 1024 * 1024 * 1024, // 1 GiB
|
||||
// max_entry_size_bytes: 16 * 1024 * 1024, // 16 MiB
|
||||
// max_size_bytes: 1024 * 1024 * 1024, // 1 GiB
|
||||
max_size_entries: 1000,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,8 +14,6 @@ fn test_node_metadata_v1_backward_compatibilty() {
|
||||
NodeMetadata {
|
||||
postgres_host: "localhost".to_string(),
|
||||
postgres_port: 23,
|
||||
grpc_host: None,
|
||||
grpc_port: None,
|
||||
http_host: "localhost".to_string(),
|
||||
http_port: 42,
|
||||
https_port: None,
|
||||
@@ -39,35 +37,6 @@ fn test_node_metadata_v2_backward_compatibilty() {
|
||||
NodeMetadata {
|
||||
postgres_host: "localhost".to_string(),
|
||||
postgres_port: 23,
|
||||
grpc_host: None,
|
||||
grpc_port: None,
|
||||
http_host: "localhost".to_string(),
|
||||
http_port: 42,
|
||||
https_port: Some(123),
|
||||
other: HashMap::new(),
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_node_metadata_v3_backward_compatibilty() {
|
||||
let v3 = serde_json::to_vec(&serde_json::json!({
|
||||
"host": "localhost",
|
||||
"port": 23,
|
||||
"grpc_host": "localhost",
|
||||
"grpc_port": 51,
|
||||
"http_host": "localhost",
|
||||
"http_port": 42,
|
||||
"https_port": 123,
|
||||
}));
|
||||
|
||||
assert_eq!(
|
||||
serde_json::from_slice::<NodeMetadata>(&v3.unwrap()).unwrap(),
|
||||
NodeMetadata {
|
||||
postgres_host: "localhost".to_string(),
|
||||
postgres_port: 23,
|
||||
grpc_host: Some("localhost".to_string()),
|
||||
grpc_port: Some(51),
|
||||
http_host: "localhost".to_string(),
|
||||
http_port: 42,
|
||||
https_port: Some(123),
|
||||
|
||||
@@ -52,8 +52,6 @@ pub struct NodeRegisterRequest {
|
||||
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_pg_port: u16,
|
||||
pub listen_grpc_addr: Option<String>,
|
||||
pub listen_grpc_port: Option<u16>,
|
||||
|
||||
pub listen_http_addr: String,
|
||||
pub listen_http_port: u16,
|
||||
@@ -103,8 +101,6 @@ pub struct TenantLocateResponseShard {
|
||||
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_pg_port: u16,
|
||||
pub listen_grpc_addr: Option<String>,
|
||||
pub listen_grpc_port: Option<u16>,
|
||||
|
||||
pub listen_http_addr: String,
|
||||
pub listen_http_port: u16,
|
||||
@@ -156,8 +152,6 @@ pub struct NodeDescribeResponse {
|
||||
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_pg_port: u16,
|
||||
pub listen_grpc_addr: Option<String>,
|
||||
pub listen_grpc_port: Option<u16>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
//!
|
||||
//! # local timeline dir
|
||||
//! ls test_output/test_pgbench\[neon-45-684\]/repo/tenants/$TENANT/timelines/$TIMELINE | \
|
||||
//! grep "__" | cargo run --release --bin pagectl draw-timeline > out.svg
|
||||
//! grep "__" | cargo run --release --bin pagectl draw-timeline-dir > out.svg
|
||||
//!
|
||||
//! # Layer map dump from `/v1/tenant/$TENANT/timeline/$TIMELINE/layer`
|
||||
//! (jq -r '.historic_layers[] | .layer_file_name' | cargo run -p pagectl draw-timeline) < layer-map.json > out.svg
|
||||
@@ -81,11 +81,7 @@ fn build_coordinate_compression_map<T: Ord + Copy>(coords: Vec<T>) -> BTreeMap<T
|
||||
fn parse_filename(name: &str) -> (Range<Key>, Range<Lsn>) {
|
||||
let split: Vec<&str> = name.split("__").collect();
|
||||
let keys: Vec<&str> = split[0].split('-').collect();
|
||||
|
||||
// Remove the temporary file extension, e.g., remove the `.d20a.___temp` part from the following filename:
|
||||
// 000000067F000040490000404A00441B0000-000000067F000040490000404A00441B4000__000043483A34CE00.d20a.___temp
|
||||
let lsns = split[1].split('.').collect::<Vec<&str>>()[0];
|
||||
let mut lsns: Vec<&str> = lsns.split('-').collect();
|
||||
let mut lsns: Vec<&str> = split[1].split('-').collect();
|
||||
|
||||
// The current format of the layer file name: 000000067F0000000400000B150100000000-000000067F0000000400000D350100000000__00000000014B7AC8-v1-00000001
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@ use pageserver::{page_cache, virtual_file};
|
||||
use pageserver_api::key::Key;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use crate::layer_map_analyzer::{LayerFile, parse_filename};
|
||||
use crate::layer_map_analyzer::parse_filename;
|
||||
|
||||
#[derive(Subcommand)]
|
||||
pub(crate) enum LayerCmd {
|
||||
@@ -38,8 +38,6 @@ pub(crate) enum LayerCmd {
|
||||
/// The id from list-layer command
|
||||
id: usize,
|
||||
},
|
||||
/// Dump all information of a layer file locally
|
||||
DumpLayerLocal { path: PathBuf },
|
||||
RewriteSummary {
|
||||
layer_file_path: Utf8PathBuf,
|
||||
#[clap(long)]
|
||||
@@ -133,7 +131,15 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
}
|
||||
|
||||
for (idx, layer_file) in to_print {
|
||||
print_layer_file(idx, &layer_file);
|
||||
println!(
|
||||
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
|
||||
idx,
|
||||
layer_file.key_range.start,
|
||||
layer_file.key_range.end,
|
||||
layer_file.lsn_range.start,
|
||||
layer_file.lsn_range.end,
|
||||
layer_file.is_delta,
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -153,7 +159,16 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
let layer = layer?;
|
||||
if let Ok(layer_file) = parse_filename(&layer.file_name().into_string().unwrap()) {
|
||||
if *id == idx {
|
||||
print_layer_file(idx, &layer_file);
|
||||
// TODO(chi): dedup code
|
||||
println!(
|
||||
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
|
||||
idx,
|
||||
layer_file.key_range.start,
|
||||
layer_file.key_range.end,
|
||||
layer_file.lsn_range.start,
|
||||
layer_file.lsn_range.end,
|
||||
layer_file.is_delta,
|
||||
);
|
||||
|
||||
if layer_file.is_delta {
|
||||
read_delta_file(layer.path(), &ctx).await?;
|
||||
@@ -168,18 +183,6 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
LayerCmd::DumpLayerLocal { path } => {
|
||||
if let Ok(layer_file) = parse_filename(path.file_name().unwrap().to_str().unwrap()) {
|
||||
print_layer_file(0, &layer_file);
|
||||
|
||||
if layer_file.is_delta {
|
||||
read_delta_file(path, &ctx).await?;
|
||||
} else {
|
||||
read_image_file(path, &ctx).await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
LayerCmd::RewriteSummary {
|
||||
layer_file_path,
|
||||
new_tenant_id,
|
||||
@@ -244,15 +247,3 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn print_layer_file(idx: usize, layer_file: &LayerFile) {
|
||||
println!(
|
||||
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
|
||||
idx,
|
||||
layer_file.key_range.start,
|
||||
layer_file.key_range.end,
|
||||
layer_file.lsn_range.start,
|
||||
layer_file.lsn_range.end,
|
||||
layer_file.is_delta,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -102,10 +102,10 @@ message CheckRelExistsResponse {
|
||||
bool exists = 1;
|
||||
}
|
||||
|
||||
// Requests a base backup.
|
||||
// Requests a base backup at a given LSN.
|
||||
message GetBaseBackupRequest {
|
||||
// The LSN to fetch the base backup at. 0 or absent means the latest LSN known to the Pageserver.
|
||||
uint64 lsn = 1;
|
||||
// The LSN to fetch a base backup at.
|
||||
ReadLsn read_lsn = 1;
|
||||
// If true, logical replication slots will not be created.
|
||||
bool replica = 2;
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ use utils::lsn::Lsn;
|
||||
use crate::proto;
|
||||
|
||||
/// A protocol error. Typically returned via try_from() or try_into().
|
||||
#[derive(thiserror::Error, Clone, Debug)]
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum ProtocolError {
|
||||
#[error("field '{0}' has invalid value '{1}'")]
|
||||
Invalid(&'static str, String),
|
||||
@@ -182,28 +182,33 @@ impl From<CheckRelExistsResponse> for proto::CheckRelExistsResponse {
|
||||
}
|
||||
}
|
||||
|
||||
/// Requests a base backup.
|
||||
/// Requests a base backup at a given LSN.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct GetBaseBackupRequest {
|
||||
/// The LSN to fetch a base backup at. If None, uses the latest LSN known to the Pageserver.
|
||||
pub lsn: Option<Lsn>,
|
||||
/// The LSN to fetch a base backup at.
|
||||
pub read_lsn: ReadLsn,
|
||||
/// If true, logical replication slots will not be created.
|
||||
pub replica: bool,
|
||||
}
|
||||
|
||||
impl From<proto::GetBaseBackupRequest> for GetBaseBackupRequest {
|
||||
fn from(pb: proto::GetBaseBackupRequest) -> Self {
|
||||
Self {
|
||||
lsn: (pb.lsn != 0).then_some(Lsn(pb.lsn)),
|
||||
impl TryFrom<proto::GetBaseBackupRequest> for GetBaseBackupRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(pb: proto::GetBaseBackupRequest) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
read_lsn: pb
|
||||
.read_lsn
|
||||
.ok_or(ProtocolError::Missing("read_lsn"))?
|
||||
.try_into()?,
|
||||
replica: pb.replica,
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetBaseBackupRequest> for proto::GetBaseBackupRequest {
|
||||
fn from(request: GetBaseBackupRequest) -> Self {
|
||||
Self {
|
||||
lsn: request.lsn.unwrap_or_default().0,
|
||||
read_lsn: Some(request.read_lsn.into()),
|
||||
replica: request.replica,
|
||||
}
|
||||
}
|
||||
@@ -417,39 +422,6 @@ impl From<GetPageResponse> for proto::GetPageResponse {
|
||||
}
|
||||
}
|
||||
|
||||
impl GetPageResponse {
|
||||
/// Attempts to represent a tonic::Status as a GetPageResponse if appropriate. Returning a
|
||||
/// tonic::Status will terminate the GetPage stream, so per-request errors are emitted as a
|
||||
/// GetPageResponse with a non-OK status code instead.
|
||||
#[allow(clippy::result_large_err)]
|
||||
pub fn try_from_status(
|
||||
status: tonic::Status,
|
||||
request_id: RequestID,
|
||||
) -> Result<Self, tonic::Status> {
|
||||
// We shouldn't see an OK status here, because we're emitting an error.
|
||||
debug_assert_ne!(status.code(), tonic::Code::Ok);
|
||||
if status.code() == tonic::Code::Ok {
|
||||
return Err(tonic::Status::internal(format!(
|
||||
"unexpected OK status: {status:?}",
|
||||
)));
|
||||
}
|
||||
|
||||
// If we can't convert the tonic::Code to a GetPageStatusCode, this is not a per-request
|
||||
// error and we should return a tonic::Status to terminate the stream.
|
||||
let Ok(status_code) = status.code().try_into() else {
|
||||
return Err(status);
|
||||
};
|
||||
|
||||
// Return a GetPageResponse for the status.
|
||||
Ok(Self {
|
||||
request_id,
|
||||
status_code,
|
||||
reason: Some(status.message().to_string()),
|
||||
page_images: Vec::new(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// A GetPage response status code.
|
||||
///
|
||||
/// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream
|
||||
@@ -513,42 +485,8 @@ impl From<GetPageStatusCode> for i32 {
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<tonic::Code> for GetPageStatusCode {
|
||||
type Error = tonic::Code;
|
||||
|
||||
fn try_from(code: tonic::Code) -> Result<Self, Self::Error> {
|
||||
use tonic::Code;
|
||||
|
||||
let status_code = match code {
|
||||
Code::Ok => Self::Ok,
|
||||
|
||||
// These are per-request errors, which should be returned as GetPageResponses.
|
||||
Code::AlreadyExists => Self::InvalidRequest,
|
||||
Code::DataLoss => Self::InternalError,
|
||||
Code::FailedPrecondition => Self::InvalidRequest,
|
||||
Code::InvalidArgument => Self::InvalidRequest,
|
||||
Code::Internal => Self::InternalError,
|
||||
Code::NotFound => Self::NotFound,
|
||||
Code::OutOfRange => Self::InvalidRequest,
|
||||
Code::ResourceExhausted => Self::SlowDown,
|
||||
|
||||
// These should terminate the stream by returning a tonic::Status.
|
||||
Code::Aborted
|
||||
| Code::Cancelled
|
||||
| Code::DeadlineExceeded
|
||||
| Code::PermissionDenied
|
||||
| Code::Unauthenticated
|
||||
| Code::Unavailable
|
||||
| Code::Unimplemented
|
||||
| Code::Unknown => return Err(code),
|
||||
};
|
||||
Ok(status_code)
|
||||
}
|
||||
}
|
||||
|
||||
// Fetches the size of a relation at a given LSN, as # of blocks. Only valid on shard 0, other
|
||||
// shards will error.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct GetRelSizeRequest {
|
||||
pub read_lsn: ReadLsn,
|
||||
pub rel: RelTag,
|
||||
@@ -592,7 +530,6 @@ impl From<GetRelSizeResponse> for proto::GetRelSizeResponse {
|
||||
}
|
||||
|
||||
/// Requests an SLRU segment. Only valid on shard 0, other shards will error.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct GetSlruSegmentRequest {
|
||||
pub read_lsn: ReadLsn,
|
||||
pub kind: SlruKind,
|
||||
|
||||
@@ -19,10 +19,7 @@ use utils::{
|
||||
use crate::{
|
||||
basebackup::send_basebackup_tarball,
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
metrics::{
|
||||
BASEBACKUP_CACHE_ENTRIES, BASEBACKUP_CACHE_PREPARE, BASEBACKUP_CACHE_READ,
|
||||
BASEBACKUP_CACHE_SIZE,
|
||||
},
|
||||
metrics::{BASEBACKUP_CACHE_ENTRIES, BASEBACKUP_CACHE_PREPARE, BASEBACKUP_CACHE_READ},
|
||||
task_mgr::TaskKind,
|
||||
tenant::{
|
||||
Timeline,
|
||||
@@ -39,13 +36,8 @@ pub struct BasebackupPrepareRequest {
|
||||
pub type BasebackupPrepareSender = UnboundedSender<BasebackupPrepareRequest>;
|
||||
pub type BasebackupPrepareReceiver = UnboundedReceiver<BasebackupPrepareRequest>;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct CacheEntry {
|
||||
/// LSN at which the basebackup was taken.
|
||||
lsn: Lsn,
|
||||
/// Size of the basebackup archive in bytes.
|
||||
size_bytes: u64,
|
||||
}
|
||||
type BasebackupRemoveEntrySender = UnboundedSender<Utf8PathBuf>;
|
||||
type BasebackupRemoveEntryReceiver = UnboundedReceiver<Utf8PathBuf>;
|
||||
|
||||
/// BasebackupCache stores cached basebackup archives for timelines on local disk.
|
||||
///
|
||||
@@ -61,12 +53,21 @@ struct CacheEntry {
|
||||
/// and ~1 RPS for get requests.
|
||||
pub struct BasebackupCache {
|
||||
data_dir: Utf8PathBuf,
|
||||
config: BasebackupCacheConfig,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
remove_entry_sender: BasebackupRemoveEntrySender,
|
||||
|
||||
entries: std::sync::Mutex<HashMap<TenantTimelineId, CacheEntry>>,
|
||||
entries: std::sync::Mutex<HashMap<TenantTimelineId, Lsn>>,
|
||||
|
||||
cancel: CancellationToken,
|
||||
|
||||
read_hit_count: GenericCounter<AtomicU64>,
|
||||
read_miss_count: GenericCounter<AtomicU64>,
|
||||
read_err_count: GenericCounter<AtomicU64>,
|
||||
|
||||
prepare_ok_count: GenericCounter<AtomicU64>,
|
||||
prepare_skip_count: GenericCounter<AtomicU64>,
|
||||
prepare_err_count: GenericCounter<AtomicU64>,
|
||||
}
|
||||
|
||||
impl BasebackupCache {
|
||||
@@ -82,32 +83,35 @@ impl BasebackupCache {
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
cancel: CancellationToken,
|
||||
) -> Arc<Self> {
|
||||
let (remove_entry_sender, remove_entry_receiver) = tokio::sync::mpsc::unbounded_channel();
|
||||
|
||||
let enabled = config.is_some();
|
||||
|
||||
let cache = Arc::new(BasebackupCache {
|
||||
data_dir,
|
||||
config: config.unwrap_or_default(),
|
||||
tenant_manager,
|
||||
remove_entry_sender,
|
||||
|
||||
entries: std::sync::Mutex::new(HashMap::new()),
|
||||
|
||||
cancel,
|
||||
|
||||
read_hit_count: BASEBACKUP_CACHE_READ.with_label_values(&["hit"]),
|
||||
read_miss_count: BASEBACKUP_CACHE_READ.with_label_values(&["miss"]),
|
||||
read_err_count: BASEBACKUP_CACHE_READ.with_label_values(&["error"]),
|
||||
|
||||
prepare_ok_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["ok"]),
|
||||
prepare_skip_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["skip"]),
|
||||
prepare_err_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["error"]),
|
||||
});
|
||||
|
||||
if let Some(config) = config {
|
||||
let background = BackgroundTask {
|
||||
c: cache.clone(),
|
||||
|
||||
config,
|
||||
tenant_manager,
|
||||
cancel,
|
||||
|
||||
entry_count: 0,
|
||||
total_size_bytes: 0,
|
||||
|
||||
prepare_ok_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["ok"]),
|
||||
prepare_skip_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["skip"]),
|
||||
prepare_err_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["error"]),
|
||||
};
|
||||
runtime_handle.spawn(background.run(prepare_receiver));
|
||||
if enabled {
|
||||
runtime_handle.spawn(
|
||||
cache
|
||||
.clone()
|
||||
.background(prepare_receiver, remove_entry_receiver),
|
||||
);
|
||||
}
|
||||
|
||||
cache
|
||||
@@ -125,7 +129,7 @@ impl BasebackupCache {
|
||||
) -> Option<tokio::fs::File> {
|
||||
// Fast path. Check if the entry exists using the in-memory state.
|
||||
let tti = TenantTimelineId::new(tenant_id, timeline_id);
|
||||
if self.entries.lock().unwrap().get(&tti).map(|e| e.lsn) != Some(lsn) {
|
||||
if self.entries.lock().unwrap().get(&tti) != Some(&lsn) {
|
||||
self.read_miss_count.inc();
|
||||
return None;
|
||||
}
|
||||
@@ -163,41 +167,9 @@ impl BasebackupCache {
|
||||
self.data_dir
|
||||
.join(Self::entry_filename(tenant_id, timeline_id, lsn))
|
||||
}
|
||||
}
|
||||
|
||||
/// The background task that does the job to prepare basebackups
|
||||
/// and manage the cache entries on disk.
|
||||
/// It is a separate struct from BasebackupCache to allow holding
|
||||
/// a mutable reference to this state without a mutex lock,
|
||||
/// while BasebackupCache is referenced by the clients.
|
||||
struct BackgroundTask {
|
||||
c: Arc<BasebackupCache>,
|
||||
|
||||
config: BasebackupCacheConfig,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
cancel: CancellationToken,
|
||||
|
||||
/// Number of the entries in the cache.
|
||||
/// This counter is used for metrics and applying cache limits.
|
||||
/// It generally should be equal to c.entries.len(), but it's calculated
|
||||
/// pessimistically for abnormal situations: if we encountered some errors
|
||||
/// during removing the entry from disk, we won't decrement this counter to
|
||||
/// make sure that we don't exceed the limit with "trashed" files on the disk.
|
||||
/// It will also count files in the data_dir that are not valid cache entries.
|
||||
entry_count: usize,
|
||||
/// Total size of all the entries on the disk.
|
||||
/// This counter is used for metrics and applying cache limits.
|
||||
/// Similar to entry_count, it is calculated pessimistically for abnormal situations.
|
||||
total_size_bytes: u64,
|
||||
|
||||
prepare_ok_count: GenericCounter<AtomicU64>,
|
||||
prepare_skip_count: GenericCounter<AtomicU64>,
|
||||
prepare_err_count: GenericCounter<AtomicU64>,
|
||||
}
|
||||
|
||||
impl BackgroundTask {
|
||||
fn tmp_dir(&self) -> Utf8PathBuf {
|
||||
self.c.data_dir.join("tmp")
|
||||
self.data_dir.join("tmp")
|
||||
}
|
||||
|
||||
fn entry_tmp_path(
|
||||
@@ -207,7 +179,7 @@ impl BackgroundTask {
|
||||
lsn: Lsn,
|
||||
) -> Utf8PathBuf {
|
||||
self.tmp_dir()
|
||||
.join(BasebackupCache::entry_filename(tenant_id, timeline_id, lsn))
|
||||
.join(Self::entry_filename(tenant_id, timeline_id, lsn))
|
||||
}
|
||||
|
||||
fn parse_entry_filename(filename: &str) -> Option<(TenantId, TimelineId, Lsn)> {
|
||||
@@ -236,11 +208,11 @@ impl BackgroundTask {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn cleanup(&mut self) -> anyhow::Result<()> {
|
||||
async fn cleanup(&self) -> anyhow::Result<()> {
|
||||
self.clean_tmp_dir().await?;
|
||||
|
||||
// Leave only up-to-date entries.
|
||||
let entries_old = self.c.entries.lock().unwrap().clone();
|
||||
// Remove outdated entries.
|
||||
let entries_old = self.entries.lock().unwrap().clone();
|
||||
let mut entries_new = HashMap::new();
|
||||
for (tenant_shard_id, tenant_slot) in self.tenant_manager.list() {
|
||||
if !tenant_shard_id.is_shard_zero() {
|
||||
@@ -253,32 +225,31 @@ impl BackgroundTask {
|
||||
|
||||
for timeline in tenant.list_timelines() {
|
||||
let tti = TenantTimelineId::new(tenant_id, timeline.timeline_id);
|
||||
if let Some(entry) = entries_old.get(&tti) {
|
||||
if timeline.get_last_record_lsn() <= entry.lsn {
|
||||
entries_new.insert(tti, entry.clone());
|
||||
if let Some(&entry_lsn) = entries_old.get(&tti) {
|
||||
if timeline.get_last_record_lsn() <= entry_lsn {
|
||||
entries_new.insert(tti, entry_lsn);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Try to remove all entries that are not up-to-date.
|
||||
for (&tti, entry) in entries_old.iter() {
|
||||
for (&tti, &lsn) in entries_old.iter() {
|
||||
if !entries_new.contains_key(&tti) {
|
||||
self.try_remove_entry(tti.tenant_id, tti.timeline_id, entry)
|
||||
.await;
|
||||
self.remove_entry_sender
|
||||
.send(self.entry_path(tti.tenant_id, tti.timeline_id, lsn))
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
// Note: BackgroundTask is the only writer for self.c.entries,
|
||||
// so it couldn't have been modified concurrently.
|
||||
*self.c.entries.lock().unwrap() = entries_new;
|
||||
BASEBACKUP_CACHE_ENTRIES.set(entries_new.len() as i64);
|
||||
*self.entries.lock().unwrap() = entries_new;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn on_startup(&mut self) -> anyhow::Result<()> {
|
||||
async fn on_startup(&self) -> anyhow::Result<()> {
|
||||
// Create data_dir if it does not exist.
|
||||
tokio::fs::create_dir_all(&self.c.data_dir)
|
||||
tokio::fs::create_dir_all(&self.data_dir)
|
||||
.await
|
||||
.context("Failed to create basebackup cache data directory")?;
|
||||
|
||||
@@ -287,8 +258,8 @@ impl BackgroundTask {
|
||||
.context("Failed to clean tmp directory")?;
|
||||
|
||||
// Read existing entries from the data_dir and add them to in-memory state.
|
||||
let mut entries = HashMap::<TenantTimelineId, CacheEntry>::new();
|
||||
let mut dir = tokio::fs::read_dir(&self.c.data_dir).await?;
|
||||
let mut entries = HashMap::new();
|
||||
let mut dir = tokio::fs::read_dir(&self.data_dir).await?;
|
||||
while let Some(dir_entry) = dir.next_entry().await? {
|
||||
let filename = dir_entry.file_name();
|
||||
|
||||
@@ -297,43 +268,33 @@ impl BackgroundTask {
|
||||
continue;
|
||||
}
|
||||
|
||||
let size_bytes = dir_entry
|
||||
.metadata()
|
||||
.await
|
||||
.map_err(|e| {
|
||||
anyhow::anyhow!("Failed to read metadata for file {:?}: {:?}", filename, e)
|
||||
})?
|
||||
.len();
|
||||
|
||||
self.entry_count += 1;
|
||||
BASEBACKUP_CACHE_ENTRIES.set(self.entry_count as u64);
|
||||
|
||||
self.total_size_bytes += size_bytes;
|
||||
BASEBACKUP_CACHE_SIZE.set(self.total_size_bytes);
|
||||
|
||||
let parsed = Self::parse_entry_filename(filename.to_string_lossy().as_ref());
|
||||
let Some((tenant_id, timeline_id, lsn)) = parsed else {
|
||||
tracing::warn!("Invalid basebackup cache file name: {:?}", filename);
|
||||
continue;
|
||||
};
|
||||
|
||||
let cur_entry = CacheEntry { lsn, size_bytes };
|
||||
|
||||
let tti = TenantTimelineId::new(tenant_id, timeline_id);
|
||||
|
||||
use std::collections::hash_map::Entry::*;
|
||||
|
||||
match entries.entry(tti) {
|
||||
Occupied(mut entry) => {
|
||||
let found_entry = entry.get();
|
||||
let entry_lsn = *entry.get();
|
||||
// Leave only the latest entry, remove the old one.
|
||||
if cur_entry.lsn < found_entry.lsn {
|
||||
self.try_remove_entry(tenant_id, timeline_id, &cur_entry)
|
||||
.await;
|
||||
} else if cur_entry.lsn > found_entry.lsn {
|
||||
self.try_remove_entry(tenant_id, timeline_id, found_entry)
|
||||
.await;
|
||||
entry.insert(cur_entry);
|
||||
if lsn < entry_lsn {
|
||||
self.remove_entry_sender.send(self.entry_path(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
lsn,
|
||||
))?;
|
||||
} else if lsn > entry_lsn {
|
||||
self.remove_entry_sender.send(self.entry_path(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
entry_lsn,
|
||||
))?;
|
||||
entry.insert(lsn);
|
||||
} else {
|
||||
// Two different filenames parsed to the same timline_id and LSN.
|
||||
// Should never happen.
|
||||
@@ -344,17 +305,22 @@ impl BackgroundTask {
|
||||
}
|
||||
}
|
||||
Vacant(entry) => {
|
||||
entry.insert(cur_entry);
|
||||
entry.insert(lsn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
*self.c.entries.lock().unwrap() = entries;
|
||||
BASEBACKUP_CACHE_ENTRIES.set(entries.len() as i64);
|
||||
*self.entries.lock().unwrap() = entries;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run(mut self, mut prepare_receiver: BasebackupPrepareReceiver) {
|
||||
async fn background(
|
||||
self: Arc<Self>,
|
||||
mut prepare_receiver: BasebackupPrepareReceiver,
|
||||
mut remove_entry_receiver: BasebackupRemoveEntryReceiver,
|
||||
) {
|
||||
// Panic in the background is a safe fallback.
|
||||
// It will drop receivers and the cache will be effectively disabled.
|
||||
self.on_startup()
|
||||
@@ -377,6 +343,11 @@ impl BackgroundTask {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Some(req) = remove_entry_receiver.recv() => {
|
||||
if let Err(e) = tokio::fs::remove_file(req).await {
|
||||
tracing::warn!("Failed to remove basebackup cache file: {:#}", e);
|
||||
}
|
||||
}
|
||||
_ = cleanup_ticker.tick() => {
|
||||
self.cleanup().await.unwrap_or_else(|e| {
|
||||
tracing::warn!("Failed to clean up basebackup cache: {:#}", e);
|
||||
@@ -390,67 +361,6 @@ impl BackgroundTask {
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to remove an entry from disk.
|
||||
/// The caller is responsible for removing the entry from the in-memory state.
|
||||
/// Updates size counters and corresponding metrics.
|
||||
/// Ignores the filesystem errors as not-so-important, but the size counters
|
||||
/// are not decremented in this case, so the file will continue to be counted
|
||||
/// towards the size limits.
|
||||
async fn try_remove_entry(
|
||||
&mut self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
entry: &CacheEntry,
|
||||
) {
|
||||
let entry_path = self.c.entry_path(tenant_id, timeline_id, entry.lsn);
|
||||
|
||||
match tokio::fs::remove_file(&entry_path).await {
|
||||
Ok(_) => {}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
"Failed to remove basebackup cache file for tenant {} timeline {} LSN {}: {:#}",
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
entry.lsn,
|
||||
e
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
self.entry_count -= 1;
|
||||
BASEBACKUP_CACHE_ENTRIES.set(self.entry_count as u64);
|
||||
|
||||
self.total_size_bytes -= entry.size_bytes;
|
||||
BASEBACKUP_CACHE_SIZE.set(self.total_size_bytes);
|
||||
}
|
||||
|
||||
/// Insert the cache entry into in-memory state and update the size counters.
|
||||
/// Assumes that the file for the entry already exists on disk.
|
||||
/// If the entry already exists with previous LSN, it will be removed.
|
||||
async fn upsert_entry(
|
||||
&mut self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
entry: CacheEntry,
|
||||
) {
|
||||
let tti = TenantTimelineId::new(tenant_id, timeline_id);
|
||||
|
||||
self.entry_count += 1;
|
||||
BASEBACKUP_CACHE_ENTRIES.set(self.entry_count as u64);
|
||||
|
||||
self.total_size_bytes += entry.size_bytes;
|
||||
BASEBACKUP_CACHE_SIZE.set(self.total_size_bytes);
|
||||
|
||||
let old_entry = self.c.entries.lock().unwrap().insert(tti, entry);
|
||||
|
||||
if let Some(old_entry) = old_entry {
|
||||
self.try_remove_entry(tenant_id, timeline_id, &old_entry)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Prepare a basebackup for the given timeline.
|
||||
///
|
||||
/// If the basebackup already exists with a higher LSN or the timeline already
|
||||
@@ -459,7 +369,7 @@ impl BackgroundTask {
|
||||
/// The basebackup is prepared in a temporary directory and then moved to the final
|
||||
/// location to make the operation atomic.
|
||||
async fn prepare_basebackup(
|
||||
&mut self,
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
req_lsn: Lsn,
|
||||
@@ -473,44 +383,30 @@ impl BackgroundTask {
|
||||
|
||||
let tti = TenantTimelineId::new(tenant_shard_id.tenant_id, timeline_id);
|
||||
|
||||
// TODO(diko): I don't think we will hit the limit,
|
||||
// but if we do, it makes sense to try to evict oldest entries. here
|
||||
if self.entry_count >= self.config.max_size_entries {
|
||||
tracing::info!(
|
||||
%tenant_shard_id,
|
||||
%timeline_id,
|
||||
%req_lsn,
|
||||
"Basebackup cache is full (max_size_entries), skipping basebackup",
|
||||
);
|
||||
self.prepare_skip_count.inc();
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if self.total_size_bytes >= self.config.max_total_size_bytes {
|
||||
tracing::info!(
|
||||
%tenant_shard_id,
|
||||
%timeline_id,
|
||||
%req_lsn,
|
||||
"Basebackup cache is full (max_total_size_bytes), skipping basebackup",
|
||||
);
|
||||
self.prepare_skip_count.inc();
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
{
|
||||
let entries = self.c.entries.lock().unwrap();
|
||||
if let Some(entry) = entries.get(&tti) {
|
||||
if entry.lsn >= req_lsn {
|
||||
let entries = self.entries.lock().unwrap();
|
||||
if let Some(&entry_lsn) = entries.get(&tti) {
|
||||
if entry_lsn >= req_lsn {
|
||||
tracing::info!(
|
||||
%timeline_id,
|
||||
%req_lsn,
|
||||
%entry.lsn,
|
||||
%entry_lsn,
|
||||
"Basebackup entry already exists for timeline with higher LSN, skipping basebackup",
|
||||
);
|
||||
self.prepare_skip_count.inc();
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
if entries.len() as i64 >= self.config.max_size_entries {
|
||||
tracing::info!(
|
||||
%timeline_id,
|
||||
%req_lsn,
|
||||
"Basebackup cache is full, skipping basebackup",
|
||||
);
|
||||
self.prepare_skip_count.inc();
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
let tenant = self
|
||||
@@ -546,21 +442,18 @@ impl BackgroundTask {
|
||||
.prepare_basebackup_tmp(&entry_tmp_path, &timeline, req_lsn)
|
||||
.await;
|
||||
|
||||
let entry = match res {
|
||||
Ok(entry) => entry,
|
||||
Err(err) => {
|
||||
tracing::info!("Failed to prepare basebackup tmp file: {:#}", err);
|
||||
// Try to clean up tmp file. If we fail, the background clean up task will take care of it.
|
||||
match tokio::fs::remove_file(&entry_tmp_path).await {
|
||||
Ok(_) => {}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
|
||||
Err(e) => {
|
||||
tracing::info!("Failed to remove basebackup tmp file: {:?}", e);
|
||||
}
|
||||
if let Err(err) = res {
|
||||
tracing::info!("Failed to prepare basebackup tmp file: {:#}", err);
|
||||
// Try to clean up tmp file. If we fail, the background clean up task will take care of it.
|
||||
match tokio::fs::remove_file(&entry_tmp_path).await {
|
||||
Ok(_) => {}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
|
||||
Err(e) => {
|
||||
tracing::info!("Failed to remove basebackup tmp file: {:?}", e);
|
||||
}
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
// Move the tmp file to the final location atomically.
|
||||
// The tmp file is fsynced, so it's guaranteed that we will not have a partial file
|
||||
@@ -568,13 +461,17 @@ impl BackgroundTask {
|
||||
// It's not necessary to fsync the inode after renaming, because the worst case is that
|
||||
// the rename operation will be rolled back on the disk failure, the entry will disappear
|
||||
// from the main directory, and the entry access will cause a cache miss.
|
||||
let entry_path = self
|
||||
.c
|
||||
.entry_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
|
||||
let entry_path = self.entry_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
|
||||
tokio::fs::rename(&entry_tmp_path, &entry_path).await?;
|
||||
|
||||
self.upsert_entry(tenant_shard_id.tenant_id, timeline_id, entry)
|
||||
.await;
|
||||
let mut entries = self.entries.lock().unwrap();
|
||||
if let Some(old_lsn) = entries.insert(tti, req_lsn) {
|
||||
// Remove the old entry if it exists.
|
||||
self.remove_entry_sender
|
||||
.send(self.entry_path(tenant_shard_id.tenant_id, timeline_id, old_lsn))
|
||||
.unwrap();
|
||||
}
|
||||
BASEBACKUP_CACHE_ENTRIES.set(entries.len() as i64);
|
||||
|
||||
self.prepare_ok_count.inc();
|
||||
Ok(())
|
||||
@@ -587,7 +484,7 @@ impl BackgroundTask {
|
||||
entry_tmp_path: &Utf8Path,
|
||||
timeline: &Arc<Timeline>,
|
||||
req_lsn: Lsn,
|
||||
) -> anyhow::Result<CacheEntry> {
|
||||
) -> anyhow::Result<()> {
|
||||
let ctx = RequestContext::new(TaskKind::BasebackupCache, DownloadBehavior::Download);
|
||||
let ctx = ctx.with_scope_timeline(timeline);
|
||||
|
||||
@@ -627,12 +524,6 @@ impl BackgroundTask {
|
||||
writer.flush().await?;
|
||||
writer.into_inner().sync_all().await?;
|
||||
|
||||
// TODO(diko): we can count it via Writer wrapper instead of a syscall.
|
||||
let size_bytes = tokio::fs::metadata(entry_tmp_path).await?.len();
|
||||
|
||||
Ok(CacheEntry {
|
||||
lsn: req_lsn,
|
||||
size_bytes,
|
||||
})
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -159,7 +159,14 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
|
||||
Ok(m) => {
|
||||
// Since we run one time at startup, be generous in our logging and
|
||||
// dump all metadata.
|
||||
tracing::info!("Loaded node metadata: {m}");
|
||||
tracing::info!(
|
||||
"Loaded node metadata: postgres {}:{}, http {}:{}, other fields: {:?}",
|
||||
m.postgres_host,
|
||||
m.postgres_port,
|
||||
m.http_host,
|
||||
m.http_port,
|
||||
m.other
|
||||
);
|
||||
|
||||
let az_id = {
|
||||
let az_id_from_metadata = m
|
||||
@@ -188,8 +195,6 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
|
||||
node_id: conf.id,
|
||||
listen_pg_addr: m.postgres_host,
|
||||
listen_pg_port: m.postgres_port,
|
||||
listen_grpc_addr: m.grpc_host,
|
||||
listen_grpc_port: m.grpc_port,
|
||||
listen_http_addr: m.http_host,
|
||||
listen_http_port: m.http_port,
|
||||
listen_https_port: m.https_port,
|
||||
|
||||
@@ -4428,16 +4428,18 @@ pub(crate) static BASEBACKUP_CACHE_PREPARE: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static BASEBACKUP_CACHE_ENTRIES: Lazy<UIntGauge> = Lazy::new(|| {
|
||||
register_uint_gauge!(
|
||||
pub(crate) static BASEBACKUP_CACHE_ENTRIES: Lazy<IntGauge> = Lazy::new(|| {
|
||||
register_int_gauge!(
|
||||
"pageserver_basebackup_cache_entries_total",
|
||||
"Number of entries in the basebackup cache"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static BASEBACKUP_CACHE_SIZE: Lazy<UIntGauge> = Lazy::new(|| {
|
||||
register_uint_gauge!(
|
||||
// FIXME: Support basebackup cache size metrics.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) static BASEBACKUP_CACHE_SIZE: Lazy<IntGauge> = Lazy::new(|| {
|
||||
register_int_gauge!(
|
||||
"pageserver_basebackup_cache_size_bytes",
|
||||
"Total size of all basebackup cache entries on disk in bytes"
|
||||
)
|
||||
|
||||
@@ -14,7 +14,7 @@ use std::{io, str};
|
||||
|
||||
use anyhow::{Context as _, anyhow, bail};
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use bytes::{Buf as _, BufMut as _, BytesMut};
|
||||
use bytes::{Buf, BytesMut};
|
||||
use futures::future::BoxFuture;
|
||||
use futures::{FutureExt, Stream};
|
||||
use itertools::Itertools;
|
||||
@@ -623,6 +623,60 @@ enum PageStreamError {
|
||||
BadRequest(Cow<'static, str>),
|
||||
}
|
||||
|
||||
impl PageStreamError {
|
||||
/// Converts a PageStreamError into a proto::GetPageResponse with the appropriate status
|
||||
/// code, or a gRPC status if it should terminate the stream (e.g. shutdown). This is a
|
||||
/// convenience method for use from a get_pages gRPC stream.
|
||||
#[allow(clippy::result_large_err)]
|
||||
fn into_get_page_response(
|
||||
self,
|
||||
request_id: page_api::RequestID,
|
||||
) -> Result<proto::GetPageResponse, tonic::Status> {
|
||||
use page_api::GetPageStatusCode;
|
||||
use tonic::Code;
|
||||
|
||||
// We dispatch to Into<tonic::Status> first, and then map it to a GetPageResponse.
|
||||
let status: tonic::Status = self.into();
|
||||
let status_code = match status.code() {
|
||||
// We shouldn't see an OK status here, because we're emitting an error.
|
||||
Code::Ok => {
|
||||
debug_assert_ne!(status.code(), Code::Ok);
|
||||
return Err(tonic::Status::internal(format!(
|
||||
"unexpected OK status: {status:?}",
|
||||
)));
|
||||
}
|
||||
|
||||
// These are per-request errors, returned as GetPageResponses.
|
||||
Code::AlreadyExists => GetPageStatusCode::InvalidRequest,
|
||||
Code::DataLoss => GetPageStatusCode::InternalError,
|
||||
Code::FailedPrecondition => GetPageStatusCode::InvalidRequest,
|
||||
Code::InvalidArgument => GetPageStatusCode::InvalidRequest,
|
||||
Code::Internal => GetPageStatusCode::InternalError,
|
||||
Code::NotFound => GetPageStatusCode::NotFound,
|
||||
Code::OutOfRange => GetPageStatusCode::InvalidRequest,
|
||||
Code::ResourceExhausted => GetPageStatusCode::SlowDown,
|
||||
|
||||
// These should terminate the stream.
|
||||
Code::Aborted => return Err(status),
|
||||
Code::Cancelled => return Err(status),
|
||||
Code::DeadlineExceeded => return Err(status),
|
||||
Code::PermissionDenied => return Err(status),
|
||||
Code::Unauthenticated => return Err(status),
|
||||
Code::Unavailable => return Err(status),
|
||||
Code::Unimplemented => return Err(status),
|
||||
Code::Unknown => return Err(status),
|
||||
};
|
||||
|
||||
Ok(page_api::GetPageResponse {
|
||||
request_id,
|
||||
status_code,
|
||||
reason: Some(status.message().to_string()),
|
||||
page_images: Vec::new(),
|
||||
}
|
||||
.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PageStreamError> for tonic::Status {
|
||||
fn from(err: PageStreamError) -> Self {
|
||||
use tonic::Code;
|
||||
@@ -3384,8 +3438,8 @@ impl GrpcPageServiceHandler {
|
||||
|
||||
/// Processes a GetPage batch request, via the GetPages bidirectional streaming RPC.
|
||||
///
|
||||
/// NB: errors returned from here are intercepted in get_pages(), and may be converted to a
|
||||
/// GetPageResponse with an appropriate status code to avoid terminating the stream.
|
||||
/// NB: errors will terminate the stream. Per-request errors should return a GetPageResponse
|
||||
/// with an appropriate status code instead.
|
||||
///
|
||||
/// TODO: get_vectored() currently enforces a batch limit of 32. Postgres will typically send
|
||||
/// batches up to effective_io_concurrency = 100. Either we have to accept large batches, or
|
||||
@@ -3402,7 +3456,7 @@ impl GrpcPageServiceHandler {
|
||||
let ctx = ctx.with_scope_page_service_pagestream(&timeline);
|
||||
|
||||
// Validate the request, decorate the span, and convert it to a Pagestream request.
|
||||
let req = page_api::GetPageRequest::try_from(req)?;
|
||||
let req: page_api::GetPageRequest = req.try_into()?;
|
||||
|
||||
span_record!(
|
||||
req_id = %req.request_id,
|
||||
@@ -3413,7 +3467,7 @@ impl GrpcPageServiceHandler {
|
||||
);
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn(); // hold guard
|
||||
let effective_lsn = PageServerHandler::effective_request_lsn(
|
||||
let effective_lsn = match PageServerHandler::effective_request_lsn(
|
||||
&timeline,
|
||||
timeline.get_last_record_lsn(),
|
||||
req.read_lsn.request_lsn,
|
||||
@@ -3421,7 +3475,10 @@ impl GrpcPageServiceHandler {
|
||||
.not_modified_since_lsn
|
||||
.unwrap_or(req.read_lsn.request_lsn),
|
||||
&latest_gc_cutoff_lsn,
|
||||
)?;
|
||||
) {
|
||||
Ok(lsn) => lsn,
|
||||
Err(err) => return err.into_get_page_response(req.request_id),
|
||||
};
|
||||
|
||||
let mut batch = SmallVec::with_capacity(req.block_numbers.len());
|
||||
for blkno in req.block_numbers {
|
||||
@@ -3478,7 +3535,7 @@ impl GrpcPageServiceHandler {
|
||||
"unexpected response: {resp:?}"
|
||||
)));
|
||||
}
|
||||
Err(err) => return Err(err.err.into()),
|
||||
Err(err) => return err.err.into_get_page_response(req.request_id),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -3544,44 +3601,42 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
let timeline = self.get_request_timeline(&req).await?;
|
||||
let ctx = self.ctx.with_scope_timeline(&timeline);
|
||||
|
||||
// Validate the request and decorate the span.
|
||||
// Validate the request, decorate the span, and wait for the LSN to arrive.
|
||||
//
|
||||
// TODO: this requires a read LSN, is that ok?
|
||||
Self::ensure_shard_zero(&timeline)?;
|
||||
if timeline.is_archived() == Some(true) {
|
||||
return Err(tonic::Status::failed_precondition("timeline is archived"));
|
||||
}
|
||||
let req: page_api::GetBaseBackupRequest = req.into_inner().into();
|
||||
let req: page_api::GetBaseBackupRequest = req.into_inner().try_into()?;
|
||||
|
||||
span_record!(lsn=?req.lsn);
|
||||
span_record!(lsn=%req.read_lsn);
|
||||
|
||||
// Wait for the LSN to arrive, if given.
|
||||
if let Some(lsn) = req.lsn {
|
||||
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
|
||||
timeline
|
||||
.wait_lsn(
|
||||
lsn,
|
||||
WaitLsnWaiter::PageService,
|
||||
WaitLsnTimeout::Default,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
timeline
|
||||
.check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
|
||||
.map_err(|err| {
|
||||
tonic::Status::invalid_argument(format!("invalid basebackup LSN: {err}"))
|
||||
})?;
|
||||
}
|
||||
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
|
||||
timeline
|
||||
.wait_lsn(
|
||||
req.read_lsn.request_lsn,
|
||||
WaitLsnWaiter::PageService,
|
||||
WaitLsnTimeout::Default,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
timeline
|
||||
.check_lsn_is_in_scope(req.read_lsn.request_lsn, &latest_gc_cutoff_lsn)
|
||||
.map_err(|err| {
|
||||
tonic::Status::invalid_argument(format!("invalid basebackup LSN: {err}"))
|
||||
})?;
|
||||
|
||||
// Spawn a task to run the basebackup.
|
||||
//
|
||||
// TODO: do we need to support full base backups, for debugging? This also requires passing
|
||||
// the prev_lsn parameter.
|
||||
// TODO: do we need to support full base backups, for debugging?
|
||||
let span = Span::current();
|
||||
let (mut simplex_read, mut simplex_write) = tokio::io::simplex(CHUNK_SIZE);
|
||||
let jh = tokio::spawn(async move {
|
||||
let result = basebackup::send_basebackup_tarball(
|
||||
&mut simplex_write,
|
||||
&timeline,
|
||||
req.lsn,
|
||||
Some(req.read_lsn.request_lsn),
|
||||
None,
|
||||
false,
|
||||
req.replica,
|
||||
@@ -3597,21 +3652,20 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
|
||||
// Emit chunks of size CHUNK_SIZE.
|
||||
let chunks = async_stream::try_stream! {
|
||||
let mut chunk = BytesMut::with_capacity(CHUNK_SIZE);
|
||||
loop {
|
||||
let mut chunk = BytesMut::with_capacity(CHUNK_SIZE).limit(CHUNK_SIZE);
|
||||
loop {
|
||||
let n = simplex_read.read_buf(&mut chunk).await.map_err(|err| {
|
||||
tonic::Status::internal(format!("failed to read basebackup chunk: {err}"))
|
||||
})?;
|
||||
if n == 0 {
|
||||
break; // full chunk or closed stream
|
||||
let n = simplex_read.read_buf(&mut chunk).await.map_err(|err| {
|
||||
tonic::Status::internal(format!("failed to read basebackup chunk: {err}"))
|
||||
})?;
|
||||
|
||||
// If we read 0 bytes, either the chunk is full or the stream is closed.
|
||||
if n == 0 {
|
||||
if chunk.is_empty() {
|
||||
break;
|
||||
}
|
||||
yield proto::GetBaseBackupResponseChunk::from(chunk.clone().freeze());
|
||||
chunk.clear();
|
||||
}
|
||||
let chunk = chunk.into_inner().freeze();
|
||||
if chunk.is_empty() {
|
||||
break;
|
||||
}
|
||||
yield proto::GetBaseBackupResponseChunk::from(chunk);
|
||||
}
|
||||
// Wait for the basebackup task to exit and check for errors.
|
||||
jh.await.map_err(|err| {
|
||||
@@ -3688,16 +3742,9 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
.await?
|
||||
.downgrade();
|
||||
while let Some(req) = reqs.message().await? {
|
||||
let req_id = req.request_id;
|
||||
let result = Self::get_page(&ctx, &timeline, req, io_concurrency.clone())
|
||||
yield Self::get_page(&ctx, &timeline, req, io_concurrency.clone())
|
||||
.instrument(span.clone()) // propagate request span
|
||||
.await;
|
||||
yield match result {
|
||||
Ok(resp) => resp,
|
||||
// Convert per-request errors to GetPageResponses as appropriate, or terminate
|
||||
// the stream with a tonic::Status.
|
||||
Err(err) => page_api::GetPageResponse::try_from_status(err, req_id)?.into(),
|
||||
}
|
||||
.await?
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -21,7 +21,7 @@ OBJS = \
|
||||
unstable_extensions.o \
|
||||
walproposer.o \
|
||||
walproposer_pg.o \
|
||||
neon_ddl_handler.o \
|
||||
control_plane_connector.o \
|
||||
walsender_hooks.o
|
||||
|
||||
PG_CPPFLAGS = -I$(libpq_srcdir)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* neon_ddl_handler.c
|
||||
* control_plane_connector.c
|
||||
* Captures updates to roles/databases using ProcessUtility_hook and
|
||||
* sends them to the control ProcessUtility_hook. The changes are sent
|
||||
* via HTTP to the URL specified by the GUC neon.console_url when the
|
||||
@@ -13,30 +13,18 @@
|
||||
* accumulate changes. On subtransaction commit, the top of the stack
|
||||
* is merged with the table below it.
|
||||
*
|
||||
* Support event triggers for neon_superuser
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* contrib/neon/neon_dll_handler.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include <curl/curl.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include "access/xact.h"
|
||||
#include "catalog/pg_authid.h"
|
||||
#include "catalog/pg_proc.h"
|
||||
#include "commands/defrem.h"
|
||||
#include "commands/event_trigger.h"
|
||||
#include "commands/user.h"
|
||||
#include "fmgr.h"
|
||||
#include "libpq/crypt.h"
|
||||
#include "miscadmin.h"
|
||||
#include "nodes/makefuncs.h"
|
||||
#include "parser/parse_func.h"
|
||||
#include "tcop/pquery.h"
|
||||
#include "tcop/utility.h"
|
||||
#include "utils/acl.h"
|
||||
@@ -44,16 +32,11 @@
|
||||
#include "utils/hsearch.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "utils/jsonb.h"
|
||||
#include <utils/lsyscache.h>
|
||||
#include <utils/syscache.h>
|
||||
|
||||
#include "neon_ddl_handler.h"
|
||||
#include "control_plane_connector.h"
|
||||
#include "neon_utils.h"
|
||||
|
||||
static ProcessUtility_hook_type PreviousProcessUtilityHook = NULL;
|
||||
static fmgr_hook_type next_fmgr_hook = NULL;
|
||||
static needs_fmgr_hook_type next_needs_fmgr_hook = NULL;
|
||||
static bool neon_event_triggers = true;
|
||||
|
||||
static const char *jwt_token = NULL;
|
||||
|
||||
@@ -790,7 +773,6 @@ HandleDropRole(DropRoleStmt *stmt)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
HandleRename(RenameStmt *stmt)
|
||||
{
|
||||
@@ -800,460 +782,6 @@ HandleRename(RenameStmt *stmt)
|
||||
return HandleRoleRename(stmt);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Support for Event Triggers.
|
||||
*
|
||||
* In vanilla only superuser can create Event Triggers.
|
||||
*
|
||||
* We allow it for neon_superuser by temporary switching to superuser. But as
|
||||
* far as event trigger can fire in superuser context we should protect
|
||||
* superuser from execution of arbitrary user's code.
|
||||
*
|
||||
* The idea was taken from Supabase PR series starting at
|
||||
* https://github.com/supabase/supautils/pull/98
|
||||
*/
|
||||
|
||||
static bool
|
||||
neon_needs_fmgr_hook(Oid functionId) {
|
||||
|
||||
return (next_needs_fmgr_hook && (*next_needs_fmgr_hook) (functionId))
|
||||
|| get_func_rettype(functionId) == EVENT_TRIGGEROID;
|
||||
}
|
||||
|
||||
static void
|
||||
LookupFuncOwnerSecDef(Oid functionId, Oid *funcOwner, bool *is_secdef)
|
||||
{
|
||||
Form_pg_proc procForm;
|
||||
HeapTuple proc_tup = SearchSysCache1(PROCOID, ObjectIdGetDatum(functionId));
|
||||
|
||||
if (!HeapTupleIsValid(proc_tup))
|
||||
ereport(ERROR,
|
||||
(errmsg("cache lookup failed for function %u", functionId)));
|
||||
|
||||
procForm = (Form_pg_proc) GETSTRUCT(proc_tup);
|
||||
|
||||
*funcOwner = procForm->proowner;
|
||||
*is_secdef = procForm->prosecdef;
|
||||
|
||||
ReleaseSysCache(proc_tup);
|
||||
}
|
||||
|
||||
|
||||
PG_FUNCTION_INFO_V1(noop);
|
||||
Datum noop(__attribute__ ((unused)) PG_FUNCTION_ARGS) { PG_RETURN_VOID();}
|
||||
|
||||
static void
|
||||
force_noop(FmgrInfo *finfo)
|
||||
{
|
||||
finfo->fn_addr = (PGFunction) noop;
|
||||
finfo->fn_oid = InvalidOid; /* not a known function OID anymore */
|
||||
finfo->fn_nargs = 0; /* no arguments for noop */
|
||||
finfo->fn_strict = false;
|
||||
finfo->fn_retset = false;
|
||||
finfo->fn_stats = 0; /* no stats collection */
|
||||
finfo->fn_extra = NULL; /* clear out old context data */
|
||||
finfo->fn_mcxt = CurrentMemoryContext;
|
||||
finfo->fn_expr = NULL; /* no parse tree */
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Skip executing Event Triggers execution for superusers, because Event
|
||||
* Triggers are SECURITY DEFINER and user provided code could then attempt
|
||||
* privilege escalation.
|
||||
*
|
||||
* Also skip executing Event Triggers when GUC neon.event_triggers has been
|
||||
* set to false. This might be necessary to be able to connect again after a
|
||||
* LOGIN Event Trigger has been installed that would prevent connections as
|
||||
* neon_superuser.
|
||||
*/
|
||||
static void
|
||||
neon_fmgr_hook(FmgrHookEventType event, FmgrInfo *flinfo, Datum *private)
|
||||
{
|
||||
/*
|
||||
* It can be other needs_fmgr_hook which cause our hook to be invoked for
|
||||
* non-trigger function, so recheck that is is trigger function.
|
||||
*/
|
||||
if (flinfo->fn_oid != InvalidOid &&
|
||||
get_func_rettype(flinfo->fn_oid) != EVENT_TRIGGEROID)
|
||||
{
|
||||
if (next_fmgr_hook)
|
||||
(*next_fmgr_hook) (event, flinfo, private);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* The neon_superuser role can use the GUC neon.event_triggers to disable
|
||||
* firing Event Trigger.
|
||||
*
|
||||
* SET neon.event_triggers TO false;
|
||||
*
|
||||
* This only applies to the neon_superuser role though, and only allows
|
||||
* skipping Event Triggers owned by neon_superuser, which we check by
|
||||
* proxy of the Event Trigger function being owned by neon_superuser.
|
||||
*
|
||||
* A role that is created in role neon_superuser should be allowed to also
|
||||
* benefit from the neon_event_triggers GUC, and will be considered the
|
||||
* same as the neon_superuser role.
|
||||
*/
|
||||
if (event == FHET_START
|
||||
&& !neon_event_triggers
|
||||
&& is_neon_superuser())
|
||||
{
|
||||
Oid neon_superuser_oid = get_role_oid("neon_superuser", false);
|
||||
|
||||
/* Find the Function Attributes (owner Oid, security definer) */
|
||||
const char *fun_owner_name = NULL;
|
||||
Oid fun_owner = InvalidOid;
|
||||
bool fun_is_secdef = false;
|
||||
|
||||
LookupFuncOwnerSecDef(flinfo->fn_oid, &fun_owner, &fun_is_secdef);
|
||||
fun_owner_name = GetUserNameFromId(fun_owner, false);
|
||||
|
||||
if (RoleIsNeonSuperuser(fun_owner_name)
|
||||
|| has_privs_of_role(fun_owner, neon_superuser_oid))
|
||||
{
|
||||
elog(WARNING,
|
||||
"Skipping Event Trigger: neon.event_triggers is false");
|
||||
|
||||
/*
|
||||
* we can't skip execution directly inside the fmgr_hook so instead we
|
||||
* change the event trigger function to a noop function.
|
||||
*/
|
||||
force_noop(flinfo);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Fire Event Trigger if both function owner and current user are
|
||||
* superuser, or none of them are.
|
||||
*/
|
||||
else if (event == FHET_START
|
||||
/* still enable it to pass pg_regress tests */
|
||||
&& !RegressTestMode)
|
||||
{
|
||||
/*
|
||||
* Get the current user oid as of before SECURITY DEFINER change of
|
||||
* CurrentUserId, and that would be SessionUserId.
|
||||
*/
|
||||
Oid current_role_oid = GetSessionUserId();
|
||||
bool role_is_super = superuser_arg(current_role_oid);
|
||||
|
||||
/* Find the Function Attributes (owner Oid, security definer) */
|
||||
Oid function_owner = InvalidOid;
|
||||
bool function_is_secdef = false;
|
||||
bool function_is_owned_by_super = false;
|
||||
|
||||
LookupFuncOwnerSecDef(flinfo->fn_oid, &function_owner, &function_is_secdef);
|
||||
|
||||
function_is_owned_by_super = superuser_arg(function_owner);
|
||||
|
||||
/*
|
||||
* 1. Refuse to run SECURITY DEFINER function that belongs to a
|
||||
* superuser when the current user is not a superuser itself.
|
||||
*/
|
||||
if (!role_is_super
|
||||
&& function_is_owned_by_super
|
||||
&& function_is_secdef)
|
||||
{
|
||||
char *func_name = get_func_name(flinfo->fn_oid);
|
||||
|
||||
ereport(WARNING,
|
||||
(errmsg("Skipping Event Trigger"),
|
||||
errdetail("Event Trigger function \"%s\" is owned by \"%s\" "
|
||||
"and is SECURITY DEFINER",
|
||||
func_name,
|
||||
GetUserNameFromId(function_owner, false))));
|
||||
|
||||
/*
|
||||
* we can't skip execution directly inside the fmgr_hook so
|
||||
* instead we change the event trigger function to a noop
|
||||
* function.
|
||||
*/
|
||||
force_noop(flinfo);
|
||||
}
|
||||
|
||||
/*
|
||||
* 2. Refuse to run functions that belongs to a non-superuser when the
|
||||
* current user is a superuser.
|
||||
*
|
||||
* We could run a SECURITY DEFINER user-function here and be safe with
|
||||
* privilege escalation risks, but superuser roles are only used for
|
||||
* infrastructure maintenance operations, where we prefer to skip
|
||||
* running user-defined code.
|
||||
*/
|
||||
else if (role_is_super && !function_is_owned_by_super)
|
||||
{
|
||||
char *func_name = get_func_name(flinfo->fn_oid);
|
||||
|
||||
ereport(WARNING,
|
||||
(errmsg("Skipping Event Trigger"),
|
||||
errdetail("Event Trigger function \"%s\" "
|
||||
"is owned by non-superuser role \"%s\", "
|
||||
"and current_user \"%s\" is superuser",
|
||||
func_name,
|
||||
GetUserNameFromId(function_owner, false),
|
||||
GetUserNameFromId(current_role_oid, false))));
|
||||
|
||||
/*
|
||||
* we can't skip execution directly inside the fmgr_hook so
|
||||
* instead we change the event trigger function to a noop
|
||||
* function.
|
||||
*/
|
||||
force_noop(flinfo);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if (next_fmgr_hook)
|
||||
(*next_fmgr_hook) (event, flinfo, private);
|
||||
}
|
||||
|
||||
static Oid prev_role_oid = 0;
|
||||
static int prev_role_sec_context = 0;
|
||||
static bool switched_to_superuser = false;
|
||||
|
||||
/*
|
||||
* Switch tp superuser if not yet superuser.
|
||||
* Returns false if already switched to superuser.
|
||||
*/
|
||||
static bool
|
||||
switch_to_superuser(void)
|
||||
{
|
||||
Oid superuser_oid;
|
||||
|
||||
if (switched_to_superuser)
|
||||
return false;
|
||||
switched_to_superuser = true;
|
||||
|
||||
superuser_oid = get_role_oid("cloud_admin", true /*missing_ok*/);
|
||||
if (superuser_oid == InvalidOid)
|
||||
superuser_oid = BOOTSTRAP_SUPERUSERID;
|
||||
|
||||
GetUserIdAndSecContext(&prev_role_oid, &prev_role_sec_context);
|
||||
SetUserIdAndSecContext(superuser_oid, prev_role_sec_context |
|
||||
SECURITY_LOCAL_USERID_CHANGE |
|
||||
SECURITY_RESTRICTED_OPERATION);
|
||||
return true;
|
||||
}
|
||||
|
||||
static void
|
||||
switch_to_original_role(void)
|
||||
{
|
||||
SetUserIdAndSecContext(prev_role_oid, prev_role_sec_context);
|
||||
switched_to_superuser = false;
|
||||
}
|
||||
|
||||
/*
|
||||
* ALTER ROLE ... SUPERUSER;
|
||||
*
|
||||
* Used internally to give superuser to a non-privileged role to allow
|
||||
* ownership of superuser-only objects such as Event Trigger.
|
||||
*
|
||||
* ALTER ROLE foo SUPERUSER;
|
||||
* ALTER EVENT TRIGGER ... OWNED BY foo;
|
||||
* ALTER ROLE foo NOSUPERUSER;
|
||||
*
|
||||
* Now the EVENT TRIGGER is owned by foo, who can DROP it without having to be
|
||||
* superuser again.
|
||||
*/
|
||||
static void
|
||||
alter_role_super(const char* rolename, bool make_super)
|
||||
{
|
||||
AlterRoleStmt *alter_stmt = makeNode(AlterRoleStmt);
|
||||
|
||||
DefElem *defel_superuser =
|
||||
#if PG_MAJORVERSION_NUM <= 14
|
||||
makeDefElem("superuser", (Node *) makeInteger(make_super), -1);
|
||||
#else
|
||||
makeDefElem("superuser", (Node *) makeBoolean(make_super), -1);
|
||||
#endif
|
||||
|
||||
RoleSpec *rolespec = makeNode(RoleSpec);
|
||||
rolespec->roletype = ROLESPEC_CSTRING;
|
||||
rolespec->rolename = pstrdup(rolename);
|
||||
rolespec->location = -1;
|
||||
|
||||
alter_stmt->role = rolespec;
|
||||
alter_stmt->options = list_make1(defel_superuser);
|
||||
|
||||
#if PG_MAJORVERSION_NUM < 15
|
||||
AlterRole(alter_stmt);
|
||||
#else
|
||||
/* ParseState *pstate, AlterRoleStmt *stmt */
|
||||
AlterRole(NULL, alter_stmt);
|
||||
#endif
|
||||
|
||||
CommandCounterIncrement();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Changes the OWNER of an Event Trigger.
|
||||
*
|
||||
* Event Triggers can only be owned by superusers, so this ALTER ROLE with
|
||||
* SUPERUSER and then removes the property.
|
||||
*/
|
||||
static void
|
||||
alter_event_trigger_owner(const char *obj_name, Oid role_oid)
|
||||
{
|
||||
char* role_name = GetUserNameFromId(role_oid, false);
|
||||
|
||||
alter_role_super(role_name, true);
|
||||
|
||||
AlterEventTriggerOwner(obj_name, role_oid);
|
||||
CommandCounterIncrement();
|
||||
|
||||
alter_role_super(role_name, false);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Neon processing of the CREATE EVENT TRIGGER requires special attention and
|
||||
* is worth having its own ProcessUtility_hook for that.
|
||||
*/
|
||||
static void
|
||||
ProcessCreateEventTrigger(
|
||||
PlannedStmt *pstmt,
|
||||
const char *queryString,
|
||||
bool readOnlyTree,
|
||||
ProcessUtilityContext context,
|
||||
ParamListInfo params,
|
||||
QueryEnvironment *queryEnv,
|
||||
DestReceiver *dest,
|
||||
QueryCompletion *qc)
|
||||
{
|
||||
Node *parseTree = pstmt->utilityStmt;
|
||||
bool sudo = false;
|
||||
|
||||
/* We double-check that after local variable declaration block */
|
||||
CreateEventTrigStmt *stmt = (CreateEventTrigStmt *) parseTree;
|
||||
|
||||
/*
|
||||
* We are going to change the current user privileges (sudo) and might
|
||||
* need after execution cleanup. For that we want to capture the UserId
|
||||
* before changing it for our sudo implementation.
|
||||
*/
|
||||
const Oid current_user_id = GetUserId();
|
||||
bool current_user_is_super = superuser_arg(current_user_id);
|
||||
|
||||
if (nodeTag(parseTree) != T_CreateEventTrigStmt)
|
||||
{
|
||||
ereport(ERROR,
|
||||
errcode(ERRCODE_INTERNAL_ERROR),
|
||||
errmsg("ProcessCreateEventTrigger called for the wrong command"));
|
||||
}
|
||||
|
||||
/*
|
||||
* Allow neon_superuser to create Event Trigger, while keeping the
|
||||
* ownership of the object.
|
||||
*
|
||||
* For that we give superuser membership to the role for the execution of
|
||||
* the command.
|
||||
*/
|
||||
if (IsTransactionState() && is_neon_superuser())
|
||||
{
|
||||
/* Find the Event Trigger function Oid */
|
||||
Oid func_oid = LookupFuncName(stmt->funcname, 0, NULL, false);
|
||||
|
||||
/* Find the Function Owner Oid */
|
||||
Oid func_owner = InvalidOid;
|
||||
bool is_secdef = false;
|
||||
bool function_is_owned_by_super = false;
|
||||
|
||||
LookupFuncOwnerSecDef(func_oid, &func_owner, &is_secdef);
|
||||
|
||||
function_is_owned_by_super = superuser_arg(func_owner);
|
||||
|
||||
if(!current_user_is_super && function_is_owned_by_super)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
||||
errmsg("Permission denied to execute "
|
||||
"a function owned by a superuser role"),
|
||||
errdetail("current user \"%s\" is not a superuser "
|
||||
"and Event Trigger function \"%s\" "
|
||||
"is owned by a superuser",
|
||||
GetUserNameFromId(current_user_id, false),
|
||||
NameListToString(stmt->funcname))));
|
||||
}
|
||||
|
||||
if(current_user_is_super && !function_is_owned_by_super)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
||||
errmsg("Permission denied to execute "
|
||||
"a function owned by a non-superuser role"),
|
||||
errdetail("current user \"%s\" is a superuser "
|
||||
"and function \"%s\" is "
|
||||
"owned by a non-superuser",
|
||||
GetUserNameFromId(current_user_id, false),
|
||||
NameListToString(stmt->funcname))));
|
||||
}
|
||||
|
||||
sudo = switch_to_superuser();
|
||||
}
|
||||
|
||||
PG_TRY();
|
||||
{
|
||||
if (PreviousProcessUtilityHook)
|
||||
{
|
||||
PreviousProcessUtilityHook(
|
||||
pstmt,
|
||||
queryString,
|
||||
readOnlyTree,
|
||||
context,
|
||||
params,
|
||||
queryEnv,
|
||||
dest,
|
||||
qc);
|
||||
}
|
||||
else
|
||||
{
|
||||
standard_ProcessUtility(
|
||||
pstmt,
|
||||
queryString,
|
||||
readOnlyTree,
|
||||
context,
|
||||
params,
|
||||
queryEnv,
|
||||
dest,
|
||||
qc);
|
||||
}
|
||||
|
||||
/*
|
||||
* Now that the Event Trigger has been installed via our sudo
|
||||
* mechanism, if the original role was not a superuser then change
|
||||
* the event trigger ownership back to the original role.
|
||||
*
|
||||
* That way [ ALTER | DROP ] EVENT TRIGGER commands just work.
|
||||
*/
|
||||
if (IsTransactionState() && is_neon_superuser())
|
||||
{
|
||||
if (!current_user_is_super)
|
||||
{
|
||||
/*
|
||||
* Change event trigger owner to the current role (making
|
||||
* it a privileged role during the ALTER OWNER command).
|
||||
*/
|
||||
alter_event_trigger_owner(stmt->trigname, current_user_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
PG_FINALLY();
|
||||
{
|
||||
if (sudo)
|
||||
switch_to_original_role();
|
||||
}
|
||||
PG_END_TRY();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Neon hooks for DDLs (handling privileges, limiting features, etc).
|
||||
*/
|
||||
static void
|
||||
NeonProcessUtility(
|
||||
PlannedStmt *pstmt,
|
||||
@@ -1267,27 +795,6 @@ NeonProcessUtility(
|
||||
{
|
||||
Node *parseTree = pstmt->utilityStmt;
|
||||
|
||||
/*
|
||||
* The process utility hook for CREATE EVENT TRIGGER is its own
|
||||
* implementation and warrant being addressed separately from here.
|
||||
*/
|
||||
if (nodeTag(parseTree) == T_CreateEventTrigStmt)
|
||||
{
|
||||
ProcessCreateEventTrigger(
|
||||
pstmt,
|
||||
queryString,
|
||||
readOnlyTree,
|
||||
context,
|
||||
params,
|
||||
queryEnv,
|
||||
dest,
|
||||
qc);
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Other commands that need Neon specific implementations are handled here:
|
||||
*/
|
||||
switch (nodeTag(parseTree))
|
||||
{
|
||||
case T_CreatedbStmt:
|
||||
@@ -1326,82 +833,37 @@ NeonProcessUtility(
|
||||
if (PreviousProcessUtilityHook)
|
||||
{
|
||||
PreviousProcessUtilityHook(
|
||||
pstmt,
|
||||
queryString,
|
||||
readOnlyTree,
|
||||
context,
|
||||
params,
|
||||
queryEnv,
|
||||
dest,
|
||||
qc);
|
||||
pstmt,
|
||||
queryString,
|
||||
readOnlyTree,
|
||||
context,
|
||||
params,
|
||||
queryEnv,
|
||||
dest,
|
||||
qc);
|
||||
}
|
||||
else
|
||||
{
|
||||
standard_ProcessUtility(
|
||||
pstmt,
|
||||
queryString,
|
||||
readOnlyTree,
|
||||
context,
|
||||
params,
|
||||
queryEnv,
|
||||
dest,
|
||||
qc);
|
||||
pstmt,
|
||||
queryString,
|
||||
readOnlyTree,
|
||||
context,
|
||||
params,
|
||||
queryEnv,
|
||||
dest,
|
||||
qc);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Only neon_superuser is granted privilege to edit neon.event_triggers GUC.
|
||||
*/
|
||||
static void
|
||||
neon_event_triggers_assign_hook(bool newval, void *extra)
|
||||
{
|
||||
/* MyDatabaseId == InvalidOid || !OidIsValid(GetUserId()) */
|
||||
|
||||
if (IsTransactionState() && !is_neon_superuser())
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
||||
errmsg("permission denied to set neon.event_triggers"),
|
||||
errdetail("Only \"neon_superuser\" is allowed to set the GUC")));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
InitDDLHandler()
|
||||
InitControlPlaneConnector()
|
||||
{
|
||||
PreviousProcessUtilityHook = ProcessUtility_hook;
|
||||
ProcessUtility_hook = NeonProcessUtility;
|
||||
|
||||
next_needs_fmgr_hook = needs_fmgr_hook;
|
||||
needs_fmgr_hook = neon_needs_fmgr_hook;
|
||||
|
||||
next_fmgr_hook = fmgr_hook;
|
||||
fmgr_hook = neon_fmgr_hook;
|
||||
|
||||
RegisterXactCallback(NeonXactCallback, NULL);
|
||||
RegisterSubXactCallback(NeonSubXactCallback, NULL);
|
||||
|
||||
/*
|
||||
* The GUC neon.event_triggers should provide the same effect as the
|
||||
* Postgres GUC event_triggers, but the neon one is PGC_USERSET.
|
||||
*
|
||||
* This allows using the GUC in the connection string and work out of a
|
||||
* LOGIN Event Trigger that would break database access, all without
|
||||
* having to edit and reload the Postgres configuration file.
|
||||
*/
|
||||
DefineCustomBoolVariable(
|
||||
"neon.event_triggers",
|
||||
"Enable firing of event triggers",
|
||||
NULL,
|
||||
&neon_event_triggers,
|
||||
true,
|
||||
PGC_USERSET,
|
||||
0,
|
||||
NULL,
|
||||
neon_event_triggers_assign_hook,
|
||||
NULL);
|
||||
|
||||
DefineCustomStringVariable(
|
||||
"neon.console_url",
|
||||
"URL of the Neon Console, which will be forwarded changes to dbs and roles",
|
||||
6
pgxn/neon/control_plane_connector.h
Normal file
6
pgxn/neon/control_plane_connector.h
Normal file
@@ -0,0 +1,6 @@
|
||||
#ifndef CONTROL_PLANE_CONNECTOR_H
|
||||
#define CONTROL_PLANE_CONNECTOR_H
|
||||
|
||||
void InitControlPlaneConnector(void);
|
||||
|
||||
#endif
|
||||
@@ -33,9 +33,9 @@
|
||||
#include "extension_server.h"
|
||||
#include "file_cache.h"
|
||||
#include "neon.h"
|
||||
#include "neon_ddl_handler.h"
|
||||
#include "neon_lwlsncache.h"
|
||||
#include "neon_perf_counters.h"
|
||||
#include "control_plane_connector.h"
|
||||
#include "logical_replication_monitor.h"
|
||||
#include "unstable_extensions.h"
|
||||
#include "walsender_hooks.h"
|
||||
@@ -454,7 +454,7 @@ _PG_init(void)
|
||||
|
||||
InitUnstableExtensionsSupport();
|
||||
InitLogicalReplicationMonitor();
|
||||
InitDDLHandler();
|
||||
InitControlPlaneConnector();
|
||||
|
||||
pg_init_extension_server();
|
||||
|
||||
|
||||
@@ -1,6 +0,0 @@
|
||||
#ifndef CONTROL_DDL_HANDLER_H
|
||||
#define CONTROL_DDL_HANDLER_H
|
||||
|
||||
void InitDDLHandler(void);
|
||||
|
||||
#endif
|
||||
@@ -1 +0,0 @@
|
||||
ALTER TABLE nodes DROP listen_grpc_addr, listen_grpc_port;
|
||||
@@ -1 +0,0 @@
|
||||
ALTER TABLE nodes ADD listen_grpc_addr VARCHAR NULL, ADD listen_grpc_port INTEGER NULL;
|
||||
@@ -5,11 +5,10 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus, PageserverProtocol};
|
||||
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
|
||||
use control_plane::local_env::LocalEnv;
|
||||
use futures::StreamExt;
|
||||
use hyper::StatusCode;
|
||||
use pageserver_api::config::DEFAULT_GRPC_LISTEN_PORT;
|
||||
use pageserver_api::controller_api::AvailabilityZone;
|
||||
use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
|
||||
use postgres_connection::parse_host_port;
|
||||
@@ -421,31 +420,23 @@ impl ComputeHook {
|
||||
preferred_az: _preferred_az,
|
||||
} = reconfigure_request;
|
||||
|
||||
let compute_pageservers = shards
|
||||
.iter()
|
||||
.map(|shard| {
|
||||
let ps_conf = env
|
||||
.get_pageserver_conf(shard.node_id)
|
||||
.expect("Unknown pageserver");
|
||||
let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr)
|
||||
.expect("Unable to parse listen_pg_addr");
|
||||
(pg_host, pg_port.unwrap_or(5432))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for (endpoint_name, endpoint) in &cplane.endpoints {
|
||||
if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
|
||||
tracing::info!("Reconfiguring endpoint {endpoint_name}");
|
||||
|
||||
let pageservers = shards
|
||||
.iter()
|
||||
.map(|shard| {
|
||||
let ps_conf = env
|
||||
.get_pageserver_conf(shard.node_id)
|
||||
.expect("Unknown pageserver");
|
||||
if endpoint.grpc {
|
||||
let addr = ps_conf.listen_grpc_addr.as_ref().expect("no gRPC address");
|
||||
let (host, port) = parse_host_port(addr).expect("invalid gRPC address");
|
||||
let port = port.unwrap_or(DEFAULT_GRPC_LISTEN_PORT);
|
||||
(PageserverProtocol::Grpc, host, port)
|
||||
} else {
|
||||
let (host, port) = parse_host_port(&ps_conf.listen_pg_addr)
|
||||
.expect("Unable to parse listen_pg_addr");
|
||||
(PageserverProtocol::Libpq, host, port.unwrap_or(5432))
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
tracing::info!("Reconfiguring endpoint {}", endpoint_name,);
|
||||
endpoint
|
||||
.reconfigure(pageservers, *stripe_size, None)
|
||||
.reconfigure(compute_pageservers.clone(), *stripe_size, None)
|
||||
.await
|
||||
.map_err(NotifyError::NeonLocal)?;
|
||||
}
|
||||
|
||||
@@ -97,7 +97,7 @@ pub(crate) struct StorageControllerMetricGroup {
|
||||
/// Count of HTTP requests to the safekeeper that resulted in an error,
|
||||
/// broken down by the safekeeper node id, request name and method
|
||||
pub(crate) storage_controller_safekeeper_request_error:
|
||||
measured::CounterVec<SafekeeperRequestLabelGroupSet>,
|
||||
measured::CounterVec<PageserverRequestLabelGroupSet>,
|
||||
|
||||
/// Latency of HTTP requests to the pageserver, broken down by pageserver
|
||||
/// node id, request name and method. This include both successful and unsuccessful
|
||||
@@ -111,7 +111,7 @@ pub(crate) struct StorageControllerMetricGroup {
|
||||
/// requests.
|
||||
#[metric(metadata = histogram::Thresholds::exponential_buckets(0.1, 2.0))]
|
||||
pub(crate) storage_controller_safekeeper_request_latency:
|
||||
measured::HistogramVec<SafekeeperRequestLabelGroupSet, 5>,
|
||||
measured::HistogramVec<PageserverRequestLabelGroupSet, 5>,
|
||||
|
||||
/// Count of pass-through HTTP requests to the pageserver that resulted in an error,
|
||||
/// broken down by the pageserver node id, request name and method
|
||||
@@ -136,8 +136,7 @@ pub(crate) struct StorageControllerMetricGroup {
|
||||
|
||||
pub(crate) storage_controller_leadership_status: measured::GaugeVec<LeadershipStatusGroupSet>,
|
||||
|
||||
/// Indicator of stucked (long-running) reconciles, broken down by tenant, shard and sequence.
|
||||
/// The metric is automatically removed once the reconciliation completes.
|
||||
/// HTTP request status counters for handled requests
|
||||
pub(crate) storage_controller_reconcile_long_running:
|
||||
measured::CounterVec<ReconcileLongRunningLabelGroupSet>,
|
||||
|
||||
@@ -219,16 +218,6 @@ pub(crate) struct PageserverRequestLabelGroup<'a> {
|
||||
pub(crate) method: Method,
|
||||
}
|
||||
|
||||
#[derive(measured::LabelGroup, Clone)]
|
||||
#[label(set = SafekeeperRequestLabelGroupSet)]
|
||||
pub(crate) struct SafekeeperRequestLabelGroup<'a> {
|
||||
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
|
||||
pub(crate) safekeeper_id: &'a str,
|
||||
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
|
||||
pub(crate) path: &'a str,
|
||||
pub(crate) method: Method,
|
||||
}
|
||||
|
||||
#[derive(measured::LabelGroup)]
|
||||
#[label(set = DatabaseQueryErrorLabelGroupSet)]
|
||||
pub(crate) struct DatabaseQueryErrorLabelGroup {
|
||||
|
||||
@@ -37,8 +37,6 @@ pub(crate) struct Node {
|
||||
|
||||
listen_pg_addr: String,
|
||||
listen_pg_port: u16,
|
||||
listen_grpc_addr: Option<String>,
|
||||
listen_grpc_port: Option<u16>,
|
||||
|
||||
availability_zone_id: AvailabilityZone,
|
||||
|
||||
@@ -102,8 +100,8 @@ impl Node {
|
||||
self.id == register_req.node_id
|
||||
&& self.listen_http_addr == register_req.listen_http_addr
|
||||
&& self.listen_http_port == register_req.listen_http_port
|
||||
// Note: HTTPS and gRPC addresses may change, to allow for migrations. See
|
||||
// [`Self::need_update`] for more details.
|
||||
// Note: listen_https_port may change. See [`Self::need_update`] for mode details.
|
||||
// && self.listen_https_port == register_req.listen_https_port
|
||||
&& self.listen_pg_addr == register_req.listen_pg_addr
|
||||
&& self.listen_pg_port == register_req.listen_pg_port
|
||||
&& self.availability_zone_id == register_req.availability_zone_id
|
||||
@@ -111,10 +109,9 @@ impl Node {
|
||||
|
||||
// Do we need to update an existing record in DB on this registration request?
|
||||
pub(crate) fn need_update(&self, register_req: &NodeRegisterRequest) -> bool {
|
||||
// These are checked here, since they may change before we're fully migrated.
|
||||
// listen_https_port is checked here because it may change during migration to https.
|
||||
// After migration, this check may be moved to registration_match.
|
||||
self.listen_https_port != register_req.listen_https_port
|
||||
|| self.listen_grpc_addr != register_req.listen_grpc_addr
|
||||
|| self.listen_grpc_port != register_req.listen_grpc_port
|
||||
}
|
||||
|
||||
/// For a shard located on this node, populate a response object
|
||||
@@ -128,8 +125,6 @@ impl Node {
|
||||
listen_https_port: self.listen_https_port,
|
||||
listen_pg_addr: self.listen_pg_addr.clone(),
|
||||
listen_pg_port: self.listen_pg_port,
|
||||
listen_grpc_addr: self.listen_grpc_addr.clone(),
|
||||
listen_grpc_port: self.listen_grpc_port,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -216,8 +211,6 @@ impl Node {
|
||||
listen_https_port: Option<u16>,
|
||||
listen_pg_addr: String,
|
||||
listen_pg_port: u16,
|
||||
listen_grpc_addr: Option<String>,
|
||||
listen_grpc_port: Option<u16>,
|
||||
availability_zone_id: AvailabilityZone,
|
||||
use_https: bool,
|
||||
) -> anyhow::Result<Self> {
|
||||
@@ -228,10 +221,6 @@ impl Node {
|
||||
);
|
||||
}
|
||||
|
||||
if listen_grpc_addr.is_some() != listen_grpc_port.is_some() {
|
||||
anyhow::bail!("cannot create node {id}: must specify both gRPC address and port");
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
id,
|
||||
listen_http_addr,
|
||||
@@ -239,8 +228,6 @@ impl Node {
|
||||
listen_https_port,
|
||||
listen_pg_addr,
|
||||
listen_pg_port,
|
||||
listen_grpc_addr,
|
||||
listen_grpc_port,
|
||||
scheduling: NodeSchedulingPolicy::Active,
|
||||
lifecycle: NodeLifecycle::Active,
|
||||
availability: NodeAvailability::Offline,
|
||||
@@ -260,8 +247,6 @@ impl Node {
|
||||
listen_https_port: self.listen_https_port.map(|x| x as i32),
|
||||
listen_pg_addr: self.listen_pg_addr.clone(),
|
||||
listen_pg_port: self.listen_pg_port as i32,
|
||||
listen_grpc_addr: self.listen_grpc_addr.clone(),
|
||||
listen_grpc_port: self.listen_grpc_port.map(|port| port as i32),
|
||||
availability_zone_id: self.availability_zone_id.0.clone(),
|
||||
}
|
||||
}
|
||||
@@ -275,13 +260,6 @@ impl Node {
|
||||
);
|
||||
}
|
||||
|
||||
if np.listen_grpc_addr.is_some() != np.listen_grpc_port.is_some() {
|
||||
anyhow::bail!(
|
||||
"can't load node {}: must specify both gRPC address and port",
|
||||
np.node_id
|
||||
);
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
id: NodeId(np.node_id as u64),
|
||||
// At startup we consider a node offline until proven otherwise.
|
||||
@@ -294,8 +272,6 @@ impl Node {
|
||||
listen_https_port: np.listen_https_port.map(|x| x as u16),
|
||||
listen_pg_addr: np.listen_pg_addr,
|
||||
listen_pg_port: np.listen_pg_port as u16,
|
||||
listen_grpc_addr: np.listen_grpc_addr,
|
||||
listen_grpc_port: np.listen_grpc_port.map(|port| port as u16),
|
||||
availability_zone_id: AvailabilityZone(np.availability_zone_id),
|
||||
use_https,
|
||||
cancel: CancellationToken::new(),
|
||||
@@ -385,8 +361,6 @@ impl Node {
|
||||
listen_https_port: self.listen_https_port,
|
||||
listen_pg_addr: self.listen_pg_addr.clone(),
|
||||
listen_pg_port: self.listen_pg_port,
|
||||
listen_grpc_addr: self.listen_grpc_addr.clone(),
|
||||
listen_grpc_port: self.listen_grpc_port,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2125,8 +2125,6 @@ pub(crate) struct NodePersistence {
|
||||
pub(crate) availability_zone_id: String,
|
||||
pub(crate) listen_https_port: Option<i32>,
|
||||
pub(crate) lifecycle: String,
|
||||
pub(crate) listen_grpc_addr: Option<String>,
|
||||
pub(crate) listen_grpc_port: Option<i32>,
|
||||
}
|
||||
|
||||
/// Tenant metadata health status that are stored durably.
|
||||
|
||||
@@ -5,7 +5,7 @@ use safekeeper_client::mgmt_api::{Client, Result};
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
use utils::logging::SecretString;
|
||||
|
||||
use crate::metrics::SafekeeperRequestLabelGroup;
|
||||
use crate::metrics::PageserverRequestLabelGroup;
|
||||
|
||||
/// Thin wrapper around [`safekeeper_client::mgmt_api::Client`]. It allows the storage
|
||||
/// controller to collect metrics in a non-intrusive manner.
|
||||
@@ -19,8 +19,8 @@ pub(crate) struct SafekeeperClient {
|
||||
|
||||
macro_rules! measured_request {
|
||||
($name:literal, $method:expr, $node_id: expr, $invoke:expr) => {{
|
||||
let labels = SafekeeperRequestLabelGroup {
|
||||
safekeeper_id: $node_id,
|
||||
let labels = PageserverRequestLabelGroup {
|
||||
pageserver_id: $node_id,
|
||||
path: $name,
|
||||
method: $method,
|
||||
};
|
||||
@@ -35,7 +35,7 @@ macro_rules! measured_request {
|
||||
if res.is_err() {
|
||||
let error_counters = &crate::metrics::METRICS_REGISTRY
|
||||
.metrics_group
|
||||
.storage_controller_safekeeper_request_error;
|
||||
.storage_controller_pageserver_request_error;
|
||||
error_counters.inc(labels)
|
||||
}
|
||||
|
||||
|
||||
@@ -945,8 +945,6 @@ pub(crate) mod test_utils {
|
||||
None,
|
||||
format!("pghost-{i}"),
|
||||
5432 + i as u16,
|
||||
Some(format!("grpchost-{i}")),
|
||||
Some(51051 + i as u16),
|
||||
az_iter
|
||||
.next()
|
||||
.cloned()
|
||||
|
||||
@@ -34,8 +34,6 @@ diesel::table! {
|
||||
availability_zone_id -> Varchar,
|
||||
listen_https_port -> Nullable<Int4>,
|
||||
lifecycle -> Varchar,
|
||||
listen_grpc_addr -> Nullable<Varchar>,
|
||||
listen_grpc_port -> Nullable<Int4>,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1683,8 +1683,6 @@ impl Service {
|
||||
None,
|
||||
"".to_string(),
|
||||
123,
|
||||
None,
|
||||
None,
|
||||
AvailabilityZone("test_az".to_string()),
|
||||
false,
|
||||
)
|
||||
@@ -2227,8 +2225,19 @@ impl Service {
|
||||
&self,
|
||||
reattach_req: ReAttachRequest,
|
||||
) -> Result<ReAttachResponse, ApiError> {
|
||||
let mut _node_lock: Option<TracingExclusiveGuard<NodeOperations>> = None;
|
||||
|
||||
if let Some(register_req) = reattach_req.register {
|
||||
self.node_register(register_req).await?;
|
||||
_node_lock = Some(
|
||||
trace_exclusive_lock(
|
||||
&self.node_op_locks,
|
||||
register_req.node_id,
|
||||
NodeOperations::Register,
|
||||
)
|
||||
.await,
|
||||
);
|
||||
self.node_register_with_lock(register_req, _node_lock.as_ref().unwrap())
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Ordering: we must persist generation number updates before making them visible in the in-memory state
|
||||
@@ -7164,13 +7173,21 @@ impl Service {
|
||||
&self,
|
||||
register_req: NodeRegisterRequest,
|
||||
) -> Result<(), ApiError> {
|
||||
let _node_lock = trace_exclusive_lock(
|
||||
let node_lock = trace_exclusive_lock(
|
||||
&self.node_op_locks,
|
||||
register_req.node_id,
|
||||
NodeOperations::Register,
|
||||
)
|
||||
.await;
|
||||
|
||||
self.node_register_with_lock(register_req, &node_lock).await
|
||||
}
|
||||
|
||||
async fn node_register_with_lock(
|
||||
&self,
|
||||
register_req: NodeRegisterRequest,
|
||||
_node_lock: &TracingExclusiveGuard<NodeOperations>,
|
||||
) -> Result<(), ApiError> {
|
||||
#[derive(PartialEq)]
|
||||
enum RegistrationStatus {
|
||||
UpToDate,
|
||||
@@ -7256,12 +7273,6 @@ impl Service {
|
||||
));
|
||||
}
|
||||
|
||||
if register_req.listen_grpc_addr.is_some() != register_req.listen_grpc_port.is_some() {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"must specify both gRPC address and port"
|
||||
)));
|
||||
}
|
||||
|
||||
// Ordering: we must persist the new node _before_ adding it to in-memory state.
|
||||
// This ensures that before we use it for anything or expose it via any external
|
||||
// API, it is guaranteed to be available after a restart.
|
||||
@@ -7272,8 +7283,6 @@ impl Service {
|
||||
register_req.listen_https_port,
|
||||
register_req.listen_pg_addr,
|
||||
register_req.listen_pg_port,
|
||||
register_req.listen_grpc_addr,
|
||||
register_req.listen_grpc_port,
|
||||
register_req.availability_zone_id.clone(),
|
||||
self.config.use_https_pageserver_api,
|
||||
);
|
||||
|
||||
@@ -1184,19 +1184,11 @@ impl TenantShard {
|
||||
for secondary in self.intent.get_secondary() {
|
||||
// Make sure we don't try to migrate a secondary to our attached location: this case happens
|
||||
// easily in environments without multiple AZs.
|
||||
let mut exclude = match self.intent.attached {
|
||||
let exclude = match self.intent.attached {
|
||||
Some(attached) => vec![attached],
|
||||
None => vec![],
|
||||
};
|
||||
|
||||
// Exclude all other secondaries from the scheduling process to avoid replacing
|
||||
// one existing secondary with another existing secondary.
|
||||
for another_secondary in self.intent.secondary.iter() {
|
||||
if another_secondary != secondary {
|
||||
exclude.push(*another_secondary);
|
||||
}
|
||||
}
|
||||
|
||||
let replacement = match &self.policy {
|
||||
PlacementPolicy::Attached(_) => {
|
||||
// Secondaries for an attached shard should be scheduled using `SecondaryShardTag`
|
||||
@@ -1356,19 +1348,28 @@ impl TenantShard {
|
||||
/// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
|
||||
/// funciton should not be used to decide whether to reconcile.
|
||||
pub(crate) fn stably_attached(&self) -> Option<NodeId> {
|
||||
// We have an intent to attach for this node
|
||||
let attach_intent = self.intent.attached?;
|
||||
// We have an observed state for this node
|
||||
let location = self.observed.locations.get(&attach_intent)?;
|
||||
// Our observed state is not None, i.e. not in flux
|
||||
let location_config = location.conf.as_ref()?;
|
||||
|
||||
// Check if our intent and observed state agree that this node is in an attached state.
|
||||
match location_config.mode {
|
||||
LocationConfigMode::AttachedMulti
|
||||
| LocationConfigMode::AttachedSingle
|
||||
| LocationConfigMode::AttachedStale => Some(attach_intent),
|
||||
_ => None,
|
||||
if let Some(attach_intent) = self.intent.attached {
|
||||
match self.observed.locations.get(&attach_intent) {
|
||||
Some(loc) => match &loc.conf {
|
||||
Some(conf) => match conf.mode {
|
||||
LocationConfigMode::AttachedMulti
|
||||
| LocationConfigMode::AttachedSingle
|
||||
| LocationConfigMode::AttachedStale => {
|
||||
// Our intent and observed state agree that this node is in an attached state.
|
||||
Some(attach_intent)
|
||||
}
|
||||
// Our observed config is not an attached state
|
||||
_ => None,
|
||||
},
|
||||
// Our observed state is None, i.e. in flux
|
||||
None => None,
|
||||
},
|
||||
// We have no observed state for this node
|
||||
None => None,
|
||||
}
|
||||
} else {
|
||||
// Our intent is not to attach
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -69,11 +69,6 @@ def test_basebackup_cache(neon_env_builder: NeonEnvBuilder):
|
||||
).value
|
||||
== i + 1
|
||||
)
|
||||
# There should be only one basebackup file in the cache.
|
||||
assert metrics.query_one("pageserver_basebackup_cache_entries_total").value == 1
|
||||
# The size of one basebackup for new DB is ~20KB.
|
||||
size_bytes = metrics.query_one("pageserver_basebackup_cache_size_bytes").value
|
||||
assert 10 * 1024 <= size_bytes <= 100 * 1024
|
||||
|
||||
wait_until(check_metrics)
|
||||
|
||||
|
||||
@@ -19,7 +19,6 @@ from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
Safekeeper,
|
||||
StorageControllerApiException,
|
||||
flush_ep_to_pageserver,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
@@ -128,12 +127,6 @@ check_ondisk_data_compatibility_if_enabled = pytest.mark.skipif(
|
||||
reason="CHECK_ONDISK_DATA_COMPATIBILITY env is not set",
|
||||
)
|
||||
|
||||
skip_old_debug_versions = pytest.mark.skipif(
|
||||
os.getenv("BUILD_TYPE", "debug") == "debug"
|
||||
and os.getenv("DEFAULT_PG_VERSION") in [PgVersion.V14, PgVersion.V15, PgVersion.V16],
|
||||
reason="compatibility snaphots not available for old versions of debug builds",
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.xdist_group("compatibility")
|
||||
@pytest.mark.order(before="test_forward_compatibility")
|
||||
@@ -204,7 +197,6 @@ ingest_lag_log_line = ".*ingesting record with timestamp lagging more than wait_
|
||||
|
||||
|
||||
@check_ondisk_data_compatibility_if_enabled
|
||||
@skip_old_debug_versions
|
||||
@pytest.mark.xdist_group("compatibility")
|
||||
@pytest.mark.order(after="test_create_snapshot")
|
||||
def test_backward_compatibility(
|
||||
@@ -232,7 +224,6 @@ def test_backward_compatibility(
|
||||
|
||||
|
||||
@check_ondisk_data_compatibility_if_enabled
|
||||
@skip_old_debug_versions
|
||||
@pytest.mark.xdist_group("compatibility")
|
||||
@pytest.mark.order(after="test_create_snapshot")
|
||||
def test_forward_compatibility(
|
||||
@@ -302,20 +293,7 @@ def test_forward_compatibility(
|
||||
def check_neon_works(env: NeonEnv, test_output_dir: Path, sql_dump_path: Path, repo_dir: Path):
|
||||
ep = env.endpoints.create("main")
|
||||
ep_env = {"LD_LIBRARY_PATH": str(env.pg_distrib_dir / f"v{env.pg_version}/lib")}
|
||||
|
||||
# If the compatibility snapshot was created with --timelines-onto-safekeepers=false,
|
||||
# we should not pass safekeeper_generation to the endpoint because the compute
|
||||
# will not be able to start.
|
||||
# Zero generation is INVALID_GENERATION.
|
||||
generation = 0
|
||||
try:
|
||||
res = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
|
||||
generation = res["generation"]
|
||||
except StorageControllerApiException as e:
|
||||
if e.status_code != 404 or not re.search(r"Timeline .* not found", str(e)):
|
||||
raise e
|
||||
|
||||
ep.start(env=ep_env, safekeeper_generation=generation)
|
||||
ep.start(env=ep_env)
|
||||
|
||||
connstr = ep.connstr()
|
||||
|
||||
@@ -365,7 +343,7 @@ def check_neon_works(env: NeonEnv, test_output_dir: Path, sql_dump_path: Path, r
|
||||
)
|
||||
|
||||
# Timeline exists again: restart the endpoint
|
||||
ep.start(env=ep_env, safekeeper_generation=generation)
|
||||
ep.start(env=ep_env)
|
||||
|
||||
pg_bin.run_capture(
|
||||
["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump-from-wal.sql'}"]
|
||||
@@ -615,7 +593,6 @@ def test_historic_storage_formats(
|
||||
|
||||
|
||||
@check_ondisk_data_compatibility_if_enabled
|
||||
@skip_old_debug_versions
|
||||
@pytest.mark.xdist_group("compatibility")
|
||||
@pytest.mark.parametrize(
|
||||
**fixtures.utils.allpairs_versions(),
|
||||
|
||||
@@ -306,7 +306,13 @@ def test_sql_regress(
|
||||
)
|
||||
|
||||
# Connect to postgres and create a database called "regression".
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
endpoint = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
# Enable the test mode, so that we don't need to patch the test cases.
|
||||
"neon.regress_test_mode = true",
|
||||
],
|
||||
)
|
||||
endpoint.safe_psql(f"CREATE DATABASE {DBNAME}")
|
||||
|
||||
# Create some local directories for pg_regress to run in.
|
||||
|
||||
@@ -1,90 +0,0 @@
|
||||
create or replace function admin_proc()
|
||||
returns event_trigger
|
||||
language plpgsql as
|
||||
$$
|
||||
begin
|
||||
raise notice 'admin event trigger is executed for %', current_user;
|
||||
end;
|
||||
$$;
|
||||
create role neon_superuser;
|
||||
create role neon_admin login inherit createrole createdb in role neon_superuser;
|
||||
grant create on schema public to neon_admin;
|
||||
create database neondb with owner neon_admin;
|
||||
grant all privileges on database neondb to neon_superuser;
|
||||
create role neon_user;
|
||||
grant create on schema public to neon_user;
|
||||
create event trigger on_ddl1 on ddl_command_end
|
||||
execute procedure admin_proc();
|
||||
set role neon_user;
|
||||
-- check that non-privileged user can not change neon.event_triggers
|
||||
set neon.event_triggers to false;
|
||||
ERROR: permission denied to set neon.event_triggers
|
||||
DETAIL: Only "neon_superuser" is allowed to set the GUC
|
||||
-- Non-privileged neon user should not be able to create event trigers
|
||||
create event trigger on_ddl2 on ddl_command_end
|
||||
execute procedure admin_proc();
|
||||
ERROR: permission denied to create event trigger "on_ddl2"
|
||||
HINT: Must be superuser to create an event trigger.
|
||||
set role neon_admin;
|
||||
-- neon_superuser should be able to create event trigers
|
||||
create or replace function neon_proc()
|
||||
returns event_trigger
|
||||
language plpgsql as
|
||||
$$
|
||||
begin
|
||||
raise notice 'neon event trigger is executed for %', current_user;
|
||||
end;
|
||||
$$;
|
||||
NOTICE: admin event trigger is executed for neon_admin
|
||||
create event trigger on_ddl2 on ddl_command_end
|
||||
execute procedure neon_proc();
|
||||
\c neondb neon_admin
|
||||
create or replace function neondb_proc()
|
||||
returns event_trigger
|
||||
language plpgsql as
|
||||
$$
|
||||
begin
|
||||
raise notice 'neondb event trigger is executed for %', current_user;
|
||||
end;
|
||||
$$;
|
||||
create or replace function neondb_secdef_proc()
|
||||
returns event_trigger
|
||||
language plpgsql
|
||||
SECURITY DEFINER
|
||||
as
|
||||
$$
|
||||
begin
|
||||
raise notice 'neondb secdef event trigger is executed for %', current_user;
|
||||
end;
|
||||
$$;
|
||||
-- neon_admin (neon_superuser member) should be able to create event triggers
|
||||
create event trigger on_ddl3 on ddl_command_end
|
||||
execute procedure neondb_proc();
|
||||
create event trigger on_ddl4 on ddl_command_end
|
||||
execute procedure neondb_secdef_proc();
|
||||
-- Check that event trigger is fired for neon_admin
|
||||
create table t1(x integer);
|
||||
NOTICE: neondb event trigger is executed for neon_admin
|
||||
NOTICE: neondb secdef event trigger is executed for neon_admin
|
||||
-- Check that event trigger can be skipped
|
||||
set neon.event_triggers to false;
|
||||
create table t2(x integer);
|
||||
WARNING: Skipping Event Trigger: neon.event_triggers is false
|
||||
WARNING: Skipping Event Trigger: neon.event_triggers is false
|
||||
\c regression cloud_admin
|
||||
-- Check that event triggers are not fired for superuser
|
||||
create table t3(x integer);
|
||||
NOTICE: admin event trigger is executed for cloud_admin
|
||||
WARNING: Skipping Event Trigger
|
||||
DETAIL: Event Trigger function "neon_proc" is owned by non-superuser role "neon_admin", and current_user "cloud_admin" is superuser
|
||||
\c neondb cloud_admin
|
||||
-- Check that user-defined event triggers are not fired for superuser
|
||||
create table t4(x integer);
|
||||
WARNING: Skipping Event Trigger
|
||||
DETAIL: Event Trigger function "neondb_proc" is owned by non-superuser role "neon_admin", and current_user "cloud_admin" is superuser
|
||||
WARNING: Skipping Event Trigger
|
||||
DETAIL: Event Trigger function "neondb_secdef_proc" is owned by non-superuser role "neon_admin", and current_user "cloud_admin" is superuser
|
||||
\c neondb neon_admin
|
||||
-- Check that neon_admin can drop event triggers
|
||||
drop event trigger on_ddl3;
|
||||
drop event trigger on_ddl4;
|
||||
@@ -9,4 +9,3 @@ test: neon-rel-truncate
|
||||
test: neon-clog
|
||||
test: neon-test-utils
|
||||
test: neon-vacuum-full
|
||||
test: neon-event-triggers
|
||||
|
||||
@@ -1,96 +0,0 @@
|
||||
create or replace function admin_proc()
|
||||
returns event_trigger
|
||||
language plpgsql as
|
||||
$$
|
||||
begin
|
||||
raise notice 'admin event trigger is executed for %', current_user;
|
||||
end;
|
||||
$$;
|
||||
|
||||
create role neon_superuser;
|
||||
create role neon_admin login inherit createrole createdb in role neon_superuser;
|
||||
grant create on schema public to neon_admin;
|
||||
create database neondb with owner neon_admin;
|
||||
grant all privileges on database neondb to neon_superuser;
|
||||
|
||||
create role neon_user;
|
||||
grant create on schema public to neon_user;
|
||||
|
||||
create event trigger on_ddl1 on ddl_command_end
|
||||
execute procedure admin_proc();
|
||||
|
||||
set role neon_user;
|
||||
|
||||
-- check that non-privileged user can not change neon.event_triggers
|
||||
set neon.event_triggers to false;
|
||||
|
||||
-- Non-privileged neon user should not be able to create event trigers
|
||||
create event trigger on_ddl2 on ddl_command_end
|
||||
execute procedure admin_proc();
|
||||
|
||||
set role neon_admin;
|
||||
|
||||
-- neon_superuser should be able to create event trigers
|
||||
create or replace function neon_proc()
|
||||
returns event_trigger
|
||||
language plpgsql as
|
||||
$$
|
||||
begin
|
||||
raise notice 'neon event trigger is executed for %', current_user;
|
||||
end;
|
||||
$$;
|
||||
|
||||
create event trigger on_ddl2 on ddl_command_end
|
||||
execute procedure neon_proc();
|
||||
|
||||
\c neondb neon_admin
|
||||
|
||||
create or replace function neondb_proc()
|
||||
returns event_trigger
|
||||
language plpgsql as
|
||||
$$
|
||||
begin
|
||||
raise notice 'neondb event trigger is executed for %', current_user;
|
||||
end;
|
||||
$$;
|
||||
|
||||
create or replace function neondb_secdef_proc()
|
||||
returns event_trigger
|
||||
language plpgsql
|
||||
SECURITY DEFINER
|
||||
as
|
||||
$$
|
||||
begin
|
||||
raise notice 'neondb secdef event trigger is executed for %', current_user;
|
||||
end;
|
||||
$$;
|
||||
|
||||
-- neon_admin (neon_superuser member) should be able to create event triggers
|
||||
create event trigger on_ddl3 on ddl_command_end
|
||||
execute procedure neondb_proc();
|
||||
|
||||
create event trigger on_ddl4 on ddl_command_end
|
||||
execute procedure neondb_secdef_proc();
|
||||
|
||||
-- Check that event trigger is fired for neon_admin
|
||||
create table t1(x integer);
|
||||
|
||||
-- Check that event trigger can be skipped
|
||||
set neon.event_triggers to false;
|
||||
create table t2(x integer);
|
||||
|
||||
\c regression cloud_admin
|
||||
|
||||
-- Check that event triggers are not fired for superuser
|
||||
create table t3(x integer);
|
||||
|
||||
\c neondb cloud_admin
|
||||
|
||||
-- Check that user-defined event triggers are not fired for superuser
|
||||
create table t4(x integer);
|
||||
|
||||
\c neondb neon_admin
|
||||
|
||||
-- Check that neon_admin can drop event triggers
|
||||
drop event trigger on_ddl3;
|
||||
drop event trigger on_ddl4;
|
||||
Reference in New Issue
Block a user