Compare commits

...

14 Commits

Author SHA1 Message Date
Alexander Bayandin
7d175a92d6 DO NOT MERGE: run only 300gb benchmark 2023-11-07 17:48:30 +00:00
Alexander Bayandin
c1748cfe19 Add 300gb benchmark 2023-11-07 17:45:07 +00:00
Alexander Bayandin
e19b543ea2 Nightly Benchmark: simplify matrix 2023-11-07 17:44:38 +00:00
duguorong009
11d9d801b5 pageserver: improve the shutdown log error (#5792)
## Problem
- Close #5784 

## Summary of changes
- Update the `GetActiveTenantError` -> `QueryError` conversion process
in `pageserver/src/page_service.rs`
- Update the pytest logging exceptions in
`./test_runner/regress/test_tenant_detach.py`
2023-11-07 16:57:26 +00:00
Andrew Rudenko
fc47af156f Passing neon options to the console (#5781)
The idea is to pass neon_* prefixed options to control plane. It can be
used by cplane to dynamically create timelines and computes. Such
options also should be excluded from passing to compute. Another issue
is how connection caching is working now, because compute's instance now
depends not only on hostname but probably on such options too I included
them to cache key.
2023-11-07 16:49:26 +01:00
Arpad Müller
e310533ed3 Support JWT key reload in pageserver (#5594)
## Problem

For quickly rotating JWT secrets, we want to be able to reload the JWT
public key file in the pageserver, and also support multiple JWT keys.

See #4897.

## Summary of changes

* Allow directories for the `auth_validation_public_key_path` config
param instead of just files. for the safekeepers, all of their config options
also support multiple JWT keys.
* For the pageservers, make the JWT public keys easily globally swappable
by using the `arc-swap` crate.
* Add an endpoint to the pageserver, triggered by a POST to
`/v1/reload_auth_validation_keys`, that reloads the JWT public keys from
the pre-configured path (for security reasons, you cannot upload any
keys yourself).

Fixes #4897

---------

Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-11-07 15:43:29 +01:00
John Spray
1d68f52b57 pageserver: move deletion failpoint inside backoff (#5814)
## Problem

When enabled, this failpoint would busy-spin in a loop that emits log
messages.

## Summary of changes

Move the failpoint inside a backoff::exponential block: it will still
spam the log, but at much lower rate.

---------

Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-11-07 14:25:51 +00:00
Alexander Bayandin
4cd47b7d4b Dockerfile: Set BUILD_TAG for storage services (#5812)
## Problem

https://github.com/neondatabase/neon/pull/5576 added `build-tag`
reporting to `libmetrics_build_info`, but it's not reported because we
didn't set the corresponding env variable in the build process.

## Summary of changes
- Add `BUILD_TAG` env var while building services
2023-11-07 13:45:59 +00:00
Fernando Luz
0141c95788 build: Add warning when missing postgres submodule during the build (#5614)
I forked the project and in my local repo, I wasn't able to compile the
project and in my search, I found the solution in neon forum. After a PR
discussion, I made a change in the makefile to alert the missing `git
submodules update` step.

---------

Signed-off-by: Fernando Luz <prof.fernando.luz@gmail.com>
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-11-07 12:13:05 +00:00
Shany Pozin
0ac4cf67a6 Use self.tenants instead of TENANTS (#5811) 2023-11-07 11:38:02 +00:00
Joonas Koivunen
4be6bc7251 refactor: remove unnecessary unsafe (#5802)
unsafe impls for `Send` and `Sync` should not be added by default. in
the case of `SlotGuard` removing them does not cause any issues, as the
compiler automatically derives those.

This PR adds requirement to document the unsafety (see
[clippy::undocumented_unsafe_blocks]) and opportunistically adds
`#![deny(unsafe_code)]` to most places where we don't have unsafe code
right now.

TRPL on Send and Sync:
https://doc.rust-lang.org/book/ch16-04-extensible-concurrency-sync-and-send.html

[clippy::undocumented_unsafe_blocks]:
https://rust-lang.github.io/rust-clippy/master/#/undocumented_unsafe_blocks
2023-11-07 10:26:25 +00:00
John Spray
a394f49e0d pageserver: avoid converting an error to anyhow::Error (#5803)
This was preventing it getting cleanly converted to a
CalculateLogicalSizeError::Cancelled, resulting in "Logical size
calculation failed" errors in logs.
2023-11-07 09:35:45 +00:00
John Spray
c00651ff9b pageserver: start refactoring into TenantManager (#5797)
## Problem

See: https://github.com/neondatabase/neon/issues/5796

## Summary of changes

Completing the refactor is quite verbose and can be done in stages: each
interface that is currently called directly from a top-level mgr.rs
function can be moved into TenantManager once the relevant subsystems
have access to it.

Landing the initial change to create of TenantManager is useful because
it enables new code to use it without having to be altered later, and
sets us up to incrementally fix the existing code to use an explicit
Arc<TenantManager> instead of relying on the static TENANTS.
2023-11-07 09:06:53 +00:00
Richy Wang
bea8efac24 Fix comments in 'receive_wal.rs'. (#5807)
## Problem
Some comments in 'receive_wal.rs' is not suitable. It may copy from
'send_wal.rs' and leave it unchanged.
## Summary of changes
This commit fixes two comments in the code:
Changed "/// Unregister walsender." to "/// Unregister walreceiver."
Changed "///Scope guard to access slot in WalSenders registry" to
"///Scope guard to access slot in WalReceivers registry."
2023-11-07 09:13:01 +01:00
56 changed files with 662 additions and 681 deletions

View File

@@ -34,76 +34,6 @@ concurrency:
cancel-in-progress: true
jobs:
bench:
env:
TEST_PG_BENCH_DURATIONS_MATRIX: "300"
TEST_PG_BENCH_SCALES_MATRIX: "10,100"
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 14
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
PLATFORM: "neon-staging"
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
steps:
- uses: actions/checkout@v3
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-release-artifact
path: /tmp/neon/
prefix: latest
- name: Create Neon Project
id: create-neon-project
uses: ./.github/actions/neon-project-create
with:
region_id: ${{ github.event.inputs.region_id || 'aws-us-east-2' }}
postgres_version: ${{ env.DEFAULT_PG_VERSION }}
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
- name: Run benchmark
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
# Set --sparse-ordering option of pytest-order plugin
# to ensure tests are running in order of appears in the file.
# It's important for test_perf_pgbench.py::test_pgbench_remote_* tests
extra_params: -m remote_cluster --sparse-ordering --timeout 5400 --ignore test_runner/performance/test_perf_olap.py
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
- name: Delete Neon Project
if: ${{ always() }}
uses: ./.github/actions/neon-project-delete
with:
project_id: ${{ steps.create-neon-project.outputs.project_id }}
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
- name: Create Allure report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
uses: slackapi/slack-github-action@v1
with:
channel-id: "C033QLM5P7D" # dev-staging-stream
slack-message: "Periodic perf testing: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
generate-matrices:
# Create matrices for the benchmarking jobs, so we run benchmarks on rds only once a week (on Saturday)
#
@@ -116,66 +46,18 @@ jobs:
runs-on: ubuntu-latest
outputs:
pgbench-compare-matrix: ${{ steps.pgbench-compare-matrix.outputs.matrix }}
olap-compare-matrix: ${{ steps.olap-compare-matrix.outputs.matrix }}
tpch-compare-matrix: ${{ steps.tpch-compare-matrix.outputs.matrix }}
steps:
- name: Generate matrix for pgbench benchmark
id: pgbench-compare-matrix
run: |
matrix='{
"platform": [
"neon-captest-new",
"neon-captest-reuse",
"neonvm-captest-new"
],
"db_size": [ "10gb" ],
"include": [{ "platform": "neon-captest-freetier", "db_size": "3gb" },
{ "platform": "neon-captest-new", "db_size": "50gb" },
{ "platform": "neonvm-captest-freetier", "db_size": "3gb" },
{ "platform": "neonvm-captest-new", "db_size": "50gb" }]
}'
if [ "$(date +%A)" = "Saturday" ]; then
matrix=$(echo "$matrix" | jq '.include += [{ "platform": "rds-postgres", "db_size": "10gb"},
{ "platform": "rds-aurora", "db_size": "50gb"}]')
fi
echo "matrix=$(echo "$matrix" | jq --compact-output '.')" >> $GITHUB_OUTPUT
- name: Generate matrix for OLAP benchmarks
id: olap-compare-matrix
run: |
matrix='{
"platform": [
"neon-captest-reuse"
"include": [
{ "platform": "neon-captest-new", "db_size": "300gb", "compute_units": "[7, 7]", "provisioner": "k8s-pod" },
{ "platform": "neonvm-captest-new", "db_size": "300gb", "compute_units": "[7, 7]", "provisioner": "k8s-neonvm" }
]
}'
if [ "$(date +%A)" = "Saturday" ]; then
matrix=$(echo "$matrix" | jq '.include += [{ "platform": "rds-postgres" },
{ "platform": "rds-aurora" }]')
fi
echo "matrix=$(echo "$matrix" | jq --compact-output '.')" >> $GITHUB_OUTPUT
- name: Generate matrix for TPC-H benchmarks
id: tpch-compare-matrix
run: |
matrix='{
"platform": [
"neon-captest-reuse"
],
"scale": [
"10"
]
}'
if [ "$(date +%A)" = "Saturday" ]; then
matrix=$(echo "$matrix" | jq '.include += [{ "platform": "rds-postgres", "scale": "10" },
{ "platform": "rds-aurora", "scale": "10" }]')
fi
echo "matrix=$(echo "$matrix" | jq --compact-output '.')" >> $GITHUB_OUTPUT
pgbench-compare:
@@ -226,8 +108,8 @@ jobs:
region_id: ${{ github.event.inputs.region_id || 'aws-us-east-2' }}
postgres_version: ${{ env.DEFAULT_PG_VERSION }}
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
compute_units: ${{ (matrix.platform == 'neon-captest-freetier' && '[0.25, 0.25]') || '[1, 1]' }}
provisioner: ${{ (contains(matrix.platform, 'neonvm-') && 'k8s-neonvm') || 'k8s-pod' }}
compute_units: ${{ matrix.compute_units }}
provisioner: ${{ matrix.provisioner }}
- name: Set up Connection String
id: set-up-connstr
@@ -317,293 +199,3 @@ jobs:
slack-message: "Periodic perf testing ${{ matrix.platform }}: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
clickbench-compare:
# ClichBench DB for rds-aurora and rds-Postgres deployed to the same clusters
# we use for performance testing in pgbench-compare.
# Run this job only when pgbench-compare is finished to avoid the intersection.
# We might change it after https://github.com/neondatabase/neon/issues/2900.
#
# *_CLICKBENCH_CONNSTR: Genuine ClickBench DB with ~100M rows
# *_CLICKBENCH_10M_CONNSTR: DB with the first 10M rows of ClickBench DB
if: ${{ !cancelled() }}
needs: [ generate-matrices, pgbench-compare ]
strategy:
fail-fast: false
matrix: ${{ fromJson(needs.generate-matrices.outputs.olap-compare-matrix) }}
env:
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 14
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
PLATFORM: ${{ matrix.platform }}
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
steps:
- uses: actions/checkout@v3
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-release-artifact
path: /tmp/neon/
prefix: latest
- name: Add Postgres binaries to PATH
run: |
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/pgbench --version
echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH
- name: Set up Connection String
id: set-up-connstr
run: |
case "${PLATFORM}" in
neon-captest-reuse)
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_CLICKBENCH_10M_CONNSTR }}
;;
rds-aurora)
CONNSTR=${{ secrets.BENCHMARK_RDS_AURORA_CLICKBENCH_10M_CONNSTR }}
;;
rds-postgres)
CONNSTR=${{ secrets.BENCHMARK_RDS_POSTGRES_CLICKBENCH_10M_CONNSTR }}
;;
*)
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
exit 1
;;
esac
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
QUERY="SELECT version();"
if [[ "${PLATFORM}" = "neon"* ]]; then
QUERY="${QUERY} SHOW neon.tenant_id; SHOW neon.timeline_id;"
fi
psql ${CONNSTR} -c "${QUERY}"
- name: ClickBench benchmark
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance/test_perf_olap.py
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_clickbench
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
TEST_OLAP_SCALE: 10
- name: Create Allure report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
uses: slackapi/slack-github-action@v1
with:
channel-id: "C033QLM5P7D" # dev-staging-stream
slack-message: "Periodic OLAP perf testing ${{ matrix.platform }}: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
tpch-compare:
# TCP-H DB for rds-aurora and rds-Postgres deployed to the same clusters
# we use for performance testing in pgbench-compare & clickbench-compare.
# Run this job only when clickbench-compare is finished to avoid the intersection.
# We might change it after https://github.com/neondatabase/neon/issues/2900.
#
# *_TPCH_S10_CONNSTR: DB generated with scale factor 10 (~10 GB)
if: ${{ !cancelled() }}
needs: [ generate-matrices, clickbench-compare ]
strategy:
fail-fast: false
matrix: ${{ fromJson(needs.generate-matrices.outputs.tpch-compare-matrix) }}
env:
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 14
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
PLATFORM: ${{ matrix.platform }}
TEST_OLAP_SCALE: ${{ matrix.scale }}
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
steps:
- uses: actions/checkout@v3
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-release-artifact
path: /tmp/neon/
prefix: latest
- name: Add Postgres binaries to PATH
run: |
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/pgbench --version
echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH
- name: Get Connstring Secret Name
run: |
case "${PLATFORM}" in
neon-captest-reuse)
ENV_PLATFORM=CAPTEST_TPCH
;;
rds-aurora)
ENV_PLATFORM=RDS_AURORA_TPCH
;;
rds-postgres)
ENV_PLATFORM=RDS_AURORA_TPCH
;;
*)
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
exit 1
;;
esac
CONNSTR_SECRET_NAME="BENCHMARK_${ENV_PLATFORM}_S${TEST_OLAP_SCALE}_CONNSTR"
echo "CONNSTR_SECRET_NAME=${CONNSTR_SECRET_NAME}" >> $GITHUB_ENV
- name: Set up Connection String
id: set-up-connstr
run: |
CONNSTR=${{ secrets[env.CONNSTR_SECRET_NAME] }}
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
QUERY="SELECT version();"
if [[ "${PLATFORM}" = "neon"* ]]; then
QUERY="${QUERY} SHOW neon.tenant_id; SHOW neon.timeline_id;"
fi
psql ${CONNSTR} -c "${QUERY}"
- name: Run TPC-H benchmark
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance/test_perf_olap.py
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_tpch
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
TEST_OLAP_SCALE: ${{ matrix.scale }}
- name: Create Allure report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
uses: slackapi/slack-github-action@v1
with:
channel-id: "C033QLM5P7D" # dev-staging-stream
slack-message: "Periodic TPC-H perf testing ${{ matrix.platform }}: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
user-examples-compare:
if: ${{ !cancelled() }}
needs: [ generate-matrices, tpch-compare ]
strategy:
fail-fast: false
matrix: ${{ fromJson(needs.generate-matrices.outputs.olap-compare-matrix) }}
env:
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 14
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
PLATFORM: ${{ matrix.platform }}
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
steps:
- uses: actions/checkout@v3
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-release-artifact
path: /tmp/neon/
prefix: latest
- name: Add Postgres binaries to PATH
run: |
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/pgbench --version
echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH
- name: Set up Connection String
id: set-up-connstr
run: |
case "${PLATFORM}" in
neon-captest-reuse)
CONNSTR=${{ secrets.BENCHMARK_USER_EXAMPLE_CAPTEST_CONNSTR }}
;;
rds-aurora)
CONNSTR=${{ secrets.BENCHMARK_USER_EXAMPLE_RDS_AURORA_CONNSTR }}
;;
rds-postgres)
CONNSTR=${{ secrets.BENCHMARK_USER_EXAMPLE_RDS_POSTGRES_CONNSTR }}
;;
*)
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
exit 1
;;
esac
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
QUERY="SELECT version();"
if [[ "${PLATFORM}" = "neon"* ]]; then
QUERY="${QUERY} SHOW neon.tenant_id; SHOW neon.timeline_id;"
fi
psql ${CONNSTR} -c "${QUERY}"
- name: Run user examples
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance/test_perf_olap.py
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_user_examples
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
- name: Create Allure report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
uses: slackapi/slack-github-action@v1
with:
channel-id: "C033QLM5P7D" # dev-staging-stream
slack-message: "Periodic User example perf testing ${{ matrix.platform }}: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}

View File

@@ -723,6 +723,7 @@ jobs:
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
--context .
--build-arg GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
--build-arg BUILD_TAG=${{ needs.tag.outputs.build-tag }}
--build-arg REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}}
--destination neondatabase/neon:${{needs.tag.outputs.build-tag}}

7
Cargo.lock generated
View File

@@ -170,6 +170,12 @@ dependencies = [
"backtrace",
]
[[package]]
name = "arc-swap"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"
[[package]]
name = "archery"
version = "0.5.0"
@@ -5951,6 +5957,7 @@ name = "utils"
version = "0.1.0"
dependencies = [
"anyhow",
"arc-swap",
"async-trait",
"bincode",
"byteorder",

View File

@@ -36,6 +36,7 @@ license = "Apache-2.0"
## All dependency versions, used in the project
[workspace.dependencies]
anyhow = { version = "1.0", features = ["backtrace"] }
arc-swap = "1.6"
async-compression = { version = "0.4.0", features = ["tokio", "gzip"] }
azure_core = "0.16"
azure_identity = "0.16"

View File

@@ -27,6 +27,7 @@ RUN set -e \
FROM $REPOSITORY/$IMAGE:$TAG AS build
WORKDIR /home/nonroot
ARG GIT_VERSION=local
ARG BUILD_TAG
# Enable https://github.com/paritytech/cachepot to cache Rust crates' compilation results in Docker builds.
# Set up cachepot to use an AWS S3 bucket for cache results, to reuse it between `docker build` invocations.
@@ -78,9 +79,9 @@ COPY --from=build --chown=neon:neon /home/nonroot/target/release/pg_sni_router
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pagectl /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_broker /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_broker /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/neon_local /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/neon_local /usr/local/bin
COPY --from=pg-build /home/nonroot/pg_install/v14 /usr/local/v14/
COPY --from=pg-build /home/nonroot/pg_install/v15 /usr/local/v15/

View File

@@ -72,6 +72,10 @@ neon: postgres-headers walproposer-lib
#
$(POSTGRES_INSTALL_DIR)/build/%/config.status:
+@echo "Configuring Postgres $* build"
@test -s $(ROOT_PROJECT_DIR)/vendor/postgres-$*/configure || { \
echo "\nPostgres submodule not found in $(ROOT_PROJECT_DIR)/vendor/postgres-$*/, execute "; \
echo "'git submodule update --init --recursive --depth 2 --progress .' in project root.\n"; \
exit 1; }
mkdir -p $(POSTGRES_INSTALL_DIR)/build/$*
(cd $(POSTGRES_INSTALL_DIR)/build/$* && \
env PATH="$(EXTRA_PATH_OVERRIDES):$$PATH" $(ROOT_PROJECT_DIR)/vendor/postgres-$*/configure \

View File

@@ -1,7 +1,7 @@
//!
//! Various tools and helpers to handle cluster / compute node (Postgres)
//! configuration.
//!
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
pub mod checker;
pub mod config;
pub mod configurator;

View File

@@ -262,7 +262,7 @@ where
P: Into<Utf8PathBuf>,
{
let path: Utf8PathBuf = path.into();
// SAFETY
// SAFETY:
// pre_exec is marked unsafe because it runs between fork and exec.
// Why is that dangerous in various ways?
// Long answer: https://github.com/rust-lang/rust/issues/39575

View File

@@ -1,11 +1,10 @@
//
// Local control plane.
//
// Can start, configure and stop postgres instances running as a local processes.
//
// Intended to be used in integration tests and in CLI tools for
// local installations.
//
//! Local control plane.
//!
//! Can start, configure and stop postgres instances running as a local processes.
//!
//! Intended to be used in integration tests and in CLI tools for
//! local installations.
#![deny(clippy::undocumented_unsafe_blocks)]
pub mod attachment_service;
mod background_process;

View File

@@ -1,3 +1,5 @@
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
pub mod requests;
pub mod responses;
pub mod spec;

View File

@@ -1,6 +1,6 @@
//!
//! Shared code for consumption metics collection
//!
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
use chrono::{DateTime, Utc};
use rand::Rng;
use serde::{Deserialize, Serialize};

View File

@@ -2,6 +2,7 @@
//! make sure that we use the same dep version everywhere.
//! Otherwise, we might not see all metrics registered via
//! a default registry.
#![deny(clippy::undocumented_unsafe_blocks)]
use once_cell::sync::Lazy;
use prometheus::core::{AtomicU64, Collector, GenericGauge, GenericGaugeVec};
pub use prometheus::opts;

View File

@@ -1,3 +1,5 @@
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
use const_format::formatcp;
/// Public API types

View File

@@ -2,6 +2,8 @@
//! To use, create PostgresBackend and run() it, passing the Handler
//! implementation determining how to process the queries. Currently its API
//! is rather narrow, but we can extend it once required.
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
use anyhow::Context;
use bytes::Bytes;
use futures::pin_mut;

View File

@@ -1,3 +1,5 @@
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
use anyhow::{bail, Context};
use itertools::Itertools;
use std::borrow::Cow;

View File

@@ -8,6 +8,7 @@
// modules included with the postgres_ffi macro depend on the types of the specific version's
// types, and trigger a too eager lint.
#![allow(clippy::duplicate_mod)]
#![deny(clippy::undocumented_unsafe_blocks)]
use bytes::Bytes;
use utils::bin_ser::SerializeError;
@@ -20,6 +21,7 @@ macro_rules! postgres_ffi {
pub mod bindings {
// bindgen generates bindings for a lot of stuff we don't need
#![allow(dead_code)]
#![allow(clippy::undocumented_unsafe_blocks)]
use serde::{Deserialize, Serialize};
include!(concat!(

View File

@@ -1,6 +1,7 @@
//! Postgres protocol messages serialization-deserialization. See
//! <https://www.postgresql.org/docs/devel/protocol-message-formats.html>
//! on message formats.
#![deny(clippy::undocumented_unsafe_blocks)]
pub mod framed;

View File

@@ -6,6 +6,8 @@
//! * [`s3_bucket`] uses AWS S3 bucket as an external storage
//! * [`azure_blob`] allows to use Azure Blob storage as an external storage
//!
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
mod azure_blob;
mod local_fs;

View File

@@ -1,3 +1,5 @@
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
use const_format::formatcp;
/// Public API types

View File

@@ -1,4 +1,6 @@
//! Synthetic size calculation
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
mod calculation;
pub mod svg;

View File

@@ -32,6 +32,8 @@
//! .init();
//! }
//! ```
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
use opentelemetry::sdk::Resource;
use opentelemetry::KeyValue;

View File

@@ -5,6 +5,7 @@ edition.workspace = true
license.workspace = true
[dependencies]
arc-swap.workspace = true
sentry.workspace = true
async-trait.workspace = true
anyhow.workspace = true

View File

@@ -1,7 +1,8 @@
// For details about authentication see docs/authentication.md
use arc_swap::ArcSwap;
use serde;
use std::fs;
use std::{fs, sync::Arc};
use anyhow::Result;
use camino::Utf8Path;
@@ -44,31 +45,88 @@ impl Claims {
}
}
pub struct SwappableJwtAuth(ArcSwap<JwtAuth>);
impl SwappableJwtAuth {
pub fn new(jwt_auth: JwtAuth) -> Self {
SwappableJwtAuth(ArcSwap::new(Arc::new(jwt_auth)))
}
pub fn swap(&self, jwt_auth: JwtAuth) {
self.0.swap(Arc::new(jwt_auth));
}
pub fn decode(&self, token: &str) -> Result<TokenData<Claims>> {
self.0.load().decode(token)
}
}
impl std::fmt::Debug for SwappableJwtAuth {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Swappable({:?})", self.0.load())
}
}
pub struct JwtAuth {
decoding_key: DecodingKey,
decoding_keys: Vec<DecodingKey>,
validation: Validation,
}
impl JwtAuth {
pub fn new(decoding_key: DecodingKey) -> Self {
pub fn new(decoding_keys: Vec<DecodingKey>) -> Self {
let mut validation = Validation::default();
validation.algorithms = vec![STORAGE_TOKEN_ALGORITHM];
// The default 'required_spec_claims' is 'exp'. But we don't want to require
// expiration.
validation.required_spec_claims = [].into();
Self {
decoding_key,
decoding_keys,
validation,
}
}
pub fn from_key_path(key_path: &Utf8Path) -> Result<Self> {
let public_key = fs::read(key_path)?;
Ok(Self::new(DecodingKey::from_ed_pem(&public_key)?))
let metadata = key_path.metadata()?;
let decoding_keys = if metadata.is_dir() {
let mut keys = Vec::new();
for entry in fs::read_dir(key_path)? {
let path = entry?.path();
if !path.is_file() {
// Ignore directories (don't recurse)
continue;
}
let public_key = fs::read(path)?;
keys.push(DecodingKey::from_ed_pem(&public_key)?);
}
keys
} else if metadata.is_file() {
let public_key = fs::read(key_path)?;
vec![DecodingKey::from_ed_pem(&public_key)?]
} else {
anyhow::bail!("path is neither a directory or a file")
};
if decoding_keys.is_empty() {
anyhow::bail!("Configured for JWT auth with zero decoding keys. All JWT gated requests would be rejected.");
}
Ok(Self::new(decoding_keys))
}
/// Attempt to decode the token with the internal decoding keys.
///
/// The function tries the stored decoding keys in succession,
/// and returns the first yielding a successful result.
/// If there is no working decoding key, it returns the last error.
pub fn decode(&self, token: &str) -> Result<TokenData<Claims>> {
Ok(decode(token, &self.decoding_key, &self.validation)?)
let mut res = None;
for decoding_key in &self.decoding_keys {
res = Some(decode(token, decoding_key, &self.validation));
if let Some(Ok(res)) = res {
return Ok(res);
}
}
if let Some(res) = res {
res.map_err(anyhow::Error::new)
} else {
anyhow::bail!("no JWT decoding keys configured")
}
}
}
@@ -129,7 +187,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
let encoded_eddsa = "eyJhbGciOiJFZERTQSIsInR5cCI6IkpXVCJ9.eyJzY29wZSI6InRlbmFudCIsInRlbmFudF9pZCI6IjNkMWY3NTk1YjQ2ODIzMDMwNGUwYjczY2VjYmNiMDgxIiwiaXNzIjoibmVvbi5jb250cm9scGxhbmUiLCJleHAiOjE3MDkyMDA4NzksImlhdCI6MTY3ODQ0MjQ3OX0.U3eA8j-uU-JnhzeO3EDHRuXLwkAUFCPxtGHEgw6p7Ccc3YRbFs2tmCdbD9PZEXP-XsxSeBQi1FY0YPcT3NXADw";
// Check it can be validated with the public key
let auth = JwtAuth::new(DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519)?);
let auth = JwtAuth::new(vec![DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519)?]);
let claims_from_token = auth.decode(encoded_eddsa)?.claims;
assert_eq!(claims_from_token, expected_claims);
@@ -146,7 +204,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
let encoded = encode_from_key_file(&claims, TEST_PRIV_KEY_ED25519)?;
// decode it back
let auth = JwtAuth::new(DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519)?);
let auth = JwtAuth::new(vec![DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519)?]);
let decoded = auth.decode(&encoded)?;
assert_eq!(decoded.claims, claims);

View File

@@ -1,4 +1,4 @@
use crate::auth::{Claims, JwtAuth};
use crate::auth::{Claims, SwappableJwtAuth};
use crate::http::error::{api_error_handler, route_error_handler, ApiError};
use anyhow::Context;
use hyper::header::{HeaderName, AUTHORIZATION};
@@ -389,7 +389,7 @@ fn parse_token(header_value: &str) -> Result<&str, ApiError> {
}
pub fn auth_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
provide_auth: fn(&Request<Body>) -> Option<&JwtAuth>,
provide_auth: fn(&Request<Body>) -> Option<&SwappableJwtAuth>,
) -> Middleware<B, ApiError> {
Middleware::pre(move |req| async move {
if let Some(auth) = provide_auth(&req) {

View File

@@ -120,6 +120,8 @@ impl Id {
chunk[0] = HEX[((b >> 4) & 0xf) as usize];
chunk[1] = HEX[(b & 0xf) as usize];
}
// SAFETY: vec constructed out of `HEX`, it can only be ascii
unsafe { String::from_utf8_unchecked(buf) }
}
}

View File

@@ -1,5 +1,6 @@
//! `utils` is intended to be a place to put code that is shared
//! between other crates in this repository.
#![deny(clippy::undocumented_unsafe_blocks)]
pub mod backoff;

View File

@@ -1,6 +1,7 @@
/// Immediately terminate the calling process without calling
/// atexit callbacks, C runtime destructors etc. We mainly use
/// this to protect coverage data from concurrent writes.
pub fn exit_now(code: u8) {
pub fn exit_now(code: u8) -> ! {
// SAFETY: exiting is safe, the ffi is not safe
unsafe { nix::libc::_exit(code as _) };
}

View File

@@ -1,3 +1,5 @@
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
#![cfg(target_os = "linux")]
use anyhow::Context;

View File

@@ -34,8 +34,11 @@ use postgres_backend::AuthType;
use utils::logging::TracingErrorLayerEnablement;
use utils::signals::ShutdownSignals;
use utils::{
auth::JwtAuth, logging, project_build_tag, project_git_version, sentry_init::init_sentry,
signals::Signal, tcp_listener,
auth::{JwtAuth, SwappableJwtAuth},
logging, project_build_tag, project_git_version,
sentry_init::init_sentry,
signals::Signal,
tcp_listener,
};
project_git_version!(GIT_VERSION);
@@ -321,13 +324,12 @@ fn start_pageserver(
let http_auth;
let pg_auth;
if conf.http_auth_type == AuthType::NeonJWT || conf.pg_auth_type == AuthType::NeonJWT {
// unwrap is ok because check is performed when creating config, so path is set and file exists
// unwrap is ok because check is performed when creating config, so path is set and exists
let key_path = conf.auth_validation_public_key_path.as_ref().unwrap();
info!(
"Loading public key for verifying JWT tokens from {:#?}",
key_path
);
let auth: Arc<JwtAuth> = Arc::new(JwtAuth::from_key_path(key_path)?);
info!("Loading public key(s) for verifying JWT tokens from {key_path:?}");
let jwt_auth = JwtAuth::from_key_path(key_path)?;
let auth: Arc<SwappableJwtAuth> = Arc::new(SwappableJwtAuth::new(jwt_auth));
http_auth = match &conf.http_auth_type {
AuthType::Trust => None,
@@ -410,7 +412,7 @@ fn start_pageserver(
// Scan the local 'tenants/' directory and start loading the tenants
let deletion_queue_client = deletion_queue.new_client();
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
TenantSharedResources {
broker_client: broker_client.clone(),
@@ -420,6 +422,7 @@ fn start_pageserver(
order,
shutdown_pageserver.clone(),
))?;
let tenant_manager = Arc::new(tenant_manager);
BACKGROUND_RUNTIME.spawn({
let init_done_rx = init_done_rx;
@@ -548,6 +551,7 @@ fn start_pageserver(
let router_state = Arc::new(
http::routes::State::new(
conf,
tenant_manager,
http_auth.clone(),
remote_storage.clone(),
broker_client.clone(),

View File

@@ -161,7 +161,7 @@ pub struct PageServerConf {
pub http_auth_type: AuthType,
/// authentication method for libpq connections from compute
pub pg_auth_type: AuthType,
/// Path to a file containing public key for verifying JWT tokens.
/// Path to a file or directory containing public key(s) for verifying JWT tokens.
/// Used for both mgmt and compute auth, if enabled.
pub auth_validation_public_key_path: Option<Utf8PathBuf>,

View File

@@ -55,21 +55,24 @@ impl Deleter {
/// Wrap the remote `delete_objects` with a failpoint
async fn remote_delete(&self) -> Result<(), anyhow::Error> {
fail::fail_point!("deletion-queue-before-execute", |_| {
info!("Skipping execution, failpoint set");
metrics::DELETION_QUEUE
.remote_errors
.with_label_values(&["failpoint"])
.inc();
Err(anyhow::anyhow!("failpoint hit"))
});
// A backoff::retry is used here for two reasons:
// - To provide a backoff rather than busy-polling the API on errors
// - To absorb transient 429/503 conditions without hitting our error
// logging path for issues deleting objects.
backoff::retry(
|| async { self.remote_storage.delete_objects(&self.accumulator).await },
|| async {
fail::fail_point!("deletion-queue-before-execute", |_| {
info!("Skipping execution, failpoint set");
metrics::DELETION_QUEUE
.remote_errors
.with_label_values(&["failpoint"])
.inc();
Err(anyhow::anyhow!("failpoint: deletion-queue-before-execute"))
});
self.remote_storage.delete_objects(&self.accumulator).await
},
|_| false,
3,
10,

View File

@@ -52,6 +52,31 @@ paths:
schema:
type: object
/v1/reload_auth_validation_keys:
post:
description: Reloads the JWT public keys from their pre-configured location on disk.
responses:
"200":
description: The reload completed successfully.
"401":
description: Unauthorized Error
content:
application/json:
schema:
$ref: "#/components/schemas/UnauthorizedError"
"403":
description: Forbidden Error
content:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"500":
description: Generic operation error (also hits if no keys were found)
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}:
parameters:
- name: tenant_id

View File

@@ -20,6 +20,7 @@ use remote_storage::GenericRemoteStorage;
use tenant_size_model::{SizeResult, StorageModel};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::auth::JwtAuth;
use utils::http::endpoint::request_span;
use utils::http::json::json_request_or_empty_body;
use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
@@ -35,8 +36,8 @@ use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
use crate::tenant::config::{LocationConf, TenantConfOpt};
use crate::tenant::mgr::{
GetTenantError, SetNewTenantConfigError, TenantMapError, TenantMapInsertError, TenantSlotError,
TenantSlotUpsertError, TenantStateError,
GetTenantError, SetNewTenantConfigError, TenantManager, TenantMapError, TenantMapInsertError,
TenantSlotError, TenantSlotUpsertError, TenantStateError,
};
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::LayerAccessStatsReset;
@@ -45,7 +46,7 @@ use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, TenantSha
use crate::{config::PageServerConf, tenant::mgr};
use crate::{disk_usage_eviction_task, tenant};
use utils::{
auth::JwtAuth,
auth::SwappableJwtAuth,
generation::Generation,
http::{
endpoint::{self, attach_openapi_ui, auth_middleware, check_permission_with},
@@ -63,7 +64,8 @@ use super::models::ConfigureFailpointsRequest;
pub struct State {
conf: &'static PageServerConf,
auth: Option<Arc<JwtAuth>>,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
allowlist_routes: Vec<Uri>,
remote_storage: Option<GenericRemoteStorage>,
broker_client: storage_broker::BrokerClientChannel,
@@ -74,7 +76,8 @@ pub struct State {
impl State {
pub fn new(
conf: &'static PageServerConf,
auth: Option<Arc<JwtAuth>>,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
remote_storage: Option<GenericRemoteStorage>,
broker_client: storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
@@ -86,6 +89,7 @@ impl State {
.collect::<Vec<_>>();
Ok(Self {
conf,
tenant_manager,
auth,
allowlist_routes,
remote_storage,
@@ -389,6 +393,32 @@ async fn status_handler(
json_response(StatusCode::OK, StatusResponse { id: config.id })
}
async fn reload_auth_validation_keys_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let config = get_config(&request);
let state = get_state(&request);
let Some(shared_auth) = &state.auth else {
return json_response(StatusCode::BAD_REQUEST, ());
};
// unwrap is ok because check is performed when creating config, so path is set and exists
let key_path = config.auth_validation_public_key_path.as_ref().unwrap();
info!("Reloading public key(s) for verifying JWT tokens from {key_path:?}");
match JwtAuth::from_key_path(key_path) {
Ok(new_auth) => {
shared_auth.swap(new_auth);
json_response(StatusCode::OK, ())
}
Err(e) => {
warn!("Error reloading public keys from {key_path:?}: {e:}");
json_response(StatusCode::INTERNAL_SERVER_ERROR, ())
}
}
}
async fn timeline_create_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
@@ -1140,20 +1170,14 @@ async fn put_tenant_location_config_handler(
let location_conf =
LocationConf::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
mgr::upsert_location(
state.conf,
tenant_id,
location_conf,
state.broker_client.clone(),
state.remote_storage.clone(),
state.deletion_queue_client.clone(),
&ctx,
)
.await
// TODO: badrequest assumes the caller was asking for something unreasonable, but in
// principle we might have hit something like concurrent API calls to the same tenant,
// which is not a 400 but a 409.
.map_err(ApiError::BadRequest)?;
state
.tenant_manager
.upsert_location(tenant_id, location_conf, &ctx)
.await
// TODO: badrequest assumes the caller was asking for something unreasonable, but in
// principle we might have hit something like concurrent API calls to the same tenant,
// which is not a 400 but a 409.
.map_err(ApiError::BadRequest)?;
json_response(StatusCode::OK, ())
}
@@ -1695,7 +1719,7 @@ where
pub fn make_router(
state: Arc<State>,
launch_ts: &'static LaunchTimestamp,
auth: Option<Arc<JwtAuth>>,
auth: Option<Arc<SwappableJwtAuth>>,
) -> anyhow::Result<RouterBuilder<hyper::Body, ApiError>> {
let spec = include_bytes!("openapi_spec.yml");
let mut router = attach_openapi_ui(endpoint::make_router(), spec, "/swagger.yml", "/v1/doc");
@@ -1724,6 +1748,9 @@ pub fn make_router(
.put("/v1/failpoints", |r| {
testing_api_handler("manage failpoints", r, failpoints_handler)
})
.post("/v1/reload_auth_validation_keys", |r| {
api_handler(r, reload_auth_validation_keys_handler)
})
.get("/v1/tenant", |r| api_handler(r, tenant_list_handler))
.post("/v1/tenant", |r| api_handler(r, tenant_create_handler))
.get("/v1/tenant/:tenant_id", |r| api_handler(r, tenant_status))

View File

@@ -1,3 +1,5 @@
#![deny(clippy::undocumented_unsafe_blocks)]
mod auth;
pub mod basebackup;
pub mod config;

View File

@@ -14,6 +14,7 @@ use async_compression::tokio::write::GzipEncoder;
use bytes::Buf;
use bytes::Bytes;
use futures::Stream;
use pageserver_api::models::TenantState;
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
@@ -39,7 +40,7 @@ use tracing::field;
use tracing::*;
use utils::id::ConnectionId;
use utils::{
auth::{Claims, JwtAuth, Scope},
auth::{Claims, Scope, SwappableJwtAuth},
id::{TenantId, TimelineId},
lsn::Lsn,
simple_rcu::RcuReadGuard,
@@ -121,7 +122,7 @@ async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<()
pub async fn libpq_listener_main(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<JwtAuth>>,
auth: Option<Arc<SwappableJwtAuth>>,
listener: TcpListener,
auth_type: AuthType,
listener_ctx: RequestContext,
@@ -189,7 +190,7 @@ pub async fn libpq_listener_main(
async fn page_service_conn_main(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<JwtAuth>>,
auth: Option<Arc<SwappableJwtAuth>>,
socket: tokio::net::TcpStream,
auth_type: AuthType,
connection_ctx: RequestContext,
@@ -252,7 +253,7 @@ async fn page_service_conn_main(
struct PageServerHandler {
_conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<JwtAuth>>,
auth: Option<Arc<SwappableJwtAuth>>,
claims: Option<Claims>,
/// The context created for the lifetime of the connection
@@ -266,7 +267,7 @@ impl PageServerHandler {
pub fn new(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<JwtAuth>>,
auth: Option<Arc<SwappableJwtAuth>>,
connection_ctx: RequestContext,
) -> Self {
PageServerHandler {
@@ -1330,6 +1331,9 @@ impl From<GetActiveTenantError> for QueryError {
GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
),
GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
QueryError::Shutdown
}
e => QueryError::Other(anyhow::anyhow!(e)),
}
}

View File

@@ -589,11 +589,7 @@ impl Timeline {
let mut total_size: u64 = 0;
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
for rel in self
.list_rels(*spcnode, *dbnode, lsn, ctx)
.await
.context("list rels")?
{
for rel in self.list_rels(*spcnode, *dbnode, lsn, ctx).await? {
if cancel.is_cancelled() {
return Err(CalculateLogicalSizeError::Cancelled);
}

View File

@@ -200,6 +200,22 @@ async fn unsafe_create_dir_all(path: &Utf8PathBuf) -> std::io::Result<()> {
Ok(())
}
/// The TenantManager is responsible for storing and mutating the collection of all tenants
/// that this pageserver process has state for. Every Tenant and SecondaryTenant instance
/// lives inside the TenantManager.
///
/// The most important role of the TenantManager is to prevent conflicts: e.g. trying to attach
/// the same tenant twice concurrently, or trying to configure the same tenant into secondary
/// and attached modes concurrently.
pub struct TenantManager {
conf: &'static PageServerConf,
// TODO: currently this is a &'static pointing to TENANTs. When we finish refactoring
// out of that static variable, the TenantManager can own this.
// See https://github.com/neondatabase/neon/issues/5796
tenants: &'static std::sync::RwLock<TenantsMap>,
resources: TenantSharedResources,
}
fn emergency_generations(
tenant_confs: &HashMap<TenantId, anyhow::Result<LocationConf>>,
) -> HashMap<TenantId, Generation> {
@@ -366,7 +382,7 @@ pub async fn init_tenant_mgr(
resources: TenantSharedResources,
init_order: InitializationOrder,
cancel: CancellationToken,
) -> anyhow::Result<()> {
) -> anyhow::Result<TenantManager> {
let mut tenants = HashMap::new();
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
@@ -468,7 +484,12 @@ pub async fn init_tenant_mgr(
assert!(matches!(&*tenants_map, &TenantsMap::Initializing));
METRICS.tenant_slots.set(tenants.len() as u64);
*tenants_map = TenantsMap::Open(tenants);
Ok(())
Ok(TenantManager {
conf,
tenants: &TENANTS,
resources,
})
}
/// Wrapper for Tenant::spawn that checks invariants before running, and inserts
@@ -742,139 +763,134 @@ pub(crate) async fn set_new_tenant_config(
Ok(())
}
#[instrument(skip_all, fields(%tenant_id))]
pub(crate) async fn upsert_location(
conf: &'static PageServerConf,
tenant_id: TenantId,
new_location_config: LocationConf,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue_client: DeletionQueueClient,
ctx: &RequestContext,
) -> Result<(), anyhow::Error> {
info!("configuring tenant location {tenant_id} to state {new_location_config:?}");
impl TenantManager {
#[instrument(skip_all, fields(%tenant_id))]
pub(crate) async fn upsert_location(
&self,
tenant_id: TenantId,
new_location_config: LocationConf,
ctx: &RequestContext,
) -> Result<(), anyhow::Error> {
info!("configuring tenant location {tenant_id} to state {new_location_config:?}");
// Special case fast-path for updates to Tenant: if our upsert is only updating configuration,
// then we do not need to set the slot to InProgress, we can just call into the
// existng tenant.
{
let locked = TENANTS.read().unwrap();
let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Write)?;
match (&new_location_config.mode, peek_slot) {
(LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => {
if attach_conf.generation == tenant.generation {
// A transition from Attached to Attached in the same generation, we may
// take our fast path and just provide the updated configuration
// to the tenant.
tenant.set_new_location_config(AttachedTenantConf::try_from(
new_location_config,
)?);
// Special case fast-path for updates to Tenant: if our upsert is only updating configuration,
// then we do not need to set the slot to InProgress, we can just call into the
// existng tenant.
{
let locked = self.tenants.read().unwrap();
let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Write)?;
match (&new_location_config.mode, peek_slot) {
(LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => {
if attach_conf.generation == tenant.generation {
// A transition from Attached to Attached in the same generation, we may
// take our fast path and just provide the updated configuration
// to the tenant.
tenant.set_new_location_config(AttachedTenantConf::try_from(
new_location_config,
)?);
// Persist the new config in the background, to avoid holding up any
// locks while we do so.
// TODO
// Persist the new config in the background, to avoid holding up any
// locks while we do so.
// TODO
return Ok(());
} else {
// Different generations, fall through to general case
return Ok(());
} else {
// Different generations, fall through to general case
}
}
_ => {
// Not an Attached->Attached transition, fall through to general case
}
}
_ => {
// Not an Attached->Attached transition, fall through to general case
}
}
}
// General case for upserts to TenantsMap, excluding the case above: we will substitute an
// InProgress value to the slot while we make whatever changes are required. The state for
// the tenant is inaccessible to the outside world while we are doing this, but that is sensible:
// the state is ill-defined while we're in transition. Transitions are async, but fast: we do
// not do significant I/O, and shutdowns should be prompt via cancellation tokens.
let mut slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::Any)?;
// General case for upserts to TenantsMap, excluding the case above: we will substitute an
// InProgress value to the slot while we make whatever changes are required. The state for
// the tenant is inaccessible to the outside world while we are doing this, but that is sensible:
// the state is ill-defined while we're in transition. Transitions are async, but fast: we do
// not do significant I/O, and shutdowns should be prompt via cancellation tokens.
let mut slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::Any)?;
if let Some(TenantSlot::Attached(tenant)) = slot_guard.get_old_value() {
// The case where we keep a Tenant alive was covered above in the special case
// for Attached->Attached transitions in the same generation. By this point,
// if we see an attached tenant we know it will be discarded and should be
// shut down.
let (_guard, progress) = utils::completion::channel();
if let Some(TenantSlot::Attached(tenant)) = slot_guard.get_old_value() {
// The case where we keep a Tenant alive was covered above in the special case
// for Attached->Attached transitions in the same generation. By this point,
// if we see an attached tenant we know it will be discarded and should be
// shut down.
let (_guard, progress) = utils::completion::channel();
match tenant.get_attach_mode() {
AttachmentMode::Single | AttachmentMode::Multi => {
// Before we leave our state as the presumed holder of the latest generation,
// flush any outstanding deletions to reduce the risk of leaking objects.
deletion_queue_client.flush_advisory()
match tenant.get_attach_mode() {
AttachmentMode::Single | AttachmentMode::Multi => {
// Before we leave our state as the presumed holder of the latest generation,
// flush any outstanding deletions to reduce the risk of leaking objects.
self.resources.deletion_queue_client.flush_advisory()
}
AttachmentMode::Stale => {
// If we're stale there's not point trying to flush deletions
}
};
info!("Shutting down attached tenant");
match tenant.shutdown(progress, false).await {
Ok(()) => {}
Err(barrier) => {
info!("Shutdown already in progress, waiting for it to complete");
barrier.wait().await;
}
}
AttachmentMode::Stale => {
// If we're stale there's not point trying to flush deletions
slot_guard.drop_old_value().expect("We just shut it down");
}
let tenant_path = self.conf.tenant_path(&tenant_id);
let new_slot = match &new_location_config.mode {
LocationMode::Secondary(_) => {
let tenant_path = self.conf.tenant_path(&tenant_id);
// Directory doesn't need to be fsync'd because if we crash it can
// safely be recreated next time this tenant location is configured.
unsafe_create_dir_all(&tenant_path)
.await
.with_context(|| format!("Creating {tenant_path}"))?;
Tenant::persist_tenant_config(self.conf, &tenant_id, &new_location_config)
.await
.map_err(SetNewTenantConfigError::Persist)?;
TenantSlot::Secondary
}
LocationMode::Attached(_attach_config) => {
let timelines_path = self.conf.timelines_path(&tenant_id);
// Directory doesn't need to be fsync'd because we do not depend on
// it to exist after crashes: it may be recreated when tenant is
// re-attached, see https://github.com/neondatabase/neon/issues/5550
unsafe_create_dir_all(&timelines_path)
.await
.with_context(|| format!("Creating {timelines_path}"))?;
Tenant::persist_tenant_config(self.conf, &tenant_id, &new_location_config)
.await
.map_err(SetNewTenantConfigError::Persist)?;
let tenant = tenant_spawn(
self.conf,
tenant_id,
&tenant_path,
self.resources.clone(),
AttachedTenantConf::try_from(new_location_config)?,
None,
self.tenants,
SpawnMode::Normal,
ctx,
)?;
TenantSlot::Attached(tenant)
}
};
info!("Shutting down attached tenant");
match tenant.shutdown(progress, false).await {
Ok(()) => {}
Err(barrier) => {
info!("Shutdown already in progress, waiting for it to complete");
barrier.wait().await;
}
}
slot_guard.drop_old_value().expect("We just shut it down");
slot_guard.upsert(new_slot)?;
Ok(())
}
let tenant_path = conf.tenant_path(&tenant_id);
let new_slot = match &new_location_config.mode {
LocationMode::Secondary(_) => {
let tenant_path = conf.tenant_path(&tenant_id);
// Directory doesn't need to be fsync'd because if we crash it can
// safely be recreated next time this tenant location is configured.
unsafe_create_dir_all(&tenant_path)
.await
.with_context(|| format!("Creating {tenant_path}"))?;
Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config)
.await
.map_err(SetNewTenantConfigError::Persist)?;
TenantSlot::Secondary
}
LocationMode::Attached(_attach_config) => {
let timelines_path = conf.timelines_path(&tenant_id);
// Directory doesn't need to be fsync'd because we do not depend on
// it to exist after crashes: it may be recreated when tenant is
// re-attached, see https://github.com/neondatabase/neon/issues/5550
unsafe_create_dir_all(&timelines_path)
.await
.with_context(|| format!("Creating {timelines_path}"))?;
Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config)
.await
.map_err(SetNewTenantConfigError::Persist)?;
let tenant = tenant_spawn(
conf,
tenant_id,
&tenant_path,
TenantSharedResources {
broker_client,
remote_storage,
deletion_queue_client,
},
AttachedTenantConf::try_from(new_location_config)?,
None,
&TENANTS,
SpawnMode::Normal,
ctx,
)?;
TenantSlot::Attached(tenant)
}
};
slot_guard.upsert(new_slot)?;
Ok(())
}
#[derive(Debug, thiserror::Error)]
@@ -1430,9 +1446,6 @@ pub struct SlotGuard {
_completion: utils::completion::Completion,
}
unsafe impl Send for SlotGuard {}
unsafe impl Sync for SlotGuard {}
impl SlotGuard {
fn new(
tenant_id: TenantId,

View File

@@ -596,21 +596,21 @@ trait CloseFileDescriptors: CommandExt {
impl<C: CommandExt> CloseFileDescriptors for C {
fn close_fds(&mut self) -> &mut Command {
// SAFETY: Code executed inside pre_exec should have async-signal-safety,
// which means it should be safe to execute inside a signal handler.
// The precise meaning depends on platform. See `man signal-safety`
// for the linux definition.
//
// The set_fds_cloexec_threadsafe function is documented to be
// async-signal-safe.
//
// Aside from this function, the rest of the code is re-entrant and
// doesn't make any syscalls. We're just passing constants.
//
// NOTE: It's easy to indirectly cause a malloc or lock a mutex,
// which is not async-signal-safe. Be careful.
unsafe {
self.pre_exec(move || {
// SAFETY: Code executed inside pre_exec should have async-signal-safety,
// which means it should be safe to execute inside a signal handler.
// The precise meaning depends on platform. See `man signal-safety`
// for the linux definition.
//
// The set_fds_cloexec_threadsafe function is documented to be
// async-signal-safe.
//
// Aside from this function, the rest of the code is re-entrant and
// doesn't make any syscalls. We're just passing constants.
//
// NOTE: It's easy to indirectly cause a malloc or lock a mutex,
// which is not async-signal-safe. Be careful.
close_fds::set_fds_cloexec_threadsafe(3, &[]);
Ok(())
})

View File

@@ -1,6 +1,8 @@
//! User credentials used in authentication.
use crate::{auth::password_hack::parse_endpoint_param, error::UserFacingError};
use crate::{
auth::password_hack::parse_endpoint_param, error::UserFacingError, proxy::neon_options,
};
use itertools::Itertools;
use pq_proto::StartupMessageParams;
use std::collections::HashSet;
@@ -38,6 +40,8 @@ pub struct ClientCredentials<'a> {
pub user: &'a str,
// TODO: this is a severe misnomer! We should think of a new name ASAP.
pub project: Option<String>,
pub cache_key: String,
}
impl ClientCredentials<'_> {
@@ -53,6 +57,7 @@ impl<'a> ClientCredentials<'a> {
ClientCredentials {
user: "",
project: None,
cache_key: "".to_string(),
}
}
@@ -120,7 +125,17 @@ impl<'a> ClientCredentials<'a> {
info!(user, project = project.as_deref(), "credentials");
Ok(Self { user, project })
let cache_key = format!(
"{}{}",
project.as_deref().unwrap_or(""),
neon_options(params).unwrap_or("".to_string())
);
Ok(Self {
user,
project,
cache_key,
})
}
}
@@ -176,6 +191,7 @@ mod tests {
let creds = ClientCredentials::parse(&options, sni, common_names)?;
assert_eq!(creds.user, "john_doe");
assert_eq!(creds.project.as_deref(), Some("foo"));
assert_eq!(creds.cache_key, "foo");
Ok(())
}
@@ -303,4 +319,23 @@ mod tests {
_ => panic!("bad error: {err:?}"),
}
}
#[test]
fn parse_neon_options() -> anyhow::Result<()> {
let options = StartupMessageParams::new([
("user", "john_doe"),
("options", "neon_lsn:0/2 neon_endpoint_type:read_write"),
]);
let sni = Some("project.localhost");
let common_names = Some(["localhost".into()].into());
let creds = ClientCredentials::parse(&options, sni, common_names)?;
assert_eq!(creds.project.as_deref(), Some("project"));
assert_eq!(
creds.cache_key,
"projectneon_endpoint_type:read_write neon_lsn:0/2"
);
Ok(())
}
}

View File

@@ -3,6 +3,7 @@ use crate::{
cancellation::CancelClosure,
console::errors::WakeComputeError,
error::{io_error, UserFacingError},
proxy::is_neon_param,
};
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
@@ -278,7 +279,7 @@ fn filtered_options(params: &StartupMessageParams) -> Option<String> {
#[allow(unstable_name_collisions)]
let options: String = params
.options_raw()?
.filter(|opt| parse_endpoint_param(opt).is_none())
.filter(|opt| parse_endpoint_param(opt).is_none() && !is_neon_param(opt))
.intersperse(" ") // TODO: use impl from std once it's stabilized
.collect();
@@ -313,5 +314,11 @@ mod tests {
let params = StartupMessageParams::new([("options", "project = foo")]);
assert_eq!(filtered_options(&params).as_deref(), Some("project = foo"));
let params = StartupMessageParams::new([(
"options",
"project = foo neon_endpoint_type:read_write neon_lsn:0/2",
)]);
assert_eq!(filtered_options(&params).as_deref(), Some("project = foo"));
}
}

View File

@@ -178,6 +178,7 @@ pub struct ConsoleReqExtra<'a> {
pub session_id: uuid::Uuid,
/// Name of client application, if set.
pub application_name: Option<&'a str>,
pub options: Option<&'a str>,
}
/// Auth secret which is managed by the cloud.

View File

@@ -99,6 +99,7 @@ impl Api {
.query(&[
("application_name", extra.application_name),
("project", Some(project)),
("options", extra.options),
])
.build()?;
@@ -151,7 +152,7 @@ impl super::Api for Api {
extra: &ConsoleReqExtra<'_>,
creds: &ClientCredentials,
) -> Result<CachedNodeInfo, WakeComputeError> {
let key = creds.project().expect("impossible");
let key: &str = &creds.cache_key;
// Every time we do a wakeup http request, the compute node will stay up
// for some time (highly depends on the console's scale-to-zero policy);

View File

@@ -1,3 +1,5 @@
#![deny(clippy::undocumented_unsafe_blocks)]
use std::convert::Infallible;
use anyhow::{bail, Context};

View File

@@ -15,10 +15,12 @@ use crate::{
use anyhow::{bail, Context};
use async_trait::async_trait;
use futures::TryFutureExt;
use itertools::Itertools;
use metrics::{exponential_buckets, register_int_counter_vec, IntCounterVec};
use once_cell::sync::Lazy;
use once_cell::sync::{Lazy, OnceCell};
use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams};
use prometheus::{register_histogram_vec, HistogramVec};
use regex::Regex;
use std::{error::Error, io, ops::ControlFlow, sync::Arc, time::Instant};
use tokio::{
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
@@ -881,9 +883,12 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
allow_self_signed_compute,
} = self;
let console_options = neon_options(params);
let extra = console::ConsoleReqExtra {
session_id, // aka this connection's id
application_name: params.get("application_name"),
options: console_options.as_deref(),
};
let mut latency_timer = LatencyTimer::new(mode.protocol_label());
@@ -945,3 +950,27 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
proxy_pass(stream, node.stream, &aux).await
}
}
pub fn neon_options(params: &StartupMessageParams) -> Option<String> {
#[allow(unstable_name_collisions)]
let options: String = params
.options_raw()?
.filter(|opt| is_neon_param(opt))
.sorted() // we sort it to use as cache key
.intersperse(" ") // TODO: use impl from std once it's stabilized
.collect();
// Don't even bother with empty options.
if options.is_empty() {
return None;
}
Some(options)
}
pub fn is_neon_param(bytes: &str) -> bool {
static RE: OnceCell<Regex> = OnceCell::new();
RE.get_or_init(|| Regex::new(r"^neon_\w+:").unwrap());
RE.get().unwrap().is_match(bytes)
}

View File

@@ -440,6 +440,7 @@ fn helper_create_connect_info(
let extra = console::ConsoleReqExtra {
session_id: uuid::Uuid::new_v4(),
application_name: Some("TEST"),
options: None,
};
let creds = auth::BackendType::Test(mechanism);
(cache, extra, creds)

View File

@@ -22,7 +22,10 @@ use tokio_postgres::{AsyncMessage, ReadyForQueryStatus};
use crate::{
auth, console,
proxy::{LatencyTimer, NUM_DB_CONNECTIONS_CLOSED_COUNTER, NUM_DB_CONNECTIONS_OPENED_COUNTER},
proxy::{
neon_options, LatencyTimer, NUM_DB_CONNECTIONS_CLOSED_COUNTER,
NUM_DB_CONNECTIONS_OPENED_COUNTER,
},
usage_metrics::{Ids, MetricCounter, USAGE_METRICS},
};
use crate::{compute, config};
@@ -41,6 +44,7 @@ pub struct ConnInfo {
pub dbname: String,
pub hostname: String,
pub password: String,
pub options: Option<String>,
}
impl ConnInfo {
@@ -401,26 +405,25 @@ async fn connect_to_compute(
let tls = config.tls_config.as_ref();
let common_names = tls.and_then(|tls| tls.common_names.clone());
let credential_params = StartupMessageParams::new([
let params = StartupMessageParams::new([
("user", &conn_info.username),
("database", &conn_info.dbname),
("application_name", APP_NAME),
("options", conn_info.options.as_deref().unwrap_or("")),
]);
let creds = config
.auth_backend
.as_ref()
.map(|_| {
auth::ClientCredentials::parse(
&credential_params,
Some(&conn_info.hostname),
common_names,
)
})
.map(|_| auth::ClientCredentials::parse(&params, Some(&conn_info.hostname), common_names))
.transpose()?;
let console_options = neon_options(&params);
let extra = console::ConsoleReqExtra {
session_id: uuid::Uuid::new_v4(),
application_name: Some(APP_NAME),
options: console_options.as_deref(),
};
let node_info = creds

View File

@@ -174,11 +174,23 @@ fn get_conn_info(
}
}
let pairs = connection_url.query_pairs();
let mut options = Option::None;
for (key, value) in pairs {
if key == "options" {
options = Some(value.to_string());
break;
}
}
Ok(ConnInfo {
username: username.to_owned(),
dbname: dbname.to_owned(),
hostname: hostname.to_owned(),
password: password.to_owned(),
options,
})
}

View File

@@ -1,3 +1,5 @@
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
pub mod checks;
pub mod cloud_admin_api;
pub mod garbage;

View File

@@ -38,7 +38,7 @@ use safekeeper::{http, WAL_REMOVER_RUNTIME};
use safekeeper::{remove_wal, WAL_BACKUP_RUNTIME};
use safekeeper::{wal_backup, HTTP_RUNTIME};
use storage_broker::DEFAULT_ENDPOINT;
use utils::auth::{JwtAuth, Scope};
use utils::auth::{JwtAuth, Scope, SwappableJwtAuth};
use utils::{
id::NodeId,
logging::{self, LogFormat},
@@ -251,10 +251,9 @@ async fn main() -> anyhow::Result<()> {
None
}
Some(path) => {
info!("loading http auth JWT key from {path}");
Some(Arc::new(
JwtAuth::from_key_path(path).context("failed to load the auth key")?,
))
info!("loading http auth JWT key(s) from {path}");
let jwt_auth = JwtAuth::from_key_path(path).context("failed to load the auth key")?;
Some(Arc::new(SwappableJwtAuth::new(jwt_auth)))
}
};

View File

@@ -30,7 +30,7 @@ use crate::timelines_global_map::TimelineDeleteForceResult;
use crate::GlobalTimelines;
use crate::SafeKeeperConf;
use utils::{
auth::JwtAuth,
auth::SwappableJwtAuth,
http::{
endpoint::{self, auth_middleware, check_permission_with},
error::ApiError,
@@ -428,8 +428,11 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
if ALLOWLIST_ROUTES.contains(request.uri()) {
None
} else {
// Option<Arc<JwtAuth>> is always provided as data below, hence unwrap().
request.data::<Option<Arc<JwtAuth>>>().unwrap().as_deref()
// Option<Arc<SwappableJwtAuth>> is always provided as data below, hence unwrap().
request
.data::<Option<Arc<SwappableJwtAuth>>>()
.unwrap()
.as_deref()
}
}))
}

View File

@@ -1,3 +1,4 @@
#![deny(clippy::undocumented_unsafe_blocks)]
use camino::Utf8PathBuf;
use once_cell::sync::Lazy;
use remote_storage::RemoteStorageConfig;
@@ -6,7 +7,10 @@ use tokio::runtime::Runtime;
use std::time::Duration;
use storage_broker::Uri;
use utils::id::{NodeId, TenantId, TenantTimelineId};
use utils::{
auth::SwappableJwtAuth,
id::{NodeId, TenantId, TenantTimelineId},
};
mod auth;
pub mod broker;
@@ -69,7 +73,7 @@ pub struct SafeKeeperConf {
pub wal_backup_enabled: bool,
pub pg_auth: Option<Arc<JwtAuth>>,
pub pg_tenant_only_auth: Option<Arc<JwtAuth>>,
pub http_auth: Option<Arc<JwtAuth>>,
pub http_auth: Option<Arc<SwappableJwtAuth>>,
pub current_thread_runtime: bool,
}

View File

@@ -111,7 +111,7 @@ impl WalReceivers {
.count()
}
/// Unregister walsender.
/// Unregister walreceiver.
fn unregister(self: &Arc<WalReceivers>, id: WalReceiverId) {
let mut shared = self.mutex.lock();
shared.slots[id] = None;
@@ -138,8 +138,8 @@ pub enum WalReceiverStatus {
Streaming,
}
/// Scope guard to access slot in WalSenders registry and unregister from it in
/// Drop.
/// Scope guard to access slot in WalReceivers registry and unregister from
/// it in Drop.
pub struct WalReceiverGuard {
id: WalReceiverId,
walreceivers: Arc<WalReceivers>,

View File

@@ -361,7 +361,6 @@ class PgProtocol:
@dataclass
class AuthKeys:
pub: str
priv: str
def generate_token(self, *, scope: str, **token_data: str) -> str:
@@ -877,9 +876,31 @@ class NeonEnv:
@cached_property
def auth_keys(self) -> AuthKeys:
pub = (Path(self.repo_dir) / "auth_public_key.pem").read_text()
priv = (Path(self.repo_dir) / "auth_private_key.pem").read_text()
return AuthKeys(pub=pub, priv=priv)
return AuthKeys(priv=priv)
def regenerate_keys_at(self, privkey_path: Path, pubkey_path: Path):
# compare generate_auth_keys() in local_env.rs
subprocess.run(
["openssl", "genpkey", "-algorithm", "ed25519", "-out", privkey_path],
cwd=self.repo_dir,
check=True,
)
subprocess.run(
[
"openssl",
"pkey",
"-in",
privkey_path,
"-pubout",
"-out",
pubkey_path,
],
cwd=self.repo_dir,
check=True,
)
del self.auth_keys
def generate_endpoint_id(self) -> str:
"""

View File

@@ -189,6 +189,10 @@ class PageserverHttpClient(requests.Session):
assert res_json is None
return res_json
def reload_auth_validation_keys(self):
res = self.post(f"http://localhost:{self.port}/v1/reload_auth_validation_keys")
self.verbose_error(res)
def tenant_list(self) -> List[Dict[Any, Any]]:
res = self.get(f"http://localhost:{self.port}/v1/tenant")
self.verbose_error(res)

View File

@@ -1,12 +1,35 @@
import os
from contextlib import closing
from pathlib import Path
import psycopg2
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder, PgProtocol
from fixtures.pageserver.http import PageserverApiException
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PgProtocol,
)
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.types import TenantId, TimelineId
def assert_client_authorized(env: NeonEnv, http_client: PageserverHttpClient):
http_client.timeline_create(
pg_version=env.pg_version,
tenant_id=env.initial_tenant,
new_timeline_id=TimelineId.generate(),
ancestor_timeline_id=env.initial_timeline,
)
def assert_client_not_authorized(env: NeonEnv, http_client: PageserverHttpClient):
with pytest.raises(
PageserverApiException,
match="Unauthorized: malformed jwt token",
):
assert_client_authorized(env, http_client)
def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
neon_env_builder.auth_enabled = True
env = neon_env_builder.init_start()
@@ -27,30 +50,16 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
ps.safe_psql("set FOO", password=pageserver_token)
# tenant can create branches
tenant_http_client.timeline_create(
pg_version=env.pg_version,
tenant_id=env.initial_tenant,
new_timeline_id=TimelineId.generate(),
ancestor_timeline_id=env.initial_timeline,
)
assert_client_authorized(env, tenant_http_client)
# console can create branches for tenant
pageserver_http_client.timeline_create(
pg_version=env.pg_version,
tenant_id=env.initial_tenant,
new_timeline_id=TimelineId.generate(),
ancestor_timeline_id=env.initial_timeline,
)
assert_client_authorized(env, pageserver_http_client)
# fail to create branch using token with different tenant_id
with pytest.raises(
PageserverApiException, match="Forbidden: Tenant id mismatch. Permission denied"
):
invalid_tenant_http_client.timeline_create(
pg_version=env.pg_version,
tenant_id=env.initial_tenant,
new_timeline_id=TimelineId.generate(),
ancestor_timeline_id=env.initial_timeline,
)
assert_client_authorized(env, invalid_tenant_http_client)
# create tenant using management token
pageserver_http_client.tenant_create(TenantId.generate())
@@ -82,6 +91,94 @@ def test_compute_auth_to_pageserver(neon_env_builder: NeonEnvBuilder):
assert cur.fetchone() == (5000050000,)
def test_pageserver_multiple_keys(neon_env_builder: NeonEnvBuilder):
neon_env_builder.auth_enabled = True
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.append(".*Unauthorized: malformed jwt token.*")
pageserver_token_old = env.auth_keys.generate_pageserver_token()
pageserver_http_client_old = env.pageserver.http_client(pageserver_token_old)
pageserver_http_client_old.reload_auth_validation_keys()
# This test is to ensure that the pageserver supports multiple keys.
# The neon_local tool generates one key pair at a hardcoded path by default.
# As a preparation for our test, move the public key of the key pair into a
# directory at the same location as the hardcoded path by:
# 1. moving the the file at `configured_pub_key_path` to a temporary location
# 2. creating a new directory at `configured_pub_key_path`
# 3. moving the file from the temporary location into the newly created directory
configured_pub_key_path = Path(env.repo_dir) / "auth_public_key.pem"
os.rename(configured_pub_key_path, Path(env.repo_dir) / "auth_public_key.pem.file")
os.mkdir(configured_pub_key_path)
os.rename(
Path(env.repo_dir) / "auth_public_key.pem.file",
configured_pub_key_path / "auth_public_key_old.pem",
)
# Add a new key pair
# This invalidates env.auth_keys and makes them be regenerated
env.regenerate_keys_at(
Path("auth_private_key.pem"), Path("auth_public_key.pem/auth_public_key_new.pem")
)
# Reload the keys on the pageserver side
pageserver_http_client_old.reload_auth_validation_keys()
# We can continue doing things using the old token
assert_client_authorized(env, pageserver_http_client_old)
pageserver_token_new = env.auth_keys.generate_pageserver_token()
pageserver_http_client_new = env.pageserver.http_client(pageserver_token_new)
# The new token also works
assert_client_authorized(env, pageserver_http_client_new)
# Remove the old token and reload
os.remove(Path(env.repo_dir) / "auth_public_key.pem" / "auth_public_key_old.pem")
pageserver_http_client_old.reload_auth_validation_keys()
# Reloading fails now with the old token, but the new token still works
assert_client_not_authorized(env, pageserver_http_client_old)
assert_client_authorized(env, pageserver_http_client_new)
def test_pageserver_key_reload(neon_env_builder: NeonEnvBuilder):
neon_env_builder.auth_enabled = True
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.append(".*Unauthorized: malformed jwt token.*")
pageserver_token_old = env.auth_keys.generate_pageserver_token()
pageserver_http_client_old = env.pageserver.http_client(pageserver_token_old)
pageserver_http_client_old.reload_auth_validation_keys()
# Regenerate the keys
env.regenerate_keys_at(Path("auth_private_key.pem"), Path("auth_public_key.pem"))
# Reload the keys on the pageserver side
pageserver_http_client_old.reload_auth_validation_keys()
# Next attempt fails as we use the old auth token
with pytest.raises(
PageserverApiException,
match="Unauthorized: malformed jwt token",
):
pageserver_http_client_old.reload_auth_validation_keys()
# same goes for attempts trying to create a timeline
assert_client_not_authorized(env, pageserver_http_client_old)
pageserver_token_new = env.auth_keys.generate_pageserver_token()
pageserver_http_client_new = env.pageserver.http_client(pageserver_token_new)
# timeline creation works with the new token
assert_client_authorized(env, pageserver_http_client_new)
# reloading also works with the new token
pageserver_http_client_new.reload_auth_validation_keys()
@pytest.mark.parametrize("auth_enabled", [False, True])
def test_auth_failures(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
neon_env_builder.auth_enabled = auth_enabled

View File

@@ -17,10 +17,6 @@ def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgB
n_restarts = 10
scale = 10
# Pageserver currently logs requests on non-active tenants at error level
# https://github.com/neondatabase/neon/issues/5784
env.pageserver.allowed_errors.append(".* will not become active. Current state: Stopping.*")
def run_pgbench(connstr: str):
log.info(f"Start a pgbench workload on pg {connstr}")
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])