Compare commits

..

9 Commits

Author SHA1 Message Date
Christian Schwarz
3e455e1680 problem & non-solution: cargo hakari enables tokio/tracing feature 2025-01-07 15:11:48 +01:00
Christian Schwarz
91812e7b00 cleanups & self-review 2025-01-07 15:07:56 +01:00
Christian Schwarz
74bfaee67e fix doc string 2025-01-07 14:57:20 +01:00
Christian Schwarz
5288c8ca35 hakari 2025-01-07 14:54:51 +01:00
Christian Schwarz
ecb1cb21aa make all compile modes work
cargo check --tests -p utils
cargo check --tests -p utils  --features tracing-based-debugging
RUSTFLAGS="--cfg tokio_unstable" cargo check --tests -p utils --features tracing-based-debugging
2025-01-07 14:53:29 +01:00
Christian Schwarz
7109db0e58 add doc comment explaining tracing-based debugging 2025-01-07 14:46:13 +01:00
Christian Schwarz
151d07674c add support for tokio-console & make deps opt-in via feature 2025-01-07 14:29:36 +01:00
Christian Schwarz
1f94e31025 use .with(Option) 2025-01-07 13:51:00 +01:00
Christian Schwarz
803b765c76 utils::logging: implement tracing_chrome & tracing_flame support 2025-01-07 12:31:07 +01:00
44 changed files with 525 additions and 1414 deletions

View File

@@ -1,12 +0,0 @@
rust_code: ['**/*.rs', '**/Cargo.toml', '**/Cargo.lock']
v14: ['vendor/postgres-v14/**', 'Makefile', 'pgxn/**']
v15: ['vendor/postgres-v15/**', 'Makefile', 'pgxn/**']
v16: ['vendor/postgres-v16/**', 'Makefile', 'pgxn/**']
v17: ['vendor/postgres-v17/**', 'Makefile', 'pgxn/**']
rebuild_neon_extra:
- .github/workflows/neon_extra_builds.yml
rebuild_macos:
- .github/workflows/build-macos.yml

View File

@@ -1,241 +0,0 @@
name: Check neon with MacOS builds
on:
workflow_call:
inputs:
pg_versions:
description: "Array of the pg versions to build for, for example: ['v14', 'v17']"
type: string
default: '[]'
required: false
rebuild_rust_code:
description: "Rebuild Rust code"
type: boolean
default: false
required: false
rebuild_everything:
description: "If true, rebuild for all versions"
type: boolean
default: false
required: false
env:
RUST_BACKTRACE: 1
COPT: '-Werror'
# TODO: move `check-*` and `files-changed` jobs to the "Caller" Workflow
# We should care about that as Github has limitations:
# - You can connect up to four levels of workflows
# - You can call a maximum of 20 unique reusable workflows from a single workflow file.
# https://docs.github.com/en/actions/sharing-automations/reusing-workflows#limitations
jobs:
build-pgxn:
if: |
(inputs.pg_versions != '[]' || inputs.rebuild_everything) && (
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
github.ref_name == 'main'
)
timeout-minutes: 30
runs-on: macos-15
strategy:
matrix:
postgres-version: ${{ inputs.rebuild_everything && fromJson('["v14", "v15", "v16", "v17"]') || fromJSON(inputs.pg_versions) }}
env:
# Use release build only, to have less debug info around
# Hence keeping target/ (and general cache size) smaller
BUILD_TYPE: release
steps:
- name: Checkout main repo
uses: actions/checkout@v4
- name: Set pg ${{ matrix.postgres-version }} for caching
id: pg_rev
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-${{ matrix.postgres-version }}) | tee -a "${GITHUB_OUTPUT}"
- name: Cache postgres ${{ matrix.postgres-version }} build
id: cache_pg
uses: actions/cache@v4
with:
path: pg_install/${{ matrix.postgres-version }}
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ matrix.postgres-version }}-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Checkout submodule vendor/postgres-${{ matrix.postgres-version }}
if: steps.cache_pg.outputs.cache-hit != 'true'
run: |
git submodule init vendor/postgres-${{ matrix.postgres-version }}
git submodule update --depth 1 --recursive
- name: Install build dependencies
if: steps.cache_pg.outputs.cache-hit != 'true'
run: |
brew install flex bison openssl protobuf icu4c
- name: Set extra env for macOS
if: steps.cache_pg.outputs.cache-hit != 'true'
run: |
echo 'LDFLAGS=-L/usr/local/opt/openssl@3/lib' >> $GITHUB_ENV
echo 'CPPFLAGS=-I/usr/local/opt/openssl@3/include' >> $GITHUB_ENV
- name: Build Postgres ${{ matrix.postgres-version }}
if: steps.cache_pg.outputs.cache-hit != 'true'
run: |
make postgres-${{ matrix.postgres-version }} -j$(sysctl -n hw.ncpu)
- name: Build Neon Pg Ext ${{ matrix.postgres-version }}
if: steps.cache_pg.outputs.cache-hit != 'true'
run: |
make "neon-pg-ext-${{ matrix.postgres-version }}" -j$(sysctl -n hw.ncpu)
- name: Get postgres headers ${{ matrix.postgres-version }}
if: steps.cache_pg.outputs.cache-hit != 'true'
run: |
make postgres-headers-${{ matrix.postgres-version }} -j$(sysctl -n hw.ncpu)
build-walproposer-lib:
if: |
(inputs.pg_versions != '[]' || inputs.rebuild_everything) && (
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
github.ref_name == 'main'
)
timeout-minutes: 30
runs-on: macos-15
needs: [build-pgxn]
env:
# Use release build only, to have less debug info around
# Hence keeping target/ (and general cache size) smaller
BUILD_TYPE: release
steps:
- name: Checkout main repo
uses: actions/checkout@v4
- name: Set pg v17 for caching
id: pg_rev
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v17) | tee -a "${GITHUB_OUTPUT}"
- name: Cache postgres v17 build
id: cache_pg
uses: actions/cache@v4
with:
path: pg_install/v17
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v17-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache walproposer-lib
id: cache_walproposer_lib
uses: actions/cache@v4
with:
path: pg_install/build/walproposer-lib
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-walproposer_lib-v17-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Checkout submodule vendor/postgres-v17
if: steps.cache_walproposer_lib.outputs.cache-hit != 'true'
run: |
git submodule init vendor/postgres-v17
git submodule update --depth 1 --recursive
- name: Install build dependencies
if: steps.cache_walproposer_lib.outputs.cache-hit != 'true'
run: |
brew install flex bison openssl protobuf icu4c
- name: Set extra env for macOS
if: steps.cache_walproposer_lib.outputs.cache-hit != 'true'
run: |
echo 'LDFLAGS=-L/usr/local/opt/openssl@3/lib' >> $GITHUB_ENV
echo 'CPPFLAGS=-I/usr/local/opt/openssl@3/include' >> $GITHUB_ENV
- name: Build walproposer-lib (only for v17)
if: steps.cache_walproposer_lib.outputs.cache-hit != 'true'
run:
make walproposer-lib -j$(sysctl -n hw.ncpu)
cargo-build:
if: |
(inputs.pg_versions != '[]' || inputs.rebuild_rust_code || inputs.rebuild_everything) && (
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
github.ref_name == 'main'
)
timeout-minutes: 30
runs-on: macos-15
needs: [build-pgxn, build-walproposer-lib]
env:
# Use release build only, to have less debug info around
# Hence keeping target/ (and general cache size) smaller
BUILD_TYPE: release
steps:
- name: Checkout main repo
uses: actions/checkout@v4
with:
submodules: true
- name: Set pg v14 for caching
id: pg_rev_v14
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v14) | tee -a "${GITHUB_OUTPUT}"
- name: Set pg v15 for caching
id: pg_rev_v15
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v15) | tee -a "${GITHUB_OUTPUT}"
- name: Set pg v16 for caching
id: pg_rev_v16
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v16) | tee -a "${GITHUB_OUTPUT}"
- name: Set pg v17 for caching
id: pg_rev_v17
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v17) | tee -a "${GITHUB_OUTPUT}"
- name: Cache postgres v14 build
id: cache_pg
uses: actions/cache@v4
with:
path: pg_install/v14
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v14-${{ steps.pg_rev_v14.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache postgres v15 build
id: cache_pg_v15
uses: actions/cache@v4
with:
path: pg_install/v15
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v15-${{ steps.pg_rev_v15.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache postgres v16 build
id: cache_pg_v16
uses: actions/cache@v4
with:
path: pg_install/v16
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v16-${{ steps.pg_rev_v16.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache postgres v17 build
id: cache_pg_v17
uses: actions/cache@v4
with:
path: pg_install/v17
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v17-${{ steps.pg_rev_v17.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache cargo deps (only for v17)
uses: actions/cache@v4
with:
path: |
~/.cargo/registry
!~/.cargo/registry/src
~/.cargo/git
target
key: v1-${{ runner.os }}-${{ runner.arch }}-cargo-${{ hashFiles('./Cargo.lock') }}-${{ hashFiles('./rust-toolchain.toml') }}-rust
- name: Cache walproposer-lib
id: cache_walproposer_lib
uses: actions/cache@v4
with:
path: pg_install/build/walproposer-lib
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-walproposer_lib-v17-${{ steps.pg_rev_v17.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Install build dependencies
run: |
brew install flex bison openssl protobuf icu4c
- name: Set extra env for macOS
run: |
echo 'LDFLAGS=-L/usr/local/opt/openssl@3/lib' >> $GITHUB_ENV
echo 'CPPFLAGS=-I/usr/local/opt/openssl@3/include' >> $GITHUB_ENV
- name: Run cargo build (only for v17)
run: PQ_LIB_DIR=$(pwd)/pg_install/v17/lib cargo build --all --release -j$(sysctl -n hw.ncpu)
- name: Check that no warnings are produced (only for v17)
run: ./run_clippy.sh

View File

@@ -979,16 +979,6 @@ jobs:
VERSIONS: v14 v15 v16 v17
steps:
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 3600
- name: Login to Amazon Dev ECR
uses: aws-actions/amazon-ecr-login@v2
- uses: docker/login-action@v3
with:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}

View File

@@ -31,15 +31,19 @@ jobs:
uses: ./.github/workflows/build-build-tools-image.yml
secrets: inherit
files-changed:
name: Detect what files changed
runs-on: ubuntu-22.04
timeout-minutes: 3
outputs:
v17: ${{ steps.files_changed.outputs.v17 }}
postgres_changes: ${{ steps.postgres_changes.outputs.changes }}
rebuild_rust_code: ${{ steps.files_changed.outputs.rust_code }}
rebuild_everything: ${{ steps.files_changed.outputs.rebuild_neon_extra || steps.files_changed.outputs.rebuild_macos }}
check-macos-build:
needs: [ check-permissions ]
if: |
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
github.ref_name == 'main'
timeout-minutes: 90
runs-on: macos-15
env:
# Use release build only, to have less debug info around
# Hence keeping target/ (and general cache size) smaller
BUILD_TYPE: release
steps:
- name: Checkout
@@ -47,45 +51,106 @@ jobs:
with:
submodules: true
- name: Check for Postgres changes
uses: dorny/paths-filter@1441771bbfdd59dcd748680ee64ebd8faab1a242 #v3
id: files_changed
- name: Install macOS postgres dependencies
run: brew install flex bison openssl protobuf icu4c
- name: Set pg 14 revision for caching
id: pg_v14_rev
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v14) >> $GITHUB_OUTPUT
- name: Set pg 15 revision for caching
id: pg_v15_rev
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v15) >> $GITHUB_OUTPUT
- name: Set pg 16 revision for caching
id: pg_v16_rev
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v16) >> $GITHUB_OUTPUT
- name: Set pg 17 revision for caching
id: pg_v17_rev
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v17) >> $GITHUB_OUTPUT
- name: Cache postgres v14 build
id: cache_pg_14
uses: actions/cache@v4
with:
token: ${{ github.token }}
filters: .github/file-filters.yaml
base: ${{ github.event_name != 'pull_request' && (github.event.merge_group.base_ref || github.ref_name) || '' }}
ref: ${{ github.event_name != 'pull_request' && (github.event.merge_group.head_ref || github.ref) || '' }}
path: pg_install/v14
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Filter out only v-string for build matrix
id: postgres_changes
- name: Cache postgres v15 build
id: cache_pg_15
uses: actions/cache@v4
with:
path: pg_install/v15
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache postgres v16 build
id: cache_pg_16
uses: actions/cache@v4
with:
path: pg_install/v16
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache postgres v17 build
id: cache_pg_17
uses: actions/cache@v4
with:
path: pg_install/v17
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v17_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Set extra env for macOS
run: |
v_strings_only_as_json_array=$(echo ${{ steps.files_changed.outputs.chnages }} | jq '.[]|select(test("v\\d+"))' | jq --slurp -c)
echo "changes=${v_strings_only_as_json_array}" | tee -a "${GITHUB_OUTPUT}"
echo 'LDFLAGS=-L/usr/local/opt/openssl@3/lib' >> $GITHUB_ENV
echo 'CPPFLAGS=-I/usr/local/opt/openssl@3/include' >> $GITHUB_ENV
check-macos-build:
needs: [ check-permissions, files-changed ]
if: |
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
github.ref_name == 'main'
uses: ./.github/workflows/build-macos.yml
with:
pg_versions: ${{ needs.files-changed.outputs.postgres_changes }}
rebuild_rust_code: ${{ needs.files-changed.outputs.rebuild_rust_code }}
rebuild_everything: ${{ fromJson(needs.files-changed.outputs.rebuild_everything) }}
- name: Cache cargo deps
uses: actions/cache@v4
with:
path: |
~/.cargo/registry
!~/.cargo/registry/src
~/.cargo/git
target
key: v1-${{ runner.os }}-${{ runner.arch }}-cargo-${{ hashFiles('./Cargo.lock') }}-${{ hashFiles('./rust-toolchain.toml') }}-rust
- name: Build postgres v14
if: steps.cache_pg_14.outputs.cache-hit != 'true'
run: make postgres-v14 -j$(sysctl -n hw.ncpu)
- name: Build postgres v15
if: steps.cache_pg_15.outputs.cache-hit != 'true'
run: make postgres-v15 -j$(sysctl -n hw.ncpu)
- name: Build postgres v16
if: steps.cache_pg_16.outputs.cache-hit != 'true'
run: make postgres-v16 -j$(sysctl -n hw.ncpu)
- name: Build postgres v17
if: steps.cache_pg_17.outputs.cache-hit != 'true'
run: make postgres-v17 -j$(sysctl -n hw.ncpu)
- name: Build neon extensions
run: make neon-pg-ext -j$(sysctl -n hw.ncpu)
- name: Build walproposer-lib
run: make walproposer-lib -j$(sysctl -n hw.ncpu)
- name: Run cargo build
run: PQ_LIB_DIR=$(pwd)/pg_install/v16/lib cargo build --all --release
- name: Check that no warnings are produced
run: ./run_clippy.sh
gather-rust-build-stats:
needs: [ check-permissions, build-build-tools-image, files-changed ]
needs: [ check-permissions, build-build-tools-image ]
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
contents: write
if: |
(needs.files-changed.outputs.v17 == 'true' || needs.files-changed.outputs.rebuild_everything == 'true') && (
contains(github.event.pull_request.labels.*.name, 'run-extra-build-stats') ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
github.ref_name == 'main'
)
contains(github.event.pull_request.labels.*.name, 'run-extra-build-stats') ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
github.ref_name == 'main'
runs-on: [ self-hosted, large ]
container:
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm

85
Cargo.lock generated
View File

@@ -1323,6 +1323,45 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "console-api"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8030735ecb0d128428b64cd379809817e620a40e5001c54465b99ec5feec2857"
dependencies = [
"futures-core",
"prost",
"prost-types",
"tonic",
"tracing-core",
]
[[package]]
name = "console-subscriber"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6539aa9c6a4cd31f4b1c040f860a1eac9aa80e7df6b05d506a6e7179936d6a01"
dependencies = [
"console-api",
"crossbeam-channel",
"crossbeam-utils",
"futures-task",
"hdrhistogram",
"humantime",
"hyper-util",
"prost",
"prost-types",
"serde",
"serde_json",
"thread_local",
"tokio",
"tokio-stream",
"tonic",
"tracing",
"tracing-core",
"tracing-subscriber",
]
[[package]]
name = "const-oid"
version = "0.9.6"
@@ -3513,6 +3552,16 @@ dependencies = [
"winapi",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num"
version = "0.4.1"
@@ -3808,6 +3857,12 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "p256"
version = "0.11.1"
@@ -6636,6 +6691,7 @@ dependencies = [
"signal-hook-registry",
"socket2",
"tokio-macros",
"tracing",
"windows-sys 0.48.0",
]
@@ -6967,6 +7023,17 @@ dependencies = [
"syn 2.0.90",
]
[[package]]
name = "tracing-chrome"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf0a738ed5d6450a9fb96e86a23ad808de2b727fd1394585da5cdd6788ffe724"
dependencies = [
"serde_json",
"tracing-core",
"tracing-subscriber",
]
[[package]]
name = "tracing-core"
version = "0.1.33"
@@ -6987,6 +7054,17 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "tracing-flame"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bae117ee14789185e129aaee5d93750abe67fdc5a9a62650452bfe4e122a3a9"
dependencies = [
"lazy_static",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
@@ -7033,6 +7111,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"serde",
@@ -7242,6 +7321,7 @@ dependencies = [
"camino",
"camino-tempfile",
"chrono",
"console-subscriber",
"const_format",
"criterion",
"diatomic-waker",
@@ -7283,7 +7363,9 @@ dependencies = [
"tokio-util",
"toml_edit",
"tracing",
"tracing-chrome",
"tracing-error",
"tracing-flame",
"tracing-subscriber",
"url",
"uuid",
@@ -7797,6 +7879,7 @@ dependencies = [
"chrono",
"clap",
"clap_builder",
"crossbeam-utils",
"crypto-bigint 0.5.5",
"der 0.7.8",
"deranged",
@@ -7813,6 +7896,7 @@ dependencies = [
"getrandom 0.2.11",
"half",
"hashbrown 0.14.5",
"hdrhistogram",
"hex",
"hmac",
"hyper 0.14.30",
@@ -7870,6 +7954,7 @@ dependencies = [
"tower",
"tracing",
"tracing-core",
"tracing-subscriber",
"url",
"zerocopy",
"zeroize",

View File

@@ -69,7 +69,7 @@ enum EncryptionSecret {
#[tokio::main]
pub(crate) async fn main() -> anyhow::Result<()> {
utils::logging::init(
let _guard = utils::logging::init(
utils::logging::LogFormat::Plain,
utils::logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
utils::logging::Output::Stdout,

View File

@@ -15,7 +15,7 @@ use std::time::Instant;
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use compute_api::spec::{Database, PgIdent, Role};
use compute_api::spec::{PgIdent, Role};
use futures::future::join_all;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
@@ -45,10 +45,8 @@ use crate::spec_apply::ApplySpecPhase::{
DropInvalidDatabases, DropRoles, HandleNeonExtension, HandleOtherExtensions,
RenameAndDeleteDatabases, RenameRoles, RunInEachDatabase,
};
use crate::spec_apply::PerDatabasePhase;
use crate::spec_apply::PerDatabasePhase::{
ChangeSchemaPerms, DeleteDBRoleReferences, DropSubscriptionsForDeletedDatabases,
HandleAnonExtension,
ChangeSchemaPerms, DeleteDBRoleReferences, HandleAnonExtension,
};
use crate::spec_apply::{apply_operations, MutableApplyContext, DB};
use crate::sync_sk::{check_if_synced, ping_safekeeper};
@@ -836,7 +834,7 @@ impl ComputeNode {
conf
}
pub async fn get_maintenance_client(
async fn get_maintenance_client(
conf: &tokio_postgres::Config,
) -> Result<tokio_postgres::Client> {
let mut conf = conf.clone();
@@ -945,78 +943,6 @@ impl ComputeNode {
dbs: databases,
}));
// Apply special pre drop database phase.
// NOTE: we use the code of RunInEachDatabase phase for parallelism
// and connection management, but we don't really run it in *each* database,
// only in databases, we're about to drop.
info!("Applying PerDatabase (pre-dropdb) phase");
let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));
// Run the phase for each database that we're about to drop.
let db_processes = spec
.delta_operations
.iter()
.flatten()
.filter_map(move |op| {
if op.action.as_str() == "delete_db" {
Some(op.name.clone())
} else {
None
}
})
.map(|dbname| {
let spec = spec.clone();
let ctx = ctx.clone();
let jwks_roles = jwks_roles.clone();
let mut conf = conf.as_ref().clone();
let concurrency_token = concurrency_token.clone();
// We only need dbname field for this phase, so set other fields to dummy values
let db = DB::UserDB(Database {
name: dbname.clone(),
owner: "cloud_admin".to_string(),
options: None,
restrict_conn: false,
invalid: false,
});
debug!("Applying per-database phases for Database {:?}", &db);
match &db {
DB::SystemDB => {}
DB::UserDB(db) => {
conf.dbname(db.name.as_str());
}
}
let conf = Arc::new(conf);
let fut = Self::apply_spec_sql_db(
spec.clone(),
conf,
ctx.clone(),
jwks_roles.clone(),
concurrency_token.clone(),
db,
[DropSubscriptionsForDeletedDatabases].to_vec(),
);
Ok(spawn(fut))
})
.collect::<Vec<Result<_, anyhow::Error>>>();
for process in db_processes.into_iter() {
let handle = process?;
if let Err(e) = handle.await? {
// Handle the error case where the database does not exist
// We do not check whether the DB exists or not in the deletion phase,
// so we shouldn't be strict about it in pre-deletion cleanup as well.
if e.to_string().contains("does not exist") {
warn!("Error dropping subscription: {}", e);
} else {
return Err(e);
}
};
}
for phase in [
CreateSuperUser,
DropInvalidDatabases,
@@ -1036,7 +962,7 @@ impl ComputeNode {
.await?;
}
info!("Applying RunInEachDatabase2 phase");
info!("Applying RunInEachDatabase phase");
let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));
let db_processes = spec
@@ -1071,12 +997,6 @@ impl ComputeNode {
jwks_roles.clone(),
concurrency_token.clone(),
db,
[
DeleteDBRoleReferences,
ChangeSchemaPerms,
HandleAnonExtension,
]
.to_vec(),
);
Ok(spawn(fut))
@@ -1123,13 +1043,16 @@ impl ComputeNode {
jwks_roles: Arc<HashSet<String>>,
concurrency_token: Arc<tokio::sync::Semaphore>,
db: DB,
subphases: Vec<PerDatabasePhase>,
) -> Result<()> {
let _permit = concurrency_token.acquire().await?;
let mut client_conn = None;
for subphase in subphases {
for subphase in [
DeleteDBRoleReferences,
ChangeSchemaPerms,
HandleAnonExtension,
] {
apply_operations(
spec.clone(),
ctx.clone(),

View File

@@ -47,7 +47,6 @@ pub enum PerDatabasePhase {
DeleteDBRoleReferences,
ChangeSchemaPerms,
HandleAnonExtension,
DropSubscriptionsForDeletedDatabases,
}
#[derive(Clone, Debug)]
@@ -327,12 +326,13 @@ async fn get_operations<'a>(
// Use FORCE to drop database even if there are active connections.
// We run this from `cloud_admin`, so it should have enough privileges.
//
// NB: there could be other db states, which prevent us from dropping
// the database. For example, if db is used by any active subscription
// or replication slot.
// Such cases are handled in the DropSubscriptionsForDeletedDatabases
// phase. We do all the cleanup before actually dropping the database.
// TODO: deal with it once we allow logical replication. Proper fix should
// involve returning an error code to the control plane, so it could
// figure out that this is a non-retryable error, return it to the user
// and fail operation permanently.
let drop_db_query: String = format!(
"DROP DATABASE IF EXISTS {} WITH (FORCE)",
&op.name.pg_quote()
@@ -444,30 +444,6 @@ async fn get_operations<'a>(
}
ApplySpecPhase::RunInEachDatabase { db, subphase } => {
match subphase {
PerDatabasePhase::DropSubscriptionsForDeletedDatabases => {
match &db {
DB::UserDB(db) => {
let drop_subscription_query: String = format!(
include_str!("sql/drop_subscription_for_drop_dbs.sql"),
datname_str = escape_literal(&db.name),
);
let operations = vec![Operation {
query: drop_subscription_query,
comment: Some(format!(
"optionally dropping subscriptions for DB {}",
db.name,
)),
}]
.into_iter();
Ok(Box::new(operations))
}
// skip this cleanup for the system databases
// because users can't drop them
DB::SystemDB => Ok(Box::new(empty())),
}
}
PerDatabasePhase::DeleteDBRoleReferences => {
let ctx = ctx.read().await;

View File

@@ -1,11 +0,0 @@
DO $$
DECLARE
subname TEXT;
BEGIN
FOR subname IN SELECT pg_subscription.subname FROM pg_subscription WHERE subdbid = (SELECT oid FROM pg_database WHERE datname = {datname_str}) LOOP
EXECUTE format('ALTER SUBSCRIPTION %I DISABLE;', subname);
EXECUTE format('ALTER SUBSCRIPTION %I SET (slot_name = NONE);', subname);
EXECUTE format('DROP SUBSCRIPTION %I;', subname);
END LOOP;
END;
$$;

View File

@@ -154,7 +154,7 @@ mod reliable_copy_test {
/// Run test simulations.
#[test]
fn sim_example_reliable_copy() {
utils::logging::init(
let _guard = utils::logging::init(
utils::logging::LogFormat::Test,
utils::logging::TracingErrorLayerEnablement::Disabled,
utils::logging::Output::Stdout,

View File

@@ -13,7 +13,7 @@ use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info};
static LOGGING_DONE: OnceCell<()> = OnceCell::new();
static LOGGING_DONE: OnceCell<utils::logging::FlushGuard> = OnceCell::new();
pub(crate) fn upload_stream(
content: std::borrow::Cow<'static, [u8]>,
@@ -210,6 +210,6 @@ pub(crate) fn ensure_logging_ready() {
utils::logging::TracingErrorLayerEnablement::Disabled,
utils::logging::Output::Stdout,
)
.expect("logging init failed");
.expect("logging init failed")
});
}

View File

@@ -10,6 +10,10 @@ default = []
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints"]
# Enables debugging functionality that's based on the `tracing` crate,
# e.g., tokio-console or tracing-chrome.
tracing-based-debugging = [ "console-subscriber", "tracing-chrome", "tracing-flame", "tokio/tracing" ]
[dependencies]
arc-swap.workspace = true
sentry.workspace = true
@@ -20,6 +24,7 @@ bincode.workspace = true
bytes.workspace = true
camino.workspace = true
chrono.workspace = true
console-subscriber = { version = "0.4.1", optional = true }
diatomic-waker.workspace = true
flate2.workspace = true
git-version.workspace = true
@@ -47,7 +52,9 @@ tokio-tar.workspace = true
tokio-util.workspace = true
toml_edit = { workspace = true, features = ["serde"] }
tracing.workspace = true
tracing-chrome = { version = "0.7.2", optional = true }
tracing-error.workspace = true
tracing-flame = { version = "0.2.0", optional = true }
tracing-subscriber = { workspace = true, features = ["json", "registry"] }
rand.workspace = true
scopeguard.workspace = true
@@ -78,6 +85,9 @@ camino-tempfile.workspace = true
serde_assert.workspace = true
tokio = { workspace = true, features = ["test-util"] }
[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
[[bench]]
name = "benchmarks"
harness = false

View File

@@ -1,3 +1,10 @@
#[cfg(feature = "tracing-based-debugging")]
use std::{
io::BufWriter,
num::NonZeroUsize,
sync::{Arc, Mutex},
};
use std::str::FromStr;
use anyhow::Context;
@@ -98,11 +105,44 @@ pub enum Output {
Stderr,
}
/// Keep alive and drop it before the program terminates.
#[allow(dead_code)] // We need to store the `Arc<>` for drop semantics.
#[must_use]
#[cfg(feature = "tracing-based-debugging")]
pub struct FlushGuard(Arc<Mutex<FlushGuardInner>>);
#[cfg(feature = "tracing-based-debugging")]
struct FlushGuardInner {
_tracing_chrome_layer: Option<tracing_chrome::FlushGuard>,
_tracing_flame_layer: Option<tracing_flame::FlushGuard<BufWriter<std::fs::File>>>,
}
#[cfg(not(feature = "tracing-based-debugging"))]
pub struct FlushGuard;
/// Initialize the global tracing subscriber.
///
/// # Tracing-Based Debugging
///
/// If feature `tracing-based-debugging` is enabled, this function will add support
/// for runtime enablement of various tracing-based debugging tools.
///
/// The feature is disabled by default to avoid compile time bloat.
///
/// For example, to use the `tracing-chrome` crate to debug pageserver:
///
/// 1. Enable the feature by adding `tracing-based-debugging` to the `features` list in
/// the pageserver crate's `Cargo.toml`.
/// 2. Build pageserver.
/// 3. Launch pageserver with env var `NEON_UTILS_LOGGING_ENABLE_TRACING_CHROME=1`.
/// 4. Cleanly shut down pageserver.
/// 5. Follow instructions of the `tracing-chrome` crate to post-process and visualize
/// the trace files.
pub fn init(
log_format: LogFormat,
tracing_error_layer_enablement: TracingErrorLayerEnablement,
output: Output,
) -> anyhow::Result<()> {
) -> anyhow::Result<FlushGuard> {
// We fall back to printing all spans at info-level or above if
// the RUST_LOG environment variable is not set.
let rust_log_env_filter = || {
@@ -113,7 +153,9 @@ pub fn init(
// NB: the order of the with() calls does not matter.
// See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
use tracing_subscriber::prelude::*;
let r = tracing_subscriber::registry();
let r = r.with({
let log_layer = tracing_subscriber::fmt::layer()
.with_target(false)
@@ -131,17 +173,95 @@ pub fn init(
};
log_layer.with_filter(rust_log_env_filter())
});
let r = r.with(
TracingEventCountLayer(&TRACING_EVENT_COUNT_METRIC).with_filter(rust_log_env_filter()),
);
match tracing_error_layer_enablement {
TracingErrorLayerEnablement::EnableWithRustLogFilter => r
.with(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter()))
.init(),
TracingErrorLayerEnablement::Disabled => r.init(),
}
Ok(())
let r = r.with(match tracing_error_layer_enablement {
TracingErrorLayerEnablement::EnableWithRustLogFilter => {
Some(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter()))
}
TracingErrorLayerEnablement::Disabled => None,
});
#[cfg(feature = "tracing-based-debugging")]
let (r, guard) = {
let tracing_chrome_layer_flush_guard;
let r = r.with(
if crate::env::var("NEON_UTILS_LOGGING_ENABLE_TRACING_CHROME").unwrap_or(false) {
let (layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
.trace_style(tracing_chrome::TraceStyle::Async)
.build();
tracing_chrome_layer_flush_guard = Some(guard);
Some(layer.with_filter(rust_log_env_filter()))
} else {
tracing_chrome_layer_flush_guard = None;
None
},
);
let tracing_flame_flush_guard;
let r = r.with(
if crate::env::var("NEON_UTILS_LOGGING_ENABLE_TRACING_FLAME").unwrap_or(false) {
let (layer, guard) =
tracing_flame::FlameLayer::with_file("./tracing.folded").unwrap();
let layer = layer
.with_empty_samples(false)
.with_module_path(false)
.with_file_and_line(false)
.with_threads_collapsed(true);
tracing_flame_flush_guard = Some(guard);
Some(layer.with_filter(rust_log_env_filter()))
} else {
tracing_flame_flush_guard = None;
None
},
);
let r = {
let varname = "NEON_UTILS_LOGGING_ENABLE_TOKIO_CONSOLE";
let console_subscriber_config: Option<NonZeroUsize> = crate::env::var(varname);
#[cfg(tokio_unstable)]
{
r.with(match console_subscriber_config {
Some(n) => {
use console_subscriber::ConsoleLayer;
Some(
console_subscriber::Builder::default()
.event_buffer_capacity(
n.get() * ConsoleLayer::DEFAULT_EVENT_BUFFER_CAPACITY,
)
.client_buffer_capacity(
n.get() * ConsoleLayer::DEFAULT_CLIENT_BUFFER_CAPACITY,
)
.spawn(),
)
}
None => None,
})
}
#[cfg(not(tokio_unstable))]
if console_subscriber_config.is_some() {
panic!("recompile with --cfg tokio_unstable to enable {varname}");
} else {
r
}
};
(
r,
FlushGuard(Arc::new(Mutex::new(FlushGuardInner {
_tracing_chrome_layer: tracing_chrome_layer_flush_guard,
_tracing_flame_layer: tracing_flame_flush_guard,
}))),
)
};
#[cfg(not(feature = "tracing-based-debugging"))]
let (r, guard) = (r, FlushGuard);
r.init();
Ok(guard)
}
/// Disable the default rust panic hook by using `set_hook`.

View File

@@ -3,7 +3,7 @@ use pageserver_compaction::interface::CompactionLayer;
use pageserver_compaction::simulator::MockTimeline;
use utils::logging;
static LOG_HANDLE: OnceCell<()> = OnceCell::new();
static LOG_HANDLE: OnceCell<logging::FlushGuard> = OnceCell::new();
pub(crate) fn setup_logging() {
LOG_HANDLE.get_or_init(|| {

View File

@@ -115,7 +115,7 @@ struct AnalyzeLayerMapCmd {
#[tokio::main]
async fn main() -> anyhow::Result<()> {
logging::init(
let _guard = logging::init(
LogFormat::Plain,
TracingErrorLayerEnablement::EnableWithRustLogFilter,
logging::Output::Stdout,

View File

@@ -32,7 +32,7 @@ enum Args {
}
fn main() {
logging::init(
let _guard = logging::init(
logging::LogFormat::Plain,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stderr,

View File

@@ -53,10 +53,12 @@ project_build_tag!(BUILD_TAG);
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
/// Configure jemalloc to sample allocations for profiles every 1 MB (1 << 20).
#[allow(non_upper_case_globals)]
#[export_name = "malloc_conf"]
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:20\0";
// Configure jemalloc to sample allocations for profiles every 1 MB (1 << 20).
// TODO: disabled because concurrent CPU profiles cause seg faults. See:
// https://github.com/neondatabase/neon/issues/10225.
//#[allow(non_upper_case_globals)]
//#[export_name = "malloc_conf"]
//pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:20\0";
const PID_FILE_NAME: &str = "pageserver.pid";
@@ -112,7 +114,7 @@ fn main() -> anyhow::Result<()> {
} else {
TracingErrorLayerEnablement::Disabled
};
logging::init(
let _guard = logging::init(
conf.log_format,
tracing_error_layer_enablement,
logging::Output::Stdout,

View File

@@ -5562,7 +5562,7 @@ pub(crate) mod harness {
pub deletion_queue: MockDeletionQueue,
}
static LOG_HANDLE: OnceCell<()> = OnceCell::new();
static LOG_HANDLE: OnceCell<logging::FlushGuard> = OnceCell::new();
pub(crate) fn setup_logging() {
LOG_HANDLE.get_or_init(|| {

View File

@@ -1,18 +1,16 @@
use async_trait::async_trait;
use postgres_client::config::SslMode;
use pq_proto::BeMessage as Be;
use std::fmt;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{info, info_span};
use super::{ComputeCredentialKeys, ControlPlaneApi};
use crate::auth::backend::{BackendIpAllowlist, ComputeUserInfo};
use super::ComputeCredentialKeys;
use crate::auth::IpPattern;
use crate::cache::Cached;
use crate::config::AuthenticationConfig;
use crate::context::RequestContext;
use crate::control_plane::{self, client::cplane_proxy_v1, CachedNodeInfo, NodeInfo};
use crate::control_plane::{self, CachedNodeInfo, NodeInfo};
use crate::error::{ReportableError, UserFacingError};
use crate::proxy::connect_compute::ComputeConnectBackend;
use crate::stream::PqStream;
@@ -33,13 +31,6 @@ pub(crate) enum ConsoleRedirectError {
#[derive(Debug)]
pub struct ConsoleRedirectBackend {
console_uri: reqwest::Url,
api: cplane_proxy_v1::NeonControlPlaneClient,
}
impl fmt::Debug for cplane_proxy_v1::NeonControlPlaneClient {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "NeonControlPlaneClient")
}
}
impl UserFacingError for ConsoleRedirectError {
@@ -80,24 +71,9 @@ pub(crate) fn new_psql_session_id() -> String {
hex::encode(rand::random::<[u8; 8]>())
}
#[async_trait]
impl BackendIpAllowlist for ConsoleRedirectBackend {
async fn get_allowed_ips(
&self,
ctx: &RequestContext,
user_info: &ComputeUserInfo,
) -> auth::Result<Vec<auth::IpPattern>> {
self.api
.get_allowed_ips_and_secret(ctx, user_info)
.await
.map(|(ips, _)| ips.as_ref().clone())
.map_err(|e| e.into())
}
}
impl ConsoleRedirectBackend {
pub fn new(console_uri: reqwest::Url, api: cplane_proxy_v1::NeonControlPlaneClient) -> Self {
Self { console_uri, api }
pub fn new(console_uri: reqwest::Url) -> Self {
Self { console_uri }
}
pub(crate) async fn authenticate(

View File

@@ -16,9 +16,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{debug, info, warn};
use crate::auth::credentials::check_peer_addr_is_in_list;
use crate::auth::{
self, validate_password_and_exchange, AuthError, ComputeUserInfoMaybeEndpoint, IpPattern,
};
use crate::auth::{self, validate_password_and_exchange, AuthError, ComputeUserInfoMaybeEndpoint};
use crate::cache::Cached;
use crate::config::AuthenticationConfig;
use crate::context::RequestContext;
@@ -133,7 +131,7 @@ pub(crate) struct ComputeUserInfoNoEndpoint {
pub(crate) options: NeonOptions,
}
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone)]
pub(crate) struct ComputeUserInfo {
pub(crate) endpoint: EndpointId,
pub(crate) user: RoleName,
@@ -246,15 +244,6 @@ impl AuthenticationConfig {
}
}
#[async_trait::async_trait]
pub(crate) trait BackendIpAllowlist {
async fn get_allowed_ips(
&self,
ctx: &RequestContext,
user_info: &ComputeUserInfo,
) -> auth::Result<Vec<auth::IpPattern>>;
}
/// True to its name, this function encapsulates our current auth trade-offs.
/// Here, we choose the appropriate auth flow based on circumstances.
///
@@ -267,7 +256,7 @@ async fn auth_quirks(
allow_cleartext: bool,
config: &'static AuthenticationConfig,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
) -> auth::Result<(ComputeCredentials, Option<Vec<IpPattern>>)> {
) -> auth::Result<ComputeCredentials> {
// If there's no project so far, that entails that client doesn't
// support SNI or other means of passing the endpoint (project) name.
// We now expect to see a very specific payload in the place of password.
@@ -326,7 +315,7 @@ async fn auth_quirks(
)
.await
{
Ok(keys) => Ok((keys, Some(allowed_ips.as_ref().clone()))),
Ok(keys) => Ok(keys),
Err(e) => {
if e.is_password_failed() {
// The password could have been changed, so we invalidate the cache.
@@ -396,7 +385,7 @@ impl<'a> Backend<'a, ComputeUserInfoMaybeEndpoint> {
allow_cleartext: bool,
config: &'static AuthenticationConfig,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
) -> auth::Result<(Backend<'a, ComputeCredentials>, Option<Vec<IpPattern>>)> {
) -> auth::Result<Backend<'a, ComputeCredentials>> {
let res = match self {
Self::ControlPlane(api, user_info) => {
debug!(
@@ -405,7 +394,7 @@ impl<'a> Backend<'a, ComputeUserInfoMaybeEndpoint> {
"performing authentication using the console"
);
let (credentials, ip_allowlist) = auth_quirks(
let credentials = auth_quirks(
ctx,
&*api,
user_info,
@@ -415,7 +404,7 @@ impl<'a> Backend<'a, ComputeUserInfoMaybeEndpoint> {
endpoint_rate_limiter,
)
.await?;
Ok((Backend::ControlPlane(api, credentials), ip_allowlist))
Backend::ControlPlane(api, credentials)
}
Self::Local(_) => {
return Err(auth::AuthError::bad_auth_method("invalid for local proxy"))
@@ -424,7 +413,7 @@ impl<'a> Backend<'a, ComputeUserInfoMaybeEndpoint> {
// TODO: replace with some metric
info!("user successfully authenticated");
res
Ok(res)
}
}
@@ -452,24 +441,6 @@ impl Backend<'_, ComputeUserInfo> {
}
}
#[async_trait::async_trait]
impl BackendIpAllowlist for Backend<'_, ()> {
async fn get_allowed_ips(
&self,
ctx: &RequestContext,
user_info: &ComputeUserInfo,
) -> auth::Result<Vec<auth::IpPattern>> {
let auth_data = match self {
Self::ControlPlane(api, ()) => api.get_allowed_ips_and_secret(ctx, user_info).await,
Self::Local(_) => Ok((Cached::new_uncached(Arc::new(vec![])), None)),
};
auth_data
.map(|(ips, _)| ips.as_ref().clone())
.map_err(|e| e.into())
}
}
#[async_trait::async_trait]
impl ComputeConnectBackend for Backend<'_, ComputeCredentials> {
async fn wake_compute(
@@ -815,7 +786,7 @@ mod tests {
.await
.unwrap();
assert_eq!(creds.0.info.endpoint, "my-endpoint");
assert_eq!(creds.info.endpoint, "my-endpoint");
handle.await.unwrap();
}

View File

@@ -744,59 +744,9 @@ fn build_auth_backend(
}
AuthBackendType::ConsoleRedirect => {
let wake_compute_cache_config: CacheOptions = args.wake_compute_cache.parse()?;
let project_info_cache_config: ProjectInfoCacheOptions =
args.project_info_cache.parse()?;
let endpoint_cache_config: config::EndpointCacheConfig =
args.endpoint_cache_config.parse()?;
let url = args.uri.parse()?;
let backend = ConsoleRedirectBackend::new(url);
info!("Using NodeInfoCache (wake_compute) with options={wake_compute_cache_config:?}");
info!(
"Using AllowedIpsCache (wake_compute) with options={project_info_cache_config:?}"
);
info!("Using EndpointCacheConfig with options={endpoint_cache_config:?}");
let caches = Box::leak(Box::new(control_plane::caches::ApiCaches::new(
wake_compute_cache_config,
project_info_cache_config,
endpoint_cache_config,
)));
let config::ConcurrencyLockOptions {
shards,
limiter,
epoch,
timeout,
} = args.wake_compute_lock.parse()?;
info!(?limiter, shards, ?epoch, "Using NodeLocks (wake_compute)");
let locks = Box::leak(Box::new(control_plane::locks::ApiLocks::new(
"wake_compute_lock",
limiter,
shards,
timeout,
epoch,
&Metrics::get().wake_compute_lock,
)?));
let url = args.uri.clone().parse()?;
let ep_url: proxy::url::ApiUrl = args.auth_endpoint.parse()?;
let endpoint = http::Endpoint::new(ep_url, http::new_client());
let mut wake_compute_rps_limit = args.wake_compute_limit.clone();
RateBucketInfo::validate(&mut wake_compute_rps_limit)?;
let wake_compute_endpoint_rate_limiter =
Arc::new(WakeComputeRateLimiter::new(wake_compute_rps_limit));
// Since we use only get_allowed_ips_and_secret() wake_compute_endpoint_rate_limiter
// and locks are not used in ConsoleRedirectBackend,
// but they are required by the NeonControlPlaneClient
let api = control_plane::client::cplane_proxy_v1::NeonControlPlaneClient::new(
endpoint,
args.control_plane_token.clone(),
caches,
locks,
wake_compute_endpoint_rate_limiter,
);
let backend = ConsoleRedirectBackend::new(url, api);
let config = Box::leak(Box::new(backend));
Ok(Either::Right(config))

View File

@@ -12,10 +12,8 @@ use tokio::sync::Mutex;
use tracing::{debug, info};
use uuid::Uuid;
use crate::auth::backend::{BackendIpAllowlist, ComputeUserInfo};
use crate::auth::{check_peer_addr_is_in_list, AuthError, IpPattern};
use crate::auth::{check_peer_addr_is_in_list, IpPattern};
use crate::config::ComputeConfig;
use crate::context::RequestContext;
use crate::error::ReportableError;
use crate::ext::LockExt;
use crate::metrics::{CancellationRequest, CancellationSource, Metrics};
@@ -58,9 +56,6 @@ pub(crate) enum CancelError {
#[error("IP is not allowed")]
IpNotAllowed,
#[error("Authentication backend error")]
AuthError(#[from] AuthError),
}
impl ReportableError for CancelError {
@@ -73,7 +68,6 @@ impl ReportableError for CancelError {
CancelError::Postgres(_) => crate::error::ErrorKind::Compute,
CancelError::RateLimit => crate::error::ErrorKind::RateLimit,
CancelError::IpNotAllowed => crate::error::ErrorKind::User,
CancelError::AuthError(_) => crate::error::ErrorKind::ControlPlane,
}
}
}
@@ -108,7 +102,10 @@ impl<P: CancellationPublisher> CancellationHandler<P> {
}
}
/// Cancelling only in notification, will be removed
/// Try to cancel a running query for the corresponding connection.
/// If the cancellation key is not found, it will be published to Redis.
/// check_allowed - if true, check if the IP is allowed to cancel the query
/// return Result primarily for tests
pub(crate) async fn cancel_session(
&self,
key: CancelKeyData,
@@ -137,8 +134,7 @@ impl<P: CancellationPublisher> CancellationHandler<P> {
}
// NB: we should immediately release the lock after cloning the token.
let cancel_state = self.map.get(&key).and_then(|x| x.clone());
let Some(cancel_closure) = cancel_state else {
let Some(cancel_closure) = self.map.get(&key).and_then(|x| x.clone()) else {
tracing::warn!("query cancellation key not found: {key}");
Metrics::get()
.proxy
@@ -189,96 +185,6 @@ impl<P: CancellationPublisher> CancellationHandler<P> {
cancel_closure.try_cancel_query(self.compute_config).await
}
/// Try to cancel a running query for the corresponding connection.
/// If the cancellation key is not found, it will be published to Redis.
/// check_allowed - if true, check if the IP is allowed to cancel the query.
/// Will fetch IP allowlist internally.
///
/// return Result primarily for tests
pub(crate) async fn cancel_session_auth<T: BackendIpAllowlist>(
&self,
key: CancelKeyData,
ctx: RequestContext,
check_allowed: bool,
auth_backend: &T,
) -> Result<(), CancelError> {
// TODO: check for unspecified address is only for backward compatibility, should be removed
if !ctx.peer_addr().is_unspecified() {
let subnet_key = match ctx.peer_addr() {
IpAddr::V4(ip) => IpNet::V4(Ipv4Net::new_assert(ip, 24).trunc()), // use defaut mask here
IpAddr::V6(ip) => IpNet::V6(Ipv6Net::new_assert(ip, 64).trunc()),
};
if !self.limiter.lock_propagate_poison().check(subnet_key, 1) {
// log only the subnet part of the IP address to know which subnet is rate limited
tracing::warn!("Rate limit exceeded. Skipping cancellation message, {subnet_key}");
Metrics::get()
.proxy
.cancellation_requests_total
.inc(CancellationRequest {
source: self.from,
kind: crate::metrics::CancellationOutcome::RateLimitExceeded,
});
return Err(CancelError::RateLimit);
}
}
// NB: we should immediately release the lock after cloning the token.
let cancel_state = self.map.get(&key).and_then(|x| x.clone());
let Some(cancel_closure) = cancel_state else {
tracing::warn!("query cancellation key not found: {key}");
Metrics::get()
.proxy
.cancellation_requests_total
.inc(CancellationRequest {
source: self.from,
kind: crate::metrics::CancellationOutcome::NotFound,
});
if ctx.session_id() == Uuid::nil() {
// was already published, do not publish it again
return Ok(());
}
match self
.client
.try_publish(key, ctx.session_id(), ctx.peer_addr())
.await
{
Ok(()) => {} // do nothing
Err(e) => {
// log it here since cancel_session could be spawned in a task
tracing::error!("failed to publish cancellation key: {key}, error: {e}");
return Err(CancelError::IO(std::io::Error::new(
std::io::ErrorKind::Other,
e.to_string(),
)));
}
}
return Ok(());
};
let ip_allowlist = auth_backend
.get_allowed_ips(&ctx, &cancel_closure.user_info)
.await
.map_err(CancelError::AuthError)?;
if check_allowed && !check_peer_addr_is_in_list(&ctx.peer_addr(), &ip_allowlist) {
// log it here since cancel_session could be spawned in a task
tracing::warn!("IP is not allowed to cancel the query: {key}");
return Err(CancelError::IpNotAllowed);
}
Metrics::get()
.proxy
.cancellation_requests_total
.inc(CancellationRequest {
source: self.from,
kind: crate::metrics::CancellationOutcome::Found,
});
info!("cancelling query per user's request using key {key}");
cancel_closure.try_cancel_query(self.compute_config).await
}
#[cfg(test)]
fn contains(&self, session: &Session<P>) -> bool {
self.map.contains_key(&session.key)
@@ -342,7 +248,6 @@ pub struct CancelClosure {
cancel_token: CancelToken,
ip_allowlist: Vec<IpPattern>,
hostname: String, // for pg_sni router
user_info: ComputeUserInfo,
}
impl CancelClosure {
@@ -351,14 +256,12 @@ impl CancelClosure {
cancel_token: CancelToken,
ip_allowlist: Vec<IpPattern>,
hostname: String,
user_info: ComputeUserInfo,
) -> Self {
Self {
socket_addr,
cancel_token,
ip_allowlist,
hostname,
user_info,
}
}
/// Cancels the query running on user's compute node.
@@ -385,8 +288,6 @@ impl CancelClosure {
debug!("query was cancelled");
Ok(())
}
/// Obsolete (will be removed after moving CancelMap to Redis), only for notifications
pub(crate) fn set_ip_allowlist(&mut self, ip_allowlist: Vec<IpPattern>) {
self.ip_allowlist = ip_allowlist;
}

View File

@@ -13,7 +13,6 @@ use thiserror::Error;
use tokio::net::TcpStream;
use tracing::{debug, error, info, warn};
use crate::auth::backend::ComputeUserInfo;
use crate::auth::parse_endpoint_param;
use crate::cancellation::CancelClosure;
use crate::config::ComputeConfig;
@@ -24,10 +23,8 @@ use crate::control_plane::messages::MetricsAuxInfo;
use crate::error::{ReportableError, UserFacingError};
use crate::metrics::{Metrics, NumDbConnectionsGuard};
use crate::proxy::neon_option;
use crate::proxy::NeonOptions;
use crate::tls::postgres_rustls::MakeRustlsConnect;
use crate::types::Host;
use crate::types::{EndpointId, RoleName};
pub const COULD_NOT_CONNECT: &str = "Couldn't connect to compute node";
@@ -287,28 +284,6 @@ impl ConnCfg {
self.0.get_ssl_mode()
);
let compute_info = match parameters.get("user") {
Some(user) => {
match parameters.get("database") {
Some(database) => {
ComputeUserInfo {
user: RoleName::from(user),
options: NeonOptions::default(), // just a shim, we don't need options
endpoint: EndpointId::from(database),
}
}
None => {
warn!("compute node didn't return database name");
ComputeUserInfo::default()
}
}
}
None => {
warn!("compute node didn't return user name");
ComputeUserInfo::default()
}
};
// NB: CancelToken is supposed to hold socket_addr, but we use connect_raw.
// Yet another reason to rework the connection establishing code.
let cancel_closure = CancelClosure::new(
@@ -319,9 +294,8 @@ impl ConnCfg {
process_id,
secret_key,
},
vec![], // TODO: deprecated, will be removed
vec![],
host.to_string(),
compute_info,
);
let connection = PostgresConnection {

View File

@@ -159,7 +159,6 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
let request_gauge = metrics.connection_requests.guard(proto);
let tls = config.tls_config.as_ref();
let record_handshake_error = !ctx.has_private_peer_addr();
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Client);
let do_handshake = handshake(ctx, stream, tls, record_handshake_error);
@@ -172,20 +171,23 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
// spawn a task to cancel the session, but don't wait for it
cancellations.spawn({
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
let ctx = ctx.clone();
let cancel_span = tracing::span!(parent: None, tracing::Level::INFO, "cancel_session", session_id = ?ctx.session_id());
let session_id = ctx.session_id();
let peer_ip = ctx.peer_addr();
let cancel_span = tracing::span!(parent: None, tracing::Level::INFO, "cancel_session", session_id = ?session_id);
cancel_span.follows_from(tracing::Span::current());
async move {
cancellation_handler_clone
.cancel_session_auth(
cancel_key_data,
ctx,
config.authentication_config.ip_allowlist_check_enabled,
backend,
)
.await
.inspect_err(|e | debug!(error = ?e, "cancel_session failed")).ok();
}.instrument(cancel_span)
drop(
cancellation_handler_clone
.cancel_session(
cancel_key_data,
session_id,
peer_ip,
config.authentication_config.ip_allowlist_check_enabled,
)
.instrument(cancel_span)
.await,
);
}
});
return Ok(None);

View File

@@ -29,7 +29,7 @@ use crate::rate_limiter::WakeComputeRateLimiter;
use crate::types::{EndpointCacheKey, EndpointId};
use crate::{compute, http, scram};
pub(crate) const X_REQUEST_ID: HeaderName = HeaderName::from_static("x-request-id");
const X_REQUEST_ID: HeaderName = HeaderName::from_static("x-request-id");
#[derive(Clone)]
pub struct NeonControlPlaneClient {
@@ -78,30 +78,15 @@ impl NeonControlPlaneClient {
info!("endpoint is not valid, skipping the request");
return Ok(AuthInfo::default());
}
self.do_get_auth_req(user_info, &ctx.session_id(), Some(ctx))
.await
}
async fn do_get_auth_req(
&self,
user_info: &ComputeUserInfo,
session_id: &uuid::Uuid,
ctx: Option<&RequestContext>,
) -> Result<AuthInfo, GetAuthInfoError> {
let request_id: String = session_id.to_string();
let application_name = if let Some(ctx) = ctx {
ctx.console_application_name()
} else {
"auth_cancellation".to_string()
};
let request_id = ctx.session_id().to_string();
let application_name = ctx.console_application_name();
async {
let request = self
.endpoint
.get_path("get_endpoint_access_control")
.header(X_REQUEST_ID, &request_id)
.header(AUTHORIZATION, format!("Bearer {}", &self.jwt))
.query(&[("session_id", session_id)])
.query(&[("session_id", ctx.session_id())])
.query(&[
("application_name", application_name.as_str()),
("endpointish", user_info.endpoint.as_str()),
@@ -111,16 +96,9 @@ impl NeonControlPlaneClient {
debug!(url = request.url().as_str(), "sending http request");
let start = Instant::now();
let response = match ctx {
Some(ctx) => {
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Cplane);
let rsp = self.endpoint.execute(request).await;
drop(pause);
rsp?
}
None => self.endpoint.execute(request).await?,
};
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Cplane);
let response = self.endpoint.execute(request).await?;
drop(pause);
info!(duration = ?start.elapsed(), "received http response");
let body = match parse_body::<GetEndpointAccessControl>(response).await {
Ok(body) => body,

View File

@@ -273,20 +273,23 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
// spawn a task to cancel the session, but don't wait for it
cancellations.spawn({
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
let ctx = ctx.clone();
let cancel_span = tracing::span!(parent: None, tracing::Level::INFO, "cancel_session", session_id = ?ctx.session_id());
let session_id = ctx.session_id();
let peer_ip = ctx.peer_addr();
let cancel_span = tracing::span!(parent: None, tracing::Level::INFO, "cancel_session", session_id = ?session_id);
cancel_span.follows_from(tracing::Span::current());
async move {
cancellation_handler_clone
.cancel_session_auth(
cancel_key_data,
ctx,
config.authentication_config.ip_allowlist_check_enabled,
auth_backend,
)
.await
.inspect_err(|e | debug!(error = ?e, "cancel_session failed")).ok();
}.instrument(cancel_span)
drop(
cancellation_handler_clone
.cancel_session(
cancel_key_data,
session_id,
peer_ip,
config.authentication_config.ip_allowlist_check_enabled,
)
.instrument(cancel_span)
.await,
);
}
});
return Ok(None);
@@ -312,7 +315,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
};
let user = user_info.get_user().to_owned();
let (user_info, ip_allowlist) = match user_info
let user_info = match user_info
.authenticate(
ctx,
&mut stream,
@@ -353,8 +356,6 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
.or_else(|e| stream.throw_error(e))
.await?;
node.cancel_closure
.set_ip_allowlist(ip_allowlist.unwrap_or_default());
let session = cancellation_handler.get_session();
prepare_client_connection(&node, &session, &mut stream).await?;

View File

@@ -37,6 +37,7 @@ struct NotificationHeader<'a> {
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
#[serde(tag = "topic", content = "data")]
// Message to contributors: Make sure to align these topic names with the list below.
pub(crate) enum Notification {
#[serde(
rename = "/allowed_ips_updated",
@@ -73,9 +74,21 @@ pub(crate) enum Notification {
PasswordUpdate { password_update: PasswordUpdate },
#[serde(rename = "/cancel_session")]
Cancel(CancelSession),
}
#[serde(other, skip_serializing)]
UnknownTopic,
/// Returns true if the topic name given is a known topic that we can deserialize and action on.
/// Returns false otherwise.
fn known_topic(s: &str) -> bool {
// Message to contributors: Make sure to align these topic names with the enum above.
matches!(
s,
"/allowed_ips_updated"
| "/block_public_or_vpc_access_updated"
| "/allowed_vpc_endpoints_updated_for_org"
| "/allowed_vpc_endpoints_updated_for_projects"
| "/password_updated"
| "/cancel_session"
)
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
@@ -165,29 +178,32 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
let payload: String = msg.get_payload()?;
tracing::debug!(?payload, "received a message payload");
let msg: Notification = match serde_json::from_str(&payload) {
Ok(Notification::UnknownTopic) => {
match serde_json::from_str::<NotificationHeader>(&payload) {
// don't update the metric for redis errors if it's just a topic we don't know about.
Ok(header) => tracing::warn!(topic = header.topic, "unknown topic"),
Err(e) => {
Metrics::get().proxy.redis_errors_total.inc(RedisErrors {
channel: msg.get_channel_name(),
});
tracing::error!("broken message: {e}");
}
};
return Ok(());
}
// For better error handling, we first parse the payload to extract the topic.
// If there's a topic we don't support, we can handle that error more gracefully.
let header: NotificationHeader = match serde_json::from_str(&payload) {
Ok(msg) => msg,
Err(e) => {
Metrics::get().proxy.redis_errors_total.inc(RedisErrors {
channel: msg.get_channel_name(),
});
match serde_json::from_str::<NotificationHeader>(&payload) {
Ok(header) => tracing::error!(topic = header.topic, "broken message: {e}"),
Err(_) => tracing::error!("broken message: {e}"),
};
tracing::error!("broken message: {e}");
return Ok(());
}
};
if !known_topic(header.topic) {
// don't update the metric for redis errors if it's just a topic we don't know about.
tracing::warn!(topic = header.topic, "unknown topic");
return Ok(());
}
let msg: Notification = match serde_json::from_str(&payload) {
Ok(msg) => msg,
Err(e) => {
Metrics::get().proxy.redis_errors_total.inc(RedisErrors {
channel: msg.get_channel_name(),
});
tracing::error!(topic = header.topic, "broken message: {e}");
return Ok(());
}
};
@@ -262,8 +278,6 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
invalidate_cache(cache, msg);
});
}
Notification::UnknownTopic => unreachable!(),
}
Ok(())
@@ -290,7 +304,6 @@ fn invalidate_cache<C: ProjectInfoCache>(cache: Arc<C>, msg: Notification) {
Notification::AllowedVpcEndpointsUpdatedForProjects { .. } => {
// https://github.com/neondatabase/neon/pull/10073
}
Notification::UnknownTopic => unreachable!(),
}
}

View File

@@ -51,10 +51,12 @@ use utils::{
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
/// Configure jemalloc to sample allocations for profiles every 1 MB (1 << 20).
#[allow(non_upper_case_globals)]
#[export_name = "malloc_conf"]
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:20\0";
// Configure jemalloc to sample allocations for profiles every 1 MB (1 << 20).
// TODO: disabled because concurrent CPU profiles cause seg faults. See:
// https://github.com/neondatabase/neon/issues/10225.
//#[allow(non_upper_case_globals)]
//#[export_name = "malloc_conf"]
//pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:20\0";
const PID_FILE_NAME: &str = "safekeeper.pid";
const ID_FILE_NAME: &str = "safekeeper.id";
@@ -254,7 +256,7 @@ async fn main() -> anyhow::Result<()> {
// 1. init logging
// 2. tracing panic hook
// 3. sentry
logging::init(
let _guard = logging::init(
LogFormat::from_config(&args.log_format)?,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stdout,

View File

@@ -636,7 +636,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. init logging
// 2. tracing panic hook
// 3. sentry
logging::init(
let _guard = logging::init(
LogFormat::from_config(&args.log_format)?,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stdout,

View File

@@ -112,14 +112,6 @@ where
}
}
pub(crate) fn try_exclusive(&self, key: T, operation: I) -> Option<TracingExclusiveGuard<I>> {
let mut locked = self.entities.lock().unwrap();
let entry = locked.entry(key).or_default().clone();
let mut guard = TracingExclusiveGuard::new(entry.try_write_owned().ok()?);
*guard.guard = Some(operation);
Some(guard)
}
/// Rather than building a lock guard that re-takes the [`Self::entities`] lock, we just do
/// periodic housekeeping to avoid the map growing indefinitely
pub(crate) fn housekeeping(&self) {

View File

@@ -188,7 +188,7 @@ impl Secrets {
}
fn main() -> anyhow::Result<()> {
logging::init(
let _guard = logging::init(
LogFormat::Plain,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stdout,
@@ -221,6 +221,12 @@ fn main() -> anyhow::Result<()> {
async fn async_main() -> anyhow::Result<()> {
let launch_ts = Box::leak(Box::new(LaunchTimestamp::generate()));
let _guard = logging::init(
LogFormat::Plain,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stdout,
)?;
preinitialize_metrics();
let args = Cli::parse();

View File

@@ -97,7 +97,6 @@ pub(crate) enum DatabaseOperation {
TenantGenerations,
ShardGenerations,
ListTenantShards,
LoadTenant,
InsertTenantShards,
UpdateTenantShard,
DeleteTenant,
@@ -331,40 +330,11 @@ impl Persistence {
/// At startup, load the high level state for shards, such as their config + policy. This will
/// be enriched at runtime with state discovered on pageservers.
///
/// We exclude shards configured to be detached. During startup, if we see any attached locations
/// for such shards, they will automatically be detached as 'orphans'.
pub(crate) async fn load_active_tenant_shards(
&self,
) -> DatabaseResult<Vec<TenantShardPersistence>> {
use crate::schema::tenant_shards::dsl::*;
pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult<Vec<TenantShardPersistence>> {
self.with_measured_conn(
DatabaseOperation::ListTenantShards,
move |conn| -> DatabaseResult<_> {
let query = tenant_shards.filter(
placement_policy.ne(serde_json::to_string(&PlacementPolicy::Detached).unwrap()),
);
let result = query.load::<TenantShardPersistence>(conn)?;
Ok(result)
},
)
.await
}
/// When restoring a previously detached tenant into memory, load it from the database
pub(crate) async fn load_tenant(
&self,
filter_tenant_id: TenantId,
) -> DatabaseResult<Vec<TenantShardPersistence>> {
use crate::schema::tenant_shards::dsl::*;
self.with_measured_conn(
DatabaseOperation::LoadTenant,
move |conn| -> DatabaseResult<_> {
let query = tenant_shards.filter(tenant_id.eq(filter_tenant_id.to_string()));
let result = query.load::<TenantShardPersistence>(conn)?;
Ok(result)
Ok(crate::schema::tenant_shards::table.load::<TenantShardPersistence>(conn)?)
},
)
.await

View File

@@ -14,6 +14,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio_util::sync::CancellationToken;
use utils::backoff::exponential_backoff;
use utils::failpoint_support;
use utils::generation::Generation;
use utils::id::{NodeId, TimelineId};
use utils::lsn::Lsn;
@@ -211,12 +212,11 @@ impl Reconciler {
lazy: bool,
) -> Result<(), ReconcileError> {
if !node.is_available() && config.mode == LocationConfigMode::Detached {
// [`crate::service::Service::node_activate_reconcile`] will update the observed state
// when the node comes back online. At that point, the intent and observed states will
// be mismatched and a background reconciliation will detach.
tracing::info!(
"Node {node} is unavailable during detach: proceeding anyway, it will be detached via background reconciliation"
);
// Attempts to detach from offline nodes may be imitated without doing I/O: a node which is offline
// will get fully reconciled wrt the shard's intent state when it is reactivated, irrespective of
// what we put into `observed`, in [`crate::service::Service::node_activate_reconcile`]
tracing::info!("Node {node} is unavailable during detach: proceeding anyway, it will be detached on next activation");
self.observed.locations.remove(&node.get_id());
return Ok(());
}
@@ -749,8 +749,6 @@ impl Reconciler {
};
if increment_generation {
pausable_failpoint!("reconciler-pre-increment-generation");
let generation = self
.persistence
.increment_generation(self.tenant_shard_id, node.get_id())
@@ -826,7 +824,7 @@ impl Reconciler {
.handle_detach(self.tenant_shard_id, self.shard.stripe_size);
}
pausable_failpoint!("reconciler-epilogue");
failpoint_support::sleep_millis_async!("sleep-on-reconcile-epilogue");
Ok(())
}

View File

@@ -83,7 +83,6 @@ use utils::{
generation::Generation,
http::error::ApiError,
id::{NodeId, TenantId, TimelineId},
pausable_failpoint,
sync::gate::Gate,
};
@@ -155,7 +154,6 @@ enum TenantOperations {
TimelineArchivalConfig,
TimelineDetachAncestor,
TimelineGcBlockUnblock,
DropDetached,
}
#[derive(Clone, strum_macros::Display)]
@@ -417,8 +415,8 @@ pub struct Service {
/// Queue of tenants who are waiting for concurrency limits to permit them to reconcile
/// Send into this queue to promptly attempt to reconcile this shard next time units are available.
///
/// Note that this state logically lives inside ServiceState, but carrying Sender here makes the code simpler
/// by avoiding needing a &mut ref to something inside the ServiceState. This could be optimized to
/// Note that this state logically lives inside ServiceInner, but carrying Sender here makes the code simpler
/// by avoiding needing a &mut ref to something inside the ServiceInner. This could be optimized to
/// use a VecDeque instead of a channel to reduce synchronization overhead, at the cost of some code complexity.
delayed_reconcile_tx: tokio::sync::mpsc::Sender<TenantShardId>,
@@ -1026,8 +1024,6 @@ impl Service {
)
.await;
pausable_failpoint!("heartbeat-pre-node-state-configure");
// This is the code path for geniune availability transitions (i.e node
// goes unavailable and/or comes back online).
let res = self
@@ -1166,20 +1162,6 @@ impl Service {
}
}
// If we just finished detaching all shards for a tenant, it might be time to drop it from memory.
if tenant.policy == PlacementPolicy::Detached {
// We may only drop a tenant from memory while holding the exclusive lock on the tenant ID: this protects us
// from concurrent execution wrt a request handler that might expect the tenant to remain in memory for the
// duration of the request.
let guard = self.tenant_op_locks.try_exclusive(
tenant.tenant_shard_id.tenant_id,
TenantOperations::DropDetached,
);
if let Some(guard) = guard {
self.maybe_drop_tenant(tenant.tenant_shard_id.tenant_id, &mut locked, &guard);
}
}
// Maybe some other work can proceed now that this job finished.
if self.reconciler_concurrency.available_permits() > 0 {
while let Ok(tenant_shard_id) = locked.delayed_reconcile_rx.try_recv() {
@@ -1309,7 +1291,7 @@ impl Service {
.set(nodes.len() as i64);
tracing::info!("Loading shards from database...");
let mut tenant_shard_persistence = persistence.load_active_tenant_shards().await?;
let mut tenant_shard_persistence = persistence.list_tenant_shards().await?;
tracing::info!(
"Loaded {} shards from database.",
tenant_shard_persistence.len()
@@ -1561,14 +1543,8 @@ impl Service {
// the pageserver API (not via this service), we will auto-create any missing tenant
// shards with default state.
let insert = {
match self
.maybe_load_tenant(attach_req.tenant_shard_id.tenant_id, &_tenant_lock)
.await
{
Ok(_) => false,
Err(ApiError::NotFound(_)) => true,
Err(e) => return Err(e.into()),
}
let locked = self.inner.write().unwrap();
!locked.tenants.contains_key(&attach_req.tenant_shard_id)
};
if insert {
@@ -2460,99 +2436,6 @@ impl Service {
}
}
/// For APIs that might act on tenants with [`PlacementPolicy::Detached`], first check if
/// the tenant is present in memory. If not, load it from the database. If it is found
/// in neither location, return a NotFound error.
///
/// Caller must demonstrate they hold a lock guard, as otherwise two callers might try and load
/// it at the same time, or we might race with [`Self::maybe_drop_tenant`]
async fn maybe_load_tenant(
&self,
tenant_id: TenantId,
_guard: &TracingExclusiveGuard<TenantOperations>,
) -> Result<(), ApiError> {
let present_in_memory = {
let locked = self.inner.read().unwrap();
locked
.tenants
.range(TenantShardId::tenant_range(tenant_id))
.next()
.is_some()
};
if present_in_memory {
return Ok(());
}
let tenant_shards = self.persistence.load_tenant(tenant_id).await?;
if tenant_shards.is_empty() {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant {} not found", tenant_id).into(),
));
}
// TODO: choose a fresh AZ to use for this tenant when un-detaching: there definitely isn't a running
// compute, so no benefit to making AZ sticky across detaches.
let mut locked = self.inner.write().unwrap();
tracing::info!(
"Loaded {} shards for tenant {}",
tenant_shards.len(),
tenant_id
);
locked.tenants.extend(tenant_shards.into_iter().map(|p| {
let intent = IntentState::new();
let shard =
TenantShard::from_persistent(p, intent).expect("Corrupt shard row in database");
// Sanity check: when loading on-demand, we should always be loaded something Detached
debug_assert!(shard.policy == PlacementPolicy::Detached);
if shard.policy != PlacementPolicy::Detached {
tracing::error!(
"Tenant shard {} loaded on-demand, but has non-Detached policy {:?}",
shard.tenant_shard_id,
shard.policy
);
}
(shard.tenant_shard_id, shard)
}));
Ok(())
}
/// If all shards for a tenant are detached, and in a fully quiescent state (no observed locations on pageservers),
/// and have no reconciler running, then we can drop the tenant from memory. It will be reloaded on-demand
/// if we are asked to attach it again (see [`Self::maybe_load_tenant`]).
///
/// Caller must demonstrate they hold a lock guard, as otherwise it is unsafe to drop a tenant from
/// memory while some other function might assume it continues to exist while not holding the lock on Self::inner.
fn maybe_drop_tenant(
&self,
tenant_id: TenantId,
locked: &mut std::sync::RwLockWriteGuard<ServiceState>,
_guard: &TracingExclusiveGuard<TenantOperations>,
) {
let mut tenant_shards = locked.tenants.range(TenantShardId::tenant_range(tenant_id));
if tenant_shards.all(|(_id, shard)| {
shard.policy == PlacementPolicy::Detached
&& shard.reconciler.is_none()
&& shard.observed.is_empty()
}) {
let keys = locked
.tenants
.range(TenantShardId::tenant_range(tenant_id))
.map(|(id, _)| id)
.copied()
.collect::<Vec<_>>();
for key in keys {
tracing::info!("Dropping detached tenant shard {} from memory", key);
locked.tenants.remove(&key);
}
}
}
/// This API is used by the cloud control plane to migrate unsharded tenants that it created
/// directly with pageservers into this service.
///
@@ -2579,26 +2462,14 @@ impl Service {
)
.await;
let tenant_id = if !tenant_shard_id.is_unsharded() {
if !tenant_shard_id.is_unsharded() {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"This API is for importing single-sharded or unsharded tenants"
)));
} else {
tenant_shard_id.tenant_id
};
// In case we are waking up a Detached tenant
match self.maybe_load_tenant(tenant_id, &_tenant_lock).await {
Ok(()) | Err(ApiError::NotFound(_)) => {
// This is a creation or an update
}
Err(e) => {
return Err(e);
}
};
}
// First check if this is a creation or an update
let create_or_update = self.tenant_location_config_prepare(tenant_id, req);
let create_or_update = self.tenant_location_config_prepare(tenant_shard_id.tenant_id, req);
let mut result = TenantLocationConfigResponse {
shards: Vec::new(),
@@ -2621,7 +2492,6 @@ impl Service {
// Persist updates
// Ordering: write to the database before applying changes in-memory, so that
// we will not appear time-travel backwards on a restart.
let mut schedule_context = ScheduleContext::default();
for ShardUpdate {
tenant_shard_id,
@@ -2726,8 +2596,6 @@ impl Service {
let tenant_id = req.tenant_id;
let patch = req.config;
self.maybe_load_tenant(tenant_id, &_tenant_lock).await?;
let base = {
let locked = self.inner.read().unwrap();
let shards = locked
@@ -2772,7 +2640,19 @@ impl Service {
)
.await;
self.maybe_load_tenant(req.tenant_id, &_tenant_lock).await?;
let tenant_exists = {
let locked = self.inner.read().unwrap();
let mut r = locked
.tenants
.range(TenantShardId::tenant_range(req.tenant_id));
r.next().is_some()
};
if !tenant_exists {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant {} not found", req.tenant_id).into(),
));
}
self.set_tenant_config_and_reconcile(req.tenant_id, req.config)
.await
@@ -3065,8 +2945,6 @@ impl Service {
let _tenant_lock =
trace_exclusive_lock(&self.tenant_op_locks, tenant_id, TenantOperations::Delete).await;
self.maybe_load_tenant(tenant_id, &_tenant_lock).await?;
// Detach all shards. This also deletes local pageserver shard data.
let (detach_waiters, node) = {
let mut detach_waiters = Vec::new();
@@ -3186,8 +3064,6 @@ impl Service {
)
.await;
self.maybe_load_tenant(tenant_id, &_tenant_lock).await?;
failpoint_support::sleep_millis_async!("tenant-update-policy-exclusive-lock");
let TenantPolicyRequest {
@@ -5270,13 +5146,11 @@ impl Service {
)));
}
let mut persistent_shards = self.persistence.load_active_tenant_shards().await?;
persistent_shards
.sort_by_key(|tsp| (tsp.tenant_id.clone(), tsp.shard_number, tsp.shard_count));
let mut shards = self.persistence.list_tenant_shards().await?;
shards.sort_by_key(|tsp| (tsp.tenant_id.clone(), tsp.shard_number, tsp.shard_count));
expect_shards.sort_by_key(|tsp| (tsp.tenant_id.clone(), tsp.shard_number, tsp.shard_count));
if persistent_shards != expect_shards {
if shards != expect_shards {
tracing::error!("Consistency check failed on shards.");
tracing::error!(
"Shards in memory: {}",
@@ -5285,7 +5159,7 @@ impl Service {
);
tracing::error!(
"Shards in database: {}",
serde_json::to_string(&persistent_shards)
serde_json::to_string(&shards)
.map_err(|e| ApiError::InternalServerError(e.into()))?
);
return Err(ApiError::InternalServerError(anyhow::anyhow!(
@@ -6241,10 +6115,6 @@ impl Service {
let mut pending_reconciles = 0;
let mut az_violations = 0;
// If we find any tenants to drop from memory, stash them to offload after
// we're done traversing the map of tenants.
let mut drop_detached_tenants = Vec::new();
let mut reconciles_spawned = 0;
for shard in tenants.values_mut() {
// Accumulate scheduling statistics
@@ -6278,25 +6148,6 @@ impl Service {
// Shard wanted to reconcile but for some reason couldn't.
pending_reconciles += 1;
}
// If this tenant is detached, try dropping it from memory. This is usually done
// proactively in [`Self::process_results`], but we do it here to handle the edge
// case where a reconcile completes while someone else is holding an op lock for the tenant.
if shard.tenant_shard_id.shard_number == ShardNumber(0)
&& shard.policy == PlacementPolicy::Detached
{
if let Some(guard) = self.tenant_op_locks.try_exclusive(
shard.tenant_shard_id.tenant_id,
TenantOperations::DropDetached,
) {
drop_detached_tenants.push((shard.tenant_shard_id.tenant_id, guard));
}
}
}
// Process any deferred tenant drops
for (tenant_id, guard) in drop_detached_tenants {
self.maybe_drop_tenant(tenant_id, &mut locked, &guard);
}
metrics::METRICS_REGISTRY

View File

@@ -465,10 +465,6 @@ impl ObservedState {
locations: HashMap::new(),
}
}
pub(crate) fn is_empty(&self) -> bool {
self.locations.is_empty()
}
}
impl TenantShard {

View File

@@ -1,5 +1,4 @@
use std::collections::{HashMap, HashSet};
use std::time::SystemTime;
use itertools::Itertools;
use pageserver::tenant::checks::check_valid_layermap;
@@ -89,14 +88,9 @@ pub(crate) async fn branch_cleanup_and_check_errors(
match s3_data.blob_data {
BlobDataParseResult::Parsed {
index_part,
index_part_generation: _,
s3_layers: _,
index_part_last_modified_time,
index_part_snapshot_time,
index_part_generation: _index_part_generation,
s3_layers: _s3_layers,
} => {
// Ignore missing file error if index_part downloaded is different from the one when listing the layer files.
let ignore_error = index_part_snapshot_time < index_part_last_modified_time
&& !cfg!(debug_assertions);
if !IndexPart::KNOWN_VERSIONS.contains(&index_part.version()) {
result
.errors
@@ -177,7 +171,7 @@ pub(crate) async fn branch_cleanup_and_check_errors(
is_l0,
);
if is_l0 || ignore_error {
if is_l0 {
result.warnings.push(msg);
} else {
result.errors.push(msg);
@@ -314,8 +308,6 @@ pub(crate) enum BlobDataParseResult {
Parsed {
index_part: Box<IndexPart>,
index_part_generation: Generation,
index_part_last_modified_time: SystemTime,
index_part_snapshot_time: SystemTime,
s3_layers: HashSet<(LayerName, Generation)>,
},
/// The remains of an uncleanly deleted Timeline or aborted timeline creation(e.g. an initdb archive only, or some layer without an index)
@@ -492,9 +484,9 @@ async fn list_timeline_blobs_impl(
}
if let Some(index_part_object_key) = index_part_object.as_ref() {
let (index_part_bytes, index_part_last_modified_time) =
let index_part_bytes =
match download_object_with_retries(remote_client, &index_part_object_key.key).await {
Ok(data) => data,
Ok(index_part_bytes) => index_part_bytes,
Err(e) => {
// It is possible that the branch gets deleted in-between we list the objects
// and we download the index part file.
@@ -508,7 +500,7 @@ async fn list_timeline_blobs_impl(
));
}
};
let index_part_snapshot_time = index_part_object_key.last_modified;
match serde_json::from_slice(&index_part_bytes) {
Ok(index_part) => {
return Ok(ListTimelineBlobsResult::Ready(RemoteTimelineBlobData {
@@ -516,8 +508,6 @@ async fn list_timeline_blobs_impl(
index_part: Box::new(index_part),
index_part_generation,
s3_layers,
index_part_last_modified_time,
index_part_snapshot_time,
},
unused_index_keys: index_part_keys,
unknown_keys,
@@ -635,7 +625,7 @@ pub(crate) async fn list_tenant_manifests(
let manifest_bytes =
match download_object_with_retries(remote_client, &latest_listing_object.key).await {
Ok((bytes, _)) => bytes,
Ok(bytes) => bytes,
Err(e) => {
// It is possible that the tenant gets deleted in-between we list the objects
// and we download the manifest file.

View File

@@ -13,7 +13,7 @@ pub mod tenant_snapshot;
use std::env;
use std::fmt::Display;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use std::time::Duration;
use anyhow::Context;
use aws_config::retry::{RetryConfigBuilder, RetryMode};
@@ -509,11 +509,10 @@ async fn list_objects_with_retries(
panic!("MAX_RETRIES is not allowed to be 0");
}
/// Returns content, last modified time
async fn download_object_with_retries(
remote_client: &GenericRemoteStorage,
key: &RemotePath,
) -> anyhow::Result<(Vec<u8>, SystemTime)> {
) -> anyhow::Result<Vec<u8>> {
let cancel = CancellationToken::new();
for trial in 0..MAX_RETRIES {
let mut buf = Vec::new();
@@ -536,7 +535,7 @@ async fn download_object_with_retries(
{
Ok(bytes_read) => {
tracing::debug!("Downloaded {bytes_read} bytes for object {key}");
return Ok((buf, download.last_modified));
return Ok(buf);
}
Err(e) => {
error!("Failed to stream object body for key {key}: {e}");

View File

@@ -450,8 +450,6 @@ async fn gc_ancestor(
index_part: _,
index_part_generation: _,
s3_layers,
index_part_last_modified_time: _,
index_part_snapshot_time: _,
} => s3_layers,
BlobDataParseResult::Relic => {
// Post-deletion tenant location: don't try and GC it.
@@ -588,9 +586,7 @@ async fn gc_timeline(
BlobDataParseResult::Parsed {
index_part,
index_part_generation,
s3_layers: _,
index_part_last_modified_time: _,
index_part_snapshot_time: _,
s3_layers: _s3_layers,
} => (index_part, *index_part_generation, data.unused_index_keys),
BlobDataParseResult::Relic => {
// Post-deletion tenant location: don't try and GC it.

View File

@@ -47,8 +47,6 @@ impl MetadataSummary {
index_part,
index_part_generation: _,
s3_layers: _,
index_part_last_modified_time: _,
index_part_snapshot_time: _,
} = &data.blob_data
{
*self
@@ -197,9 +195,7 @@ pub async fn scan_pageserver_metadata(
if let BlobDataParseResult::Parsed {
index_part,
index_part_generation,
s3_layers: _,
index_part_last_modified_time: _,
index_part_snapshot_time: _,
s3_layers: _s3_layers,
} = &data.blob_data
{
if index_part.deleted_at.is_some() {
@@ -322,11 +318,9 @@ pub async fn scan_pageserver_metadata(
match &data.blob_data {
BlobDataParseResult::Parsed {
index_part: _,
index_part: _index_part,
index_part_generation: _index_part_generation,
s3_layers,
index_part_last_modified_time: _,
index_part_snapshot_time: _,
} => {
tenant_objects.push(ttid, s3_layers.clone());
}

View File

@@ -268,8 +268,6 @@ impl SnapshotDownloader {
index_part,
index_part_generation,
s3_layers: _,
index_part_last_modified_time: _,
index_part_snapshot_time: _,
} => {
self.download_timeline(
ttid,

View File

@@ -2521,7 +2521,6 @@ class NeonPageserver(PgProtocol, LogUtils):
self,
extra_env_vars: dict[str, str] | None = None,
timeout_in_seconds: int | None = None,
await_active: bool = True,
) -> Self:
"""
Start the page server.
@@ -2548,10 +2547,8 @@ class NeonPageserver(PgProtocol, LogUtils):
)
self.running = True
if (
await_active
and self.env.storage_controller.running
and self.env.storage_controller.node_registered(self.id)
if self.env.storage_controller.running and self.env.storage_controller.node_registered(
self.id
):
self.env.storage_controller.poll_node_status(
self.id, PageserverAvailability.ACTIVE, None, max_attempts=200, backoff=0.1
@@ -4933,30 +4930,13 @@ def check_restored_datadir_content(
assert (mismatch, error) == ([], [])
def logical_replication_sync(
subscriber: PgProtocol,
publisher: PgProtocol,
sub_dbname: str | None = None,
pub_dbname: str | None = None,
) -> Lsn:
def logical_replication_sync(subscriber: PgProtocol, publisher: PgProtocol) -> Lsn:
"""Wait logical replication subscriber to sync with publisher."""
if pub_dbname is not None:
publisher_lsn = Lsn(
publisher.safe_psql("SELECT pg_current_wal_flush_lsn()", dbname=pub_dbname)[0][0]
)
else:
publisher_lsn = Lsn(publisher.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
publisher_lsn = Lsn(publisher.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
while True:
if sub_dbname is not None:
res = subscriber.safe_psql(
"select latest_end_lsn from pg_catalog.pg_stat_subscription", dbname=sub_dbname
)[0][0]
else:
res = subscriber.safe_psql(
"select latest_end_lsn from pg_catalog.pg_stat_subscription"
)[0][0]
res = subscriber.safe_psql("select latest_end_lsn from pg_catalog.pg_stat_subscription")[0][
0
]
if res:
log.info(f"subscriber_lsn={res}")
subscriber_lsn = Lsn(res)

View File

@@ -1,9 +1,7 @@
from __future__ import annotations
import logging
import requests
from fixtures.neon_fixtures import NeonEnv, logical_replication_sync
from fixtures.neon_fixtures import NeonEnv
TEST_DB_NAMES = [
{
@@ -138,115 +136,3 @@ def test_compute_create_databases(neon_simple_env: NeonEnv):
assert curr_db is not None
assert len(curr_db) == 1
assert curr_db[0] == db["name"]
def test_dropdb_with_subscription(neon_simple_env: NeonEnv):
"""
Test that compute_ctl can drop a database that has a logical replication subscription.
"""
env = neon_simple_env
# Create and start endpoint so that neon_local put all the generated
# stuff into the spec.json file.
endpoint = env.endpoints.create_start("main")
TEST_DB_NAMES = [
{
"name": "neondb",
"owner": "cloud_admin",
},
{
"name": "subscriber_db",
"owner": "cloud_admin",
},
{
"name": "publisher_db",
"owner": "cloud_admin",
},
]
# Update the spec.json file to create the databases
# and reconfigure the endpoint to apply the changes.
endpoint.respec_deep(
**{
"skip_pg_catalog_updates": False,
"cluster": {
"databases": TEST_DB_NAMES,
},
}
)
endpoint.reconfigure()
# connect to the publisher_db and create a publication
with endpoint.cursor(dbname="publisher_db") as cursor:
cursor.execute("CREATE PUBLICATION mypub FOR ALL TABLES")
cursor.execute("select pg_catalog.pg_create_logical_replication_slot('mysub', 'pgoutput');")
cursor.execute("CREATE TABLE t(a int)")
cursor.execute("INSERT INTO t VALUES (1)")
# connect to the subscriber_db and create a subscription
# Note that we need to create subscription with
connstr = endpoint.connstr(dbname="publisher_db").replace("'", "''")
with endpoint.cursor(dbname="subscriber_db") as cursor:
cursor.execute("CREATE TABLE t(a int)")
cursor.execute(
f"CREATE SUBSCRIPTION mysub CONNECTION '{connstr}' PUBLICATION mypub WITH (create_slot = false) "
)
# wait for the subscription to be active
logical_replication_sync(
endpoint, endpoint, sub_dbname="subscriber_db", pub_dbname="publisher_db"
)
# Check that replication is working
with endpoint.cursor(dbname="subscriber_db") as cursor:
cursor.execute("SELECT * FROM t")
rows = cursor.fetchall()
assert len(rows) == 1
assert rows[0][0] == 1
# drop the subscriber_db from the list
TEST_DB_NAMES_NEW = [
{
"name": "neondb",
"owner": "cloud_admin",
},
{
"name": "publisher_db",
"owner": "cloud_admin",
},
]
# Update the spec.json file to drop the database
# and reconfigure the endpoint to apply the changes.
endpoint.respec_deep(
**{
"skip_pg_catalog_updates": False,
"cluster": {
"databases": TEST_DB_NAMES_NEW,
},
"delta_operations": [
{"action": "delete_db", "name": "subscriber_db"},
# also test the case when we try to delete a non-existent database
# shouldn't happen in normal operation,
# but can occur when failed operations are retried
{"action": "delete_db", "name": "nonexistent_db"},
],
}
)
logging.info("Reconfiguring the endpoint to drop the subscriber_db")
endpoint.reconfigure()
# Check that the subscriber_db is dropped
with endpoint.cursor() as cursor:
cursor.execute("SELECT datname FROM pg_database WHERE datname = %s", ("subscriber_db",))
catalog_db = cursor.fetchone()
assert catalog_db is None
# Check that we can still connect to the publisher_db
with endpoint.cursor(dbname="publisher_db") as cursor:
cursor.execute("SELECT * FROM current_database()")
curr_db = cursor.fetchone()
assert curr_db is not None
assert len(curr_db) == 1
assert curr_db[0] == "publisher_db"

View File

@@ -17,7 +17,6 @@ from fixtures.compute_reconfigure import ComputeReconfigure
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
DEFAULT_AZ_ID,
LogCursor,
NeonEnv,
NeonEnvBuilder,
NeonPageserver,
@@ -2407,14 +2406,7 @@ def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder):
env.storage_controller.tenant_create(tid)
env.storage_controller.reconcile_until_idle()
env.storage_controller.configure_failpoints(("reconciler-epilogue", "pause"))
def unpause_failpoint():
time.sleep(2)
env.storage_controller.configure_failpoints(("reconciler-epilogue", "off"))
thread = threading.Thread(target=unpause_failpoint)
thread.start()
env.storage_controller.configure_failpoints(("sleep-on-reconcile-epilogue", "return(10000)"))
# Make a change to the tenant config to trigger a slow reconcile
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
@@ -2429,8 +2421,6 @@ def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder):
observed_state = env.storage_controller.step_down()
log.info(f"Storage controller stepped down with {observed_state=}")
thread.join()
# Validate that we waited for the slow reconcile to complete
# and updated the observed state in the storcon before stepping down.
node_id = str(env.pageserver.id)
@@ -3299,221 +3289,8 @@ def test_storage_controller_detached_stopped(
"generation": None,
},
)
env.storage_controller.reconcile_until_idle()
env.storage_controller.consistency_check()
# Confirm the detach happened
assert env.pageserver.http_client().tenant_list_locations()["tenant_shards"] == []
@run_only_on_default_postgres("Postgres version makes no difference here")
def test_storage_controller_detach_lifecycle(
neon_env_builder: NeonEnvBuilder,
):
"""
Test that detached tenants are handled properly through their lifecycle: getting dropped
from memory when detached, then getting loaded back on-demand.
"""
remote_storage_kind = s3_storage()
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
neon_env_builder.num_pageservers = 1
env = neon_env_builder.init_configs()
env.start()
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
env.storage_controller.tenant_create(
tenant_id,
shard_count=1,
)
virtual_ps_http.timeline_create(PgVersion.NOT_SET, tenant_id, timeline_id)
remote_prefix = "/".join(
(
"tenants",
str(tenant_id),
)
)
# We will later check data is gone after deletion, so as a control check that it is present to begin with
assert_prefix_not_empty(
neon_env_builder.pageserver_remote_storage,
prefix=remote_prefix,
)
assert len(env.pageserver.http_client().tenant_list_locations()["tenant_shards"]) == 1
assert len(env.storage_controller.tenant_list()) == 1
# Detach the tenant
virtual_ps_http.tenant_location_conf(
tenant_id,
{
"mode": "Detached",
"secondary_conf": None,
"tenant_conf": {},
"generation": None,
},
)
# Ensure reconciles are done (the one we do inline in location_conf is advisory and if it takes too long that API just succeeds anyway)
env.storage_controller.reconcile_until_idle()
env.storage_controller.consistency_check()
# Confirm the detach happened on pageserver
assert env.pageserver.http_client().tenant_list_locations()["tenant_shards"] == []
# Confirm the tenant is not in memory on the controller
assert env.storage_controller.tenant_list() == []
# The detached tenant does not get loaded into memory across a controller restart
env.storage_controller.stop()
env.storage_controller.start()
assert env.storage_controller.tenant_list() == []
env.storage_controller.consistency_check()
# The detached tenant can be re-attached
virtual_ps_http.tenant_location_conf(
tenant_id,
{
"mode": "AttachedSingle",
"secondary_conf": None,
"tenant_conf": {},
"generation": None,
},
)
assert len(env.pageserver.http_client().tenant_list_locations()["tenant_shards"]) == 1
assert len(env.storage_controller.tenant_list()) == 1
env.storage_controller.consistency_check()
# Detach it again before doing deletion
virtual_ps_http.tenant_location_conf(
tenant_id,
{
"mode": "Detached",
"secondary_conf": None,
"tenant_conf": {},
"generation": None,
},
)
env.storage_controller.reconcile_until_idle()
env.storage_controller.consistency_check()
# A detached tenant can be deleted
virtual_ps_http.tenant_delete(tenant_id)
# Such deletions really work (empty remote storage)
assert_prefix_empty(
neon_env_builder.pageserver_remote_storage,
prefix=remote_prefix,
)
@run_only_on_default_postgres("Postgres version makes no difference here")
def test_storage_controller_node_flap_detach_race(
neon_env_builder: NeonEnvBuilder,
):
"""
Reproducer for https://github.com/neondatabase/neon/issues/10253.
When a node's availability flaps, the reconciliations spawned by the node
going offline may race with the reconciliation done when then node comes
back online.
"""
neon_env_builder.num_pageservers = 4
env = neon_env_builder.init_configs()
env.start()
tenant_id = TenantId.generate()
env.storage_controller.tenant_create(
tenant_id,
shard_count=2,
)
env.storage_controller.reconcile_until_idle()
stopped_nodes = [s["node_id"] for s in env.storage_controller.locate(tenant_id)]
def has_hit_failpoint(failpoint: str, offset: LogCursor | None = None) -> LogCursor:
res = env.storage_controller.log_contains(f"at failpoint {failpoint}", offset=offset)
assert res
return res[1]
# Stop the nodes which host attached shards.
# This will trigger reconciliations which pause before incrmenenting the generation,
# and, more importantly, updating the `generation_pageserver` of the shards.
env.storage_controller.configure_failpoints(("reconciler-pre-increment-generation", "pause"))
for node_id in stopped_nodes:
env.get_pageserver(node_id).stop(immediate=True)
def failure_handled() -> LogCursor:
stop_offset = None
for node_id in stopped_nodes:
res = env.storage_controller.log_contains(f"node {node_id} going offline")
assert res
stop_offset = res[1]
assert stop_offset
return stop_offset
offset = wait_until(failure_handled)
# Now restart the nodes and make them pause before marking themselves as available
# or running the activation reconciliation.
env.storage_controller.configure_failpoints(("heartbeat-pre-node-state-configure", "pause"))
for node_id in stopped_nodes:
env.get_pageserver(node_id).start(await_active=False)
offset = wait_until(
lambda: has_hit_failpoint("heartbeat-pre-node-state-configure", offset=offset)
)
# The nodes have restarted and are waiting to perform activaction reconciliation.
# Unpause the initial reconciliation triggered by the nodes going offline.
# It will attempt to detach from the old location, but notice that the old location
# is not yet available, and then stop before processing the results of the reconciliation.
env.storage_controller.configure_failpoints(("reconciler-epilogue", "pause"))
env.storage_controller.configure_failpoints(("reconciler-pre-increment-generation", "off"))
offset = wait_until(lambda: has_hit_failpoint("reconciler-epilogue", offset=offset))
# Let the nodes perform activation reconciliation while still holding up processing the result
# from the initial reconcile triggered by going offline.
env.storage_controller.configure_failpoints(("heartbeat-pre-node-state-configure", "off"))
def activate_reconciliation_done():
for node_id in stopped_nodes:
assert env.storage_controller.log_contains(
f"Node {node_id} transition to active", offset=offset
)
wait_until(activate_reconciliation_done)
# Finally, allow the initial reconcile to finish up.
env.storage_controller.configure_failpoints(("reconciler-epilogue", "off"))
# Give things a chance to settle and validate that no stale locations exist
env.storage_controller.reconcile_until_idle()
def validate_locations():
shard_locations = defaultdict(list)
for ps in env.pageservers:
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
for loc in locations:
shard_locations[loc[0]].append(
{"generation": loc[1]["generation"], "mode": loc[1]["mode"], "node": ps.id}
)
log.info(f"Shard locations: {shard_locations}")
attached_locations = {
k: list(filter(lambda loc: loc["mode"] == "AttachedSingle", v))
for k, v in shard_locations.items()
}
for shard, locs in attached_locations.items():
assert len(locs) == 1, f"{shard} has {len(locs)} attached locations"
wait_until(validate_locations, timeout=10)

View File

@@ -27,6 +27,7 @@ camino = { version = "1", default-features = false, features = ["serde1"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }
clap = { version = "4", features = ["derive", "env", "string"] }
clap_builder = { version = "4", default-features = false, features = ["color", "env", "help", "std", "string", "suggestions", "usage"] }
crossbeam-utils = { version = "0.8" }
crypto-bigint = { version = "0.5", features = ["generic-array", "zeroize"] }
der = { version = "0.7", default-features = false, features = ["oid", "pem", "std"] }
deranged = { version = "0.3", default-features = false, features = ["powerfmt", "serde", "std"] }
@@ -42,6 +43,7 @@ generic-array = { version = "0.14", default-features = false, features = ["more_
getrandom = { version = "0.2", default-features = false, features = ["std"] }
half = { version = "2", default-features = false, features = ["num-traits"] }
hashbrown = { version = "0.14", features = ["raw"] }
hdrhistogram = { version = "7" }
hex = { version = "0.4", features = ["serde"] }
hmac = { version = "0.12", default-features = false, features = ["reset"] }
hyper-582f2526e08bb6a0 = { package = "hyper", version = "0.14", features = ["full"] }
@@ -85,7 +87,7 @@ sync_wrapper = { version = "0.1", default-features = false, features = ["futures
tikv-jemalloc-ctl = { version = "0.6", features = ["stats", "use_std"] }
tikv-jemalloc-sys = { version = "0.6", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] }
time = { version = "0.3", features = ["macros", "serde-well-known"] }
tokio = { version = "1", features = ["full", "test-util"] }
tokio = { version = "1", features = ["full", "test-util", "tracing"] }
tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring", "tls12"] }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
@@ -94,6 +96,7 @@ tonic = { version = "0.12", features = ["tls-roots"] }
tower = { version = "0.4", default-features = false, features = ["balance", "buffer", "limit", "log", "util"] }
tracing = { version = "0.1", features = ["log"] }
tracing-core = { version = "0.1" }
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
url = { version = "2", features = ["serde"] }
zerocopy = { version = "0.7", features = ["derive", "simd"] }
zeroize = { version = "1", features = ["derive", "serde"] }