mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 21:20:37 +00:00
Compare commits
9 Commits
release-pr
...
problame/b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3e455e1680 | ||
|
|
91812e7b00 | ||
|
|
74bfaee67e | ||
|
|
5288c8ca35 | ||
|
|
ecb1cb21aa | ||
|
|
7109db0e58 | ||
|
|
151d07674c | ||
|
|
1f94e31025 | ||
|
|
803b765c76 |
12
.github/file-filters.yaml
vendored
12
.github/file-filters.yaml
vendored
@@ -1,12 +0,0 @@
|
||||
rust_code: ['**/*.rs', '**/Cargo.toml', '**/Cargo.lock']
|
||||
|
||||
v14: ['vendor/postgres-v14/**', 'Makefile', 'pgxn/**']
|
||||
v15: ['vendor/postgres-v15/**', 'Makefile', 'pgxn/**']
|
||||
v16: ['vendor/postgres-v16/**', 'Makefile', 'pgxn/**']
|
||||
v17: ['vendor/postgres-v17/**', 'Makefile', 'pgxn/**']
|
||||
|
||||
rebuild_neon_extra:
|
||||
- .github/workflows/neon_extra_builds.yml
|
||||
|
||||
rebuild_macos:
|
||||
- .github/workflows/build-macos.yml
|
||||
241
.github/workflows/build-macos.yml
vendored
241
.github/workflows/build-macos.yml
vendored
@@ -1,241 +0,0 @@
|
||||
name: Check neon with MacOS builds
|
||||
|
||||
on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
pg_versions:
|
||||
description: "Array of the pg versions to build for, for example: ['v14', 'v17']"
|
||||
type: string
|
||||
default: '[]'
|
||||
required: false
|
||||
rebuild_rust_code:
|
||||
description: "Rebuild Rust code"
|
||||
type: boolean
|
||||
default: false
|
||||
required: false
|
||||
rebuild_everything:
|
||||
description: "If true, rebuild for all versions"
|
||||
type: boolean
|
||||
default: false
|
||||
required: false
|
||||
|
||||
env:
|
||||
RUST_BACKTRACE: 1
|
||||
COPT: '-Werror'
|
||||
|
||||
# TODO: move `check-*` and `files-changed` jobs to the "Caller" Workflow
|
||||
# We should care about that as Github has limitations:
|
||||
# - You can connect up to four levels of workflows
|
||||
# - You can call a maximum of 20 unique reusable workflows from a single workflow file.
|
||||
# https://docs.github.com/en/actions/sharing-automations/reusing-workflows#limitations
|
||||
jobs:
|
||||
build-pgxn:
|
||||
if: |
|
||||
(inputs.pg_versions != '[]' || inputs.rebuild_everything) && (
|
||||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
|
||||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
|
||||
github.ref_name == 'main'
|
||||
)
|
||||
timeout-minutes: 30
|
||||
runs-on: macos-15
|
||||
strategy:
|
||||
matrix:
|
||||
postgres-version: ${{ inputs.rebuild_everything && fromJson('["v14", "v15", "v16", "v17"]') || fromJSON(inputs.pg_versions) }}
|
||||
env:
|
||||
# Use release build only, to have less debug info around
|
||||
# Hence keeping target/ (and general cache size) smaller
|
||||
BUILD_TYPE: release
|
||||
steps:
|
||||
- name: Checkout main repo
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set pg ${{ matrix.postgres-version }} for caching
|
||||
id: pg_rev
|
||||
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-${{ matrix.postgres-version }}) | tee -a "${GITHUB_OUTPUT}"
|
||||
|
||||
- name: Cache postgres ${{ matrix.postgres-version }} build
|
||||
id: cache_pg
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/${{ matrix.postgres-version }}
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ matrix.postgres-version }}-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Checkout submodule vendor/postgres-${{ matrix.postgres-version }}
|
||||
if: steps.cache_pg.outputs.cache-hit != 'true'
|
||||
run: |
|
||||
git submodule init vendor/postgres-${{ matrix.postgres-version }}
|
||||
git submodule update --depth 1 --recursive
|
||||
|
||||
- name: Install build dependencies
|
||||
if: steps.cache_pg.outputs.cache-hit != 'true'
|
||||
run: |
|
||||
brew install flex bison openssl protobuf icu4c
|
||||
|
||||
- name: Set extra env for macOS
|
||||
if: steps.cache_pg.outputs.cache-hit != 'true'
|
||||
run: |
|
||||
echo 'LDFLAGS=-L/usr/local/opt/openssl@3/lib' >> $GITHUB_ENV
|
||||
echo 'CPPFLAGS=-I/usr/local/opt/openssl@3/include' >> $GITHUB_ENV
|
||||
|
||||
- name: Build Postgres ${{ matrix.postgres-version }}
|
||||
if: steps.cache_pg.outputs.cache-hit != 'true'
|
||||
run: |
|
||||
make postgres-${{ matrix.postgres-version }} -j$(sysctl -n hw.ncpu)
|
||||
|
||||
- name: Build Neon Pg Ext ${{ matrix.postgres-version }}
|
||||
if: steps.cache_pg.outputs.cache-hit != 'true'
|
||||
run: |
|
||||
make "neon-pg-ext-${{ matrix.postgres-version }}" -j$(sysctl -n hw.ncpu)
|
||||
|
||||
- name: Get postgres headers ${{ matrix.postgres-version }}
|
||||
if: steps.cache_pg.outputs.cache-hit != 'true'
|
||||
run: |
|
||||
make postgres-headers-${{ matrix.postgres-version }} -j$(sysctl -n hw.ncpu)
|
||||
|
||||
build-walproposer-lib:
|
||||
if: |
|
||||
(inputs.pg_versions != '[]' || inputs.rebuild_everything) && (
|
||||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
|
||||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
|
||||
github.ref_name == 'main'
|
||||
)
|
||||
timeout-minutes: 30
|
||||
runs-on: macos-15
|
||||
needs: [build-pgxn]
|
||||
env:
|
||||
# Use release build only, to have less debug info around
|
||||
# Hence keeping target/ (and general cache size) smaller
|
||||
BUILD_TYPE: release
|
||||
steps:
|
||||
- name: Checkout main repo
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set pg v17 for caching
|
||||
id: pg_rev
|
||||
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v17) | tee -a "${GITHUB_OUTPUT}"
|
||||
|
||||
- name: Cache postgres v17 build
|
||||
id: cache_pg
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/v17
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v17-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Cache walproposer-lib
|
||||
id: cache_walproposer_lib
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/build/walproposer-lib
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-walproposer_lib-v17-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Checkout submodule vendor/postgres-v17
|
||||
if: steps.cache_walproposer_lib.outputs.cache-hit != 'true'
|
||||
run: |
|
||||
git submodule init vendor/postgres-v17
|
||||
git submodule update --depth 1 --recursive
|
||||
|
||||
- name: Install build dependencies
|
||||
if: steps.cache_walproposer_lib.outputs.cache-hit != 'true'
|
||||
run: |
|
||||
brew install flex bison openssl protobuf icu4c
|
||||
|
||||
- name: Set extra env for macOS
|
||||
if: steps.cache_walproposer_lib.outputs.cache-hit != 'true'
|
||||
run: |
|
||||
echo 'LDFLAGS=-L/usr/local/opt/openssl@3/lib' >> $GITHUB_ENV
|
||||
echo 'CPPFLAGS=-I/usr/local/opt/openssl@3/include' >> $GITHUB_ENV
|
||||
|
||||
- name: Build walproposer-lib (only for v17)
|
||||
if: steps.cache_walproposer_lib.outputs.cache-hit != 'true'
|
||||
run:
|
||||
make walproposer-lib -j$(sysctl -n hw.ncpu)
|
||||
|
||||
cargo-build:
|
||||
if: |
|
||||
(inputs.pg_versions != '[]' || inputs.rebuild_rust_code || inputs.rebuild_everything) && (
|
||||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
|
||||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
|
||||
github.ref_name == 'main'
|
||||
)
|
||||
timeout-minutes: 30
|
||||
runs-on: macos-15
|
||||
needs: [build-pgxn, build-walproposer-lib]
|
||||
env:
|
||||
# Use release build only, to have less debug info around
|
||||
# Hence keeping target/ (and general cache size) smaller
|
||||
BUILD_TYPE: release
|
||||
steps:
|
||||
- name: Checkout main repo
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
submodules: true
|
||||
|
||||
- name: Set pg v14 for caching
|
||||
id: pg_rev_v14
|
||||
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v14) | tee -a "${GITHUB_OUTPUT}"
|
||||
- name: Set pg v15 for caching
|
||||
id: pg_rev_v15
|
||||
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v15) | tee -a "${GITHUB_OUTPUT}"
|
||||
- name: Set pg v16 for caching
|
||||
id: pg_rev_v16
|
||||
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v16) | tee -a "${GITHUB_OUTPUT}"
|
||||
- name: Set pg v17 for caching
|
||||
id: pg_rev_v17
|
||||
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v17) | tee -a "${GITHUB_OUTPUT}"
|
||||
|
||||
- name: Cache postgres v14 build
|
||||
id: cache_pg
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/v14
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v14-${{ steps.pg_rev_v14.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
- name: Cache postgres v15 build
|
||||
id: cache_pg_v15
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/v15
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v15-${{ steps.pg_rev_v15.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
- name: Cache postgres v16 build
|
||||
id: cache_pg_v16
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/v16
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v16-${{ steps.pg_rev_v16.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
- name: Cache postgres v17 build
|
||||
id: cache_pg_v17
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/v17
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v17-${{ steps.pg_rev_v17.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Cache cargo deps (only for v17)
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: |
|
||||
~/.cargo/registry
|
||||
!~/.cargo/registry/src
|
||||
~/.cargo/git
|
||||
target
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-cargo-${{ hashFiles('./Cargo.lock') }}-${{ hashFiles('./rust-toolchain.toml') }}-rust
|
||||
|
||||
- name: Cache walproposer-lib
|
||||
id: cache_walproposer_lib
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/build/walproposer-lib
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-walproposer_lib-v17-${{ steps.pg_rev_v17.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Install build dependencies
|
||||
run: |
|
||||
brew install flex bison openssl protobuf icu4c
|
||||
|
||||
- name: Set extra env for macOS
|
||||
run: |
|
||||
echo 'LDFLAGS=-L/usr/local/opt/openssl@3/lib' >> $GITHUB_ENV
|
||||
echo 'CPPFLAGS=-I/usr/local/opt/openssl@3/include' >> $GITHUB_ENV
|
||||
|
||||
- name: Run cargo build (only for v17)
|
||||
run: PQ_LIB_DIR=$(pwd)/pg_install/v17/lib cargo build --all --release -j$(sysctl -n hw.ncpu)
|
||||
|
||||
- name: Check that no warnings are produced (only for v17)
|
||||
run: ./run_clippy.sh
|
||||
10
.github/workflows/build_and_test.yml
vendored
10
.github/workflows/build_and_test.yml
vendored
@@ -979,16 +979,6 @@ jobs:
|
||||
VERSIONS: v14 v15 v16 v17
|
||||
|
||||
steps:
|
||||
- name: Configure AWS credentials
|
||||
uses: aws-actions/configure-aws-credentials@v4
|
||||
with:
|
||||
aws-region: eu-central-1
|
||||
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
|
||||
role-duration-seconds: 3600
|
||||
|
||||
- name: Login to Amazon Dev ECR
|
||||
uses: aws-actions/amazon-ecr-login@v2
|
||||
|
||||
- uses: docker/login-action@v3
|
||||
with:
|
||||
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
|
||||
|
||||
139
.github/workflows/neon_extra_builds.yml
vendored
139
.github/workflows/neon_extra_builds.yml
vendored
@@ -31,15 +31,19 @@ jobs:
|
||||
uses: ./.github/workflows/build-build-tools-image.yml
|
||||
secrets: inherit
|
||||
|
||||
files-changed:
|
||||
name: Detect what files changed
|
||||
runs-on: ubuntu-22.04
|
||||
timeout-minutes: 3
|
||||
outputs:
|
||||
v17: ${{ steps.files_changed.outputs.v17 }}
|
||||
postgres_changes: ${{ steps.postgres_changes.outputs.changes }}
|
||||
rebuild_rust_code: ${{ steps.files_changed.outputs.rust_code }}
|
||||
rebuild_everything: ${{ steps.files_changed.outputs.rebuild_neon_extra || steps.files_changed.outputs.rebuild_macos }}
|
||||
check-macos-build:
|
||||
needs: [ check-permissions ]
|
||||
if: |
|
||||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
|
||||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
|
||||
github.ref_name == 'main'
|
||||
timeout-minutes: 90
|
||||
runs-on: macos-15
|
||||
|
||||
env:
|
||||
# Use release build only, to have less debug info around
|
||||
# Hence keeping target/ (and general cache size) smaller
|
||||
BUILD_TYPE: release
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
@@ -47,45 +51,106 @@ jobs:
|
||||
with:
|
||||
submodules: true
|
||||
|
||||
- name: Check for Postgres changes
|
||||
uses: dorny/paths-filter@1441771bbfdd59dcd748680ee64ebd8faab1a242 #v3
|
||||
id: files_changed
|
||||
- name: Install macOS postgres dependencies
|
||||
run: brew install flex bison openssl protobuf icu4c
|
||||
|
||||
- name: Set pg 14 revision for caching
|
||||
id: pg_v14_rev
|
||||
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v14) >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Set pg 15 revision for caching
|
||||
id: pg_v15_rev
|
||||
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v15) >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Set pg 16 revision for caching
|
||||
id: pg_v16_rev
|
||||
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v16) >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Set pg 17 revision for caching
|
||||
id: pg_v17_rev
|
||||
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v17) >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Cache postgres v14 build
|
||||
id: cache_pg_14
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
token: ${{ github.token }}
|
||||
filters: .github/file-filters.yaml
|
||||
base: ${{ github.event_name != 'pull_request' && (github.event.merge_group.base_ref || github.ref_name) || '' }}
|
||||
ref: ${{ github.event_name != 'pull_request' && (github.event.merge_group.head_ref || github.ref) || '' }}
|
||||
path: pg_install/v14
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Filter out only v-string for build matrix
|
||||
id: postgres_changes
|
||||
- name: Cache postgres v15 build
|
||||
id: cache_pg_15
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/v15
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Cache postgres v16 build
|
||||
id: cache_pg_16
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/v16
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Cache postgres v17 build
|
||||
id: cache_pg_17
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/v17
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v17_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Set extra env for macOS
|
||||
run: |
|
||||
v_strings_only_as_json_array=$(echo ${{ steps.files_changed.outputs.chnages }} | jq '.[]|select(test("v\\d+"))' | jq --slurp -c)
|
||||
echo "changes=${v_strings_only_as_json_array}" | tee -a "${GITHUB_OUTPUT}"
|
||||
echo 'LDFLAGS=-L/usr/local/opt/openssl@3/lib' >> $GITHUB_ENV
|
||||
echo 'CPPFLAGS=-I/usr/local/opt/openssl@3/include' >> $GITHUB_ENV
|
||||
|
||||
check-macos-build:
|
||||
needs: [ check-permissions, files-changed ]
|
||||
if: |
|
||||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
|
||||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
|
||||
github.ref_name == 'main'
|
||||
uses: ./.github/workflows/build-macos.yml
|
||||
with:
|
||||
pg_versions: ${{ needs.files-changed.outputs.postgres_changes }}
|
||||
rebuild_rust_code: ${{ needs.files-changed.outputs.rebuild_rust_code }}
|
||||
rebuild_everything: ${{ fromJson(needs.files-changed.outputs.rebuild_everything) }}
|
||||
- name: Cache cargo deps
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: |
|
||||
~/.cargo/registry
|
||||
!~/.cargo/registry/src
|
||||
~/.cargo/git
|
||||
target
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-cargo-${{ hashFiles('./Cargo.lock') }}-${{ hashFiles('./rust-toolchain.toml') }}-rust
|
||||
|
||||
- name: Build postgres v14
|
||||
if: steps.cache_pg_14.outputs.cache-hit != 'true'
|
||||
run: make postgres-v14 -j$(sysctl -n hw.ncpu)
|
||||
|
||||
- name: Build postgres v15
|
||||
if: steps.cache_pg_15.outputs.cache-hit != 'true'
|
||||
run: make postgres-v15 -j$(sysctl -n hw.ncpu)
|
||||
|
||||
- name: Build postgres v16
|
||||
if: steps.cache_pg_16.outputs.cache-hit != 'true'
|
||||
run: make postgres-v16 -j$(sysctl -n hw.ncpu)
|
||||
|
||||
- name: Build postgres v17
|
||||
if: steps.cache_pg_17.outputs.cache-hit != 'true'
|
||||
run: make postgres-v17 -j$(sysctl -n hw.ncpu)
|
||||
|
||||
- name: Build neon extensions
|
||||
run: make neon-pg-ext -j$(sysctl -n hw.ncpu)
|
||||
|
||||
- name: Build walproposer-lib
|
||||
run: make walproposer-lib -j$(sysctl -n hw.ncpu)
|
||||
|
||||
- name: Run cargo build
|
||||
run: PQ_LIB_DIR=$(pwd)/pg_install/v16/lib cargo build --all --release
|
||||
|
||||
- name: Check that no warnings are produced
|
||||
run: ./run_clippy.sh
|
||||
|
||||
gather-rust-build-stats:
|
||||
needs: [ check-permissions, build-build-tools-image, files-changed ]
|
||||
needs: [ check-permissions, build-build-tools-image ]
|
||||
permissions:
|
||||
id-token: write # aws-actions/configure-aws-credentials
|
||||
statuses: write
|
||||
contents: write
|
||||
if: |
|
||||
(needs.files-changed.outputs.v17 == 'true' || needs.files-changed.outputs.rebuild_everything == 'true') && (
|
||||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-stats') ||
|
||||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
|
||||
github.ref_name == 'main'
|
||||
)
|
||||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-stats') ||
|
||||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
|
||||
github.ref_name == 'main'
|
||||
runs-on: [ self-hosted, large ]
|
||||
container:
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
|
||||
85
Cargo.lock
generated
85
Cargo.lock
generated
@@ -1323,6 +1323,45 @@ dependencies = [
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "console-api"
|
||||
version = "0.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8030735ecb0d128428b64cd379809817e620a40e5001c54465b99ec5feec2857"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"prost",
|
||||
"prost-types",
|
||||
"tonic",
|
||||
"tracing-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "console-subscriber"
|
||||
version = "0.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6539aa9c6a4cd31f4b1c040f860a1eac9aa80e7df6b05d506a6e7179936d6a01"
|
||||
dependencies = [
|
||||
"console-api",
|
||||
"crossbeam-channel",
|
||||
"crossbeam-utils",
|
||||
"futures-task",
|
||||
"hdrhistogram",
|
||||
"humantime",
|
||||
"hyper-util",
|
||||
"prost",
|
||||
"prost-types",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thread_local",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tonic",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "const-oid"
|
||||
version = "0.9.6"
|
||||
@@ -3513,6 +3552,16 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nu-ansi-term"
|
||||
version = "0.46.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
|
||||
dependencies = [
|
||||
"overload",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num"
|
||||
version = "0.4.1"
|
||||
@@ -3808,6 +3857,12 @@ version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a"
|
||||
|
||||
[[package]]
|
||||
name = "overload"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
|
||||
|
||||
[[package]]
|
||||
name = "p256"
|
||||
version = "0.11.1"
|
||||
@@ -6636,6 +6691,7 @@ dependencies = [
|
||||
"signal-hook-registry",
|
||||
"socket2",
|
||||
"tokio-macros",
|
||||
"tracing",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
@@ -6967,6 +7023,17 @@ dependencies = [
|
||||
"syn 2.0.90",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-chrome"
|
||||
version = "0.7.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bf0a738ed5d6450a9fb96e86a23ad808de2b727fd1394585da5cdd6788ffe724"
|
||||
dependencies = [
|
||||
"serde_json",
|
||||
"tracing-core",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-core"
|
||||
version = "0.1.33"
|
||||
@@ -6987,6 +7054,17 @@ dependencies = [
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-flame"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0bae117ee14789185e129aaee5d93750abe67fdc5a9a62650452bfe4e122a3a9"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-log"
|
||||
version = "0.2.0"
|
||||
@@ -7033,6 +7111,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
|
||||
dependencies = [
|
||||
"matchers",
|
||||
"nu-ansi-term",
|
||||
"once_cell",
|
||||
"regex",
|
||||
"serde",
|
||||
@@ -7242,6 +7321,7 @@ dependencies = [
|
||||
"camino",
|
||||
"camino-tempfile",
|
||||
"chrono",
|
||||
"console-subscriber",
|
||||
"const_format",
|
||||
"criterion",
|
||||
"diatomic-waker",
|
||||
@@ -7283,7 +7363,9 @@ dependencies = [
|
||||
"tokio-util",
|
||||
"toml_edit",
|
||||
"tracing",
|
||||
"tracing-chrome",
|
||||
"tracing-error",
|
||||
"tracing-flame",
|
||||
"tracing-subscriber",
|
||||
"url",
|
||||
"uuid",
|
||||
@@ -7797,6 +7879,7 @@ dependencies = [
|
||||
"chrono",
|
||||
"clap",
|
||||
"clap_builder",
|
||||
"crossbeam-utils",
|
||||
"crypto-bigint 0.5.5",
|
||||
"der 0.7.8",
|
||||
"deranged",
|
||||
@@ -7813,6 +7896,7 @@ dependencies = [
|
||||
"getrandom 0.2.11",
|
||||
"half",
|
||||
"hashbrown 0.14.5",
|
||||
"hdrhistogram",
|
||||
"hex",
|
||||
"hmac",
|
||||
"hyper 0.14.30",
|
||||
@@ -7870,6 +7954,7 @@ dependencies = [
|
||||
"tower",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
"tracing-subscriber",
|
||||
"url",
|
||||
"zerocopy",
|
||||
"zeroize",
|
||||
|
||||
@@ -69,7 +69,7 @@ enum EncryptionSecret {
|
||||
|
||||
#[tokio::main]
|
||||
pub(crate) async fn main() -> anyhow::Result<()> {
|
||||
utils::logging::init(
|
||||
let _guard = utils::logging::init(
|
||||
utils::logging::LogFormat::Plain,
|
||||
utils::logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
|
||||
utils::logging::Output::Stdout,
|
||||
|
||||
@@ -15,7 +15,7 @@ use std::time::Instant;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::{DateTime, Utc};
|
||||
use compute_api::spec::{Database, PgIdent, Role};
|
||||
use compute_api::spec::{PgIdent, Role};
|
||||
use futures::future::join_all;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::StreamExt;
|
||||
@@ -45,10 +45,8 @@ use crate::spec_apply::ApplySpecPhase::{
|
||||
DropInvalidDatabases, DropRoles, HandleNeonExtension, HandleOtherExtensions,
|
||||
RenameAndDeleteDatabases, RenameRoles, RunInEachDatabase,
|
||||
};
|
||||
use crate::spec_apply::PerDatabasePhase;
|
||||
use crate::spec_apply::PerDatabasePhase::{
|
||||
ChangeSchemaPerms, DeleteDBRoleReferences, DropSubscriptionsForDeletedDatabases,
|
||||
HandleAnonExtension,
|
||||
ChangeSchemaPerms, DeleteDBRoleReferences, HandleAnonExtension,
|
||||
};
|
||||
use crate::spec_apply::{apply_operations, MutableApplyContext, DB};
|
||||
use crate::sync_sk::{check_if_synced, ping_safekeeper};
|
||||
@@ -836,7 +834,7 @@ impl ComputeNode {
|
||||
conf
|
||||
}
|
||||
|
||||
pub async fn get_maintenance_client(
|
||||
async fn get_maintenance_client(
|
||||
conf: &tokio_postgres::Config,
|
||||
) -> Result<tokio_postgres::Client> {
|
||||
let mut conf = conf.clone();
|
||||
@@ -945,78 +943,6 @@ impl ComputeNode {
|
||||
dbs: databases,
|
||||
}));
|
||||
|
||||
// Apply special pre drop database phase.
|
||||
// NOTE: we use the code of RunInEachDatabase phase for parallelism
|
||||
// and connection management, but we don't really run it in *each* database,
|
||||
// only in databases, we're about to drop.
|
||||
info!("Applying PerDatabase (pre-dropdb) phase");
|
||||
let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));
|
||||
|
||||
// Run the phase for each database that we're about to drop.
|
||||
let db_processes = spec
|
||||
.delta_operations
|
||||
.iter()
|
||||
.flatten()
|
||||
.filter_map(move |op| {
|
||||
if op.action.as_str() == "delete_db" {
|
||||
Some(op.name.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.map(|dbname| {
|
||||
let spec = spec.clone();
|
||||
let ctx = ctx.clone();
|
||||
let jwks_roles = jwks_roles.clone();
|
||||
let mut conf = conf.as_ref().clone();
|
||||
let concurrency_token = concurrency_token.clone();
|
||||
// We only need dbname field for this phase, so set other fields to dummy values
|
||||
let db = DB::UserDB(Database {
|
||||
name: dbname.clone(),
|
||||
owner: "cloud_admin".to_string(),
|
||||
options: None,
|
||||
restrict_conn: false,
|
||||
invalid: false,
|
||||
});
|
||||
|
||||
debug!("Applying per-database phases for Database {:?}", &db);
|
||||
|
||||
match &db {
|
||||
DB::SystemDB => {}
|
||||
DB::UserDB(db) => {
|
||||
conf.dbname(db.name.as_str());
|
||||
}
|
||||
}
|
||||
|
||||
let conf = Arc::new(conf);
|
||||
let fut = Self::apply_spec_sql_db(
|
||||
spec.clone(),
|
||||
conf,
|
||||
ctx.clone(),
|
||||
jwks_roles.clone(),
|
||||
concurrency_token.clone(),
|
||||
db,
|
||||
[DropSubscriptionsForDeletedDatabases].to_vec(),
|
||||
);
|
||||
|
||||
Ok(spawn(fut))
|
||||
})
|
||||
.collect::<Vec<Result<_, anyhow::Error>>>();
|
||||
|
||||
for process in db_processes.into_iter() {
|
||||
let handle = process?;
|
||||
if let Err(e) = handle.await? {
|
||||
// Handle the error case where the database does not exist
|
||||
// We do not check whether the DB exists or not in the deletion phase,
|
||||
// so we shouldn't be strict about it in pre-deletion cleanup as well.
|
||||
if e.to_string().contains("does not exist") {
|
||||
warn!("Error dropping subscription: {}", e);
|
||||
} else {
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
for phase in [
|
||||
CreateSuperUser,
|
||||
DropInvalidDatabases,
|
||||
@@ -1036,7 +962,7 @@ impl ComputeNode {
|
||||
.await?;
|
||||
}
|
||||
|
||||
info!("Applying RunInEachDatabase2 phase");
|
||||
info!("Applying RunInEachDatabase phase");
|
||||
let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));
|
||||
|
||||
let db_processes = spec
|
||||
@@ -1071,12 +997,6 @@ impl ComputeNode {
|
||||
jwks_roles.clone(),
|
||||
concurrency_token.clone(),
|
||||
db,
|
||||
[
|
||||
DeleteDBRoleReferences,
|
||||
ChangeSchemaPerms,
|
||||
HandleAnonExtension,
|
||||
]
|
||||
.to_vec(),
|
||||
);
|
||||
|
||||
Ok(spawn(fut))
|
||||
@@ -1123,13 +1043,16 @@ impl ComputeNode {
|
||||
jwks_roles: Arc<HashSet<String>>,
|
||||
concurrency_token: Arc<tokio::sync::Semaphore>,
|
||||
db: DB,
|
||||
subphases: Vec<PerDatabasePhase>,
|
||||
) -> Result<()> {
|
||||
let _permit = concurrency_token.acquire().await?;
|
||||
|
||||
let mut client_conn = None;
|
||||
|
||||
for subphase in subphases {
|
||||
for subphase in [
|
||||
DeleteDBRoleReferences,
|
||||
ChangeSchemaPerms,
|
||||
HandleAnonExtension,
|
||||
] {
|
||||
apply_operations(
|
||||
spec.clone(),
|
||||
ctx.clone(),
|
||||
|
||||
@@ -47,7 +47,6 @@ pub enum PerDatabasePhase {
|
||||
DeleteDBRoleReferences,
|
||||
ChangeSchemaPerms,
|
||||
HandleAnonExtension,
|
||||
DropSubscriptionsForDeletedDatabases,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -327,12 +326,13 @@ async fn get_operations<'a>(
|
||||
|
||||
// Use FORCE to drop database even if there are active connections.
|
||||
// We run this from `cloud_admin`, so it should have enough privileges.
|
||||
//
|
||||
// NB: there could be other db states, which prevent us from dropping
|
||||
// the database. For example, if db is used by any active subscription
|
||||
// or replication slot.
|
||||
// Such cases are handled in the DropSubscriptionsForDeletedDatabases
|
||||
// phase. We do all the cleanup before actually dropping the database.
|
||||
// TODO: deal with it once we allow logical replication. Proper fix should
|
||||
// involve returning an error code to the control plane, so it could
|
||||
// figure out that this is a non-retryable error, return it to the user
|
||||
// and fail operation permanently.
|
||||
let drop_db_query: String = format!(
|
||||
"DROP DATABASE IF EXISTS {} WITH (FORCE)",
|
||||
&op.name.pg_quote()
|
||||
@@ -444,30 +444,6 @@ async fn get_operations<'a>(
|
||||
}
|
||||
ApplySpecPhase::RunInEachDatabase { db, subphase } => {
|
||||
match subphase {
|
||||
PerDatabasePhase::DropSubscriptionsForDeletedDatabases => {
|
||||
match &db {
|
||||
DB::UserDB(db) => {
|
||||
let drop_subscription_query: String = format!(
|
||||
include_str!("sql/drop_subscription_for_drop_dbs.sql"),
|
||||
datname_str = escape_literal(&db.name),
|
||||
);
|
||||
|
||||
let operations = vec![Operation {
|
||||
query: drop_subscription_query,
|
||||
comment: Some(format!(
|
||||
"optionally dropping subscriptions for DB {}",
|
||||
db.name,
|
||||
)),
|
||||
}]
|
||||
.into_iter();
|
||||
|
||||
Ok(Box::new(operations))
|
||||
}
|
||||
// skip this cleanup for the system databases
|
||||
// because users can't drop them
|
||||
DB::SystemDB => Ok(Box::new(empty())),
|
||||
}
|
||||
}
|
||||
PerDatabasePhase::DeleteDBRoleReferences => {
|
||||
let ctx = ctx.read().await;
|
||||
|
||||
|
||||
@@ -1,11 +0,0 @@
|
||||
DO $$
|
||||
DECLARE
|
||||
subname TEXT;
|
||||
BEGIN
|
||||
FOR subname IN SELECT pg_subscription.subname FROM pg_subscription WHERE subdbid = (SELECT oid FROM pg_database WHERE datname = {datname_str}) LOOP
|
||||
EXECUTE format('ALTER SUBSCRIPTION %I DISABLE;', subname);
|
||||
EXECUTE format('ALTER SUBSCRIPTION %I SET (slot_name = NONE);', subname);
|
||||
EXECUTE format('DROP SUBSCRIPTION %I;', subname);
|
||||
END LOOP;
|
||||
END;
|
||||
$$;
|
||||
@@ -154,7 +154,7 @@ mod reliable_copy_test {
|
||||
/// Run test simulations.
|
||||
#[test]
|
||||
fn sim_example_reliable_copy() {
|
||||
utils::logging::init(
|
||||
let _guard = utils::logging::init(
|
||||
utils::logging::LogFormat::Test,
|
||||
utils::logging::TracingErrorLayerEnablement::Disabled,
|
||||
utils::logging::Output::Stdout,
|
||||
|
||||
@@ -13,7 +13,7 @@ use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
static LOGGING_DONE: OnceCell<()> = OnceCell::new();
|
||||
static LOGGING_DONE: OnceCell<utils::logging::FlushGuard> = OnceCell::new();
|
||||
|
||||
pub(crate) fn upload_stream(
|
||||
content: std::borrow::Cow<'static, [u8]>,
|
||||
@@ -210,6 +210,6 @@ pub(crate) fn ensure_logging_ready() {
|
||||
utils::logging::TracingErrorLayerEnablement::Disabled,
|
||||
utils::logging::Output::Stdout,
|
||||
)
|
||||
.expect("logging init failed");
|
||||
.expect("logging init failed")
|
||||
});
|
||||
}
|
||||
|
||||
@@ -10,6 +10,10 @@ default = []
|
||||
# which adds some runtime cost to run tests on outage conditions
|
||||
testing = ["fail/failpoints"]
|
||||
|
||||
# Enables debugging functionality that's based on the `tracing` crate,
|
||||
# e.g., tokio-console or tracing-chrome.
|
||||
tracing-based-debugging = [ "console-subscriber", "tracing-chrome", "tracing-flame", "tokio/tracing" ]
|
||||
|
||||
[dependencies]
|
||||
arc-swap.workspace = true
|
||||
sentry.workspace = true
|
||||
@@ -20,6 +24,7 @@ bincode.workspace = true
|
||||
bytes.workspace = true
|
||||
camino.workspace = true
|
||||
chrono.workspace = true
|
||||
console-subscriber = { version = "0.4.1", optional = true }
|
||||
diatomic-waker.workspace = true
|
||||
flate2.workspace = true
|
||||
git-version.workspace = true
|
||||
@@ -47,7 +52,9 @@ tokio-tar.workspace = true
|
||||
tokio-util.workspace = true
|
||||
toml_edit = { workspace = true, features = ["serde"] }
|
||||
tracing.workspace = true
|
||||
tracing-chrome = { version = "0.7.2", optional = true }
|
||||
tracing-error.workspace = true
|
||||
tracing-flame = { version = "0.2.0", optional = true }
|
||||
tracing-subscriber = { workspace = true, features = ["json", "registry"] }
|
||||
rand.workspace = true
|
||||
scopeguard.workspace = true
|
||||
@@ -78,6 +85,9 @@ camino-tempfile.workspace = true
|
||||
serde_assert.workspace = true
|
||||
tokio = { workspace = true, features = ["test-util"] }
|
||||
|
||||
[lints.rust]
|
||||
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
|
||||
|
||||
[[bench]]
|
||||
name = "benchmarks"
|
||||
harness = false
|
||||
|
||||
@@ -1,3 +1,10 @@
|
||||
#[cfg(feature = "tracing-based-debugging")]
|
||||
use std::{
|
||||
io::BufWriter,
|
||||
num::NonZeroUsize,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::Context;
|
||||
@@ -98,11 +105,44 @@ pub enum Output {
|
||||
Stderr,
|
||||
}
|
||||
|
||||
/// Keep alive and drop it before the program terminates.
|
||||
#[allow(dead_code)] // We need to store the `Arc<>` for drop semantics.
|
||||
#[must_use]
|
||||
#[cfg(feature = "tracing-based-debugging")]
|
||||
pub struct FlushGuard(Arc<Mutex<FlushGuardInner>>);
|
||||
|
||||
#[cfg(feature = "tracing-based-debugging")]
|
||||
struct FlushGuardInner {
|
||||
_tracing_chrome_layer: Option<tracing_chrome::FlushGuard>,
|
||||
_tracing_flame_layer: Option<tracing_flame::FlushGuard<BufWriter<std::fs::File>>>,
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "tracing-based-debugging"))]
|
||||
pub struct FlushGuard;
|
||||
|
||||
/// Initialize the global tracing subscriber.
|
||||
///
|
||||
/// # Tracing-Based Debugging
|
||||
///
|
||||
/// If feature `tracing-based-debugging` is enabled, this function will add support
|
||||
/// for runtime enablement of various tracing-based debugging tools.
|
||||
///
|
||||
/// The feature is disabled by default to avoid compile time bloat.
|
||||
///
|
||||
/// For example, to use the `tracing-chrome` crate to debug pageserver:
|
||||
///
|
||||
/// 1. Enable the feature by adding `tracing-based-debugging` to the `features` list in
|
||||
/// the pageserver crate's `Cargo.toml`.
|
||||
/// 2. Build pageserver.
|
||||
/// 3. Launch pageserver with env var `NEON_UTILS_LOGGING_ENABLE_TRACING_CHROME=1`.
|
||||
/// 4. Cleanly shut down pageserver.
|
||||
/// 5. Follow instructions of the `tracing-chrome` crate to post-process and visualize
|
||||
/// the trace files.
|
||||
pub fn init(
|
||||
log_format: LogFormat,
|
||||
tracing_error_layer_enablement: TracingErrorLayerEnablement,
|
||||
output: Output,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> anyhow::Result<FlushGuard> {
|
||||
// We fall back to printing all spans at info-level or above if
|
||||
// the RUST_LOG environment variable is not set.
|
||||
let rust_log_env_filter = || {
|
||||
@@ -113,7 +153,9 @@ pub fn init(
|
||||
// NB: the order of the with() calls does not matter.
|
||||
// See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
|
||||
use tracing_subscriber::prelude::*;
|
||||
|
||||
let r = tracing_subscriber::registry();
|
||||
|
||||
let r = r.with({
|
||||
let log_layer = tracing_subscriber::fmt::layer()
|
||||
.with_target(false)
|
||||
@@ -131,17 +173,95 @@ pub fn init(
|
||||
};
|
||||
log_layer.with_filter(rust_log_env_filter())
|
||||
});
|
||||
|
||||
let r = r.with(
|
||||
TracingEventCountLayer(&TRACING_EVENT_COUNT_METRIC).with_filter(rust_log_env_filter()),
|
||||
);
|
||||
match tracing_error_layer_enablement {
|
||||
TracingErrorLayerEnablement::EnableWithRustLogFilter => r
|
||||
.with(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter()))
|
||||
.init(),
|
||||
TracingErrorLayerEnablement::Disabled => r.init(),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
let r = r.with(match tracing_error_layer_enablement {
|
||||
TracingErrorLayerEnablement::EnableWithRustLogFilter => {
|
||||
Some(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter()))
|
||||
}
|
||||
TracingErrorLayerEnablement::Disabled => None,
|
||||
});
|
||||
|
||||
#[cfg(feature = "tracing-based-debugging")]
|
||||
let (r, guard) = {
|
||||
let tracing_chrome_layer_flush_guard;
|
||||
let r = r.with(
|
||||
if crate::env::var("NEON_UTILS_LOGGING_ENABLE_TRACING_CHROME").unwrap_or(false) {
|
||||
let (layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
|
||||
.trace_style(tracing_chrome::TraceStyle::Async)
|
||||
.build();
|
||||
tracing_chrome_layer_flush_guard = Some(guard);
|
||||
Some(layer.with_filter(rust_log_env_filter()))
|
||||
} else {
|
||||
tracing_chrome_layer_flush_guard = None;
|
||||
None
|
||||
},
|
||||
);
|
||||
|
||||
let tracing_flame_flush_guard;
|
||||
let r = r.with(
|
||||
if crate::env::var("NEON_UTILS_LOGGING_ENABLE_TRACING_FLAME").unwrap_or(false) {
|
||||
let (layer, guard) =
|
||||
tracing_flame::FlameLayer::with_file("./tracing.folded").unwrap();
|
||||
let layer = layer
|
||||
.with_empty_samples(false)
|
||||
.with_module_path(false)
|
||||
.with_file_and_line(false)
|
||||
.with_threads_collapsed(true);
|
||||
tracing_flame_flush_guard = Some(guard);
|
||||
Some(layer.with_filter(rust_log_env_filter()))
|
||||
} else {
|
||||
tracing_flame_flush_guard = None;
|
||||
None
|
||||
},
|
||||
);
|
||||
|
||||
let r = {
|
||||
let varname = "NEON_UTILS_LOGGING_ENABLE_TOKIO_CONSOLE";
|
||||
let console_subscriber_config: Option<NonZeroUsize> = crate::env::var(varname);
|
||||
#[cfg(tokio_unstable)]
|
||||
{
|
||||
r.with(match console_subscriber_config {
|
||||
Some(n) => {
|
||||
use console_subscriber::ConsoleLayer;
|
||||
Some(
|
||||
console_subscriber::Builder::default()
|
||||
.event_buffer_capacity(
|
||||
n.get() * ConsoleLayer::DEFAULT_EVENT_BUFFER_CAPACITY,
|
||||
)
|
||||
.client_buffer_capacity(
|
||||
n.get() * ConsoleLayer::DEFAULT_CLIENT_BUFFER_CAPACITY,
|
||||
)
|
||||
.spawn(),
|
||||
)
|
||||
}
|
||||
None => None,
|
||||
})
|
||||
}
|
||||
#[cfg(not(tokio_unstable))]
|
||||
if console_subscriber_config.is_some() {
|
||||
panic!("recompile with --cfg tokio_unstable to enable {varname}");
|
||||
} else {
|
||||
r
|
||||
}
|
||||
};
|
||||
(
|
||||
r,
|
||||
FlushGuard(Arc::new(Mutex::new(FlushGuardInner {
|
||||
_tracing_chrome_layer: tracing_chrome_layer_flush_guard,
|
||||
_tracing_flame_layer: tracing_flame_flush_guard,
|
||||
}))),
|
||||
)
|
||||
};
|
||||
#[cfg(not(feature = "tracing-based-debugging"))]
|
||||
let (r, guard) = (r, FlushGuard);
|
||||
|
||||
r.init();
|
||||
|
||||
Ok(guard)
|
||||
}
|
||||
|
||||
/// Disable the default rust panic hook by using `set_hook`.
|
||||
|
||||
@@ -3,7 +3,7 @@ use pageserver_compaction::interface::CompactionLayer;
|
||||
use pageserver_compaction::simulator::MockTimeline;
|
||||
use utils::logging;
|
||||
|
||||
static LOG_HANDLE: OnceCell<()> = OnceCell::new();
|
||||
static LOG_HANDLE: OnceCell<logging::FlushGuard> = OnceCell::new();
|
||||
|
||||
pub(crate) fn setup_logging() {
|
||||
LOG_HANDLE.get_or_init(|| {
|
||||
|
||||
@@ -115,7 +115,7 @@ struct AnalyzeLayerMapCmd {
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
logging::init(
|
||||
let _guard = logging::init(
|
||||
LogFormat::Plain,
|
||||
TracingErrorLayerEnablement::EnableWithRustLogFilter,
|
||||
logging::Output::Stdout,
|
||||
|
||||
@@ -32,7 +32,7 @@ enum Args {
|
||||
}
|
||||
|
||||
fn main() {
|
||||
logging::init(
|
||||
let _guard = logging::init(
|
||||
logging::LogFormat::Plain,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
logging::Output::Stderr,
|
||||
|
||||
@@ -53,10 +53,12 @@ project_build_tag!(BUILD_TAG);
|
||||
#[global_allocator]
|
||||
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
|
||||
|
||||
/// Configure jemalloc to sample allocations for profiles every 1 MB (1 << 20).
|
||||
#[allow(non_upper_case_globals)]
|
||||
#[export_name = "malloc_conf"]
|
||||
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:20\0";
|
||||
// Configure jemalloc to sample allocations for profiles every 1 MB (1 << 20).
|
||||
// TODO: disabled because concurrent CPU profiles cause seg faults. See:
|
||||
// https://github.com/neondatabase/neon/issues/10225.
|
||||
//#[allow(non_upper_case_globals)]
|
||||
//#[export_name = "malloc_conf"]
|
||||
//pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:20\0";
|
||||
|
||||
const PID_FILE_NAME: &str = "pageserver.pid";
|
||||
|
||||
@@ -112,7 +114,7 @@ fn main() -> anyhow::Result<()> {
|
||||
} else {
|
||||
TracingErrorLayerEnablement::Disabled
|
||||
};
|
||||
logging::init(
|
||||
let _guard = logging::init(
|
||||
conf.log_format,
|
||||
tracing_error_layer_enablement,
|
||||
logging::Output::Stdout,
|
||||
|
||||
@@ -5562,7 +5562,7 @@ pub(crate) mod harness {
|
||||
pub deletion_queue: MockDeletionQueue,
|
||||
}
|
||||
|
||||
static LOG_HANDLE: OnceCell<()> = OnceCell::new();
|
||||
static LOG_HANDLE: OnceCell<logging::FlushGuard> = OnceCell::new();
|
||||
|
||||
pub(crate) fn setup_logging() {
|
||||
LOG_HANDLE.get_or_init(|| {
|
||||
|
||||
@@ -1,18 +1,16 @@
|
||||
use async_trait::async_trait;
|
||||
use postgres_client::config::SslMode;
|
||||
use pq_proto::BeMessage as Be;
|
||||
use std::fmt;
|
||||
use thiserror::Error;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{info, info_span};
|
||||
|
||||
use super::{ComputeCredentialKeys, ControlPlaneApi};
|
||||
use crate::auth::backend::{BackendIpAllowlist, ComputeUserInfo};
|
||||
use super::ComputeCredentialKeys;
|
||||
use crate::auth::IpPattern;
|
||||
use crate::cache::Cached;
|
||||
use crate::config::AuthenticationConfig;
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::{self, client::cplane_proxy_v1, CachedNodeInfo, NodeInfo};
|
||||
use crate::control_plane::{self, CachedNodeInfo, NodeInfo};
|
||||
use crate::error::{ReportableError, UserFacingError};
|
||||
use crate::proxy::connect_compute::ComputeConnectBackend;
|
||||
use crate::stream::PqStream;
|
||||
@@ -33,13 +31,6 @@ pub(crate) enum ConsoleRedirectError {
|
||||
#[derive(Debug)]
|
||||
pub struct ConsoleRedirectBackend {
|
||||
console_uri: reqwest::Url,
|
||||
api: cplane_proxy_v1::NeonControlPlaneClient,
|
||||
}
|
||||
|
||||
impl fmt::Debug for cplane_proxy_v1::NeonControlPlaneClient {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "NeonControlPlaneClient")
|
||||
}
|
||||
}
|
||||
|
||||
impl UserFacingError for ConsoleRedirectError {
|
||||
@@ -80,24 +71,9 @@ pub(crate) fn new_psql_session_id() -> String {
|
||||
hex::encode(rand::random::<[u8; 8]>())
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl BackendIpAllowlist for ConsoleRedirectBackend {
|
||||
async fn get_allowed_ips(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> auth::Result<Vec<auth::IpPattern>> {
|
||||
self.api
|
||||
.get_allowed_ips_and_secret(ctx, user_info)
|
||||
.await
|
||||
.map(|(ips, _)| ips.as_ref().clone())
|
||||
.map_err(|e| e.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl ConsoleRedirectBackend {
|
||||
pub fn new(console_uri: reqwest::Url, api: cplane_proxy_v1::NeonControlPlaneClient) -> Self {
|
||||
Self { console_uri, api }
|
||||
pub fn new(console_uri: reqwest::Url) -> Self {
|
||||
Self { console_uri }
|
||||
}
|
||||
|
||||
pub(crate) async fn authenticate(
|
||||
|
||||
@@ -16,9 +16,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
use crate::auth::credentials::check_peer_addr_is_in_list;
|
||||
use crate::auth::{
|
||||
self, validate_password_and_exchange, AuthError, ComputeUserInfoMaybeEndpoint, IpPattern,
|
||||
};
|
||||
use crate::auth::{self, validate_password_and_exchange, AuthError, ComputeUserInfoMaybeEndpoint};
|
||||
use crate::cache::Cached;
|
||||
use crate::config::AuthenticationConfig;
|
||||
use crate::context::RequestContext;
|
||||
@@ -133,7 +131,7 @@ pub(crate) struct ComputeUserInfoNoEndpoint {
|
||||
pub(crate) options: NeonOptions,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct ComputeUserInfo {
|
||||
pub(crate) endpoint: EndpointId,
|
||||
pub(crate) user: RoleName,
|
||||
@@ -246,15 +244,6 @@ impl AuthenticationConfig {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub(crate) trait BackendIpAllowlist {
|
||||
async fn get_allowed_ips(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> auth::Result<Vec<auth::IpPattern>>;
|
||||
}
|
||||
|
||||
/// True to its name, this function encapsulates our current auth trade-offs.
|
||||
/// Here, we choose the appropriate auth flow based on circumstances.
|
||||
///
|
||||
@@ -267,7 +256,7 @@ async fn auth_quirks(
|
||||
allow_cleartext: bool,
|
||||
config: &'static AuthenticationConfig,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
) -> auth::Result<(ComputeCredentials, Option<Vec<IpPattern>>)> {
|
||||
) -> auth::Result<ComputeCredentials> {
|
||||
// If there's no project so far, that entails that client doesn't
|
||||
// support SNI or other means of passing the endpoint (project) name.
|
||||
// We now expect to see a very specific payload in the place of password.
|
||||
@@ -326,7 +315,7 @@ async fn auth_quirks(
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(keys) => Ok((keys, Some(allowed_ips.as_ref().clone()))),
|
||||
Ok(keys) => Ok(keys),
|
||||
Err(e) => {
|
||||
if e.is_password_failed() {
|
||||
// The password could have been changed, so we invalidate the cache.
|
||||
@@ -396,7 +385,7 @@ impl<'a> Backend<'a, ComputeUserInfoMaybeEndpoint> {
|
||||
allow_cleartext: bool,
|
||||
config: &'static AuthenticationConfig,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
) -> auth::Result<(Backend<'a, ComputeCredentials>, Option<Vec<IpPattern>>)> {
|
||||
) -> auth::Result<Backend<'a, ComputeCredentials>> {
|
||||
let res = match self {
|
||||
Self::ControlPlane(api, user_info) => {
|
||||
debug!(
|
||||
@@ -405,7 +394,7 @@ impl<'a> Backend<'a, ComputeUserInfoMaybeEndpoint> {
|
||||
"performing authentication using the console"
|
||||
);
|
||||
|
||||
let (credentials, ip_allowlist) = auth_quirks(
|
||||
let credentials = auth_quirks(
|
||||
ctx,
|
||||
&*api,
|
||||
user_info,
|
||||
@@ -415,7 +404,7 @@ impl<'a> Backend<'a, ComputeUserInfoMaybeEndpoint> {
|
||||
endpoint_rate_limiter,
|
||||
)
|
||||
.await?;
|
||||
Ok((Backend::ControlPlane(api, credentials), ip_allowlist))
|
||||
Backend::ControlPlane(api, credentials)
|
||||
}
|
||||
Self::Local(_) => {
|
||||
return Err(auth::AuthError::bad_auth_method("invalid for local proxy"))
|
||||
@@ -424,7 +413,7 @@ impl<'a> Backend<'a, ComputeUserInfoMaybeEndpoint> {
|
||||
|
||||
// TODO: replace with some metric
|
||||
info!("user successfully authenticated");
|
||||
res
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -452,24 +441,6 @@ impl Backend<'_, ComputeUserInfo> {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl BackendIpAllowlist for Backend<'_, ()> {
|
||||
async fn get_allowed_ips(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> auth::Result<Vec<auth::IpPattern>> {
|
||||
let auth_data = match self {
|
||||
Self::ControlPlane(api, ()) => api.get_allowed_ips_and_secret(ctx, user_info).await,
|
||||
Self::Local(_) => Ok((Cached::new_uncached(Arc::new(vec![])), None)),
|
||||
};
|
||||
|
||||
auth_data
|
||||
.map(|(ips, _)| ips.as_ref().clone())
|
||||
.map_err(|e| e.into())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ComputeConnectBackend for Backend<'_, ComputeCredentials> {
|
||||
async fn wake_compute(
|
||||
@@ -815,7 +786,7 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(creds.0.info.endpoint, "my-endpoint");
|
||||
assert_eq!(creds.info.endpoint, "my-endpoint");
|
||||
|
||||
handle.await.unwrap();
|
||||
}
|
||||
|
||||
@@ -744,59 +744,9 @@ fn build_auth_backend(
|
||||
}
|
||||
|
||||
AuthBackendType::ConsoleRedirect => {
|
||||
let wake_compute_cache_config: CacheOptions = args.wake_compute_cache.parse()?;
|
||||
let project_info_cache_config: ProjectInfoCacheOptions =
|
||||
args.project_info_cache.parse()?;
|
||||
let endpoint_cache_config: config::EndpointCacheConfig =
|
||||
args.endpoint_cache_config.parse()?;
|
||||
let url = args.uri.parse()?;
|
||||
let backend = ConsoleRedirectBackend::new(url);
|
||||
|
||||
info!("Using NodeInfoCache (wake_compute) with options={wake_compute_cache_config:?}");
|
||||
info!(
|
||||
"Using AllowedIpsCache (wake_compute) with options={project_info_cache_config:?}"
|
||||
);
|
||||
info!("Using EndpointCacheConfig with options={endpoint_cache_config:?}");
|
||||
let caches = Box::leak(Box::new(control_plane::caches::ApiCaches::new(
|
||||
wake_compute_cache_config,
|
||||
project_info_cache_config,
|
||||
endpoint_cache_config,
|
||||
)));
|
||||
|
||||
let config::ConcurrencyLockOptions {
|
||||
shards,
|
||||
limiter,
|
||||
epoch,
|
||||
timeout,
|
||||
} = args.wake_compute_lock.parse()?;
|
||||
info!(?limiter, shards, ?epoch, "Using NodeLocks (wake_compute)");
|
||||
let locks = Box::leak(Box::new(control_plane::locks::ApiLocks::new(
|
||||
"wake_compute_lock",
|
||||
limiter,
|
||||
shards,
|
||||
timeout,
|
||||
epoch,
|
||||
&Metrics::get().wake_compute_lock,
|
||||
)?));
|
||||
|
||||
let url = args.uri.clone().parse()?;
|
||||
let ep_url: proxy::url::ApiUrl = args.auth_endpoint.parse()?;
|
||||
let endpoint = http::Endpoint::new(ep_url, http::new_client());
|
||||
let mut wake_compute_rps_limit = args.wake_compute_limit.clone();
|
||||
RateBucketInfo::validate(&mut wake_compute_rps_limit)?;
|
||||
let wake_compute_endpoint_rate_limiter =
|
||||
Arc::new(WakeComputeRateLimiter::new(wake_compute_rps_limit));
|
||||
|
||||
// Since we use only get_allowed_ips_and_secret() wake_compute_endpoint_rate_limiter
|
||||
// and locks are not used in ConsoleRedirectBackend,
|
||||
// but they are required by the NeonControlPlaneClient
|
||||
let api = control_plane::client::cplane_proxy_v1::NeonControlPlaneClient::new(
|
||||
endpoint,
|
||||
args.control_plane_token.clone(),
|
||||
caches,
|
||||
locks,
|
||||
wake_compute_endpoint_rate_limiter,
|
||||
);
|
||||
|
||||
let backend = ConsoleRedirectBackend::new(url, api);
|
||||
let config = Box::leak(Box::new(backend));
|
||||
|
||||
Ok(Either::Right(config))
|
||||
|
||||
@@ -12,10 +12,8 @@ use tokio::sync::Mutex;
|
||||
use tracing::{debug, info};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::auth::backend::{BackendIpAllowlist, ComputeUserInfo};
|
||||
use crate::auth::{check_peer_addr_is_in_list, AuthError, IpPattern};
|
||||
use crate::auth::{check_peer_addr_is_in_list, IpPattern};
|
||||
use crate::config::ComputeConfig;
|
||||
use crate::context::RequestContext;
|
||||
use crate::error::ReportableError;
|
||||
use crate::ext::LockExt;
|
||||
use crate::metrics::{CancellationRequest, CancellationSource, Metrics};
|
||||
@@ -58,9 +56,6 @@ pub(crate) enum CancelError {
|
||||
|
||||
#[error("IP is not allowed")]
|
||||
IpNotAllowed,
|
||||
|
||||
#[error("Authentication backend error")]
|
||||
AuthError(#[from] AuthError),
|
||||
}
|
||||
|
||||
impl ReportableError for CancelError {
|
||||
@@ -73,7 +68,6 @@ impl ReportableError for CancelError {
|
||||
CancelError::Postgres(_) => crate::error::ErrorKind::Compute,
|
||||
CancelError::RateLimit => crate::error::ErrorKind::RateLimit,
|
||||
CancelError::IpNotAllowed => crate::error::ErrorKind::User,
|
||||
CancelError::AuthError(_) => crate::error::ErrorKind::ControlPlane,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -108,7 +102,10 @@ impl<P: CancellationPublisher> CancellationHandler<P> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Cancelling only in notification, will be removed
|
||||
/// Try to cancel a running query for the corresponding connection.
|
||||
/// If the cancellation key is not found, it will be published to Redis.
|
||||
/// check_allowed - if true, check if the IP is allowed to cancel the query
|
||||
/// return Result primarily for tests
|
||||
pub(crate) async fn cancel_session(
|
||||
&self,
|
||||
key: CancelKeyData,
|
||||
@@ -137,8 +134,7 @@ impl<P: CancellationPublisher> CancellationHandler<P> {
|
||||
}
|
||||
|
||||
// NB: we should immediately release the lock after cloning the token.
|
||||
let cancel_state = self.map.get(&key).and_then(|x| x.clone());
|
||||
let Some(cancel_closure) = cancel_state else {
|
||||
let Some(cancel_closure) = self.map.get(&key).and_then(|x| x.clone()) else {
|
||||
tracing::warn!("query cancellation key not found: {key}");
|
||||
Metrics::get()
|
||||
.proxy
|
||||
@@ -189,96 +185,6 @@ impl<P: CancellationPublisher> CancellationHandler<P> {
|
||||
cancel_closure.try_cancel_query(self.compute_config).await
|
||||
}
|
||||
|
||||
/// Try to cancel a running query for the corresponding connection.
|
||||
/// If the cancellation key is not found, it will be published to Redis.
|
||||
/// check_allowed - if true, check if the IP is allowed to cancel the query.
|
||||
/// Will fetch IP allowlist internally.
|
||||
///
|
||||
/// return Result primarily for tests
|
||||
pub(crate) async fn cancel_session_auth<T: BackendIpAllowlist>(
|
||||
&self,
|
||||
key: CancelKeyData,
|
||||
ctx: RequestContext,
|
||||
check_allowed: bool,
|
||||
auth_backend: &T,
|
||||
) -> Result<(), CancelError> {
|
||||
// TODO: check for unspecified address is only for backward compatibility, should be removed
|
||||
if !ctx.peer_addr().is_unspecified() {
|
||||
let subnet_key = match ctx.peer_addr() {
|
||||
IpAddr::V4(ip) => IpNet::V4(Ipv4Net::new_assert(ip, 24).trunc()), // use defaut mask here
|
||||
IpAddr::V6(ip) => IpNet::V6(Ipv6Net::new_assert(ip, 64).trunc()),
|
||||
};
|
||||
if !self.limiter.lock_propagate_poison().check(subnet_key, 1) {
|
||||
// log only the subnet part of the IP address to know which subnet is rate limited
|
||||
tracing::warn!("Rate limit exceeded. Skipping cancellation message, {subnet_key}");
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.cancellation_requests_total
|
||||
.inc(CancellationRequest {
|
||||
source: self.from,
|
||||
kind: crate::metrics::CancellationOutcome::RateLimitExceeded,
|
||||
});
|
||||
return Err(CancelError::RateLimit);
|
||||
}
|
||||
}
|
||||
|
||||
// NB: we should immediately release the lock after cloning the token.
|
||||
let cancel_state = self.map.get(&key).and_then(|x| x.clone());
|
||||
let Some(cancel_closure) = cancel_state else {
|
||||
tracing::warn!("query cancellation key not found: {key}");
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.cancellation_requests_total
|
||||
.inc(CancellationRequest {
|
||||
source: self.from,
|
||||
kind: crate::metrics::CancellationOutcome::NotFound,
|
||||
});
|
||||
|
||||
if ctx.session_id() == Uuid::nil() {
|
||||
// was already published, do not publish it again
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
match self
|
||||
.client
|
||||
.try_publish(key, ctx.session_id(), ctx.peer_addr())
|
||||
.await
|
||||
{
|
||||
Ok(()) => {} // do nothing
|
||||
Err(e) => {
|
||||
// log it here since cancel_session could be spawned in a task
|
||||
tracing::error!("failed to publish cancellation key: {key}, error: {e}");
|
||||
return Err(CancelError::IO(std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
e.to_string(),
|
||||
)));
|
||||
}
|
||||
}
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let ip_allowlist = auth_backend
|
||||
.get_allowed_ips(&ctx, &cancel_closure.user_info)
|
||||
.await
|
||||
.map_err(CancelError::AuthError)?;
|
||||
|
||||
if check_allowed && !check_peer_addr_is_in_list(&ctx.peer_addr(), &ip_allowlist) {
|
||||
// log it here since cancel_session could be spawned in a task
|
||||
tracing::warn!("IP is not allowed to cancel the query: {key}");
|
||||
return Err(CancelError::IpNotAllowed);
|
||||
}
|
||||
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.cancellation_requests_total
|
||||
.inc(CancellationRequest {
|
||||
source: self.from,
|
||||
kind: crate::metrics::CancellationOutcome::Found,
|
||||
});
|
||||
info!("cancelling query per user's request using key {key}");
|
||||
cancel_closure.try_cancel_query(self.compute_config).await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn contains(&self, session: &Session<P>) -> bool {
|
||||
self.map.contains_key(&session.key)
|
||||
@@ -342,7 +248,6 @@ pub struct CancelClosure {
|
||||
cancel_token: CancelToken,
|
||||
ip_allowlist: Vec<IpPattern>,
|
||||
hostname: String, // for pg_sni router
|
||||
user_info: ComputeUserInfo,
|
||||
}
|
||||
|
||||
impl CancelClosure {
|
||||
@@ -351,14 +256,12 @@ impl CancelClosure {
|
||||
cancel_token: CancelToken,
|
||||
ip_allowlist: Vec<IpPattern>,
|
||||
hostname: String,
|
||||
user_info: ComputeUserInfo,
|
||||
) -> Self {
|
||||
Self {
|
||||
socket_addr,
|
||||
cancel_token,
|
||||
ip_allowlist,
|
||||
hostname,
|
||||
user_info,
|
||||
}
|
||||
}
|
||||
/// Cancels the query running on user's compute node.
|
||||
@@ -385,8 +288,6 @@ impl CancelClosure {
|
||||
debug!("query was cancelled");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Obsolete (will be removed after moving CancelMap to Redis), only for notifications
|
||||
pub(crate) fn set_ip_allowlist(&mut self, ip_allowlist: Vec<IpPattern>) {
|
||||
self.ip_allowlist = ip_allowlist;
|
||||
}
|
||||
|
||||
@@ -13,7 +13,6 @@ use thiserror::Error;
|
||||
use tokio::net::TcpStream;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::auth::backend::ComputeUserInfo;
|
||||
use crate::auth::parse_endpoint_param;
|
||||
use crate::cancellation::CancelClosure;
|
||||
use crate::config::ComputeConfig;
|
||||
@@ -24,10 +23,8 @@ use crate::control_plane::messages::MetricsAuxInfo;
|
||||
use crate::error::{ReportableError, UserFacingError};
|
||||
use crate::metrics::{Metrics, NumDbConnectionsGuard};
|
||||
use crate::proxy::neon_option;
|
||||
use crate::proxy::NeonOptions;
|
||||
use crate::tls::postgres_rustls::MakeRustlsConnect;
|
||||
use crate::types::Host;
|
||||
use crate::types::{EndpointId, RoleName};
|
||||
|
||||
pub const COULD_NOT_CONNECT: &str = "Couldn't connect to compute node";
|
||||
|
||||
@@ -287,28 +284,6 @@ impl ConnCfg {
|
||||
self.0.get_ssl_mode()
|
||||
);
|
||||
|
||||
let compute_info = match parameters.get("user") {
|
||||
Some(user) => {
|
||||
match parameters.get("database") {
|
||||
Some(database) => {
|
||||
ComputeUserInfo {
|
||||
user: RoleName::from(user),
|
||||
options: NeonOptions::default(), // just a shim, we don't need options
|
||||
endpoint: EndpointId::from(database),
|
||||
}
|
||||
}
|
||||
None => {
|
||||
warn!("compute node didn't return database name");
|
||||
ComputeUserInfo::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
warn!("compute node didn't return user name");
|
||||
ComputeUserInfo::default()
|
||||
}
|
||||
};
|
||||
|
||||
// NB: CancelToken is supposed to hold socket_addr, but we use connect_raw.
|
||||
// Yet another reason to rework the connection establishing code.
|
||||
let cancel_closure = CancelClosure::new(
|
||||
@@ -319,9 +294,8 @@ impl ConnCfg {
|
||||
process_id,
|
||||
secret_key,
|
||||
},
|
||||
vec![], // TODO: deprecated, will be removed
|
||||
vec![],
|
||||
host.to_string(),
|
||||
compute_info,
|
||||
);
|
||||
|
||||
let connection = PostgresConnection {
|
||||
|
||||
@@ -159,7 +159,6 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
let request_gauge = metrics.connection_requests.guard(proto);
|
||||
|
||||
let tls = config.tls_config.as_ref();
|
||||
|
||||
let record_handshake_error = !ctx.has_private_peer_addr();
|
||||
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Client);
|
||||
let do_handshake = handshake(ctx, stream, tls, record_handshake_error);
|
||||
@@ -172,20 +171,23 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
// spawn a task to cancel the session, but don't wait for it
|
||||
cancellations.spawn({
|
||||
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
|
||||
let ctx = ctx.clone();
|
||||
let cancel_span = tracing::span!(parent: None, tracing::Level::INFO, "cancel_session", session_id = ?ctx.session_id());
|
||||
let session_id = ctx.session_id();
|
||||
let peer_ip = ctx.peer_addr();
|
||||
let cancel_span = tracing::span!(parent: None, tracing::Level::INFO, "cancel_session", session_id = ?session_id);
|
||||
cancel_span.follows_from(tracing::Span::current());
|
||||
async move {
|
||||
cancellation_handler_clone
|
||||
.cancel_session_auth(
|
||||
cancel_key_data,
|
||||
ctx,
|
||||
config.authentication_config.ip_allowlist_check_enabled,
|
||||
backend,
|
||||
)
|
||||
.await
|
||||
.inspect_err(|e | debug!(error = ?e, "cancel_session failed")).ok();
|
||||
}.instrument(cancel_span)
|
||||
drop(
|
||||
cancellation_handler_clone
|
||||
.cancel_session(
|
||||
cancel_key_data,
|
||||
session_id,
|
||||
peer_ip,
|
||||
config.authentication_config.ip_allowlist_check_enabled,
|
||||
)
|
||||
.instrument(cancel_span)
|
||||
.await,
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
return Ok(None);
|
||||
|
||||
@@ -29,7 +29,7 @@ use crate::rate_limiter::WakeComputeRateLimiter;
|
||||
use crate::types::{EndpointCacheKey, EndpointId};
|
||||
use crate::{compute, http, scram};
|
||||
|
||||
pub(crate) const X_REQUEST_ID: HeaderName = HeaderName::from_static("x-request-id");
|
||||
const X_REQUEST_ID: HeaderName = HeaderName::from_static("x-request-id");
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct NeonControlPlaneClient {
|
||||
@@ -78,30 +78,15 @@ impl NeonControlPlaneClient {
|
||||
info!("endpoint is not valid, skipping the request");
|
||||
return Ok(AuthInfo::default());
|
||||
}
|
||||
self.do_get_auth_req(user_info, &ctx.session_id(), Some(ctx))
|
||||
.await
|
||||
}
|
||||
|
||||
async fn do_get_auth_req(
|
||||
&self,
|
||||
user_info: &ComputeUserInfo,
|
||||
session_id: &uuid::Uuid,
|
||||
ctx: Option<&RequestContext>,
|
||||
) -> Result<AuthInfo, GetAuthInfoError> {
|
||||
let request_id: String = session_id.to_string();
|
||||
let application_name = if let Some(ctx) = ctx {
|
||||
ctx.console_application_name()
|
||||
} else {
|
||||
"auth_cancellation".to_string()
|
||||
};
|
||||
|
||||
let request_id = ctx.session_id().to_string();
|
||||
let application_name = ctx.console_application_name();
|
||||
async {
|
||||
let request = self
|
||||
.endpoint
|
||||
.get_path("get_endpoint_access_control")
|
||||
.header(X_REQUEST_ID, &request_id)
|
||||
.header(AUTHORIZATION, format!("Bearer {}", &self.jwt))
|
||||
.query(&[("session_id", session_id)])
|
||||
.query(&[("session_id", ctx.session_id())])
|
||||
.query(&[
|
||||
("application_name", application_name.as_str()),
|
||||
("endpointish", user_info.endpoint.as_str()),
|
||||
@@ -111,16 +96,9 @@ impl NeonControlPlaneClient {
|
||||
|
||||
debug!(url = request.url().as_str(), "sending http request");
|
||||
let start = Instant::now();
|
||||
let response = match ctx {
|
||||
Some(ctx) => {
|
||||
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Cplane);
|
||||
let rsp = self.endpoint.execute(request).await;
|
||||
drop(pause);
|
||||
rsp?
|
||||
}
|
||||
None => self.endpoint.execute(request).await?,
|
||||
};
|
||||
|
||||
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Cplane);
|
||||
let response = self.endpoint.execute(request).await?;
|
||||
drop(pause);
|
||||
info!(duration = ?start.elapsed(), "received http response");
|
||||
let body = match parse_body::<GetEndpointAccessControl>(response).await {
|
||||
Ok(body) => body,
|
||||
|
||||
@@ -273,20 +273,23 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
// spawn a task to cancel the session, but don't wait for it
|
||||
cancellations.spawn({
|
||||
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
|
||||
let ctx = ctx.clone();
|
||||
let cancel_span = tracing::span!(parent: None, tracing::Level::INFO, "cancel_session", session_id = ?ctx.session_id());
|
||||
let session_id = ctx.session_id();
|
||||
let peer_ip = ctx.peer_addr();
|
||||
let cancel_span = tracing::span!(parent: None, tracing::Level::INFO, "cancel_session", session_id = ?session_id);
|
||||
cancel_span.follows_from(tracing::Span::current());
|
||||
async move {
|
||||
cancellation_handler_clone
|
||||
.cancel_session_auth(
|
||||
cancel_key_data,
|
||||
ctx,
|
||||
config.authentication_config.ip_allowlist_check_enabled,
|
||||
auth_backend,
|
||||
)
|
||||
.await
|
||||
.inspect_err(|e | debug!(error = ?e, "cancel_session failed")).ok();
|
||||
}.instrument(cancel_span)
|
||||
drop(
|
||||
cancellation_handler_clone
|
||||
.cancel_session(
|
||||
cancel_key_data,
|
||||
session_id,
|
||||
peer_ip,
|
||||
config.authentication_config.ip_allowlist_check_enabled,
|
||||
)
|
||||
.instrument(cancel_span)
|
||||
.await,
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
return Ok(None);
|
||||
@@ -312,7 +315,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
};
|
||||
|
||||
let user = user_info.get_user().to_owned();
|
||||
let (user_info, ip_allowlist) = match user_info
|
||||
let user_info = match user_info
|
||||
.authenticate(
|
||||
ctx,
|
||||
&mut stream,
|
||||
@@ -353,8 +356,6 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
.or_else(|e| stream.throw_error(e))
|
||||
.await?;
|
||||
|
||||
node.cancel_closure
|
||||
.set_ip_allowlist(ip_allowlist.unwrap_or_default());
|
||||
let session = cancellation_handler.get_session();
|
||||
prepare_client_connection(&node, &session, &mut stream).await?;
|
||||
|
||||
|
||||
@@ -37,6 +37,7 @@ struct NotificationHeader<'a> {
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
#[serde(tag = "topic", content = "data")]
|
||||
// Message to contributors: Make sure to align these topic names with the list below.
|
||||
pub(crate) enum Notification {
|
||||
#[serde(
|
||||
rename = "/allowed_ips_updated",
|
||||
@@ -73,9 +74,21 @@ pub(crate) enum Notification {
|
||||
PasswordUpdate { password_update: PasswordUpdate },
|
||||
#[serde(rename = "/cancel_session")]
|
||||
Cancel(CancelSession),
|
||||
}
|
||||
|
||||
#[serde(other, skip_serializing)]
|
||||
UnknownTopic,
|
||||
/// Returns true if the topic name given is a known topic that we can deserialize and action on.
|
||||
/// Returns false otherwise.
|
||||
fn known_topic(s: &str) -> bool {
|
||||
// Message to contributors: Make sure to align these topic names with the enum above.
|
||||
matches!(
|
||||
s,
|
||||
"/allowed_ips_updated"
|
||||
| "/block_public_or_vpc_access_updated"
|
||||
| "/allowed_vpc_endpoints_updated_for_org"
|
||||
| "/allowed_vpc_endpoints_updated_for_projects"
|
||||
| "/password_updated"
|
||||
| "/cancel_session"
|
||||
)
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
@@ -165,29 +178,32 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
|
||||
let payload: String = msg.get_payload()?;
|
||||
tracing::debug!(?payload, "received a message payload");
|
||||
|
||||
let msg: Notification = match serde_json::from_str(&payload) {
|
||||
Ok(Notification::UnknownTopic) => {
|
||||
match serde_json::from_str::<NotificationHeader>(&payload) {
|
||||
// don't update the metric for redis errors if it's just a topic we don't know about.
|
||||
Ok(header) => tracing::warn!(topic = header.topic, "unknown topic"),
|
||||
Err(e) => {
|
||||
Metrics::get().proxy.redis_errors_total.inc(RedisErrors {
|
||||
channel: msg.get_channel_name(),
|
||||
});
|
||||
tracing::error!("broken message: {e}");
|
||||
}
|
||||
};
|
||||
return Ok(());
|
||||
}
|
||||
// For better error handling, we first parse the payload to extract the topic.
|
||||
// If there's a topic we don't support, we can handle that error more gracefully.
|
||||
let header: NotificationHeader = match serde_json::from_str(&payload) {
|
||||
Ok(msg) => msg,
|
||||
Err(e) => {
|
||||
Metrics::get().proxy.redis_errors_total.inc(RedisErrors {
|
||||
channel: msg.get_channel_name(),
|
||||
});
|
||||
match serde_json::from_str::<NotificationHeader>(&payload) {
|
||||
Ok(header) => tracing::error!(topic = header.topic, "broken message: {e}"),
|
||||
Err(_) => tracing::error!("broken message: {e}"),
|
||||
};
|
||||
tracing::error!("broken message: {e}");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
if !known_topic(header.topic) {
|
||||
// don't update the metric for redis errors if it's just a topic we don't know about.
|
||||
tracing::warn!(topic = header.topic, "unknown topic");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let msg: Notification = match serde_json::from_str(&payload) {
|
||||
Ok(msg) => msg,
|
||||
Err(e) => {
|
||||
Metrics::get().proxy.redis_errors_total.inc(RedisErrors {
|
||||
channel: msg.get_channel_name(),
|
||||
});
|
||||
tracing::error!(topic = header.topic, "broken message: {e}");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
@@ -262,8 +278,6 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
|
||||
invalidate_cache(cache, msg);
|
||||
});
|
||||
}
|
||||
|
||||
Notification::UnknownTopic => unreachable!(),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -290,7 +304,6 @@ fn invalidate_cache<C: ProjectInfoCache>(cache: Arc<C>, msg: Notification) {
|
||||
Notification::AllowedVpcEndpointsUpdatedForProjects { .. } => {
|
||||
// https://github.com/neondatabase/neon/pull/10073
|
||||
}
|
||||
Notification::UnknownTopic => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -51,10 +51,12 @@ use utils::{
|
||||
#[global_allocator]
|
||||
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
|
||||
|
||||
/// Configure jemalloc to sample allocations for profiles every 1 MB (1 << 20).
|
||||
#[allow(non_upper_case_globals)]
|
||||
#[export_name = "malloc_conf"]
|
||||
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:20\0";
|
||||
// Configure jemalloc to sample allocations for profiles every 1 MB (1 << 20).
|
||||
// TODO: disabled because concurrent CPU profiles cause seg faults. See:
|
||||
// https://github.com/neondatabase/neon/issues/10225.
|
||||
//#[allow(non_upper_case_globals)]
|
||||
//#[export_name = "malloc_conf"]
|
||||
//pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:20\0";
|
||||
|
||||
const PID_FILE_NAME: &str = "safekeeper.pid";
|
||||
const ID_FILE_NAME: &str = "safekeeper.id";
|
||||
@@ -254,7 +256,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
// 1. init logging
|
||||
// 2. tracing panic hook
|
||||
// 3. sentry
|
||||
logging::init(
|
||||
let _guard = logging::init(
|
||||
LogFormat::from_config(&args.log_format)?,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
logging::Output::Stdout,
|
||||
|
||||
@@ -636,7 +636,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
// 1. init logging
|
||||
// 2. tracing panic hook
|
||||
// 3. sentry
|
||||
logging::init(
|
||||
let _guard = logging::init(
|
||||
LogFormat::from_config(&args.log_format)?,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
logging::Output::Stdout,
|
||||
|
||||
@@ -112,14 +112,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn try_exclusive(&self, key: T, operation: I) -> Option<TracingExclusiveGuard<I>> {
|
||||
let mut locked = self.entities.lock().unwrap();
|
||||
let entry = locked.entry(key).or_default().clone();
|
||||
let mut guard = TracingExclusiveGuard::new(entry.try_write_owned().ok()?);
|
||||
*guard.guard = Some(operation);
|
||||
Some(guard)
|
||||
}
|
||||
|
||||
/// Rather than building a lock guard that re-takes the [`Self::entities`] lock, we just do
|
||||
/// periodic housekeeping to avoid the map growing indefinitely
|
||||
pub(crate) fn housekeeping(&self) {
|
||||
|
||||
@@ -188,7 +188,7 @@ impl Secrets {
|
||||
}
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
logging::init(
|
||||
let _guard = logging::init(
|
||||
LogFormat::Plain,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
logging::Output::Stdout,
|
||||
@@ -221,6 +221,12 @@ fn main() -> anyhow::Result<()> {
|
||||
async fn async_main() -> anyhow::Result<()> {
|
||||
let launch_ts = Box::leak(Box::new(LaunchTimestamp::generate()));
|
||||
|
||||
let _guard = logging::init(
|
||||
LogFormat::Plain,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
logging::Output::Stdout,
|
||||
)?;
|
||||
|
||||
preinitialize_metrics();
|
||||
|
||||
let args = Cli::parse();
|
||||
|
||||
@@ -97,7 +97,6 @@ pub(crate) enum DatabaseOperation {
|
||||
TenantGenerations,
|
||||
ShardGenerations,
|
||||
ListTenantShards,
|
||||
LoadTenant,
|
||||
InsertTenantShards,
|
||||
UpdateTenantShard,
|
||||
DeleteTenant,
|
||||
@@ -331,40 +330,11 @@ impl Persistence {
|
||||
|
||||
/// At startup, load the high level state for shards, such as their config + policy. This will
|
||||
/// be enriched at runtime with state discovered on pageservers.
|
||||
///
|
||||
/// We exclude shards configured to be detached. During startup, if we see any attached locations
|
||||
/// for such shards, they will automatically be detached as 'orphans'.
|
||||
pub(crate) async fn load_active_tenant_shards(
|
||||
&self,
|
||||
) -> DatabaseResult<Vec<TenantShardPersistence>> {
|
||||
use crate::schema::tenant_shards::dsl::*;
|
||||
pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult<Vec<TenantShardPersistence>> {
|
||||
self.with_measured_conn(
|
||||
DatabaseOperation::ListTenantShards,
|
||||
move |conn| -> DatabaseResult<_> {
|
||||
let query = tenant_shards.filter(
|
||||
placement_policy.ne(serde_json::to_string(&PlacementPolicy::Detached).unwrap()),
|
||||
);
|
||||
let result = query.load::<TenantShardPersistence>(conn)?;
|
||||
|
||||
Ok(result)
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// When restoring a previously detached tenant into memory, load it from the database
|
||||
pub(crate) async fn load_tenant(
|
||||
&self,
|
||||
filter_tenant_id: TenantId,
|
||||
) -> DatabaseResult<Vec<TenantShardPersistence>> {
|
||||
use crate::schema::tenant_shards::dsl::*;
|
||||
self.with_measured_conn(
|
||||
DatabaseOperation::LoadTenant,
|
||||
move |conn| -> DatabaseResult<_> {
|
||||
let query = tenant_shards.filter(tenant_id.eq(filter_tenant_id.to_string()));
|
||||
let result = query.load::<TenantShardPersistence>(conn)?;
|
||||
|
||||
Ok(result)
|
||||
Ok(crate::schema::tenant_shards::table.load::<TenantShardPersistence>(conn)?)
|
||||
},
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -14,6 +14,7 @@ use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::backoff::exponential_backoff;
|
||||
use utils::failpoint_support;
|
||||
use utils::generation::Generation;
|
||||
use utils::id::{NodeId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
@@ -211,12 +212,11 @@ impl Reconciler {
|
||||
lazy: bool,
|
||||
) -> Result<(), ReconcileError> {
|
||||
if !node.is_available() && config.mode == LocationConfigMode::Detached {
|
||||
// [`crate::service::Service::node_activate_reconcile`] will update the observed state
|
||||
// when the node comes back online. At that point, the intent and observed states will
|
||||
// be mismatched and a background reconciliation will detach.
|
||||
tracing::info!(
|
||||
"Node {node} is unavailable during detach: proceeding anyway, it will be detached via background reconciliation"
|
||||
);
|
||||
// Attempts to detach from offline nodes may be imitated without doing I/O: a node which is offline
|
||||
// will get fully reconciled wrt the shard's intent state when it is reactivated, irrespective of
|
||||
// what we put into `observed`, in [`crate::service::Service::node_activate_reconcile`]
|
||||
tracing::info!("Node {node} is unavailable during detach: proceeding anyway, it will be detached on next activation");
|
||||
self.observed.locations.remove(&node.get_id());
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -749,8 +749,6 @@ impl Reconciler {
|
||||
};
|
||||
|
||||
if increment_generation {
|
||||
pausable_failpoint!("reconciler-pre-increment-generation");
|
||||
|
||||
let generation = self
|
||||
.persistence
|
||||
.increment_generation(self.tenant_shard_id, node.get_id())
|
||||
@@ -826,7 +824,7 @@ impl Reconciler {
|
||||
.handle_detach(self.tenant_shard_id, self.shard.stripe_size);
|
||||
}
|
||||
|
||||
pausable_failpoint!("reconciler-epilogue");
|
||||
failpoint_support::sleep_millis_async!("sleep-on-reconcile-epilogue");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -83,7 +83,6 @@ use utils::{
|
||||
generation::Generation,
|
||||
http::error::ApiError,
|
||||
id::{NodeId, TenantId, TimelineId},
|
||||
pausable_failpoint,
|
||||
sync::gate::Gate,
|
||||
};
|
||||
|
||||
@@ -155,7 +154,6 @@ enum TenantOperations {
|
||||
TimelineArchivalConfig,
|
||||
TimelineDetachAncestor,
|
||||
TimelineGcBlockUnblock,
|
||||
DropDetached,
|
||||
}
|
||||
|
||||
#[derive(Clone, strum_macros::Display)]
|
||||
@@ -417,8 +415,8 @@ pub struct Service {
|
||||
/// Queue of tenants who are waiting for concurrency limits to permit them to reconcile
|
||||
/// Send into this queue to promptly attempt to reconcile this shard next time units are available.
|
||||
///
|
||||
/// Note that this state logically lives inside ServiceState, but carrying Sender here makes the code simpler
|
||||
/// by avoiding needing a &mut ref to something inside the ServiceState. This could be optimized to
|
||||
/// Note that this state logically lives inside ServiceInner, but carrying Sender here makes the code simpler
|
||||
/// by avoiding needing a &mut ref to something inside the ServiceInner. This could be optimized to
|
||||
/// use a VecDeque instead of a channel to reduce synchronization overhead, at the cost of some code complexity.
|
||||
delayed_reconcile_tx: tokio::sync::mpsc::Sender<TenantShardId>,
|
||||
|
||||
@@ -1026,8 +1024,6 @@ impl Service {
|
||||
)
|
||||
.await;
|
||||
|
||||
pausable_failpoint!("heartbeat-pre-node-state-configure");
|
||||
|
||||
// This is the code path for geniune availability transitions (i.e node
|
||||
// goes unavailable and/or comes back online).
|
||||
let res = self
|
||||
@@ -1166,20 +1162,6 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
// If we just finished detaching all shards for a tenant, it might be time to drop it from memory.
|
||||
if tenant.policy == PlacementPolicy::Detached {
|
||||
// We may only drop a tenant from memory while holding the exclusive lock on the tenant ID: this protects us
|
||||
// from concurrent execution wrt a request handler that might expect the tenant to remain in memory for the
|
||||
// duration of the request.
|
||||
let guard = self.tenant_op_locks.try_exclusive(
|
||||
tenant.tenant_shard_id.tenant_id,
|
||||
TenantOperations::DropDetached,
|
||||
);
|
||||
if let Some(guard) = guard {
|
||||
self.maybe_drop_tenant(tenant.tenant_shard_id.tenant_id, &mut locked, &guard);
|
||||
}
|
||||
}
|
||||
|
||||
// Maybe some other work can proceed now that this job finished.
|
||||
if self.reconciler_concurrency.available_permits() > 0 {
|
||||
while let Ok(tenant_shard_id) = locked.delayed_reconcile_rx.try_recv() {
|
||||
@@ -1309,7 +1291,7 @@ impl Service {
|
||||
.set(nodes.len() as i64);
|
||||
|
||||
tracing::info!("Loading shards from database...");
|
||||
let mut tenant_shard_persistence = persistence.load_active_tenant_shards().await?;
|
||||
let mut tenant_shard_persistence = persistence.list_tenant_shards().await?;
|
||||
tracing::info!(
|
||||
"Loaded {} shards from database.",
|
||||
tenant_shard_persistence.len()
|
||||
@@ -1561,14 +1543,8 @@ impl Service {
|
||||
// the pageserver API (not via this service), we will auto-create any missing tenant
|
||||
// shards with default state.
|
||||
let insert = {
|
||||
match self
|
||||
.maybe_load_tenant(attach_req.tenant_shard_id.tenant_id, &_tenant_lock)
|
||||
.await
|
||||
{
|
||||
Ok(_) => false,
|
||||
Err(ApiError::NotFound(_)) => true,
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
let locked = self.inner.write().unwrap();
|
||||
!locked.tenants.contains_key(&attach_req.tenant_shard_id)
|
||||
};
|
||||
|
||||
if insert {
|
||||
@@ -2460,99 +2436,6 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
/// For APIs that might act on tenants with [`PlacementPolicy::Detached`], first check if
|
||||
/// the tenant is present in memory. If not, load it from the database. If it is found
|
||||
/// in neither location, return a NotFound error.
|
||||
///
|
||||
/// Caller must demonstrate they hold a lock guard, as otherwise two callers might try and load
|
||||
/// it at the same time, or we might race with [`Self::maybe_drop_tenant`]
|
||||
async fn maybe_load_tenant(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
_guard: &TracingExclusiveGuard<TenantOperations>,
|
||||
) -> Result<(), ApiError> {
|
||||
let present_in_memory = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
locked
|
||||
.tenants
|
||||
.range(TenantShardId::tenant_range(tenant_id))
|
||||
.next()
|
||||
.is_some()
|
||||
};
|
||||
|
||||
if present_in_memory {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let tenant_shards = self.persistence.load_tenant(tenant_id).await?;
|
||||
if tenant_shards.is_empty() {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant {} not found", tenant_id).into(),
|
||||
));
|
||||
}
|
||||
|
||||
// TODO: choose a fresh AZ to use for this tenant when un-detaching: there definitely isn't a running
|
||||
// compute, so no benefit to making AZ sticky across detaches.
|
||||
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
tracing::info!(
|
||||
"Loaded {} shards for tenant {}",
|
||||
tenant_shards.len(),
|
||||
tenant_id
|
||||
);
|
||||
|
||||
locked.tenants.extend(tenant_shards.into_iter().map(|p| {
|
||||
let intent = IntentState::new();
|
||||
let shard =
|
||||
TenantShard::from_persistent(p, intent).expect("Corrupt shard row in database");
|
||||
|
||||
// Sanity check: when loading on-demand, we should always be loaded something Detached
|
||||
debug_assert!(shard.policy == PlacementPolicy::Detached);
|
||||
if shard.policy != PlacementPolicy::Detached {
|
||||
tracing::error!(
|
||||
"Tenant shard {} loaded on-demand, but has non-Detached policy {:?}",
|
||||
shard.tenant_shard_id,
|
||||
shard.policy
|
||||
);
|
||||
}
|
||||
|
||||
(shard.tenant_shard_id, shard)
|
||||
}));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// If all shards for a tenant are detached, and in a fully quiescent state (no observed locations on pageservers),
|
||||
/// and have no reconciler running, then we can drop the tenant from memory. It will be reloaded on-demand
|
||||
/// if we are asked to attach it again (see [`Self::maybe_load_tenant`]).
|
||||
///
|
||||
/// Caller must demonstrate they hold a lock guard, as otherwise it is unsafe to drop a tenant from
|
||||
/// memory while some other function might assume it continues to exist while not holding the lock on Self::inner.
|
||||
fn maybe_drop_tenant(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
locked: &mut std::sync::RwLockWriteGuard<ServiceState>,
|
||||
_guard: &TracingExclusiveGuard<TenantOperations>,
|
||||
) {
|
||||
let mut tenant_shards = locked.tenants.range(TenantShardId::tenant_range(tenant_id));
|
||||
if tenant_shards.all(|(_id, shard)| {
|
||||
shard.policy == PlacementPolicy::Detached
|
||||
&& shard.reconciler.is_none()
|
||||
&& shard.observed.is_empty()
|
||||
}) {
|
||||
let keys = locked
|
||||
.tenants
|
||||
.range(TenantShardId::tenant_range(tenant_id))
|
||||
.map(|(id, _)| id)
|
||||
.copied()
|
||||
.collect::<Vec<_>>();
|
||||
for key in keys {
|
||||
tracing::info!("Dropping detached tenant shard {} from memory", key);
|
||||
locked.tenants.remove(&key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// This API is used by the cloud control plane to migrate unsharded tenants that it created
|
||||
/// directly with pageservers into this service.
|
||||
///
|
||||
@@ -2579,26 +2462,14 @@ impl Service {
|
||||
)
|
||||
.await;
|
||||
|
||||
let tenant_id = if !tenant_shard_id.is_unsharded() {
|
||||
if !tenant_shard_id.is_unsharded() {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"This API is for importing single-sharded or unsharded tenants"
|
||||
)));
|
||||
} else {
|
||||
tenant_shard_id.tenant_id
|
||||
};
|
||||
|
||||
// In case we are waking up a Detached tenant
|
||||
match self.maybe_load_tenant(tenant_id, &_tenant_lock).await {
|
||||
Ok(()) | Err(ApiError::NotFound(_)) => {
|
||||
// This is a creation or an update
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// First check if this is a creation or an update
|
||||
let create_or_update = self.tenant_location_config_prepare(tenant_id, req);
|
||||
let create_or_update = self.tenant_location_config_prepare(tenant_shard_id.tenant_id, req);
|
||||
|
||||
let mut result = TenantLocationConfigResponse {
|
||||
shards: Vec::new(),
|
||||
@@ -2621,7 +2492,6 @@ impl Service {
|
||||
// Persist updates
|
||||
// Ordering: write to the database before applying changes in-memory, so that
|
||||
// we will not appear time-travel backwards on a restart.
|
||||
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
for ShardUpdate {
|
||||
tenant_shard_id,
|
||||
@@ -2726,8 +2596,6 @@ impl Service {
|
||||
let tenant_id = req.tenant_id;
|
||||
let patch = req.config;
|
||||
|
||||
self.maybe_load_tenant(tenant_id, &_tenant_lock).await?;
|
||||
|
||||
let base = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let shards = locked
|
||||
@@ -2772,7 +2640,19 @@ impl Service {
|
||||
)
|
||||
.await;
|
||||
|
||||
self.maybe_load_tenant(req.tenant_id, &_tenant_lock).await?;
|
||||
let tenant_exists = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let mut r = locked
|
||||
.tenants
|
||||
.range(TenantShardId::tenant_range(req.tenant_id));
|
||||
r.next().is_some()
|
||||
};
|
||||
|
||||
if !tenant_exists {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant {} not found", req.tenant_id).into(),
|
||||
));
|
||||
}
|
||||
|
||||
self.set_tenant_config_and_reconcile(req.tenant_id, req.config)
|
||||
.await
|
||||
@@ -3065,8 +2945,6 @@ impl Service {
|
||||
let _tenant_lock =
|
||||
trace_exclusive_lock(&self.tenant_op_locks, tenant_id, TenantOperations::Delete).await;
|
||||
|
||||
self.maybe_load_tenant(tenant_id, &_tenant_lock).await?;
|
||||
|
||||
// Detach all shards. This also deletes local pageserver shard data.
|
||||
let (detach_waiters, node) = {
|
||||
let mut detach_waiters = Vec::new();
|
||||
@@ -3186,8 +3064,6 @@ impl Service {
|
||||
)
|
||||
.await;
|
||||
|
||||
self.maybe_load_tenant(tenant_id, &_tenant_lock).await?;
|
||||
|
||||
failpoint_support::sleep_millis_async!("tenant-update-policy-exclusive-lock");
|
||||
|
||||
let TenantPolicyRequest {
|
||||
@@ -5270,13 +5146,11 @@ impl Service {
|
||||
)));
|
||||
}
|
||||
|
||||
let mut persistent_shards = self.persistence.load_active_tenant_shards().await?;
|
||||
persistent_shards
|
||||
.sort_by_key(|tsp| (tsp.tenant_id.clone(), tsp.shard_number, tsp.shard_count));
|
||||
|
||||
let mut shards = self.persistence.list_tenant_shards().await?;
|
||||
shards.sort_by_key(|tsp| (tsp.tenant_id.clone(), tsp.shard_number, tsp.shard_count));
|
||||
expect_shards.sort_by_key(|tsp| (tsp.tenant_id.clone(), tsp.shard_number, tsp.shard_count));
|
||||
|
||||
if persistent_shards != expect_shards {
|
||||
if shards != expect_shards {
|
||||
tracing::error!("Consistency check failed on shards.");
|
||||
tracing::error!(
|
||||
"Shards in memory: {}",
|
||||
@@ -5285,7 +5159,7 @@ impl Service {
|
||||
);
|
||||
tracing::error!(
|
||||
"Shards in database: {}",
|
||||
serde_json::to_string(&persistent_shards)
|
||||
serde_json::to_string(&shards)
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?
|
||||
);
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
@@ -6241,10 +6115,6 @@ impl Service {
|
||||
let mut pending_reconciles = 0;
|
||||
let mut az_violations = 0;
|
||||
|
||||
// If we find any tenants to drop from memory, stash them to offload after
|
||||
// we're done traversing the map of tenants.
|
||||
let mut drop_detached_tenants = Vec::new();
|
||||
|
||||
let mut reconciles_spawned = 0;
|
||||
for shard in tenants.values_mut() {
|
||||
// Accumulate scheduling statistics
|
||||
@@ -6278,25 +6148,6 @@ impl Service {
|
||||
// Shard wanted to reconcile but for some reason couldn't.
|
||||
pending_reconciles += 1;
|
||||
}
|
||||
|
||||
// If this tenant is detached, try dropping it from memory. This is usually done
|
||||
// proactively in [`Self::process_results`], but we do it here to handle the edge
|
||||
// case where a reconcile completes while someone else is holding an op lock for the tenant.
|
||||
if shard.tenant_shard_id.shard_number == ShardNumber(0)
|
||||
&& shard.policy == PlacementPolicy::Detached
|
||||
{
|
||||
if let Some(guard) = self.tenant_op_locks.try_exclusive(
|
||||
shard.tenant_shard_id.tenant_id,
|
||||
TenantOperations::DropDetached,
|
||||
) {
|
||||
drop_detached_tenants.push((shard.tenant_shard_id.tenant_id, guard));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Process any deferred tenant drops
|
||||
for (tenant_id, guard) in drop_detached_tenants {
|
||||
self.maybe_drop_tenant(tenant_id, &mut locked, &guard);
|
||||
}
|
||||
|
||||
metrics::METRICS_REGISTRY
|
||||
|
||||
@@ -465,10 +465,6 @@ impl ObservedState {
|
||||
locations: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn is_empty(&self) -> bool {
|
||||
self.locations.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
impl TenantShard {
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::time::SystemTime;
|
||||
|
||||
use itertools::Itertools;
|
||||
use pageserver::tenant::checks::check_valid_layermap;
|
||||
@@ -89,14 +88,9 @@ pub(crate) async fn branch_cleanup_and_check_errors(
|
||||
match s3_data.blob_data {
|
||||
BlobDataParseResult::Parsed {
|
||||
index_part,
|
||||
index_part_generation: _,
|
||||
s3_layers: _,
|
||||
index_part_last_modified_time,
|
||||
index_part_snapshot_time,
|
||||
index_part_generation: _index_part_generation,
|
||||
s3_layers: _s3_layers,
|
||||
} => {
|
||||
// Ignore missing file error if index_part downloaded is different from the one when listing the layer files.
|
||||
let ignore_error = index_part_snapshot_time < index_part_last_modified_time
|
||||
&& !cfg!(debug_assertions);
|
||||
if !IndexPart::KNOWN_VERSIONS.contains(&index_part.version()) {
|
||||
result
|
||||
.errors
|
||||
@@ -177,7 +171,7 @@ pub(crate) async fn branch_cleanup_and_check_errors(
|
||||
is_l0,
|
||||
);
|
||||
|
||||
if is_l0 || ignore_error {
|
||||
if is_l0 {
|
||||
result.warnings.push(msg);
|
||||
} else {
|
||||
result.errors.push(msg);
|
||||
@@ -314,8 +308,6 @@ pub(crate) enum BlobDataParseResult {
|
||||
Parsed {
|
||||
index_part: Box<IndexPart>,
|
||||
index_part_generation: Generation,
|
||||
index_part_last_modified_time: SystemTime,
|
||||
index_part_snapshot_time: SystemTime,
|
||||
s3_layers: HashSet<(LayerName, Generation)>,
|
||||
},
|
||||
/// The remains of an uncleanly deleted Timeline or aborted timeline creation(e.g. an initdb archive only, or some layer without an index)
|
||||
@@ -492,9 +484,9 @@ async fn list_timeline_blobs_impl(
|
||||
}
|
||||
|
||||
if let Some(index_part_object_key) = index_part_object.as_ref() {
|
||||
let (index_part_bytes, index_part_last_modified_time) =
|
||||
let index_part_bytes =
|
||||
match download_object_with_retries(remote_client, &index_part_object_key.key).await {
|
||||
Ok(data) => data,
|
||||
Ok(index_part_bytes) => index_part_bytes,
|
||||
Err(e) => {
|
||||
// It is possible that the branch gets deleted in-between we list the objects
|
||||
// and we download the index part file.
|
||||
@@ -508,7 +500,7 @@ async fn list_timeline_blobs_impl(
|
||||
));
|
||||
}
|
||||
};
|
||||
let index_part_snapshot_time = index_part_object_key.last_modified;
|
||||
|
||||
match serde_json::from_slice(&index_part_bytes) {
|
||||
Ok(index_part) => {
|
||||
return Ok(ListTimelineBlobsResult::Ready(RemoteTimelineBlobData {
|
||||
@@ -516,8 +508,6 @@ async fn list_timeline_blobs_impl(
|
||||
index_part: Box::new(index_part),
|
||||
index_part_generation,
|
||||
s3_layers,
|
||||
index_part_last_modified_time,
|
||||
index_part_snapshot_time,
|
||||
},
|
||||
unused_index_keys: index_part_keys,
|
||||
unknown_keys,
|
||||
@@ -635,7 +625,7 @@ pub(crate) async fn list_tenant_manifests(
|
||||
|
||||
let manifest_bytes =
|
||||
match download_object_with_retries(remote_client, &latest_listing_object.key).await {
|
||||
Ok((bytes, _)) => bytes,
|
||||
Ok(bytes) => bytes,
|
||||
Err(e) => {
|
||||
// It is possible that the tenant gets deleted in-between we list the objects
|
||||
// and we download the manifest file.
|
||||
|
||||
@@ -13,7 +13,7 @@ pub mod tenant_snapshot;
|
||||
use std::env;
|
||||
use std::fmt::Display;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime};
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use aws_config::retry::{RetryConfigBuilder, RetryMode};
|
||||
@@ -509,11 +509,10 @@ async fn list_objects_with_retries(
|
||||
panic!("MAX_RETRIES is not allowed to be 0");
|
||||
}
|
||||
|
||||
/// Returns content, last modified time
|
||||
async fn download_object_with_retries(
|
||||
remote_client: &GenericRemoteStorage,
|
||||
key: &RemotePath,
|
||||
) -> anyhow::Result<(Vec<u8>, SystemTime)> {
|
||||
) -> anyhow::Result<Vec<u8>> {
|
||||
let cancel = CancellationToken::new();
|
||||
for trial in 0..MAX_RETRIES {
|
||||
let mut buf = Vec::new();
|
||||
@@ -536,7 +535,7 @@ async fn download_object_with_retries(
|
||||
{
|
||||
Ok(bytes_read) => {
|
||||
tracing::debug!("Downloaded {bytes_read} bytes for object {key}");
|
||||
return Ok((buf, download.last_modified));
|
||||
return Ok(buf);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to stream object body for key {key}: {e}");
|
||||
|
||||
@@ -450,8 +450,6 @@ async fn gc_ancestor(
|
||||
index_part: _,
|
||||
index_part_generation: _,
|
||||
s3_layers,
|
||||
index_part_last_modified_time: _,
|
||||
index_part_snapshot_time: _,
|
||||
} => s3_layers,
|
||||
BlobDataParseResult::Relic => {
|
||||
// Post-deletion tenant location: don't try and GC it.
|
||||
@@ -588,9 +586,7 @@ async fn gc_timeline(
|
||||
BlobDataParseResult::Parsed {
|
||||
index_part,
|
||||
index_part_generation,
|
||||
s3_layers: _,
|
||||
index_part_last_modified_time: _,
|
||||
index_part_snapshot_time: _,
|
||||
s3_layers: _s3_layers,
|
||||
} => (index_part, *index_part_generation, data.unused_index_keys),
|
||||
BlobDataParseResult::Relic => {
|
||||
// Post-deletion tenant location: don't try and GC it.
|
||||
|
||||
@@ -47,8 +47,6 @@ impl MetadataSummary {
|
||||
index_part,
|
||||
index_part_generation: _,
|
||||
s3_layers: _,
|
||||
index_part_last_modified_time: _,
|
||||
index_part_snapshot_time: _,
|
||||
} = &data.blob_data
|
||||
{
|
||||
*self
|
||||
@@ -197,9 +195,7 @@ pub async fn scan_pageserver_metadata(
|
||||
if let BlobDataParseResult::Parsed {
|
||||
index_part,
|
||||
index_part_generation,
|
||||
s3_layers: _,
|
||||
index_part_last_modified_time: _,
|
||||
index_part_snapshot_time: _,
|
||||
s3_layers: _s3_layers,
|
||||
} = &data.blob_data
|
||||
{
|
||||
if index_part.deleted_at.is_some() {
|
||||
@@ -322,11 +318,9 @@ pub async fn scan_pageserver_metadata(
|
||||
|
||||
match &data.blob_data {
|
||||
BlobDataParseResult::Parsed {
|
||||
index_part: _,
|
||||
index_part: _index_part,
|
||||
index_part_generation: _index_part_generation,
|
||||
s3_layers,
|
||||
index_part_last_modified_time: _,
|
||||
index_part_snapshot_time: _,
|
||||
} => {
|
||||
tenant_objects.push(ttid, s3_layers.clone());
|
||||
}
|
||||
|
||||
@@ -268,8 +268,6 @@ impl SnapshotDownloader {
|
||||
index_part,
|
||||
index_part_generation,
|
||||
s3_layers: _,
|
||||
index_part_last_modified_time: _,
|
||||
index_part_snapshot_time: _,
|
||||
} => {
|
||||
self.download_timeline(
|
||||
ttid,
|
||||
|
||||
@@ -2521,7 +2521,6 @@ class NeonPageserver(PgProtocol, LogUtils):
|
||||
self,
|
||||
extra_env_vars: dict[str, str] | None = None,
|
||||
timeout_in_seconds: int | None = None,
|
||||
await_active: bool = True,
|
||||
) -> Self:
|
||||
"""
|
||||
Start the page server.
|
||||
@@ -2548,10 +2547,8 @@ class NeonPageserver(PgProtocol, LogUtils):
|
||||
)
|
||||
self.running = True
|
||||
|
||||
if (
|
||||
await_active
|
||||
and self.env.storage_controller.running
|
||||
and self.env.storage_controller.node_registered(self.id)
|
||||
if self.env.storage_controller.running and self.env.storage_controller.node_registered(
|
||||
self.id
|
||||
):
|
||||
self.env.storage_controller.poll_node_status(
|
||||
self.id, PageserverAvailability.ACTIVE, None, max_attempts=200, backoff=0.1
|
||||
@@ -4933,30 +4930,13 @@ def check_restored_datadir_content(
|
||||
assert (mismatch, error) == ([], [])
|
||||
|
||||
|
||||
def logical_replication_sync(
|
||||
subscriber: PgProtocol,
|
||||
publisher: PgProtocol,
|
||||
sub_dbname: str | None = None,
|
||||
pub_dbname: str | None = None,
|
||||
) -> Lsn:
|
||||
def logical_replication_sync(subscriber: PgProtocol, publisher: PgProtocol) -> Lsn:
|
||||
"""Wait logical replication subscriber to sync with publisher."""
|
||||
if pub_dbname is not None:
|
||||
publisher_lsn = Lsn(
|
||||
publisher.safe_psql("SELECT pg_current_wal_flush_lsn()", dbname=pub_dbname)[0][0]
|
||||
)
|
||||
else:
|
||||
publisher_lsn = Lsn(publisher.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
|
||||
publisher_lsn = Lsn(publisher.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
while True:
|
||||
if sub_dbname is not None:
|
||||
res = subscriber.safe_psql(
|
||||
"select latest_end_lsn from pg_catalog.pg_stat_subscription", dbname=sub_dbname
|
||||
)[0][0]
|
||||
else:
|
||||
res = subscriber.safe_psql(
|
||||
"select latest_end_lsn from pg_catalog.pg_stat_subscription"
|
||||
)[0][0]
|
||||
|
||||
res = subscriber.safe_psql("select latest_end_lsn from pg_catalog.pg_stat_subscription")[0][
|
||||
0
|
||||
]
|
||||
if res:
|
||||
log.info(f"subscriber_lsn={res}")
|
||||
subscriber_lsn = Lsn(res)
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
import requests
|
||||
from fixtures.neon_fixtures import NeonEnv, logical_replication_sync
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
TEST_DB_NAMES = [
|
||||
{
|
||||
@@ -138,115 +136,3 @@ def test_compute_create_databases(neon_simple_env: NeonEnv):
|
||||
assert curr_db is not None
|
||||
assert len(curr_db) == 1
|
||||
assert curr_db[0] == db["name"]
|
||||
|
||||
|
||||
def test_dropdb_with_subscription(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test that compute_ctl can drop a database that has a logical replication subscription.
|
||||
"""
|
||||
env = neon_simple_env
|
||||
|
||||
# Create and start endpoint so that neon_local put all the generated
|
||||
# stuff into the spec.json file.
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
TEST_DB_NAMES = [
|
||||
{
|
||||
"name": "neondb",
|
||||
"owner": "cloud_admin",
|
||||
},
|
||||
{
|
||||
"name": "subscriber_db",
|
||||
"owner": "cloud_admin",
|
||||
},
|
||||
{
|
||||
"name": "publisher_db",
|
||||
"owner": "cloud_admin",
|
||||
},
|
||||
]
|
||||
|
||||
# Update the spec.json file to create the databases
|
||||
# and reconfigure the endpoint to apply the changes.
|
||||
endpoint.respec_deep(
|
||||
**{
|
||||
"skip_pg_catalog_updates": False,
|
||||
"cluster": {
|
||||
"databases": TEST_DB_NAMES,
|
||||
},
|
||||
}
|
||||
)
|
||||
endpoint.reconfigure()
|
||||
|
||||
# connect to the publisher_db and create a publication
|
||||
with endpoint.cursor(dbname="publisher_db") as cursor:
|
||||
cursor.execute("CREATE PUBLICATION mypub FOR ALL TABLES")
|
||||
cursor.execute("select pg_catalog.pg_create_logical_replication_slot('mysub', 'pgoutput');")
|
||||
cursor.execute("CREATE TABLE t(a int)")
|
||||
cursor.execute("INSERT INTO t VALUES (1)")
|
||||
|
||||
# connect to the subscriber_db and create a subscription
|
||||
# Note that we need to create subscription with
|
||||
connstr = endpoint.connstr(dbname="publisher_db").replace("'", "''")
|
||||
with endpoint.cursor(dbname="subscriber_db") as cursor:
|
||||
cursor.execute("CREATE TABLE t(a int)")
|
||||
cursor.execute(
|
||||
f"CREATE SUBSCRIPTION mysub CONNECTION '{connstr}' PUBLICATION mypub WITH (create_slot = false) "
|
||||
)
|
||||
|
||||
# wait for the subscription to be active
|
||||
logical_replication_sync(
|
||||
endpoint, endpoint, sub_dbname="subscriber_db", pub_dbname="publisher_db"
|
||||
)
|
||||
|
||||
# Check that replication is working
|
||||
with endpoint.cursor(dbname="subscriber_db") as cursor:
|
||||
cursor.execute("SELECT * FROM t")
|
||||
rows = cursor.fetchall()
|
||||
assert len(rows) == 1
|
||||
assert rows[0][0] == 1
|
||||
|
||||
# drop the subscriber_db from the list
|
||||
TEST_DB_NAMES_NEW = [
|
||||
{
|
||||
"name": "neondb",
|
||||
"owner": "cloud_admin",
|
||||
},
|
||||
{
|
||||
"name": "publisher_db",
|
||||
"owner": "cloud_admin",
|
||||
},
|
||||
]
|
||||
# Update the spec.json file to drop the database
|
||||
# and reconfigure the endpoint to apply the changes.
|
||||
endpoint.respec_deep(
|
||||
**{
|
||||
"skip_pg_catalog_updates": False,
|
||||
"cluster": {
|
||||
"databases": TEST_DB_NAMES_NEW,
|
||||
},
|
||||
"delta_operations": [
|
||||
{"action": "delete_db", "name": "subscriber_db"},
|
||||
# also test the case when we try to delete a non-existent database
|
||||
# shouldn't happen in normal operation,
|
||||
# but can occur when failed operations are retried
|
||||
{"action": "delete_db", "name": "nonexistent_db"},
|
||||
],
|
||||
}
|
||||
)
|
||||
|
||||
logging.info("Reconfiguring the endpoint to drop the subscriber_db")
|
||||
endpoint.reconfigure()
|
||||
|
||||
# Check that the subscriber_db is dropped
|
||||
with endpoint.cursor() as cursor:
|
||||
cursor.execute("SELECT datname FROM pg_database WHERE datname = %s", ("subscriber_db",))
|
||||
catalog_db = cursor.fetchone()
|
||||
assert catalog_db is None
|
||||
|
||||
# Check that we can still connect to the publisher_db
|
||||
with endpoint.cursor(dbname="publisher_db") as cursor:
|
||||
cursor.execute("SELECT * FROM current_database()")
|
||||
curr_db = cursor.fetchone()
|
||||
assert curr_db is not None
|
||||
assert len(curr_db) == 1
|
||||
assert curr_db[0] == "publisher_db"
|
||||
|
||||
@@ -17,7 +17,6 @@ from fixtures.compute_reconfigure import ComputeReconfigure
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
DEFAULT_AZ_ID,
|
||||
LogCursor,
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
NeonPageserver,
|
||||
@@ -2407,14 +2406,7 @@ def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder):
|
||||
env.storage_controller.tenant_create(tid)
|
||||
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
env.storage_controller.configure_failpoints(("reconciler-epilogue", "pause"))
|
||||
|
||||
def unpause_failpoint():
|
||||
time.sleep(2)
|
||||
env.storage_controller.configure_failpoints(("reconciler-epilogue", "off"))
|
||||
|
||||
thread = threading.Thread(target=unpause_failpoint)
|
||||
thread.start()
|
||||
env.storage_controller.configure_failpoints(("sleep-on-reconcile-epilogue", "return(10000)"))
|
||||
|
||||
# Make a change to the tenant config to trigger a slow reconcile
|
||||
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
|
||||
@@ -2429,8 +2421,6 @@ def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder):
|
||||
observed_state = env.storage_controller.step_down()
|
||||
log.info(f"Storage controller stepped down with {observed_state=}")
|
||||
|
||||
thread.join()
|
||||
|
||||
# Validate that we waited for the slow reconcile to complete
|
||||
# and updated the observed state in the storcon before stepping down.
|
||||
node_id = str(env.pageserver.id)
|
||||
@@ -3299,221 +3289,8 @@ def test_storage_controller_detached_stopped(
|
||||
"generation": None,
|
||||
},
|
||||
)
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
# Confirm the detach happened
|
||||
assert env.pageserver.http_client().tenant_list_locations()["tenant_shards"] == []
|
||||
|
||||
|
||||
@run_only_on_default_postgres("Postgres version makes no difference here")
|
||||
def test_storage_controller_detach_lifecycle(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
Test that detached tenants are handled properly through their lifecycle: getting dropped
|
||||
from memory when detached, then getting loaded back on-demand.
|
||||
"""
|
||||
|
||||
remote_storage_kind = s3_storage()
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
|
||||
neon_env_builder.num_pageservers = 1
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
env.storage_controller.tenant_create(
|
||||
tenant_id,
|
||||
shard_count=1,
|
||||
)
|
||||
virtual_ps_http.timeline_create(PgVersion.NOT_SET, tenant_id, timeline_id)
|
||||
|
||||
remote_prefix = "/".join(
|
||||
(
|
||||
"tenants",
|
||||
str(tenant_id),
|
||||
)
|
||||
)
|
||||
# We will later check data is gone after deletion, so as a control check that it is present to begin with
|
||||
assert_prefix_not_empty(
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix=remote_prefix,
|
||||
)
|
||||
|
||||
assert len(env.pageserver.http_client().tenant_list_locations()["tenant_shards"]) == 1
|
||||
assert len(env.storage_controller.tenant_list()) == 1
|
||||
|
||||
# Detach the tenant
|
||||
virtual_ps_http.tenant_location_conf(
|
||||
tenant_id,
|
||||
{
|
||||
"mode": "Detached",
|
||||
"secondary_conf": None,
|
||||
"tenant_conf": {},
|
||||
"generation": None,
|
||||
},
|
||||
)
|
||||
# Ensure reconciles are done (the one we do inline in location_conf is advisory and if it takes too long that API just succeeds anyway)
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
# Confirm the detach happened on pageserver
|
||||
assert env.pageserver.http_client().tenant_list_locations()["tenant_shards"] == []
|
||||
# Confirm the tenant is not in memory on the controller
|
||||
assert env.storage_controller.tenant_list() == []
|
||||
|
||||
# The detached tenant does not get loaded into memory across a controller restart
|
||||
env.storage_controller.stop()
|
||||
env.storage_controller.start()
|
||||
assert env.storage_controller.tenant_list() == []
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
# The detached tenant can be re-attached
|
||||
virtual_ps_http.tenant_location_conf(
|
||||
tenant_id,
|
||||
{
|
||||
"mode": "AttachedSingle",
|
||||
"secondary_conf": None,
|
||||
"tenant_conf": {},
|
||||
"generation": None,
|
||||
},
|
||||
)
|
||||
assert len(env.pageserver.http_client().tenant_list_locations()["tenant_shards"]) == 1
|
||||
assert len(env.storage_controller.tenant_list()) == 1
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
# Detach it again before doing deletion
|
||||
virtual_ps_http.tenant_location_conf(
|
||||
tenant_id,
|
||||
{
|
||||
"mode": "Detached",
|
||||
"secondary_conf": None,
|
||||
"tenant_conf": {},
|
||||
"generation": None,
|
||||
},
|
||||
)
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
# A detached tenant can be deleted
|
||||
virtual_ps_http.tenant_delete(tenant_id)
|
||||
|
||||
# Such deletions really work (empty remote storage)
|
||||
assert_prefix_empty(
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix=remote_prefix,
|
||||
)
|
||||
|
||||
|
||||
@run_only_on_default_postgres("Postgres version makes no difference here")
|
||||
def test_storage_controller_node_flap_detach_race(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
Reproducer for https://github.com/neondatabase/neon/issues/10253.
|
||||
|
||||
When a node's availability flaps, the reconciliations spawned by the node
|
||||
going offline may race with the reconciliation done when then node comes
|
||||
back online.
|
||||
"""
|
||||
neon_env_builder.num_pageservers = 4
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
env.storage_controller.tenant_create(
|
||||
tenant_id,
|
||||
shard_count=2,
|
||||
)
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
stopped_nodes = [s["node_id"] for s in env.storage_controller.locate(tenant_id)]
|
||||
|
||||
def has_hit_failpoint(failpoint: str, offset: LogCursor | None = None) -> LogCursor:
|
||||
res = env.storage_controller.log_contains(f"at failpoint {failpoint}", offset=offset)
|
||||
assert res
|
||||
return res[1]
|
||||
|
||||
# Stop the nodes which host attached shards.
|
||||
# This will trigger reconciliations which pause before incrmenenting the generation,
|
||||
# and, more importantly, updating the `generation_pageserver` of the shards.
|
||||
env.storage_controller.configure_failpoints(("reconciler-pre-increment-generation", "pause"))
|
||||
for node_id in stopped_nodes:
|
||||
env.get_pageserver(node_id).stop(immediate=True)
|
||||
|
||||
def failure_handled() -> LogCursor:
|
||||
stop_offset = None
|
||||
|
||||
for node_id in stopped_nodes:
|
||||
res = env.storage_controller.log_contains(f"node {node_id} going offline")
|
||||
assert res
|
||||
stop_offset = res[1]
|
||||
|
||||
assert stop_offset
|
||||
return stop_offset
|
||||
|
||||
offset = wait_until(failure_handled)
|
||||
|
||||
# Now restart the nodes and make them pause before marking themselves as available
|
||||
# or running the activation reconciliation.
|
||||
env.storage_controller.configure_failpoints(("heartbeat-pre-node-state-configure", "pause"))
|
||||
|
||||
for node_id in stopped_nodes:
|
||||
env.get_pageserver(node_id).start(await_active=False)
|
||||
|
||||
offset = wait_until(
|
||||
lambda: has_hit_failpoint("heartbeat-pre-node-state-configure", offset=offset)
|
||||
)
|
||||
|
||||
# The nodes have restarted and are waiting to perform activaction reconciliation.
|
||||
# Unpause the initial reconciliation triggered by the nodes going offline.
|
||||
# It will attempt to detach from the old location, but notice that the old location
|
||||
# is not yet available, and then stop before processing the results of the reconciliation.
|
||||
env.storage_controller.configure_failpoints(("reconciler-epilogue", "pause"))
|
||||
env.storage_controller.configure_failpoints(("reconciler-pre-increment-generation", "off"))
|
||||
|
||||
offset = wait_until(lambda: has_hit_failpoint("reconciler-epilogue", offset=offset))
|
||||
|
||||
# Let the nodes perform activation reconciliation while still holding up processing the result
|
||||
# from the initial reconcile triggered by going offline.
|
||||
env.storage_controller.configure_failpoints(("heartbeat-pre-node-state-configure", "off"))
|
||||
|
||||
def activate_reconciliation_done():
|
||||
for node_id in stopped_nodes:
|
||||
assert env.storage_controller.log_contains(
|
||||
f"Node {node_id} transition to active", offset=offset
|
||||
)
|
||||
|
||||
wait_until(activate_reconciliation_done)
|
||||
|
||||
# Finally, allow the initial reconcile to finish up.
|
||||
env.storage_controller.configure_failpoints(("reconciler-epilogue", "off"))
|
||||
|
||||
# Give things a chance to settle and validate that no stale locations exist
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
def validate_locations():
|
||||
shard_locations = defaultdict(list)
|
||||
for ps in env.pageservers:
|
||||
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
|
||||
for loc in locations:
|
||||
shard_locations[loc[0]].append(
|
||||
{"generation": loc[1]["generation"], "mode": loc[1]["mode"], "node": ps.id}
|
||||
)
|
||||
|
||||
log.info(f"Shard locations: {shard_locations}")
|
||||
|
||||
attached_locations = {
|
||||
k: list(filter(lambda loc: loc["mode"] == "AttachedSingle", v))
|
||||
for k, v in shard_locations.items()
|
||||
}
|
||||
|
||||
for shard, locs in attached_locations.items():
|
||||
assert len(locs) == 1, f"{shard} has {len(locs)} attached locations"
|
||||
|
||||
wait_until(validate_locations, timeout=10)
|
||||
|
||||
@@ -27,6 +27,7 @@ camino = { version = "1", default-features = false, features = ["serde1"] }
|
||||
chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }
|
||||
clap = { version = "4", features = ["derive", "env", "string"] }
|
||||
clap_builder = { version = "4", default-features = false, features = ["color", "env", "help", "std", "string", "suggestions", "usage"] }
|
||||
crossbeam-utils = { version = "0.8" }
|
||||
crypto-bigint = { version = "0.5", features = ["generic-array", "zeroize"] }
|
||||
der = { version = "0.7", default-features = false, features = ["oid", "pem", "std"] }
|
||||
deranged = { version = "0.3", default-features = false, features = ["powerfmt", "serde", "std"] }
|
||||
@@ -42,6 +43,7 @@ generic-array = { version = "0.14", default-features = false, features = ["more_
|
||||
getrandom = { version = "0.2", default-features = false, features = ["std"] }
|
||||
half = { version = "2", default-features = false, features = ["num-traits"] }
|
||||
hashbrown = { version = "0.14", features = ["raw"] }
|
||||
hdrhistogram = { version = "7" }
|
||||
hex = { version = "0.4", features = ["serde"] }
|
||||
hmac = { version = "0.12", default-features = false, features = ["reset"] }
|
||||
hyper-582f2526e08bb6a0 = { package = "hyper", version = "0.14", features = ["full"] }
|
||||
@@ -85,7 +87,7 @@ sync_wrapper = { version = "0.1", default-features = false, features = ["futures
|
||||
tikv-jemalloc-ctl = { version = "0.6", features = ["stats", "use_std"] }
|
||||
tikv-jemalloc-sys = { version = "0.6", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] }
|
||||
time = { version = "0.3", features = ["macros", "serde-well-known"] }
|
||||
tokio = { version = "1", features = ["full", "test-util"] }
|
||||
tokio = { version = "1", features = ["full", "test-util", "tracing"] }
|
||||
tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring", "tls12"] }
|
||||
tokio-stream = { version = "0.1", features = ["net"] }
|
||||
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
|
||||
@@ -94,6 +96,7 @@ tonic = { version = "0.12", features = ["tls-roots"] }
|
||||
tower = { version = "0.4", default-features = false, features = ["balance", "buffer", "limit", "log", "util"] }
|
||||
tracing = { version = "0.1", features = ["log"] }
|
||||
tracing-core = { version = "0.1" }
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
|
||||
url = { version = "2", features = ["serde"] }
|
||||
zerocopy = { version = "0.7", features = ["derive", "simd"] }
|
||||
zeroize = { version = "1", features = ["derive", "serde"] }
|
||||
|
||||
Reference in New Issue
Block a user