Compare commits

..

1 Commits

Author SHA1 Message Date
Bojan Serafimov
69eb02363b Preallocate vectors 2023-11-07 00:31:45 -05:00
164 changed files with 1993 additions and 6980 deletions

View File

@@ -1,3 +1,17 @@
# The binaries are really slow, if you compile them in 'dev' mode with the defaults.
# Enable some optimizations even in 'dev' mode, to make tests faster. The basic
# optimizations enabled by "opt-level=1" don't affect debuggability too much.
#
# See https://www.reddit.com/r/rust/comments/gvrgca/this_is_a_neat_trick_for_getting_good_runtime/
#
[profile.dev.package."*"]
# Set the default for dependencies in Development mode.
opt-level = 3
[profile.dev]
# Turn on a small amount of optimization in Development mode.
opt-level = 1
[build]
# This is only present for local builds, as it will be overridden
# by the RUSTDOCFLAGS env var in CI.

View File

@@ -22,11 +22,5 @@ platforms = [
# "x86_64-pc-windows-msvc",
]
[final-excludes]
# vm_monitor benefits from the same Cargo.lock as the rest of our artifacts, but
# it is built primarly in separate repo neondatabase/autoscaling and thus is excluded
# from depending on workspace-hack because most of the dependencies are not used.
workspace-members = ["vm_monitor"]
# Write out exact versions rather than a semver range. (Defaults to false.)
# exact-versions = true

View File

@@ -17,9 +17,8 @@ assignees: ''
## Implementation ideas
```[tasklist]
### Tasks
```
## Tasks
- [ ]
## Other related tasks and Epics

View File

@@ -3,7 +3,7 @@
**NB: this PR must be merged only by 'Create a merge commit'!**
### Checklist when preparing for release
- [ ] Read or refresh [the release flow guide](https://www.notion.so/neondatabase/Release-general-flow-61f2e39fd45d4d14a70c7749604bd70b)
- [ ] Read or refresh [the release flow guide](https://github.com/neondatabase/cloud/wiki/Release:-general-flow)
- [ ] Ask in the [cloud Slack channel](https://neondb.slack.com/archives/C033A2WE6BZ) that you are going to rollout the release. Any blockers?
- [ ] Does this release contain any db migrations? Destructive ones? What is the rollback plan?

View File

@@ -1,7 +1,5 @@
self-hosted-runner:
labels:
- arm64
- dev
- gen3
- large
- small

View File

@@ -172,10 +172,10 @@ jobs:
# https://github.com/EmbarkStudios/cargo-deny
- name: Check rust licenses/bans/advisories/sources
if: ${{ !cancelled() }}
run: cargo deny check --hide-inclusion-graph
run: cargo deny check
build-neon:
needs: [ check-permissions, tag ]
needs: [ check-permissions ]
runs-on: [ self-hosted, gen3, large ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
@@ -187,7 +187,6 @@ jobs:
env:
BUILD_TYPE: ${{ matrix.build_type }}
GIT_VERSION: ${{ github.event.pull_request.head.sha || github.sha }}
BUILD_TAG: ${{ needs.tag.outputs.build-tag }}
steps:
- name: Fix git ownership
@@ -586,13 +585,10 @@ jobs:
id: upload-coverage-report-new
env:
BUCKET: neon-github-public-dev
# A differential coverage report is available only for PRs.
# (i.e. for pushes into main/release branches we have a regular coverage report)
COMMIT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
BASE_SHA: ${{ github.event.pull_request.base.sha || github.sha }}
run: |
BASELINE="$(git merge-base HEAD origin/main)"
CURRENT="${COMMIT_SHA}"
BASELINE="$(git merge-base $BASE_SHA $CURRENT)"
cp /tmp/coverage/report/lcov.info ./${CURRENT}.info
@@ -727,7 +723,6 @@ jobs:
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
--context .
--build-arg GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
--build-arg BUILD_TAG=${{ needs.tag.outputs.build-tag }}
--build-arg REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}}
--destination neondatabase/neon:${{needs.tag.outputs.build-tag}}
@@ -852,7 +847,7 @@ jobs:
run:
shell: sh -eu {0}
env:
VM_BUILDER_VERSION: v0.19.0
VM_BUILDER_VERSION: v0.18.5
steps:
- name: Checkout
@@ -874,7 +869,8 @@ jobs:
- name: Build vm image
run: |
./vm-builder \
-spec=vm-image-spec.yaml \
-enable-file-cache \
-cgroup-uid=postgres \
-src=369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} \
-dst=369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}

View File

@@ -21,10 +21,7 @@ env:
jobs:
check-macos-build:
if: |
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
github.ref_name == 'main'
if: github.ref_name == 'main' || contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos')
timeout-minutes: 90
runs-on: macos-latest
@@ -115,182 +112,8 @@ jobs:
- name: Check that no warnings are produced
run: ./run_clippy.sh
check-linux-arm-build:
timeout-minutes: 90
runs-on: [ self-hosted, dev, arm64 ]
env:
# Use release build only, to have less debug info around
# Hence keeping target/ (and general cache size) smaller
BUILD_TYPE: release
CARGO_FEATURES: --features testing
CARGO_FLAGS: --locked --release
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
steps:
- name: Fix git ownership
run: |
# Workaround for `fatal: detected dubious ownership in repository at ...`
#
# Use both ${{ github.workspace }} and ${GITHUB_WORKSPACE} because they're different on host and in containers
# Ref https://github.com/actions/checkout/issues/785
#
git config --global --add safe.directory ${{ github.workspace }}
git config --global --add safe.directory ${GITHUB_WORKSPACE}
- name: Checkout
uses: actions/checkout@v4
with:
submodules: true
fetch-depth: 1
- name: Set pg 14 revision for caching
id: pg_v14_rev
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v14) >> $GITHUB_OUTPUT
- name: Set pg 15 revision for caching
id: pg_v15_rev
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v15) >> $GITHUB_OUTPUT
- name: Set pg 16 revision for caching
id: pg_v16_rev
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v16) >> $GITHUB_OUTPUT
- name: Set env variables
run: |
echo "CARGO_HOME=${GITHUB_WORKSPACE}/.cargo" >> $GITHUB_ENV
- name: Cache postgres v14 build
id: cache_pg_14
uses: actions/cache@v3
with:
path: pg_install/v14
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache postgres v15 build
id: cache_pg_15
uses: actions/cache@v3
with:
path: pg_install/v15
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache postgres v16 build
id: cache_pg_16
uses: actions/cache@v3
with:
path: pg_install/v16
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Build postgres v14
if: steps.cache_pg_14.outputs.cache-hit != 'true'
run: mold -run make postgres-v14 -j$(nproc)
- name: Build postgres v15
if: steps.cache_pg_15.outputs.cache-hit != 'true'
run: mold -run make postgres-v15 -j$(nproc)
- name: Build postgres v16
if: steps.cache_pg_16.outputs.cache-hit != 'true'
run: mold -run make postgres-v16 -j$(nproc)
- name: Build neon extensions
run: mold -run make neon-pg-ext -j$(nproc)
- name: Build walproposer-lib
run: mold -run make walproposer-lib -j$(nproc)
- name: Run cargo build
run: |
mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests
- name: Run cargo test
run: |
cargo test $CARGO_FLAGS $CARGO_FEATURES
# Run separate tests for real S3
export ENABLE_REAL_S3_REMOTE_STORAGE=nonempty
export REMOTE_STORAGE_S3_BUCKET=neon-github-public-dev
export REMOTE_STORAGE_S3_REGION=eu-central-1
# Avoid `$CARGO_FEATURES` since there's no `testing` feature in the e2e tests now
cargo test $CARGO_FLAGS --package remote_storage --test test_real_s3
# Run separate tests for real Azure Blob Storage
# XXX: replace region with `eu-central-1`-like region
export ENABLE_REAL_AZURE_REMOTE_STORAGE=y
export AZURE_STORAGE_ACCOUNT="${{ secrets.AZURE_STORAGE_ACCOUNT_DEV }}"
export AZURE_STORAGE_ACCESS_KEY="${{ secrets.AZURE_STORAGE_ACCESS_KEY_DEV }}"
export REMOTE_STORAGE_AZURE_CONTAINER="${{ vars.REMOTE_STORAGE_AZURE_CONTAINER }}"
export REMOTE_STORAGE_AZURE_REGION="${{ vars.REMOTE_STORAGE_AZURE_REGION }}"
# Avoid `$CARGO_FEATURES` since there's no `testing` feature in the e2e tests now
cargo test $CARGO_FLAGS --package remote_storage --test test_real_azure
check-codestyle-rust-arm:
timeout-minutes: 90
runs-on: [ self-hosted, dev, arm64 ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
steps:
- name: Checkout
uses: actions/checkout@v4
with:
submodules: true
fetch-depth: 1
# Some of our rust modules use FFI and need those to be checked
- name: Get postgres headers
run: make postgres-headers -j$(nproc)
# cargo hack runs the given cargo subcommand (clippy in this case) for all feature combinations.
# This will catch compiler & clippy warnings in all feature combinations.
# TODO: use cargo hack for build and test as well, but, that's quite expensive.
# NB: keep clippy args in sync with ./run_clippy.sh
- run: |
CLIPPY_COMMON_ARGS="$( source .neon_clippy_args; echo "$CLIPPY_COMMON_ARGS")"
if [ "$CLIPPY_COMMON_ARGS" = "" ]; then
echo "No clippy args found in .neon_clippy_args"
exit 1
fi
echo "CLIPPY_COMMON_ARGS=${CLIPPY_COMMON_ARGS}" >> $GITHUB_ENV
- name: Run cargo clippy (debug)
run: cargo hack --feature-powerset clippy $CLIPPY_COMMON_ARGS
- name: Run cargo clippy (release)
run: cargo hack --feature-powerset clippy --release $CLIPPY_COMMON_ARGS
- name: Check documentation generation
run: cargo doc --workspace --no-deps --document-private-items
env:
RUSTDOCFLAGS: "-Dwarnings -Arustdoc::private_intra_doc_links"
# Use `${{ !cancelled() }}` to run quck tests after the longer clippy run
- name: Check formatting
if: ${{ !cancelled() }}
run: cargo fmt --all -- --check
# https://github.com/facebookincubator/cargo-guppy/tree/bec4e0eb29dcd1faac70b1b5360267fc02bf830e/tools/cargo-hakari#2-keep-the-workspace-hack-up-to-date-in-ci
- name: Check rust dependencies
if: ${{ !cancelled() }}
run: |
cargo hakari generate --diff # workspace-hack Cargo.toml is up-to-date
cargo hakari manage-deps --dry-run # all workspace crates depend on workspace-hack
# https://github.com/EmbarkStudios/cargo-deny
- name: Check rust licenses/bans/advisories/sources
if: ${{ !cancelled() }}
run: cargo deny check
gather-rust-build-stats:
if: |
contains(github.event.pull_request.labels.*.name, 'run-extra-build-stats') ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
github.ref_name == 'main'
if: github.ref_name == 'main' || contains(github.event.pull_request.labels.*.name, 'run-extra-build-stats')
runs-on: [ self-hosted, gen3, large ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned

View File

@@ -9,24 +9,6 @@ refactoring, additional comments, and so forth. Let's try to raise the
bar, and clean things up as we go. Try to leave code in a better shape
than it was before.
## Pre-commit hook
We have a sample pre-commit hook in `pre-commit.py`.
To set it up, run:
```bash
ln -s ../../pre-commit.py .git/hooks/pre-commit
```
This will run following checks on staged files before each commit:
- `rustfmt`
- checks for python files, see [obligatory checks](/docs/sourcetree.md#obligatory-checks).
There is also a separate script `./run_clippy.sh` that runs `cargo clippy` on the whole project
and `./scripts/reformat` that runs all formatting tools to ensure the project is up to date.
If you want to skip the hook, run `git commit` with `--no-verify` option.
## Submitting changes
1. Get at least one +1 on your PR before you push.

725
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -5,7 +5,6 @@ members = [
"control_plane",
"pageserver",
"pageserver/ctl",
"pageserver/pagebench",
"proxy",
"safekeeper",
"storage_broker",
@@ -37,7 +36,6 @@ license = "Apache-2.0"
## All dependency versions, used in the project
[workspace.dependencies]
anyhow = { version = "1.0", features = ["backtrace"] }
arc-swap = "1.6"
async-compression = { version = "0.4.0", features = ["tokio", "gzip"] }
azure_core = "0.16"
azure_identity = "0.16"
@@ -49,7 +47,6 @@ async-trait = "0.1"
aws-config = { version = "0.56", default-features = false, features=["rustls"] }
aws-sdk-s3 = "0.29"
aws-smithy-http = "0.56"
aws-smithy-async = { version = "0.56", default-features = false, features=["rt-tokio"] }
aws-credential-types = "0.56"
aws-types = "0.56"
axum = { version = "0.6.20", features = ["ws"] }
@@ -68,7 +65,7 @@ comfy-table = "6.1"
const_format = "0.2"
crc32c = "0.6"
crossbeam-utils = "0.8.5"
dashmap = { version = "5.5.0", features = ["raw-api"] }
dashmap = "5.5.0"
either = "1.8"
enum-map = "2.4.2"
enumset = "1.0.12"
@@ -80,12 +77,11 @@ futures-util = "0.3"
git-version = "0.3"
hashbrown = "0.13"
hashlink = "0.8.1"
hdrhistogram = "7.5.2"
hex = "0.4"
hex-literal = "0.4"
hmac = "0.12.1"
hostname = "0.3.1"
http-types = { version = "2", default-features = false }
http-types = "2"
humantime = "2.1"
humantime-serde = "1.1.1"
hyper = "0.14"
@@ -138,7 +134,6 @@ strum_macros = "0.24"
svg_fmt = "0.4.1"
sync_wrapper = "0.1.2"
tar = "0.4"
task-local-extensions = "0.1.4"
test-context = "0.1"
thiserror = "1.0"
tls-listener = { version = "0.7", features = ["rustls", "hyper-h1"] }
@@ -167,11 +162,11 @@ env_logger = "0.10"
log = "0.4"
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
## Other git libraries
heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending
@@ -208,7 +203,7 @@ tonic-build = "0.9"
# This is only needed for proxy's tests.
# TODO: we should probably fork `tokio-postgres-rustls` instead.
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
################# Binary contents sections

View File

@@ -27,7 +27,6 @@ RUN set -e \
FROM $REPOSITORY/$IMAGE:$TAG AS build
WORKDIR /home/nonroot
ARG GIT_VERSION=local
ARG BUILD_TAG
# Enable https://github.com/paritytech/cachepot to cache Rust crates' compilation results in Docker builds.
# Set up cachepot to use an AWS S3 bucket for cache results, to reuse it between `docker build` invocations.
@@ -79,9 +78,9 @@ COPY --from=build --chown=neon:neon /home/nonroot/target/release/pg_sni_router
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pagectl /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_broker /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_broker /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/neon_local /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/neon_local /usr/local/bin
COPY --from=pg-build /home/nonroot/pg_install/v14 /usr/local/v14/
COPY --from=pg-build /home/nonroot/pg_install/v15 /usr/local/v15/

View File

@@ -72,10 +72,6 @@ neon: postgres-headers walproposer-lib
#
$(POSTGRES_INSTALL_DIR)/build/%/config.status:
+@echo "Configuring Postgres $* build"
@test -s $(ROOT_PROJECT_DIR)/vendor/postgres-$*/configure || { \
echo "\nPostgres submodule not found in $(ROOT_PROJECT_DIR)/vendor/postgres-$*/, execute "; \
echo "'git submodule update --init --recursive --depth 2 --progress .' in project root.\n"; \
exit 1; }
mkdir -p $(POSTGRES_INSTALL_DIR)/build/$*
(cd $(POSTGRES_INSTALL_DIR)/build/$* && \
env PATH="$(EXTRA_PATH_OVERRIDES):$$PATH" $(ROOT_PROJECT_DIR)/vendor/postgres-$*/configure \

View File

@@ -479,6 +479,13 @@ fn cli() -> clap::Command {
)
.value_name("FILECACHE_CONNSTR"),
)
.arg(
// DEPRECATED, NO LONGER DOES ANYTHING.
// See https://github.com/neondatabase/cloud/issues/7516
Arg::new("file-cache-on-disk")
.long("file-cache-on-disk")
.action(clap::ArgAction::SetTrue),
)
}
#[test]

View File

@@ -710,12 +710,8 @@ impl ComputeNode {
// `pg_ctl` for start / stop, so this just seems much easier to do as we already
// have opened connection to Postgres and superuser access.
#[instrument(skip_all)]
fn pg_reload_conf(&self) -> Result<()> {
let pgctl_bin = Path::new(&self.pgbin).parent().unwrap().join("pg_ctl");
Command::new(pgctl_bin)
.args(["reload", "-D", &self.pgdata])
.output()
.expect("cannot run pg_ctl process");
fn pg_reload_conf(&self, client: &mut Client) -> Result<()> {
client.simple_query("SELECT pg_reload_conf()")?;
Ok(())
}
@@ -728,9 +724,9 @@ impl ComputeNode {
// Write new config
let pgdata_path = Path::new(&self.pgdata);
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec, None)?;
self.pg_reload_conf()?;
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
self.pg_reload_conf(&mut client)?;
// Proceed with post-startup configuration. Note, that order of operations is important.
// Disable DDL forwarding because control plane already knows about these roles/databases.

View File

@@ -78,7 +78,7 @@ use regex::Regex;
use remote_storage::*;
use serde_json;
use std::io::Read;
use std::num::NonZeroUsize;
use std::num::{NonZeroU32, NonZeroUsize};
use std::path::Path;
use std::str;
use tar::Archive;
@@ -133,6 +133,45 @@ fn parse_pg_version(human_version: &str) -> &str {
panic!("Unsuported postgres version {human_version}");
}
#[cfg(test)]
mod tests {
use super::parse_pg_version;
#[test]
fn test_parse_pg_version() {
assert_eq!(parse_pg_version("PostgreSQL 15.4"), "v15");
assert_eq!(parse_pg_version("PostgreSQL 15.14"), "v15");
assert_eq!(
parse_pg_version("PostgreSQL 15.4 (Ubuntu 15.4-0ubuntu0.23.04.1)"),
"v15"
);
assert_eq!(parse_pg_version("PostgreSQL 14.15"), "v14");
assert_eq!(parse_pg_version("PostgreSQL 14.0"), "v14");
assert_eq!(
parse_pg_version("PostgreSQL 14.9 (Debian 14.9-1.pgdg120+1"),
"v14"
);
assert_eq!(parse_pg_version("PostgreSQL 16devel"), "v16");
assert_eq!(parse_pg_version("PostgreSQL 16beta1"), "v16");
assert_eq!(parse_pg_version("PostgreSQL 16rc2"), "v16");
assert_eq!(parse_pg_version("PostgreSQL 16extra"), "v16");
}
#[test]
#[should_panic]
fn test_parse_pg_unsupported_version() {
parse_pg_version("PostgreSQL 13.14");
}
#[test]
#[should_panic]
fn test_parse_pg_incorrect_version_format() {
parse_pg_version("PostgreSQL 14");
}
}
// download the archive for a given extension,
// unzip it, and place files in the appropriate locations (share/lib)
pub async fn download_extension(
@@ -242,46 +281,9 @@ pub fn init_remote_storage(remote_ext_config: &str) -> anyhow::Result<GenericRem
max_keys_per_list_response: None,
};
let config = RemoteStorageConfig {
max_concurrent_syncs: NonZeroUsize::new(100).expect("100 != 0"),
max_sync_errors: NonZeroU32::new(100).expect("100 != 0"),
storage: RemoteStorageKind::AwsS3(config),
};
GenericRemoteStorage::from_config(&config)
}
#[cfg(test)]
mod tests {
use super::parse_pg_version;
#[test]
fn test_parse_pg_version() {
assert_eq!(parse_pg_version("PostgreSQL 15.4"), "v15");
assert_eq!(parse_pg_version("PostgreSQL 15.14"), "v15");
assert_eq!(
parse_pg_version("PostgreSQL 15.4 (Ubuntu 15.4-0ubuntu0.23.04.1)"),
"v15"
);
assert_eq!(parse_pg_version("PostgreSQL 14.15"), "v14");
assert_eq!(parse_pg_version("PostgreSQL 14.0"), "v14");
assert_eq!(
parse_pg_version("PostgreSQL 14.9 (Debian 14.9-1.pgdg120+1"),
"v14"
);
assert_eq!(parse_pg_version("PostgreSQL 16devel"), "v16");
assert_eq!(parse_pg_version("PostgreSQL 16beta1"), "v16");
assert_eq!(parse_pg_version("PostgreSQL 16rc2"), "v16");
assert_eq!(parse_pg_version("PostgreSQL 16extra"), "v16");
}
#[test]
#[should_panic]
fn test_parse_pg_unsupported_version() {
parse_pg_version("PostgreSQL 13.14");
}
#[test]
#[should_panic]
fn test_parse_pg_incorrect_version_format() {
parse_pg_version("PostgreSQL 14");
}
}

View File

@@ -1,7 +1,7 @@
//!
//! Various tools and helpers to handle cluster / compute node (Postgres)
//! configuration.
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
//!
pub mod checker;
pub mod config;
pub mod configurator;

View File

@@ -9,7 +9,6 @@ pub struct AttachmentService {
env: LocalEnv,
listen: String,
path: PathBuf,
client: reqwest::blocking::Client,
}
const COMMAND: &str = "attachment_service";
@@ -25,16 +24,6 @@ pub struct AttachHookResponse {
pub gen: Option<u32>,
}
#[derive(Serialize, Deserialize)]
pub struct InspectRequest {
pub tenant_id: TenantId,
}
#[derive(Serialize, Deserialize)]
pub struct InspectResponse {
pub attachment: Option<(u32, NodeId)>,
}
impl AttachmentService {
pub fn from_env(env: &LocalEnv) -> Self {
let path = env.base_data_dir.join("attachments.json");
@@ -53,9 +42,6 @@ impl AttachmentService {
env: env.clone(),
path,
listen,
client: reqwest::blocking::ClientBuilder::new()
.build()
.expect("Failed to construct http client"),
}
}
@@ -98,13 +84,16 @@ impl AttachmentService {
.unwrap()
.join("attach-hook")
.unwrap();
let client = reqwest::blocking::ClientBuilder::new()
.build()
.expect("Failed to construct http client");
let request = AttachHookRequest {
tenant_id,
node_id: Some(pageserver_id),
};
let response = self.client.post(url).json(&request).send()?;
let response = client.post(url).json(&request).send()?;
if response.status() != StatusCode::OK {
return Err(anyhow!("Unexpected status {}", response.status()));
}
@@ -112,26 +101,4 @@ impl AttachmentService {
let response = response.json::<AttachHookResponse>()?;
Ok(response.gen)
}
pub fn inspect(&self, tenant_id: TenantId) -> anyhow::Result<Option<(u32, NodeId)>> {
use hyper::StatusCode;
let url = self
.env
.control_plane_api
.clone()
.unwrap()
.join("inspect")
.unwrap();
let request = InspectRequest { tenant_id };
let response = self.client.post(url).json(&request).send()?;
if response.status() != StatusCode::OK {
return Err(anyhow!("Unexpected status {}", response.status()));
}
let response = response.json::<InspectResponse>()?;
Ok(response.attachment)
}
}

View File

@@ -86,10 +86,7 @@ where
.stdout(process_log_file)
.stderr(same_file_for_stderr)
.args(args);
let filled_cmd = fill_env_vars_prefixed_neon(fill_remote_storage_secrets_vars(
fill_rust_env_vars(background_command),
));
let filled_cmd = fill_remote_storage_secrets_vars(fill_rust_env_vars(background_command));
filled_cmd.envs(envs);
let pid_file_to_check = match initial_pid_file {
@@ -256,15 +253,6 @@ fn fill_remote_storage_secrets_vars(mut cmd: &mut Command) -> &mut Command {
cmd
}
fn fill_env_vars_prefixed_neon(mut cmd: &mut Command) -> &mut Command {
for (var, val) in std::env::vars() {
if var.starts_with("NEON_") {
cmd = cmd.env(var, val);
}
}
cmd
}
/// Add a `pre_exec` to the cmd that, inbetween fork() and exec(),
/// 1. Claims a pidfile with a fcntl lock on it and
/// 2. Sets up the pidfile's file descriptor so that it (and the lock)
@@ -274,7 +262,7 @@ where
P: Into<Utf8PathBuf>,
{
let path: Utf8PathBuf = path.into();
// SAFETY:
// SAFETY
// pre_exec is marked unsafe because it runs between fork and exec.
// Why is that dangerous in various ways?
// Long answer: https://github.com/rust-lang/rust/issues/39575

View File

@@ -32,9 +32,7 @@ use pageserver_api::control_api::{
ValidateResponseTenant,
};
use control_plane::attachment_service::{
AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse,
};
use control_plane::attachment_service::{AttachHookRequest, AttachHookResponse};
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
@@ -257,36 +255,19 @@ async fn handle_attach_hook(mut req: Request<Body>) -> Result<Response<Body>, Ap
)
}
async fn handle_inspect(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
let inspect_req = json_request::<InspectRequest>(&mut req).await?;
let state = get_state(&req).inner.clone();
let locked = state.write().await;
let tenant_state = locked.tenants.get(&inspect_req.tenant_id);
json_response(
StatusCode::OK,
InspectResponse {
attachment: tenant_state.and_then(|s| s.pageserver.map(|ps| (s.generation, ps))),
},
)
}
fn make_router(persistent_state: PersistentState) -> RouterBuilder<hyper::Body, ApiError> {
endpoint::make_router()
.data(Arc::new(State::new(persistent_state)))
.post("/re-attach", |r| request_span(r, handle_re_attach))
.post("/validate", |r| request_span(r, handle_validate))
.post("/attach-hook", |r| request_span(r, handle_attach_hook))
.post("/inspect", |r| request_span(r, handle_inspect))
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let _guard = logging::init(
logging::init(
LogFormat::Plain,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stdout,
)?;
let args = Cli::parse();

View File

@@ -11,14 +11,13 @@ use compute_api::spec::ComputeMode;
use control_plane::attachment_service::AttachmentService;
use control_plane::endpoint::ComputeControlPlane;
use control_plane::local_env::LocalEnv;
use control_plane::pageserver::{PageServerNode, PAGESERVER_REMOTE_STORAGE_DIR};
use control_plane::pageserver::PageServerNode;
use control_plane::safekeeper::SafekeeperNode;
use control_plane::tenant_migration::migrate_tenant;
use control_plane::{broker, local_env};
use pageserver_api::models::TimelineInfo;
use pageserver_api::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
DEFAULT_HTTP_LISTEN_ADDR as DEFAULT_PAGESERVER_HTTP_ADDR,
DEFAULT_PG_LISTEN_ADDR as DEFAULT_PAGESERVER_PG_ADDR,
};
use postgres_backend::AuthType;
use safekeeper_api::{
@@ -47,8 +46,8 @@ const DEFAULT_PG_VERSION: &str = "15";
const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/";
fn default_conf(num_pageservers: u16) -> String {
let mut template = format!(
fn default_conf() -> String {
format!(
r#"
# Default built-in configuration, defined in main.rs
control_plane_api = '{DEFAULT_PAGESERVER_CONTROL_PLANE_API}'
@@ -56,33 +55,21 @@ control_plane_api = '{DEFAULT_PAGESERVER_CONTROL_PLANE_API}'
[broker]
listen_addr = '{DEFAULT_BROKER_ADDR}'
[[pageservers]]
id = {DEFAULT_PAGESERVER_ID}
listen_pg_addr = '{DEFAULT_PAGESERVER_PG_ADDR}'
listen_http_addr = '{DEFAULT_PAGESERVER_HTTP_ADDR}'
pg_auth_type = '{trust_auth}'
http_auth_type = '{trust_auth}'
[[safekeepers]]
id = {DEFAULT_SAFEKEEPER_ID}
pg_port = {DEFAULT_SAFEKEEPER_PG_PORT}
http_port = {DEFAULT_SAFEKEEPER_HTTP_PORT}
"#,
);
for i in 0..num_pageservers {
let pageserver_id = NodeId(DEFAULT_PAGESERVER_ID.0 + i as u64);
let pg_port = DEFAULT_PAGESERVER_PG_PORT + i;
let http_port = DEFAULT_PAGESERVER_HTTP_PORT + i;
template += &format!(
r#"
[[pageservers]]
id = {pageserver_id}
listen_pg_addr = '127.0.0.1:{pg_port}'
listen_http_addr = '127.0.0.1:{http_port}'
pg_auth_type = '{trust_auth}'
http_auth_type = '{trust_auth}'
"#,
trust_auth = AuthType::Trust,
)
}
template
trust_auth = AuthType::Trust,
)
}
///
@@ -308,9 +295,6 @@ fn parse_timeline_id(sub_match: &ArgMatches) -> anyhow::Result<Option<TimelineId
}
fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
let num_pageservers = init_match
.get_one::<u16>("num-pageservers")
.expect("num-pageservers arg has a default");
// Create config file
let toml_file: String = if let Some(config_path) = init_match.get_one::<PathBuf>("config") {
// load and parse the file
@@ -322,7 +306,7 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
})?
} else {
// Built-in default config
default_conf(*num_pageservers)
default_conf()
};
let pg_version = init_match
@@ -336,9 +320,6 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
env.init(pg_version, force)
.context("Failed to initialize neon repository")?;
// Create remote storage location for default LocalFs remote storage
std::fs::create_dir_all(env.base_data_dir.join(PAGESERVER_REMOTE_STORAGE_DIR))?;
// Initialize pageserver, create initial tenant and timeline.
for ps_conf in &env.pageservers {
PageServerNode::from_env(&env, ps_conf)
@@ -452,15 +433,6 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
.with_context(|| format!("Tenant config failed for tenant with id {tenant_id}"))?;
println!("tenant {tenant_id} successfully configured on the pageserver");
}
Some(("migrate", matches)) => {
let tenant_id = get_tenant_id(matches, env)?;
let new_pageserver = get_pageserver(env, matches)?;
let new_pageserver_id = new_pageserver.conf.id;
migrate_tenant(env, tenant_id, new_pageserver)?;
println!("tenant {tenant_id} migrated to {}", new_pageserver_id);
}
Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name),
None => bail!("no tenant subcommand provided"),
}
@@ -895,20 +867,20 @@ fn handle_mappings(sub_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Res
}
}
fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result<PageServerNode> {
let node_id = if let Some(id_str) = args.get_one::<String>("pageserver-id") {
NodeId(id_str.parse().context("while parsing pageserver id")?)
} else {
DEFAULT_PAGESERVER_ID
};
Ok(PageServerNode::from_env(
env,
env.get_pageserver_conf(node_id)?,
))
}
fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result<PageServerNode> {
let node_id = if let Some(id_str) = args.get_one::<String>("pageserver-id") {
NodeId(id_str.parse().context("while parsing pageserver id")?)
} else {
DEFAULT_PAGESERVER_ID
};
Ok(PageServerNode::from_env(
env,
env.get_pageserver_conf(node_id)?,
))
}
match sub_match.subcommand() {
Some(("start", subcommand_args)) => {
if let Err(e) = get_pageserver(env, subcommand_args)?
@@ -945,20 +917,6 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
}
}
Some(("migrate", subcommand_args)) => {
let pageserver = get_pageserver(env, subcommand_args)?;
//TODO what shutdown strategy should we use here?
if let Err(e) = pageserver.stop(false) {
eprintln!("pageserver stop failed: {}", e);
exit(1);
}
if let Err(e) = pageserver.start(&pageserver_config_overrides(subcommand_args)) {
eprintln!("pageserver start failed: {e}");
exit(1);
}
}
Some(("status", subcommand_args)) => {
match get_pageserver(env, subcommand_args)?.check_status() {
Ok(_) => println!("Page server is up and running"),
@@ -1266,13 +1224,6 @@ fn cli() -> Command {
.help("Force initialization even if the repository is not empty")
.required(false);
let num_pageservers_arg = Arg::new("num-pageservers")
.value_parser(value_parser!(u16))
.long("num-pageservers")
.help("How many pageservers to create (default 1)")
.required(false)
.default_value("1");
Command::new("Neon CLI")
.arg_required_else_help(true)
.version(GIT_VERSION)
@@ -1280,7 +1231,6 @@ fn cli() -> Command {
Command::new("init")
.about("Initialize a new Neon repository, preparing configs for services to start with")
.arg(pageserver_config_args.clone())
.arg(num_pageservers_arg.clone())
.arg(
Arg::new("config")
.long("config")
@@ -1351,10 +1301,6 @@ fn cli() -> Command {
.subcommand(Command::new("config")
.arg(tenant_id_arg.clone())
.arg(Arg::new("config").short('c').num_args(1).action(ArgAction::Append).required(false)))
.subcommand(Command::new("migrate")
.about("Migrate a tenant from one pageserver to another")
.arg(tenant_id_arg.clone())
.arg(pageserver_id_arg.clone()))
)
.subcommand(
Command::new("pageserver")

View File

@@ -1,10 +1,11 @@
//! Local control plane.
//!
//! Can start, configure and stop postgres instances running as a local processes.
//!
//! Intended to be used in integration tests and in CLI tools for
//! local installations.
#![deny(clippy::undocumented_unsafe_blocks)]
//
// Local control plane.
//
// Can start, configure and stop postgres instances running as a local processes.
//
// Intended to be used in integration tests and in CLI tools for
// local installations.
//
pub mod attachment_service;
mod background_process;
@@ -14,4 +15,3 @@ pub mod local_env;
pub mod pageserver;
pub mod postgresql_conf;
pub mod safekeeper;
pub mod tenant_migration;

View File

@@ -15,10 +15,7 @@ use std::{io, result};
use anyhow::{bail, Context};
use camino::Utf8PathBuf;
use pageserver_api::models::{
self, LocationConfig, TenantInfo, TenantLocationConfigRequest, TimelineInfo,
};
use pageserver_api::shard::TenantShardId;
use pageserver_api::models::{self, TenantInfo, TimelineInfo};
use postgres_backend::AuthType;
use postgres_connection::{parse_host_port, PgConnectionConfig};
use reqwest::blocking::{Client, RequestBuilder, Response};
@@ -34,9 +31,6 @@ use utils::{
use crate::local_env::PageServerConf;
use crate::{background_process, local_env::LocalEnv};
/// Directory within .neon which will be used by default for LocalFs remote storage.
pub const PAGESERVER_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/pageserver";
#[derive(Error, Debug)]
pub enum PageserverHttpError {
#[error("Reqwest error: {0}")]
@@ -104,10 +98,8 @@ impl PageServerNode {
}
}
/// Merge overrides provided by the user on the command line with our default overides derived from neon_local configuration.
///
/// These all end up on the command line of the `pageserver` binary.
fn neon_local_overrides(&self, cli_overrides: &[&str]) -> Vec<String> {
// pageserver conf overrides defined by neon_local configuration.
fn neon_local_overrides(&self) -> Vec<String> {
let id = format!("id={}", self.conf.id);
// FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc.
let pg_distrib_dir_param = format!(
@@ -140,25 +132,12 @@ impl PageServerNode {
));
}
if !cli_overrides
.iter()
.any(|c| c.starts_with("remote_storage"))
{
overrides.push(format!(
"remote_storage={{local_path='../{PAGESERVER_REMOTE_STORAGE_DIR}'}}"
));
}
if self.conf.http_auth_type != AuthType::Trust || self.conf.pg_auth_type != AuthType::Trust
{
// Keys are generated in the toplevel repo dir, pageservers' workdirs
// are one level below that, so refer to keys with ../
overrides.push("auth_validation_public_key_path='../auth_public_key.pem'".to_owned());
}
// Apply the user-provided overrides
overrides.extend(cli_overrides.iter().map(|&c| c.to_owned()));
overrides
}
@@ -224,6 +203,9 @@ impl PageServerNode {
}
fn start_node(&self, config_overrides: &[&str], update_config: bool) -> anyhow::Result<Child> {
let mut overrides = self.neon_local_overrides();
overrides.extend(config_overrides.iter().map(|&c| c.to_owned()));
let datadir = self.repo_path();
print!(
"Starting pageserver node {} at '{}' in {:?}",
@@ -266,7 +248,8 @@ impl PageServerNode {
) -> Vec<Cow<'a, str>> {
let mut args = vec![Cow::Borrowed("-D"), Cow::Borrowed(datadir_path_str)];
let overrides = self.neon_local_overrides(config_overrides);
let mut overrides = self.neon_local_overrides();
overrides.extend(config_overrides.iter().map(|&c| c.to_owned()));
for config_override in overrides {
args.push(Cow::Borrowed("-c"));
args.push(Cow::Owned(config_override));
@@ -409,7 +392,7 @@ impl PageServerNode {
};
let request = models::TenantCreateRequest {
new_tenant_id: TenantShardId::unsharded(new_tenant_id),
new_tenant_id,
generation,
config,
};
@@ -518,27 +501,6 @@ impl PageServerNode {
Ok(())
}
pub fn location_config(
&self,
tenant_id: TenantId,
config: LocationConfig,
) -> anyhow::Result<()> {
let req_body = TenantLocationConfigRequest { tenant_id, config };
self.http_request(
Method::PUT,
format!(
"{}/tenant/{}/location_config",
self.http_base_url, tenant_id
),
)?
.json(&req_body)
.send()?
.error_from_body()?;
Ok(())
}
pub fn timeline_list(&self, tenant_id: &TenantId) -> anyhow::Result<Vec<TimelineInfo>> {
let timeline_infos: Vec<TimelineInfo> = self
.http_request(

View File

@@ -1,202 +0,0 @@
//!
//! Functionality for migrating tenants across pageservers: unlike most of neon_local, this code
//! isn't scoped to a particular physical service, as it needs to update compute endpoints to
//! point to the new pageserver.
//!
use crate::local_env::LocalEnv;
use crate::{
attachment_service::AttachmentService, endpoint::ComputeControlPlane,
pageserver::PageServerNode,
};
use pageserver_api::models::{
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
};
use std::collections::HashMap;
use std::time::Duration;
use utils::{
generation::Generation,
id::{TenantId, TimelineId},
lsn::Lsn,
};
/// Given an attached pageserver, retrieve the LSN for all timelines
fn get_lsns(
tenant_id: TenantId,
pageserver: &PageServerNode,
) -> anyhow::Result<HashMap<TimelineId, Lsn>> {
let timelines = pageserver.timeline_list(&tenant_id)?;
Ok(timelines
.into_iter()
.map(|t| (t.timeline_id, t.last_record_lsn))
.collect())
}
/// Wait for the timeline LSNs on `pageserver` to catch up with or overtake
/// `baseline`.
fn await_lsn(
tenant_id: TenantId,
pageserver: &PageServerNode,
baseline: HashMap<TimelineId, Lsn>,
) -> anyhow::Result<()> {
loop {
let latest = match get_lsns(tenant_id, pageserver) {
Ok(l) => l,
Err(e) => {
println!(
"🕑 Can't get LSNs on pageserver {} yet, waiting ({e})",
pageserver.conf.id
);
std::thread::sleep(Duration::from_millis(500));
continue;
}
};
let mut any_behind: bool = false;
for (timeline_id, baseline_lsn) in &baseline {
match latest.get(timeline_id) {
Some(latest_lsn) => {
println!("🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}");
if latest_lsn < baseline_lsn {
any_behind = true;
}
}
None => {
// Expected timeline isn't yet visible on migration destination.
// (IRL we would have to account for timeline deletion, but this
// is just test helper)
any_behind = true;
}
}
}
if !any_behind {
println!("✅ LSN caught up. Proceeding...");
break;
} else {
std::thread::sleep(Duration::from_millis(500));
}
}
Ok(())
}
/// This function spans multiple services, to demonstrate live migration of a tenant
/// between pageservers:
/// - Coordinate attach/secondary/detach on pageservers
/// - call into attachment_service for generations
/// - reconfigure compute endpoints to point to new attached pageserver
pub fn migrate_tenant(
env: &LocalEnv,
tenant_id: TenantId,
dest_ps: PageServerNode,
) -> anyhow::Result<()> {
// Get a new generation
let attachment_service = AttachmentService::from_env(env);
let previous = attachment_service.inspect(tenant_id)?;
let mut baseline_lsns = None;
if let Some((generation, origin_ps_id)) = &previous {
let origin_ps = PageServerNode::from_env(env, env.get_pageserver_conf(*origin_ps_id)?);
if origin_ps_id == &dest_ps.conf.id {
println!("🔁 Already attached to {origin_ps_id}, freshening...");
let gen = attachment_service.attach_hook(tenant_id, dest_ps.conf.id)?;
let dest_conf = LocationConfig {
mode: LocationConfigMode::AttachedSingle,
generation: gen.map(Generation::new),
secondary_conf: None,
tenant_conf: TenantConfig::default(),
};
dest_ps.location_config(tenant_id, dest_conf)?;
println!("✅ Migration complete");
return Ok(());
}
println!("🔁 Switching origin pageserver {origin_ps_id} to stale mode");
let stale_conf = LocationConfig {
mode: LocationConfigMode::AttachedStale,
generation: Some(Generation::new(*generation)),
secondary_conf: None,
tenant_conf: TenantConfig::default(),
};
origin_ps.location_config(tenant_id, stale_conf)?;
baseline_lsns = Some(get_lsns(tenant_id, &origin_ps)?);
}
let gen = attachment_service.attach_hook(tenant_id, dest_ps.conf.id)?;
let dest_conf = LocationConfig {
mode: LocationConfigMode::AttachedMulti,
generation: gen.map(Generation::new),
secondary_conf: None,
tenant_conf: TenantConfig::default(),
};
println!("🔁 Attaching to pageserver {}", dest_ps.conf.id);
dest_ps.location_config(tenant_id, dest_conf)?;
if let Some(baseline) = baseline_lsns {
println!("🕑 Waiting for LSN to catch up...");
await_lsn(tenant_id, &dest_ps, baseline)?;
}
let cplane = ComputeControlPlane::load(env.clone())?;
for (endpoint_name, endpoint) in &cplane.endpoints {
if endpoint.tenant_id == tenant_id {
println!(
"🔁 Reconfiguring endpoint {} to use pageserver {}",
endpoint_name, dest_ps.conf.id
);
endpoint.reconfigure(Some(dest_ps.conf.id))?;
}
}
for other_ps_conf in &env.pageservers {
if other_ps_conf.id == dest_ps.conf.id {
continue;
}
let other_ps = PageServerNode::from_env(env, other_ps_conf);
let other_ps_tenants = other_ps.tenant_list()?;
// Check if this tenant is attached
let found = other_ps_tenants
.into_iter()
.map(|t| t.id)
.any(|i| i == tenant_id);
if !found {
continue;
}
// Downgrade to a secondary location
let secondary_conf = LocationConfig {
mode: LocationConfigMode::Secondary,
generation: None,
secondary_conf: Some(LocationConfigSecondary { warm: true }),
tenant_conf: TenantConfig::default(),
};
println!(
"💤 Switching to secondary mode on pageserver {}",
other_ps.conf.id
);
other_ps.location_config(tenant_id, secondary_conf)?;
}
println!(
"🔁 Switching to AttachedSingle mode on pageserver {}",
dest_ps.conf.id
);
let dest_conf = LocationConfig {
mode: LocationConfigMode::AttachedSingle,
generation: gen.map(Generation::new),
secondary_conf: None,
tenant_conf: TenantConfig::default(),
};
dest_ps.location_config(tenant_id, dest_conf)?;
println!("✅ Migration complete");
Ok(())
}

View File

@@ -74,30 +74,10 @@ highlight = "all"
workspace-default-features = "allow"
external-default-features = "allow"
allow = []
deny = []
skip = []
skip-tree = []
[[bans.deny]]
# we use tokio, the same rationale applies for async-{io,waker,global-executor,executor,channel,lock}, smol
# if you find yourself here while adding a dependency, try "default-features = false", ask around on #rust
name = "async-std"
[[bans.deny]]
name = "async-io"
[[bans.deny]]
name = "async-waker"
[[bans.deny]]
name = "async-global-executor"
[[bans.deny]]
name = "async-executor"
[[bans.deny]]
name = "smol"
# This section is considered when running `cargo deny check sources`.
# More documentation about the 'sources' section can be found here:
# https://embarkstudios.github.io/cargo-deny/checks/sources/cfg.html

View File

@@ -177,7 +177,7 @@ I e during migration create_branch can be called on old pageserver and newly cre
The difference of simplistic approach from one described above is that it calls ignore on source tenant first and then calls attach on target pageserver. Approach above does it in opposite order thus opening a possibility for race conditions we strive to avoid.
The approach largely follows this guide: <https://www.notion.so/neondatabase/Cloud-Ad-hoc-tenant-relocation-f687474f7bfc42269e6214e3acba25c7>
The approach largely follows this guide: <https://github.com/neondatabase/cloud/wiki/Cloud:-Ad-hoc-tenant-relocation>
The happy path sequence:

View File

@@ -1,5 +1,3 @@
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
pub mod requests;
pub mod responses;
pub mod spec;

View File

@@ -1,6 +1,6 @@
//!
//! Shared code for consumption metics collection
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
//!
use chrono::{DateTime, Utc};
use rand::Rng;
use serde::{Deserialize, Serialize};

View File

@@ -2,7 +2,6 @@
//! make sure that we use the same dep version everywhere.
//! Otherwise, we might not see all metrics registered via
//! a default registry.
#![deny(clippy::undocumented_unsafe_blocks)]
use once_cell::sync::Lazy;
use prometheus::core::{AtomicU64, Collector, GenericGauge, GenericGaugeVec};
pub use prometheus::opts;

View File

@@ -17,9 +17,5 @@ postgres_ffi.workspace = true
enum-map.workspace = true
strum.workspace = true
strum_macros.workspace = true
hex.workspace = true
workspace_hack.workspace = true
[dev-dependencies]
bincode.workspace = true

View File

@@ -1,174 +0,0 @@
use anyhow::{bail, Result};
use byteorder::{ByteOrder, BE};
use serde::{Deserialize, Serialize};
use std::fmt;
/// Key used in the Repository kv-store.
///
/// The Repository treats this as an opaque struct, but see the code in pgdatadir_mapping.rs
/// for what we actually store in these fields.
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
pub struct Key {
pub field1: u8,
pub field2: u32,
pub field3: u32,
pub field4: u32,
pub field5: u8,
pub field6: u32,
}
pub const KEY_SIZE: usize = 18;
impl Key {
/// 'field2' is used to store tablespaceid for relations and small enum numbers for other relish.
/// As long as Neon does not support tablespace (because of lack of access to local file system),
/// we can assume that only some predefined namespace OIDs are used which can fit in u16
pub fn to_i128(&self) -> i128 {
assert!(self.field2 < 0xFFFF || self.field2 == 0xFFFFFFFF || self.field2 == 0x22222222);
(((self.field1 & 0xf) as i128) << 120)
| (((self.field2 & 0xFFFF) as i128) << 104)
| ((self.field3 as i128) << 72)
| ((self.field4 as i128) << 40)
| ((self.field5 as i128) << 32)
| self.field6 as i128
}
pub const fn from_i128(x: i128) -> Self {
Key {
field1: ((x >> 120) & 0xf) as u8,
field2: ((x >> 104) & 0xFFFF) as u32,
field3: (x >> 72) as u32,
field4: (x >> 40) as u32,
field5: (x >> 32) as u8,
field6: x as u32,
}
}
pub fn next(&self) -> Key {
self.add(1)
}
pub fn add(&self, x: u32) -> Key {
let mut key = *self;
let r = key.field6.overflowing_add(x);
key.field6 = r.0;
if r.1 {
let r = key.field5.overflowing_add(1);
key.field5 = r.0;
if r.1 {
let r = key.field4.overflowing_add(1);
key.field4 = r.0;
if r.1 {
let r = key.field3.overflowing_add(1);
key.field3 = r.0;
if r.1 {
let r = key.field2.overflowing_add(1);
key.field2 = r.0;
if r.1 {
let r = key.field1.overflowing_add(1);
key.field1 = r.0;
assert!(!r.1);
}
}
}
}
}
key
}
pub fn from_slice(b: &[u8]) -> Self {
Key {
field1: b[0],
field2: u32::from_be_bytes(b[1..5].try_into().unwrap()),
field3: u32::from_be_bytes(b[5..9].try_into().unwrap()),
field4: u32::from_be_bytes(b[9..13].try_into().unwrap()),
field5: b[13],
field6: u32::from_be_bytes(b[14..18].try_into().unwrap()),
}
}
pub fn write_to_byte_slice(&self, buf: &mut [u8]) {
buf[0] = self.field1;
BE::write_u32(&mut buf[1..5], self.field2);
BE::write_u32(&mut buf[5..9], self.field3);
BE::write_u32(&mut buf[9..13], self.field4);
buf[13] = self.field5;
BE::write_u32(&mut buf[14..18], self.field6);
}
}
impl fmt::Display for Key {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{:02X}{:08X}{:08X}{:08X}{:02X}{:08X}",
self.field1, self.field2, self.field3, self.field4, self.field5, self.field6
)
}
}
impl Key {
pub const MIN: Key = Key {
field1: u8::MIN,
field2: u32::MIN,
field3: u32::MIN,
field4: u32::MIN,
field5: u8::MIN,
field6: u32::MIN,
};
pub const MAX: Key = Key {
field1: u8::MAX,
field2: u32::MAX,
field3: u32::MAX,
field4: u32::MAX,
field5: u8::MAX,
field6: u32::MAX,
};
pub fn from_hex(s: &str) -> Result<Self> {
if s.len() != 36 {
bail!("parse error");
}
Ok(Key {
field1: u8::from_str_radix(&s[0..2], 16)?,
field2: u32::from_str_radix(&s[2..10], 16)?,
field3: u32::from_str_radix(&s[10..18], 16)?,
field4: u32::from_str_radix(&s[18..26], 16)?,
field5: u8::from_str_radix(&s[26..28], 16)?,
field6: u32::from_str_radix(&s[28..36], 16)?,
})
}
}
impl std::str::FromStr for Key {
type Err = anyhow::Error;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
Self::from_hex(s)
}
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use crate::key::Key;
#[test]
fn display_fromstr_bijection() {
let mut rng = rand::thread_rng();
use rand::Rng;
let key = Key {
field1: rng.gen(),
field2: rng.gen(),
field3: rng.gen(),
field4: rng.gen(),
field5: rng.gen(),
field6: rng.gen(),
};
assert_eq!(key, Key::from_str(&format!("{key}")).unwrap());
}
}

View File

@@ -1,13 +1,9 @@
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
use const_format::formatcp;
/// Public API types
pub mod control_api;
pub mod key;
pub mod models;
pub mod reltag;
pub mod shard;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");

View File

@@ -16,9 +16,9 @@ use utils::{
lsn::Lsn,
};
use crate::{reltag::RelTag, shard::TenantShardId};
use crate::reltag::RelTag;
use anyhow::bail;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use bytes::{BufMut, Bytes, BytesMut};
/// The state of a tenant in this pageserver.
///
@@ -187,7 +187,7 @@ pub struct TimelineCreateRequest {
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantCreateRequest {
pub new_tenant_id: TenantShardId,
pub new_tenant_id: TenantId,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub generation: Option<u32>,
@@ -371,8 +371,6 @@ pub struct TenantInfo {
/// If a layer is present in both local FS and S3, it counts only once.
pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
pub attachment_status: TenantAttachmentStatus,
#[serde(skip_serializing_if = "Option::is_none")]
pub generation: Option<u32>,
}
/// This represents the output of the "timeline_detail" and "timeline_list" API calls.
@@ -517,8 +515,6 @@ pub enum HistoricLayerInfo {
lsn_end: Lsn,
remote: bool,
access_stats: LayerAccessStats,
remote_path: Option<String>,
},
Image {
layer_file_name: String,
@@ -527,8 +523,6 @@ pub enum HistoricLayerInfo {
lsn_start: Lsn,
remote: bool,
access_stats: LayerAccessStats,
remote_path: Option<String>,
},
}
@@ -773,36 +767,6 @@ impl PagestreamBeMessage {
bytes.into()
}
pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
let mut buf = buf.reader();
let msg_tag = buf.read_u8()?;
match msg_tag {
100 => todo!(),
101 => todo!(),
102 => {
let buf = buf.get_ref();
/* TODO use constant */
if buf.len() == 8192 {
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
page: buf.clone(),
}))
} else {
anyhow::bail!("invalid page size: {}", buf.len());
}
}
103 => {
let buf = buf.get_ref();
let cstr = std::ffi::CStr::from_bytes_until_nul(buf)?;
let rust_str = cstr.to_str()?;
Ok(PagestreamBeMessage::Error(PagestreamErrorResponse {
message: rust_str.to_owned(),
}))
}
104 => todo!(),
_ => bail!("unknown tag: {:?}", msg_tag),
}
}
}
#[cfg(test)]
@@ -868,7 +832,6 @@ mod tests {
state: TenantState::Active,
current_physical_size: Some(42),
attachment_status: TenantAttachmentStatus::Attached,
generation: None,
};
let expected_active = json!({
"id": original_active.id.to_string(),
@@ -889,7 +852,6 @@ mod tests {
},
current_physical_size: Some(42),
attachment_status: TenantAttachmentStatus::Attached,
generation: None,
};
let expected_broken = json!({
"id": original_broken.id.to_string(),

View File

@@ -1,321 +0,0 @@
use std::{ops::RangeInclusive, str::FromStr};
use hex::FromHex;
use serde::{Deserialize, Serialize};
use utils::id::TenantId;
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug)]
pub struct ShardNumber(pub u8);
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug)]
pub struct ShardCount(pub u8);
impl ShardCount {
pub const MAX: Self = Self(u8::MAX);
}
impl ShardNumber {
pub const MAX: Self = Self(u8::MAX);
}
/// TenantShardId identify the units of work for the Pageserver.
///
/// These are written as `<tenant_id>-<shard number><shard-count>`, for example:
///
/// # The second shard in a two-shard tenant
/// 072f1291a5310026820b2fe4b2968934-0102
///
/// Historically, tenants could not have multiple shards, and were identified
/// by TenantId. To support this, TenantShardId has a special legacy
/// mode where `shard_count` is equal to zero: this represents a single-sharded
/// tenant which should be written as a TenantId with no suffix.
///
/// The human-readable encoding of TenantShardId, such as used in API URLs,
/// is both forward and backward compatible: a legacy TenantId can be
/// decoded as a TenantShardId, and when re-encoded it will be parseable
/// as a TenantId.
///
/// Note that the binary encoding is _not_ backward compatible, because
/// at the time sharding is introduced, there are no existing binary structures
/// containing TenantId that we need to handle.
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy)]
pub struct TenantShardId {
pub tenant_id: TenantId,
pub shard_number: ShardNumber,
pub shard_count: ShardCount,
}
impl TenantShardId {
pub fn unsharded(tenant_id: TenantId) -> Self {
Self {
tenant_id,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
}
}
/// The range of all TenantShardId that belong to a particular TenantId. This is useful when
/// you have a BTreeMap of TenantShardId, and are querying by TenantId.
pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
RangeInclusive::new(
Self {
tenant_id,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
},
Self {
tenant_id,
shard_number: ShardNumber::MAX,
shard_count: ShardCount::MAX,
},
)
}
pub fn shard_slug(&self) -> String {
format!("{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
}
}
impl std::fmt::Display for TenantShardId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.shard_count != ShardCount(0) {
write!(
f,
"{}-{:02x}{:02x}",
self.tenant_id, self.shard_number.0, self.shard_count.0
)
} else {
// Legacy case (shard_count == 0) -- format as just the tenant id. Note that this
// is distinct from the normal single shard case (shard count == 1).
self.tenant_id.fmt(f)
}
}
}
impl std::fmt::Debug for TenantShardId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Debug is the same as Display: the compact hex representation
write!(f, "{}", self)
}
}
impl std::str::FromStr for TenantShardId {
type Err = hex::FromHexError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
// Expect format: 16 byte TenantId, '-', 1 byte shard number, 1 byte shard count
if s.len() == 32 {
// Legacy case: no shard specified
Ok(Self {
tenant_id: TenantId::from_str(s)?,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
})
} else if s.len() == 37 {
let bytes = s.as_bytes();
let tenant_id = TenantId::from_hex(&bytes[0..32])?;
let mut shard_parts: [u8; 2] = [0u8; 2];
hex::decode_to_slice(&bytes[33..37], &mut shard_parts)?;
Ok(Self {
tenant_id,
shard_number: ShardNumber(shard_parts[0]),
shard_count: ShardCount(shard_parts[1]),
})
} else {
Err(hex::FromHexError::InvalidStringLength)
}
}
}
impl From<[u8; 18]> for TenantShardId {
fn from(b: [u8; 18]) -> Self {
let tenant_id_bytes: [u8; 16] = b[0..16].try_into().unwrap();
Self {
tenant_id: TenantId::from(tenant_id_bytes),
shard_number: ShardNumber(b[16]),
shard_count: ShardCount(b[17]),
}
}
}
impl Serialize for TenantShardId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if serializer.is_human_readable() {
serializer.collect_str(self)
} else {
let mut packed: [u8; 18] = [0; 18];
packed[0..16].clone_from_slice(&self.tenant_id.as_arr());
packed[16] = self.shard_number.0;
packed[17] = self.shard_count.0;
packed.serialize(serializer)
}
}
}
impl<'de> Deserialize<'de> for TenantShardId {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct IdVisitor {
is_human_readable_deserializer: bool,
}
impl<'de> serde::de::Visitor<'de> for IdVisitor {
type Value = TenantShardId;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
if self.is_human_readable_deserializer {
formatter.write_str("value in form of hex string")
} else {
formatter.write_str("value in form of integer array([u8; 18])")
}
}
fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
let s = serde::de::value::SeqAccessDeserializer::new(seq);
let id: [u8; 18] = Deserialize::deserialize(s)?;
Ok(TenantShardId::from(id))
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
TenantShardId::from_str(v).map_err(E::custom)
}
}
if deserializer.is_human_readable() {
deserializer.deserialize_str(IdVisitor {
is_human_readable_deserializer: true,
})
} else {
deserializer.deserialize_tuple(
18,
IdVisitor {
is_human_readable_deserializer: false,
},
)
}
}
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use bincode;
use utils::{id::TenantId, Hex};
use super::*;
const EXAMPLE_TENANT_ID: &str = "1f359dd625e519a1a4e8d7509690f6fc";
#[test]
fn tenant_shard_id_string() -> Result<(), hex::FromHexError> {
let example = TenantShardId {
tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
shard_count: ShardCount(10),
shard_number: ShardNumber(7),
};
let encoded = format!("{example}");
let expected = format!("{EXAMPLE_TENANT_ID}-070a");
assert_eq!(&encoded, &expected);
let decoded = TenantShardId::from_str(&encoded)?;
assert_eq!(example, decoded);
Ok(())
}
#[test]
fn tenant_shard_id_binary() -> Result<(), hex::FromHexError> {
let example = TenantShardId {
tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
shard_count: ShardCount(10),
shard_number: ShardNumber(7),
};
let encoded = bincode::serialize(&example).unwrap();
let expected: [u8; 18] = [
0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
0xf6, 0xfc, 0x07, 0x0a,
];
assert_eq!(Hex(&encoded), Hex(&expected));
let decoded = bincode::deserialize(&encoded).unwrap();
assert_eq!(example, decoded);
Ok(())
}
#[test]
fn tenant_shard_id_backward_compat() -> Result<(), hex::FromHexError> {
// Test that TenantShardId can decode a TenantId in human
// readable form
let example = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
let encoded = format!("{example}");
assert_eq!(&encoded, EXAMPLE_TENANT_ID);
let decoded = TenantShardId::from_str(&encoded)?;
assert_eq!(example, decoded.tenant_id);
assert_eq!(decoded.shard_count, ShardCount(0));
assert_eq!(decoded.shard_number, ShardNumber(0));
Ok(())
}
#[test]
fn tenant_shard_id_forward_compat() -> Result<(), hex::FromHexError> {
// Test that a legacy TenantShardId encodes into a form that
// can be decoded as TenantId
let example_tenant_id = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
let example = TenantShardId::unsharded(example_tenant_id);
let encoded = format!("{example}");
assert_eq!(&encoded, EXAMPLE_TENANT_ID);
let decoded = TenantId::from_str(&encoded)?;
assert_eq!(example_tenant_id, decoded);
Ok(())
}
#[test]
fn tenant_shard_id_legacy_binary() -> Result<(), hex::FromHexError> {
// Unlike in human readable encoding, binary encoding does not
// do any special handling of legacy unsharded TenantIds: this test
// is equivalent to the main test for binary encoding, just verifying
// that the same behavior applies when we have used `unsharded()` to
// construct a TenantShardId.
let example = TenantShardId::unsharded(TenantId::from_str(EXAMPLE_TENANT_ID).unwrap());
let encoded = bincode::serialize(&example).unwrap();
let expected: [u8; 18] = [
0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
0xf6, 0xfc, 0x00, 0x00,
];
assert_eq!(Hex(&encoded), Hex(&expected));
let decoded = bincode::deserialize::<TenantShardId>(&encoded).unwrap();
assert_eq!(example, decoded);
Ok(())
}
}

View File

@@ -2,8 +2,6 @@
//! To use, create PostgresBackend and run() it, passing the Handler
//! implementation determining how to process the queries. Currently its API
//! is rather narrow, but we can extend it once required.
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
use anyhow::Context;
use bytes::Bytes;
use futures::pin_mut;
@@ -17,7 +15,7 @@ use std::{fmt, io};
use std::{future::Future, str::FromStr};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_rustls::TlsAcceptor;
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, error, info, trace};
use pq_proto::framed::{ConnectionError, Framed, FramedReader, FramedWriter};
use pq_proto::{
@@ -35,11 +33,6 @@ pub enum QueryError {
/// We were instructed to shutdown while processing the query
#[error("Shutting down")]
Shutdown,
/// Authentication failure
#[error("Unauthorized: {0}")]
Unauthorized(std::borrow::Cow<'static, str>),
#[error("Simulated Connection Error")]
SimulatedConnectionError,
/// Some other error
#[error(transparent)]
Other(#[from] anyhow::Error),
@@ -54,9 +47,8 @@ impl From<io::Error> for QueryError {
impl QueryError {
pub fn pg_error_code(&self) -> &'static [u8; 5] {
match self {
Self::Disconnected(_) | Self::SimulatedConnectionError => b"08006", // connection failure
Self::Disconnected(_) => b"08006", // connection failure
Self::Shutdown => SQLSTATE_ADMIN_SHUTDOWN,
Self::Unauthorized(_) => SQLSTATE_INTERNAL_ERROR,
Self::Other(_) => SQLSTATE_INTERNAL_ERROR, // internal error
}
}
@@ -616,7 +608,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
if let Err(e) = handler.check_auth_jwt(self, jwt_response) {
self.write_message_noflush(&BeMessage::ErrorResponse(
&short_error(&e),
&e.to_string(),
Some(e.pg_error_code()),
))?;
return Err(e);
@@ -738,9 +730,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
if let Err(e) = handler.process_query(self, query_string).await {
match e {
QueryError::Shutdown => return Ok(ProcessMsgResult::Break),
QueryError::SimulatedConnectionError => {
return Err(QueryError::SimulatedConnectionError)
}
e => {
log_query_error(query_string, &e);
let short_error = short_error(&e);
@@ -975,8 +964,6 @@ pub fn short_error(e: &QueryError) -> String {
match e {
QueryError::Disconnected(connection_error) => connection_error.to_string(),
QueryError::Shutdown => "shutdown".to_string(),
QueryError::Unauthorized(_e) => "JWT authentication error".to_string(),
QueryError::SimulatedConnectionError => "simulated connection error".to_string(),
QueryError::Other(e) => format!("{e:#}"),
}
}
@@ -993,15 +980,9 @@ fn log_query_error(query: &str, e: &QueryError) {
QueryError::Disconnected(other_connection_error) => {
error!("query handler for '{query}' failed with connection error: {other_connection_error:?}")
}
QueryError::SimulatedConnectionError => {
error!("query handler for query '{query}' failed due to a simulated connection error")
}
QueryError::Shutdown => {
info!("query handler for '{query}' cancelled during tenant shutdown")
}
QueryError::Unauthorized(e) => {
warn!("query handler for '{query}' failed with authentication error: {e}");
}
QueryError::Other(e) => {
error!("query handler for '{query}' failed: {e:?}");
}

View File

@@ -1,5 +1,3 @@
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
use anyhow::{bail, Context};
use itertools::Itertools;
use std::borrow::Cow;

View File

@@ -8,7 +8,6 @@
// modules included with the postgres_ffi macro depend on the types of the specific version's
// types, and trigger a too eager lint.
#![allow(clippy::duplicate_mod)]
#![deny(clippy::undocumented_unsafe_blocks)]
use bytes::Bytes;
use utils::bin_ser::SerializeError;
@@ -21,7 +20,6 @@ macro_rules! postgres_ffi {
pub mod bindings {
// bindgen generates bindings for a lot of stuff we don't need
#![allow(dead_code)]
#![allow(clippy::undocumented_unsafe_blocks)]
use serde::{Deserialize, Serialize};
include!(concat!(

View File

@@ -1,7 +1,6 @@
//! Postgres protocol messages serialization-deserialization. See
//! <https://www.postgresql.org/docs/devel/protocol-message-formats.html>
//! on message formats.
#![deny(clippy::undocumented_unsafe_blocks)]
pub mod framed;

View File

@@ -8,7 +8,6 @@ license.workspace = true
anyhow.workspace = true
async-trait.workspace = true
once_cell.workspace = true
aws-smithy-async.workspace = true
aws-smithy-http.workspace = true
aws-types.workspace = true
aws-config.workspace = true

View File

@@ -6,15 +6,19 @@
//! * [`s3_bucket`] uses AWS S3 bucket as an external storage
//! * [`azure_blob`] allows to use Azure Blob storage as an external storage
//!
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
mod azure_blob;
mod local_fs;
mod s3_bucket;
mod simulate_failures;
use std::{collections::HashMap, fmt::Debug, num::NonZeroUsize, pin::Pin, sync::Arc};
use std::{
collections::HashMap,
fmt::Debug,
num::{NonZeroU32, NonZeroUsize},
pin::Pin,
sync::Arc,
};
use anyhow::{bail, Context};
use camino::{Utf8Path, Utf8PathBuf};
@@ -30,6 +34,12 @@ pub use self::{
};
use s3_bucket::RequestKind;
/// How many different timelines can be processed simultaneously when synchronizing layers with the remote storage.
/// During regular work, pageserver produces one layer file per timeline checkpoint, with bursts of concurrency
/// during start (where local and remote timelines are compared and initial sync tasks are scheduled) and timeline attach.
/// Both cases may trigger timeline download, that might download a lot of layers. This concurrency is limited by the clients internally, if needed.
pub const DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS: usize = 50;
pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10;
/// Currently, sync happens with AWS S3, that has two limits on requests per second:
/// ~200 RPS for IAM services
/// <https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html>
@@ -81,12 +91,6 @@ impl std::fmt::Display for RemotePath {
}
}
impl From<RemotePath> for String {
fn from(val: RemotePath) -> Self {
val.0.into()
}
}
impl RemotePath {
pub fn new(relative_path: &Utf8Path) -> anyhow::Result<Self> {
anyhow::ensure!(
@@ -108,7 +112,7 @@ impl RemotePath {
self.0.file_name()
}
pub fn join<P: AsRef<Utf8Path>>(&self, segment: P) -> Self {
pub fn join(&self, segment: &Utf8Path) -> Self {
Self(self.0.join(segment))
}
@@ -437,6 +441,10 @@ pub struct StorageMetadata(HashMap<String, String>);
/// External backup storage configuration, enough for creating a client for that storage.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RemoteStorageConfig {
/// Max allowed number of concurrent sync operations between the API user and the remote storage.
pub max_concurrent_syncs: NonZeroUsize,
/// Max allowed errors before the sync task is considered failed and evicted.
pub max_sync_errors: NonZeroU32,
/// The storage connection configuration.
pub storage: RemoteStorageKind,
}
@@ -532,6 +540,18 @@ impl RemoteStorageConfig {
let use_azure = container_name.is_some() && container_region.is_some();
let max_concurrent_syncs = NonZeroUsize::new(
parse_optional_integer("max_concurrent_syncs", toml)?
.unwrap_or(DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS),
)
.context("Failed to parse 'max_concurrent_syncs' as a positive integer")?;
let max_sync_errors = NonZeroU32::new(
parse_optional_integer("max_sync_errors", toml)?
.unwrap_or(DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS),
)
.context("Failed to parse 'max_sync_errors' as a positive integer")?;
let default_concurrency_limit = if use_azure {
DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT
} else {
@@ -613,7 +633,11 @@ impl RemoteStorageConfig {
}
};
Ok(Some(RemoteStorageConfig { storage }))
Ok(Some(RemoteStorageConfig {
max_concurrent_syncs,
max_sync_errors,
storage,
}))
}
}

View File

@@ -4,27 +4,23 @@
//! allowing multiple api users to independently work with the same S3 bucket, if
//! their bucket prefixes are both specified and different.
use std::{borrow::Cow, sync::Arc};
use std::borrow::Cow;
use anyhow::Context;
use aws_config::{
environment::credentials::EnvironmentVariableCredentialsProvider,
imds::credentials::ImdsCredentialsProvider,
meta::credentials::CredentialsProviderChain,
provider_config::ProviderConfig,
retry::{RetryConfigBuilder, RetryMode},
web_identity_token::WebIdentityTokenCredentialsProvider,
imds::credentials::ImdsCredentialsProvider, meta::credentials::CredentialsProviderChain,
provider_config::ProviderConfig, web_identity_token::WebIdentityTokenCredentialsProvider,
};
use aws_credential_types::cache::CredentialsCache;
use aws_sdk_s3::{
config::{AsyncSleep, Config, Region, SharedAsyncSleep},
config::{Config, Region},
error::SdkError,
operation::get_object::GetObjectError,
primitives::ByteStream,
types::{Delete, ObjectIdentifier},
Client,
};
use aws_smithy_async::rt::sleep::TokioSleep;
use aws_smithy_http::body::SdkBody;
use hyper::Body;
use scopeguard::ScopeGuard;
@@ -87,23 +83,10 @@ impl S3Bucket {
.or_else("imds", ImdsCredentialsProvider::builder().build())
};
// AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
// We do our own retries (see [`backoff::retry`]). However, for the AWS SDK to enable rate limiting in response to throttling
// responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config. We set it to use at most one
// attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled.
let mut retry_config = RetryConfigBuilder::new();
retry_config
.set_max_attempts(Some(1))
.set_mode(Some(RetryMode::Adaptive));
let mut config_builder = Config::builder()
.region(region)
.credentials_cache(CredentialsCache::lazy())
.credentials_provider(credentials_provider)
.sleep_impl(SharedAsyncSleep::from(sleep_impl))
.retry_config(retry_config.build());
.credentials_provider(credentials_provider);
if let Some(custom_endpoint) = aws_config.endpoint.clone() {
config_builder = config_builder

View File

@@ -1,6 +1,6 @@
use std::collections::HashSet;
use std::env;
use std::num::NonZeroUsize;
use std::num::{NonZeroU32, NonZeroUsize};
use std::ops::ControlFlow;
use std::path::PathBuf;
use std::sync::Arc;
@@ -278,10 +278,9 @@ async fn azure_upload_download_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Res
fn ensure_logging_ready() {
LOGGING_DONE.get_or_init(|| {
let _ = utils::logging::init(
utils::logging::init(
utils::logging::LogFormat::Test,
utils::logging::TracingErrorLayerEnablement::Disabled,
utils::logging::Output::Stdout,
)
.expect("logging init failed");
});
@@ -470,6 +469,8 @@ fn create_azure_client(
let random = rand::thread_rng().gen::<u32>();
let remote_storage_config = RemoteStorageConfig {
max_concurrent_syncs: NonZeroUsize::new(100).unwrap(),
max_sync_errors: NonZeroU32::new(5).unwrap(),
storage: RemoteStorageKind::AzureContainer(AzureConfig {
container_name: remote_storage_azure_container,
container_region: remote_storage_azure_region,

View File

@@ -1,6 +1,6 @@
use std::collections::HashSet;
use std::env;
use std::num::NonZeroUsize;
use std::num::{NonZeroU32, NonZeroUsize};
use std::ops::ControlFlow;
use std::path::PathBuf;
use std::sync::Arc;
@@ -207,10 +207,9 @@ async fn s3_delete_objects_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()>
fn ensure_logging_ready() {
LOGGING_DONE.get_or_init(|| {
let _ = utils::logging::init(
utils::logging::init(
utils::logging::LogFormat::Test,
utils::logging::TracingErrorLayerEnablement::Disabled,
utils::logging::Output::Stdout,
)
.expect("logging init failed");
});
@@ -397,6 +396,8 @@ fn create_s3_client(
let random = rand::thread_rng().gen::<u32>();
let remote_storage_config = RemoteStorageConfig {
max_concurrent_syncs: NonZeroUsize::new(100).unwrap(),
max_sync_errors: NonZeroU32::new(5).unwrap(),
storage: RemoteStorageKind::AwsS3(S3Config {
bucket_name: remote_storage_s3_bucket,
bucket_region: remote_storage_s3_region,

View File

@@ -1,5 +1,3 @@
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
use const_format::formatcp;
/// Public API types

View File

@@ -1,6 +1,4 @@
//! Synthetic size calculation
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
mod calculation;
pub mod svg;

View File

@@ -32,8 +32,6 @@
//! .init();
//! }
//! ```
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
use opentelemetry::sdk::Resource;
use opentelemetry::KeyValue;

View File

@@ -5,7 +5,6 @@ edition.workspace = true
license.workspace = true
[dependencies]
arc-swap.workspace = true
sentry.workspace = true
async-trait.workspace = true
anyhow.workspace = true
@@ -49,8 +48,6 @@ const_format.workspace = true
# to use tokio channels as streams, this is faster to compile than async_stream
# why is it only here? no other crate should use it, streams are rarely needed.
tokio-stream = { version = "0.1.14" }
tracing-chrome = "0.7.1"
tracing-flame = "0.2.0"
[dev-dependencies]
byteorder.workspace = true

View File

@@ -1,8 +1,7 @@
// For details about authentication see docs/authentication.md
use arc_swap::ArcSwap;
use serde;
use std::{borrow::Cow, fmt::Display, fs, sync::Arc};
use std::fs;
use anyhow::Result;
use camino::Utf8Path;
@@ -11,7 +10,7 @@ use jsonwebtoken::{
};
use serde::{Deserialize, Serialize};
use crate::{http::error::ApiError, id::TenantId};
use crate::id::TenantId;
/// Algorithm to use. We require EdDSA.
const STORAGE_TOKEN_ALGORITHM: Algorithm = Algorithm::EdDSA;
@@ -45,106 +44,31 @@ impl Claims {
}
}
pub struct SwappableJwtAuth(ArcSwap<JwtAuth>);
impl SwappableJwtAuth {
pub fn new(jwt_auth: JwtAuth) -> Self {
SwappableJwtAuth(ArcSwap::new(Arc::new(jwt_auth)))
}
pub fn swap(&self, jwt_auth: JwtAuth) {
self.0.swap(Arc::new(jwt_auth));
}
pub fn decode(&self, token: &str) -> std::result::Result<TokenData<Claims>, AuthError> {
self.0.load().decode(token)
}
}
impl std::fmt::Debug for SwappableJwtAuth {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Swappable({:?})", self.0.load())
}
}
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct AuthError(pub Cow<'static, str>);
impl Display for AuthError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl From<AuthError> for ApiError {
fn from(_value: AuthError) -> Self {
// Don't pass on the value of the AuthError as a precautionary measure.
// Being intentionally vague in public error communication hurts debugability
// but it is more secure.
ApiError::Forbidden("JWT authentication error".to_string())
}
}
pub struct JwtAuth {
decoding_keys: Vec<DecodingKey>,
decoding_key: DecodingKey,
validation: Validation,
}
impl JwtAuth {
pub fn new(decoding_keys: Vec<DecodingKey>) -> Self {
pub fn new(decoding_key: DecodingKey) -> Self {
let mut validation = Validation::default();
validation.algorithms = vec![STORAGE_TOKEN_ALGORITHM];
// The default 'required_spec_claims' is 'exp'. But we don't want to require
// expiration.
validation.required_spec_claims = [].into();
Self {
decoding_keys,
decoding_key,
validation,
}
}
pub fn from_key_path(key_path: &Utf8Path) -> Result<Self> {
let metadata = key_path.metadata()?;
let decoding_keys = if metadata.is_dir() {
let mut keys = Vec::new();
for entry in fs::read_dir(key_path)? {
let path = entry?.path();
if !path.is_file() {
// Ignore directories (don't recurse)
continue;
}
let public_key = fs::read(path)?;
keys.push(DecodingKey::from_ed_pem(&public_key)?);
}
keys
} else if metadata.is_file() {
let public_key = fs::read(key_path)?;
vec![DecodingKey::from_ed_pem(&public_key)?]
} else {
anyhow::bail!("path is neither a directory or a file")
};
if decoding_keys.is_empty() {
anyhow::bail!("Configured for JWT auth with zero decoding keys. All JWT gated requests would be rejected.");
}
Ok(Self::new(decoding_keys))
let public_key = fs::read(key_path)?;
Ok(Self::new(DecodingKey::from_ed_pem(&public_key)?))
}
/// Attempt to decode the token with the internal decoding keys.
///
/// The function tries the stored decoding keys in succession,
/// and returns the first yielding a successful result.
/// If there is no working decoding key, it returns the last error.
pub fn decode(&self, token: &str) -> std::result::Result<TokenData<Claims>, AuthError> {
let mut res = None;
for decoding_key in &self.decoding_keys {
res = Some(decode(token, decoding_key, &self.validation));
if let Some(Ok(res)) = res {
return Ok(res);
}
}
if let Some(res) = res {
res.map_err(|e| AuthError(Cow::Owned(e.to_string())))
} else {
Err(AuthError(Cow::Borrowed("no JWT decoding keys configured")))
}
pub fn decode(&self, token: &str) -> Result<TokenData<Claims>> {
Ok(decode(token, &self.decoding_key, &self.validation)?)
}
}
@@ -184,9 +108,9 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
"#;
#[test]
fn test_decode() {
fn test_decode() -> Result<(), anyhow::Error> {
let expected_claims = Claims {
tenant_id: Some(TenantId::from_str("3d1f7595b468230304e0b73cecbcb081").unwrap()),
tenant_id: Some(TenantId::from_str("3d1f7595b468230304e0b73cecbcb081")?),
scope: Scope::Tenant,
};
@@ -205,24 +129,28 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
let encoded_eddsa = "eyJhbGciOiJFZERTQSIsInR5cCI6IkpXVCJ9.eyJzY29wZSI6InRlbmFudCIsInRlbmFudF9pZCI6IjNkMWY3NTk1YjQ2ODIzMDMwNGUwYjczY2VjYmNiMDgxIiwiaXNzIjoibmVvbi5jb250cm9scGxhbmUiLCJleHAiOjE3MDkyMDA4NzksImlhdCI6MTY3ODQ0MjQ3OX0.U3eA8j-uU-JnhzeO3EDHRuXLwkAUFCPxtGHEgw6p7Ccc3YRbFs2tmCdbD9PZEXP-XsxSeBQi1FY0YPcT3NXADw";
// Check it can be validated with the public key
let auth = JwtAuth::new(vec![DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519).unwrap()]);
let claims_from_token = auth.decode(encoded_eddsa).unwrap().claims;
let auth = JwtAuth::new(DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519)?);
let claims_from_token = auth.decode(encoded_eddsa)?.claims;
assert_eq!(claims_from_token, expected_claims);
Ok(())
}
#[test]
fn test_encode() {
fn test_encode() -> Result<(), anyhow::Error> {
let claims = Claims {
tenant_id: Some(TenantId::from_str("3d1f7595b468230304e0b73cecbcb081").unwrap()),
tenant_id: Some(TenantId::from_str("3d1f7595b468230304e0b73cecbcb081")?),
scope: Scope::Tenant,
};
let encoded = encode_from_key_file(&claims, TEST_PRIV_KEY_ED25519).unwrap();
let encoded = encode_from_key_file(&claims, TEST_PRIV_KEY_ED25519)?;
// decode it back
let auth = JwtAuth::new(vec![DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519).unwrap()]);
let decoded = auth.decode(&encoded).unwrap();
let auth = JwtAuth::new(DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519)?);
let decoded = auth.decode(&encoded)?;
assert_eq!(decoded.claims, claims);
Ok(())
}
}

View File

@@ -1,4 +1,4 @@
use crate::auth::{AuthError, Claims, SwappableJwtAuth};
use crate::auth::{Claims, JwtAuth};
use crate::http::error::{api_error_handler, route_error_handler, ApiError};
use anyhow::Context;
use hyper::header::{HeaderName, AUTHORIZATION};
@@ -389,7 +389,7 @@ fn parse_token(header_value: &str) -> Result<&str, ApiError> {
}
pub fn auth_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
provide_auth: fn(&Request<Body>) -> Option<&SwappableJwtAuth>,
provide_auth: fn(&Request<Body>) -> Option<&JwtAuth>,
) -> Middleware<B, ApiError> {
Middleware::pre(move |req| async move {
if let Some(auth) = provide_auth(&req) {
@@ -400,11 +400,9 @@ pub fn auth_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
})?;
let token = parse_token(header_value)?;
let data = auth.decode(token).map_err(|err| {
warn!("Authentication error: {err}");
// Rely on From<AuthError> for ApiError impl
err
})?;
let data = auth
.decode(token)
.map_err(|_| ApiError::Unauthorized("malformed jwt token".to_string()))?;
req.set_context(data.claims);
}
None => {
@@ -452,11 +450,12 @@ where
pub fn check_permission_with(
req: &Request<Body>,
check_permission: impl Fn(&Claims) -> Result<(), AuthError>,
check_permission: impl Fn(&Claims) -> Result<(), anyhow::Error>,
) -> Result<(), ApiError> {
match req.context::<Claims>() {
Some(claims) => Ok(check_permission(&claims)
.map_err(|_err| ApiError::Forbidden("JWT authentication error".to_string()))?),
Some(claims) => {
Ok(check_permission(&claims).map_err(|err| ApiError::Forbidden(err.to_string()))?)
}
None => Ok(()), // claims is None because auth is disabled
}
}

View File

@@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::error::Error as StdError;
use thiserror::Error;
use tracing::{error, info, warn};
use tracing::{error, info};
#[derive(Debug, Error)]
pub enum ApiError {
@@ -118,9 +118,6 @@ pub fn api_error_handler(api_error: ApiError) -> Response<Body> {
// Print a stack trace for Internal Server errors
match api_error {
ApiError::Forbidden(_) | ApiError::Unauthorized(_) => {
warn!("Error processing HTTP request: {api_error:#}")
}
ApiError::ResourceUnavailable(_) => info!("Error processing HTTP request: {api_error:#}"),
ApiError::NotFound(_) => info!("Error processing HTTP request: {api_error:#}"),
ApiError::InternalServerError(_) => error!("Error processing HTTP request: {api_error:?}"),

View File

@@ -120,8 +120,6 @@ impl Id {
chunk[0] = HEX[((b >> 4) & 0xf) as usize];
chunk[1] = HEX[(b & 0xf) as usize];
}
// SAFETY: vec constructed out of `HEX`, it can only be ascii
unsafe { String::from_utf8_unchecked(buf) }
}
}

View File

@@ -1,6 +1,5 @@
//! `utils` is intended to be a place to put code that is shared
//! between other crates in this repository.
#![deny(clippy::undocumented_unsafe_blocks)]
pub mod backoff;

View File

@@ -1,4 +1,4 @@
use std::{io::BufWriter, str::FromStr};
use std::str::FromStr;
use anyhow::Context;
use once_cell::sync::Lazy;
@@ -66,25 +66,10 @@ pub enum TracingErrorLayerEnablement {
EnableWithRustLogFilter,
}
/// Where the logging should output to.
#[derive(Clone, Copy)]
pub enum Output {
Stdout,
Stderr,
}
/// Keep alive and drop it before the program terminates.
#[must_use]
pub struct FlushGuard {
_tracing_chrome_layer: Option<tracing_chrome::FlushGuard>,
_tracing_flame_layer: Option<tracing_flame::FlushGuard<BufWriter<std::fs::File>>>,
}
pub fn init(
log_format: LogFormat,
tracing_error_layer_enablement: TracingErrorLayerEnablement,
output: Output,
) -> anyhow::Result<FlushGuard> {
) -> anyhow::Result<()> {
// We fall back to printing all spans at info-level or above if
// the RUST_LOG environment variable is not set.
let rust_log_env_filter = || {
@@ -92,60 +77,15 @@ pub fn init(
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))
};
// WIP: lift it up as an argument
let enable_tracing_chrome = match std::env::var("NEON_PAGESERVER_ENABLE_TRACING_CHROME") {
Ok(s) if s != "0" => true,
Ok(_s) => false,
Err(std::env::VarError::NotPresent) => false,
Err(std::env::VarError::NotUnicode(_)) => {
panic!("env var NEON_PAGESERVER_ENABLE_TRACING_CHROME not unicode")
}
};
// WIP: lift it up as an argument
let enable_tracing_flame = match std::env::var("NEON_PAGESERVER_ENABLE_TRACING_FLAME") {
Ok(s) if s != "0" => true,
Ok(_s) => false,
Err(std::env::VarError::NotPresent) => false,
Err(std::env::VarError::NotUnicode(_)) => {
panic!("env var NEON_PAGESERVER_ENABLE_TRACING_FLAME not unicode")
}
};
// NB: the order of the with() calls does not matter.
// See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
use tracing_subscriber::prelude::*;
// https://users.rust-lang.org/t/how-can-i-init-tracing-registry-dynamically-with-multiple-outputs/94307/6
#[derive(Default)]
struct LayerStack {
layers:
Option<Box<dyn tracing_subscriber::Layer<tracing_subscriber::Registry> + Sync + Send>>,
}
impl LayerStack {
fn add_layer<L>(&mut self, new_layer: L)
where
L: tracing_subscriber::Layer<tracing_subscriber::Registry> + Send + Sync,
{
let new = match self.layers.take() {
Some(layers) => Some(layers.and_then(new_layer).boxed()),
None => Some(new_layer.boxed()),
};
self.layers = new;
}
}
let mut layers = LayerStack::default();
layers.add_layer({
let r = tracing_subscriber::registry();
let r = r.with({
let log_layer = tracing_subscriber::fmt::layer()
.with_target(false)
.with_ansi(false)
.with_writer(move || -> Box<dyn std::io::Write> {
match output {
Output::Stdout => Box::new(std::io::stdout()),
Output::Stderr => Box::new(std::io::stderr()),
}
});
.with_writer(std::io::stdout);
let log_layer = match log_format {
LogFormat::Json => log_layer.json().boxed(),
LogFormat::Plain => log_layer.boxed(),
@@ -153,47 +93,15 @@ pub fn init(
};
log_layer.with_filter(rust_log_env_filter())
});
layers
.add_layer(TracingEventCountLayer(&TRACING_EVENT_COUNT).with_filter(rust_log_env_filter()));
let tracing_chrome_layer_flush_guard = if enable_tracing_chrome {
let (layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
.trace_style(tracing_chrome::TraceStyle::Async)
.build();
layers.add_layer(layer.with_filter(rust_log_env_filter()));
Some(guard)
} else {
None
};
let tracing_flame_flush_guard = if enable_tracing_flame {
let (layer, guard) = tracing_flame::FlameLayer::with_file("./tracing.folded").unwrap();
let layer = layer
.with_empty_samples(false)
.with_module_path(false)
.with_file_and_line(false)
.with_threads_collapsed(true);
layers.add_layer(layer.with_filter(rust_log_env_filter()));
Some(guard)
} else {
None
};
let r = r.with(TracingEventCountLayer(&TRACING_EVENT_COUNT).with_filter(rust_log_env_filter()));
match tracing_error_layer_enablement {
TracingErrorLayerEnablement::EnableWithRustLogFilter => layers
.add_layer(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter())),
TracingErrorLayerEnablement::Disabled => (),
TracingErrorLayerEnablement::EnableWithRustLogFilter => r
.with(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter()))
.init(),
TracingErrorLayerEnablement::Disabled => r.init(),
}
let r = tracing_subscriber::registry();
r.with(layers.layers.expect("we add at least one layer"))
.init();
Ok(FlushGuard {
_tracing_chrome_layer: tracing_chrome_layer_flush_guard,
_tracing_flame_layer: tracing_flame_flush_guard,
})
Ok(())
}
/// Disable the default rust panic hook by using `set_hook`.

View File

@@ -366,47 +366,6 @@ impl MonotonicCounter<Lsn> for RecordLsn {
}
}
/// Implements [`rand::distributions::uniform::UniformSampler`] so we can sample [`Lsn`]s.
pub struct LsnSampler(<u64 as rand::distributions::uniform::SampleUniform>::Sampler);
impl rand::distributions::uniform::SampleUniform for Lsn {
type Sampler = LsnSampler;
}
impl rand::distributions::uniform::UniformSampler for LsnSampler {
type X = Lsn;
fn new<B1, B2>(low: B1, high: B2) -> Self
where
B1: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
B2: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
{
Self(
<u64 as rand::distributions::uniform::SampleUniform>::Sampler::new(
low.borrow().0,
high.borrow().0,
),
)
}
fn new_inclusive<B1, B2>(low: B1, high: B2) -> Self
where
B1: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
B2: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
{
Self(
<u64 as rand::distributions::uniform::SampleUniform>::Sampler::new_inclusive(
low.borrow().0,
high.borrow().0,
),
)
}
fn sample<R: rand::prelude::Rng + ?Sized>(&self, rng: &mut R) -> Self::X {
Lsn(self.0.sample(rng))
}
}
#[cfg(test)]
mod tests {
use crate::bin_ser::BeSer;

View File

@@ -125,9 +125,6 @@ where
// Wake everyone with an error.
let mut internal = self.internal.lock().unwrap();
// Block any future waiters from starting
internal.shutdown = true;
// This will steal the entire waiters map.
// When we drop it all waiters will be woken.
mem::take(&mut internal.waiters)

View File

@@ -1,7 +1,6 @@
/// Immediately terminate the calling process without calling
/// atexit callbacks, C runtime destructors etc. We mainly use
/// this to protect coverage data from concurrent writes.
pub fn exit_now(code: u8) -> ! {
// SAFETY: exiting is safe, the ffi is not safe
pub fn exit_now(code: u8) {
unsafe { nix::libc::_exit(code as _) };
}

View File

@@ -85,13 +85,6 @@ impl Gate {
warn_if_stuck(self.do_close(), &self.name, Duration::from_millis(1000)).await
}
/// Check if [`Self::close()`] has finished waiting for all [`Self::enter()`] users to finish. This
/// is usually analoguous for "Did shutdown finish?" for types that include a Gate, whereas checking
/// the CancellationToken on such types is analogous to "Did shutdown start?"
pub fn close_complete(&self) -> bool {
self.sem.is_closed()
}
async fn do_close(&self) {
tracing::debug!(gate = self.name, "Closing Gate...");
match self.sem.acquire_many(Self::MAX_UNITS).await {

View File

@@ -19,12 +19,13 @@ inotify.workspace = true
serde.workspace = true
serde_json.workspace = true
sysinfo.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio.workspace = true
tokio-postgres.workspace = true
tokio-stream.workspace = true
tokio-util.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
[target.'cfg(target_os = "linux")'.dependencies]
cgroups-rs = "0.3.3"

View File

@@ -1,5 +1,3 @@
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
#![cfg(target_os = "linux")]
use anyhow::Context;

View File

@@ -188,7 +188,6 @@ extern "C" fn recovery_download(
}
}
#[allow(clippy::unnecessary_cast)]
extern "C" fn wal_read(
sk: *mut Safekeeper,
buf: *mut ::std::os::raw::c_char,
@@ -422,7 +421,6 @@ impl std::fmt::Display for Level {
}
/// Take ownership of `Vec<u8>` from StringInfoData.
#[allow(clippy::unnecessary_cast)]
pub(crate) fn take_vec_u8(pg: &mut StringInfoData) -> Option<Vec<u8>> {
if pg.data.is_null() {
return None;

View File

@@ -186,7 +186,7 @@ impl Wrapper {
.unwrap()
.into_bytes_with_nul();
assert!(safekeepers_list_vec.len() == safekeepers_list_vec.capacity());
let safekeepers_list = safekeepers_list_vec.as_mut_ptr() as *mut std::ffi::c_char;
let safekeepers_list = safekeepers_list_vec.as_mut_ptr() as *mut i8;
let callback_data = Box::into_raw(Box::new(api)) as *mut ::std::os::raw::c_void;

View File

@@ -82,9 +82,6 @@ enum-map.workspace = true
enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
tokio-stream.workspace = true
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
tracing-chrome = "0.7.1"
[dev-dependencies]
criterion.workspace = true

View File

@@ -1,15 +1,13 @@
use std::path::{Path, PathBuf};
use anyhow::Result;
use camino::{Utf8Path, Utf8PathBuf};
use camino::Utf8Path;
use clap::Subcommand;
use pageserver::context::{DownloadBehavior, RequestContext};
use pageserver::task_mgr::TaskKind;
use pageserver::tenant::block_io::BlockCursor;
use pageserver::tenant::disk_btree::DiskBtreeReader;
use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
use pageserver::tenant::storage_layer::{delta_layer, image_layer};
use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer};
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
use pageserver::{page_cache, virtual_file};
use pageserver::{
@@ -22,7 +20,6 @@ use pageserver::{
};
use std::fs;
use utils::bin_ser::BeSer;
use utils::id::{TenantId, TimelineId};
use crate::layer_map_analyzer::parse_filename;
@@ -48,13 +45,6 @@ pub(crate) enum LayerCmd {
/// The id from list-layer command
id: usize,
},
RewriteSummary {
layer_file_path: Utf8PathBuf,
#[clap(long)]
new_tenant_id: Option<TenantId>,
#[clap(long)]
new_timeline_id: Option<TimelineId>,
},
}
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
@@ -110,7 +100,6 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
println!("- timeline {}", timeline.file_name().to_string_lossy());
}
}
Ok(())
}
LayerCmd::ListLayer {
path,
@@ -139,7 +128,6 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
idx += 1;
}
}
Ok(())
}
LayerCmd::DumpLayer {
path,
@@ -180,63 +168,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
idx += 1;
}
}
Ok(())
}
LayerCmd::RewriteSummary {
layer_file_path,
new_tenant_id,
new_timeline_id,
} => {
pageserver::virtual_file::init(10);
pageserver::page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
macro_rules! rewrite_closure {
($($summary_ty:tt)*) => {{
|summary| $($summary_ty)* {
tenant_id: new_tenant_id.unwrap_or(summary.tenant_id),
timeline_id: new_timeline_id.unwrap_or(summary.timeline_id),
..summary
}
}};
}
let res = ImageLayer::rewrite_summary(
layer_file_path,
rewrite_closure!(image_layer::Summary),
&ctx,
)
.await;
match res {
Ok(()) => {
println!("Successfully rewrote summary of image layer {layer_file_path}");
return Ok(());
}
Err(image_layer::RewriteSummaryError::MagicMismatch) => (), // fallthrough
Err(image_layer::RewriteSummaryError::Other(e)) => {
return Err(e);
}
}
let res = DeltaLayer::rewrite_summary(
layer_file_path,
rewrite_closure!(delta_layer::Summary),
&ctx,
)
.await;
match res {
Ok(()) => {
println!("Successfully rewrote summary of delta layer {layer_file_path}");
return Ok(());
}
Err(delta_layer::RewriteSummaryError::MagicMismatch) => (), // fallthrough
Err(delta_layer::RewriteSummaryError::Other(e)) => {
return Err(e);
}
}
anyhow::bail!("not an image or delta layer: {layer_file_path}");
}
}
Ok(())
}

View File

@@ -1,23 +0,0 @@
[package]
name = "pagebench"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow.workspace = true
clap.workspace = true
futures.workspace = true
hdrhistogram.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
rand.workspace = true
serde.workspace = true
serde_json.workspace = true
tracing.workspace = true
tokio.workspace = true
tokio-util.workspace = true
pageserver = { path = ".." }
utils = { path = "../../libs/utils/" }

View File

@@ -1,402 +0,0 @@
use anyhow::Context;
use pageserver::client::page_service::BasebackupRequest;
use utils::lsn::Lsn;
use rand::prelude::*;
use tokio::sync::Barrier;
use tokio::task::JoinSet;
use tracing::{debug, info, instrument};
use utils::id::TenantId;
use utils::logging;
use std::cell::RefCell;
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::ops::Range;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use crate::util::tenant_timeline_id::TenantTimelineId;
/// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
#[derive(clap::Parser)]
pub(crate) struct Args {
#[clap(long, default_value = "http://localhost:9898")]
mgmt_api_endpoint: String,
#[clap(long, default_value = "localhost:64000")]
page_service_host_port: String,
#[clap(long)]
pageserver_jwt: Option<String>,
#[clap(long, default_value = "1")]
num_clients: NonZeroUsize,
#[clap(long, default_value = "1.0")]
gzip_probability: f64,
#[clap(long)]
runtime: Option<humantime::Duration>,
targets: Option<Vec<TenantTimelineId>>,
}
#[derive(Debug, Default)]
struct LiveStats {
completed_requests: AtomicU64,
}
impl LiveStats {
fn inc(&self) {
self.completed_requests.fetch_add(1, Ordering::Relaxed);
}
}
#[derive(serde::Serialize)]
struct Output {
total: PerTaskOutput,
}
const LATENCY_PERCENTILES: [f64; 4] = [95.0, 99.00, 99.90, 99.99];
struct LatencyPercentiles {
latency_percentiles: [Duration; 4],
}
impl serde::Serialize for LatencyPercentiles {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeMap;
let mut ser = serializer.serialize_map(Some(LATENCY_PERCENTILES.len()))?;
for p in LATENCY_PERCENTILES {
ser.serialize_entry(
&format!("p{p}"),
&format!(
"{}",
&humantime::format_duration(self.latency_percentiles[0])
),
)?;
}
ser.end()
}
}
#[derive(serde::Serialize)]
struct PerTaskOutput {
request_count: u64,
#[serde(with = "humantime_serde")]
latency_mean: Duration,
latency_percentiles: LatencyPercentiles,
}
struct ThreadLocalStats {
latency_histo: hdrhistogram::Histogram<u64>,
}
impl ThreadLocalStats {
fn new() -> Self {
Self {
// Initialize with fixed bounds so that we panic at runtime instead of resizing the histogram,
// which would skew the benchmark results.
latency_histo: hdrhistogram::Histogram::new_with_bounds(1, 1_000_000_000, 3).unwrap(),
}
}
fn observe(&mut self, latency: Duration) -> anyhow::Result<()> {
let micros: u64 = latency
.as_micros()
.try_into()
.context("latency greater than u64")?;
self.latency_histo
.record(micros)
.context("add to histogram")?;
Ok(())
}
fn output(&self) -> PerTaskOutput {
let latency_percentiles = std::array::from_fn(|idx| {
let micros = self
.latency_histo
.value_at_percentile(LATENCY_PERCENTILES[idx]);
Duration::from_micros(micros)
});
PerTaskOutput {
request_count: self.latency_histo.len(),
latency_mean: Duration::from_micros(self.latency_histo.mean() as u64),
latency_percentiles: LatencyPercentiles {
latency_percentiles,
},
}
}
fn add(&mut self, other: &Self) {
let Self {
ref mut latency_histo,
} = self;
latency_histo.add(&other.latency_histo).unwrap();
}
}
thread_local! {
pub static STATS: RefCell<Arc<Mutex<ThreadLocalStats>>> = std::cell::RefCell::new(
Arc::new(Mutex::new(ThreadLocalStats::new()))
);
}
pub(crate) fn main(args: Args) -> anyhow::Result<()> {
let _guard = logging::init(
logging::LogFormat::Plain,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stderr,
)
.unwrap();
let thread_local_stats = Arc::new(Mutex::new(Vec::new()));
let rt = tokio::runtime::Builder::new_multi_thread()
.on_thread_start({
let thread_local_stats = Arc::clone(&thread_local_stats);
move || {
// pre-initialize the histograms
STATS.with(|stats| {
let stats: Arc<_> = Arc::clone(&*stats.borrow());
thread_local_stats.lock().unwrap().push(stats);
});
}
})
.enable_all()
.build()
.unwrap();
let main_task = rt.spawn(main_impl(args, thread_local_stats));
rt.block_on(main_task).unwrap()
}
struct Target {
timeline: TenantTimelineId,
lsn_range: Option<Range<Lsn>>,
}
async fn main_impl(
args: Args,
thread_local_stats: Arc<Mutex<Vec<Arc<Mutex<ThreadLocalStats>>>>>,
) -> anyhow::Result<()> {
let args: &'static Args = Box::leak(Box::new(args));
let mgmt_api_client = Arc::new(pageserver::client::mgmt_api::Client::new(
args.mgmt_api_endpoint.clone(),
args.pageserver_jwt.as_deref(),
));
// discover targets
let mut timelines: Vec<TenantTimelineId> = Vec::new();
if args.targets.is_some() {
timelines = args.targets.clone().unwrap();
} else {
let tenants: Vec<TenantId> = mgmt_api_client
.list_tenants()
.await?
.into_iter()
.map(|ti| ti.id)
.collect();
let mut js = JoinSet::new();
for tenant_id in tenants {
js.spawn({
let mgmt_api_client = Arc::clone(&mgmt_api_client);
async move {
(
tenant_id,
mgmt_api_client.list_timelines(tenant_id).await.unwrap(),
)
}
});
}
while let Some(res) = js.join_next().await {
let (tenant_id, tl_infos) = res.unwrap();
for tl in tl_infos {
timelines.push(TenantTimelineId {
tenant_id,
timeline_id: tl.timeline_id,
});
}
}
}
info!("timelines:\n{:?}", timelines);
let mut js = JoinSet::new();
for timeline in &timelines {
js.spawn({
let timeline = *timeline;
let info = mgmt_api_client
.timeline_info(timeline.tenant_id, timeline.timeline_id)
.await
.unwrap();
async move {
anyhow::Ok(Target {
timeline,
lsn_range: Some(info.last_record_lsn..(info.last_record_lsn + 1)),
})
}
});
}
let mut all_targets: Vec<Target> = Vec::new();
while let Some(res) = js.join_next().await {
all_targets.push(res.unwrap().unwrap());
}
let live_stats = Arc::new(LiveStats::default());
let num_client_tasks = timelines.len();
let num_live_stats_dump = 1;
let num_work_sender_tasks = 1;
let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
num_client_tasks + num_live_stats_dump + num_work_sender_tasks,
));
let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks));
tokio::spawn({
let stats = Arc::clone(&live_stats);
let start_work_barrier = Arc::clone(&start_work_barrier);
async move {
start_work_barrier.wait().await;
loop {
let start = std::time::Instant::now();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
let elapsed = start.elapsed();
info!(
"RPS: {:.0}",
completed_requests as f64 / elapsed.as_secs_f64()
);
}
}
});
let mut work_senders = HashMap::new();
let mut tasks = Vec::new();
for tl in &timelines {
let (sender, receiver) = tokio::sync::mpsc::channel(1); // TODO: not sure what the implications of this are
work_senders.insert(tl, sender);
tasks.push(tokio::spawn(client(
args,
*tl,
Arc::clone(&start_work_barrier),
receiver,
Arc::clone(&all_work_done_barrier),
Arc::clone(&live_stats),
)));
}
let work_sender = async move {
start_work_barrier.wait().await;
loop {
let (timeline, work) = {
let mut rng = rand::thread_rng();
let target = all_targets.choose(&mut rng).unwrap();
let lsn = target.lsn_range.clone().map(|r| rng.gen_range(r));
(
target.timeline,
Work {
lsn,
gzip: rng.gen_bool(args.gzip_probability),
},
)
};
let sender = work_senders.get(&timeline).unwrap();
// TODO: what if this blocks?
sender.send(work).await.ok().unwrap();
}
};
if let Some(runtime) = args.runtime {
match tokio::time::timeout(runtime.into(), work_sender).await {
Ok(()) => unreachable!("work sender never terminates"),
Err(_timeout) => {
// this implicitly drops the work_senders, making all the clients exit
}
}
} else {
work_sender.await;
unreachable!("work sender never terminates");
}
for t in tasks {
t.await.unwrap();
}
let output = Output {
total: {
let mut agg_stats = ThreadLocalStats::new();
for stats in thread_local_stats.lock().unwrap().iter() {
let stats = stats.lock().unwrap();
agg_stats.add(&*stats);
}
agg_stats.output()
},
};
let output = serde_json::to_string_pretty(&output).unwrap();
println!("{output}");
anyhow::Ok(())
}
#[derive(Copy, Clone)]
struct Work {
lsn: Option<Lsn>,
gzip: bool,
}
#[instrument(skip_all)]
async fn client(
args: &'static Args,
timeline: TenantTimelineId,
start_work_barrier: Arc<Barrier>,
mut work: tokio::sync::mpsc::Receiver<Work>,
all_work_done_barrier: Arc<Barrier>,
live_stats: Arc<LiveStats>,
) {
start_work_barrier.wait().await;
let client =
pageserver::client::page_service::Client::new(crate::util::connstring::connstring(
&args.page_service_host_port,
args.pageserver_jwt.as_deref(),
))
.await
.unwrap();
while let Some(Work { lsn, gzip }) = work.recv().await {
let start = Instant::now();
let copy_out_stream = client
.basebackup(&BasebackupRequest {
tenant_id: timeline.tenant_id,
timeline_id: timeline.timeline_id,
lsn,
gzip,
})
.await
.with_context(|| format!("start basebackup for {timeline}"))
.unwrap();
use futures::StreamExt;
let size = Arc::new(AtomicUsize::new(0));
copy_out_stream
.for_each({
|r| {
let size = Arc::clone(&size);
async move {
let size = Arc::clone(&size);
size.fetch_add(r.unwrap().len(), Ordering::Relaxed);
}
}
})
.await;
debug!("basebackup size is {} bytes", size.load(Ordering::Relaxed));
let elapsed = start.elapsed();
live_stats.inc();
STATS.with(|stats| {
stats.borrow().lock().unwrap().observe(elapsed).unwrap();
});
}
all_work_done_barrier.wait().await;
}

View File

@@ -1,404 +0,0 @@
use anyhow::Context;
use pageserver::client::page_service::RelTagBlockNo;
use pageserver::pgdatadir_mapping::{is_rel_block_key, key_to_rel_block};
use pageserver::repository;
use utils::lsn::Lsn;
use rand::prelude::*;
use tokio::sync::Barrier;
use tokio::task::JoinSet;
use tracing::{info, instrument};
use utils::id::TenantId;
use utils::logging;
use std::cell::RefCell;
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use crate::util::tenant_timeline_id::TenantTimelineId;
/// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
#[derive(clap::Parser)]
pub(crate) struct Args {
#[clap(long, default_value = "http://localhost:9898")]
mgmt_api_endpoint: String,
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
page_service_connstring: String,
#[clap(long, default_value = "1")]
num_clients: NonZeroUsize,
#[clap(long)]
runtime: Option<humantime::Duration>,
targets: Option<Vec<TenantTimelineId>>,
}
#[derive(Debug, Default)]
struct LiveStats {
completed_requests: AtomicU64,
}
impl LiveStats {
fn inc(&self) {
self.completed_requests.fetch_add(1, Ordering::Relaxed);
}
}
#[derive(serde::Serialize)]
struct Output {
total: PerTaskOutput,
}
const LATENCY_PERCENTILES: [f64; 4] = [95.0, 99.00, 99.90, 99.99];
struct LatencyPercentiles {
latency_percentiles: [Duration; 4],
}
impl serde::Serialize for LatencyPercentiles {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeMap;
let mut ser = serializer.serialize_map(Some(LATENCY_PERCENTILES.len()))?;
for p in LATENCY_PERCENTILES {
ser.serialize_entry(
&format!("p{p}"),
&format!(
"{}",
&humantime::format_duration(self.latency_percentiles[0])
),
)?;
}
ser.end()
}
}
#[derive(serde::Serialize)]
struct PerTaskOutput {
request_count: u64,
#[serde(with = "humantime_serde")]
latency_mean: Duration,
latency_percentiles: LatencyPercentiles,
}
struct ThreadLocalStats {
latency_histo: hdrhistogram::Histogram<u64>,
}
impl ThreadLocalStats {
fn new() -> Self {
Self {
// Initialize with fixed bounds so that we panic at runtime instead of resizing the histogram,
// which would skew the benchmark results.
latency_histo: hdrhistogram::Histogram::new_with_bounds(1, 1_000_000_000, 3).unwrap(),
}
}
fn observe(&mut self, latency: Duration) -> anyhow::Result<()> {
let micros: u64 = latency
.as_micros()
.try_into()
.context("latency greater than u64")?;
self.latency_histo
.record(micros)
.context("add to histogram")?;
Ok(())
}
fn output(&self) -> PerTaskOutput {
let latency_percentiles = std::array::from_fn(|idx| {
let micros = self
.latency_histo
.value_at_percentile(LATENCY_PERCENTILES[idx]);
Duration::from_micros(micros)
});
PerTaskOutput {
request_count: self.latency_histo.len(),
latency_mean: Duration::from_micros(self.latency_histo.mean() as u64),
latency_percentiles: LatencyPercentiles {
latency_percentiles,
},
}
}
fn add(&mut self, other: &Self) {
let Self {
ref mut latency_histo,
} = self;
latency_histo.add(&other.latency_histo).unwrap();
}
}
thread_local! {
pub static STATS: RefCell<Arc<Mutex<ThreadLocalStats>>> = std::cell::RefCell::new(
Arc::new(Mutex::new(ThreadLocalStats::new()))
);
}
pub(crate) fn main(args: Args) -> anyhow::Result<()> {
let _guard = logging::init(
logging::LogFormat::Plain,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stderr,
)
.unwrap();
let thread_local_stats = Arc::new(Mutex::new(Vec::new()));
let rt = tokio::runtime::Builder::new_multi_thread()
.on_thread_start({
let thread_local_stats = Arc::clone(&thread_local_stats);
move || {
// pre-initialize the histograms
STATS.with(|stats| {
let stats: Arc<_> = Arc::clone(&*stats.borrow());
thread_local_stats.lock().unwrap().push(stats);
});
}
})
.enable_all()
.build()
.unwrap();
let main_task = rt.spawn(main_impl(args, thread_local_stats));
rt.block_on(main_task).unwrap()
}
struct KeyRange {
timeline: TenantTimelineId,
timeline_lsn: Lsn,
start: i128,
end: i128,
}
impl KeyRange {
fn len(&self) -> i128 {
self.end - self.start
}
}
async fn main_impl(
args: Args,
thread_local_stats: Arc<Mutex<Vec<Arc<Mutex<ThreadLocalStats>>>>>,
) -> anyhow::Result<()> {
let args: &'static Args = Box::leak(Box::new(args));
let mgmt_api_client = Arc::new(pageserver::client::mgmt_api::Client::new(
args.mgmt_api_endpoint.clone(),
None, // TODO: support jwt in args
));
// discover targets
let mut timelines: Vec<TenantTimelineId> = Vec::new();
if args.targets.is_some() {
timelines = args.targets.clone().unwrap();
} else {
let tenants: Vec<TenantId> = mgmt_api_client
.list_tenants()
.await?
.into_iter()
.map(|ti| ti.id)
.collect();
let mut js = JoinSet::new();
for tenant_id in tenants {
js.spawn({
let mgmt_api_client = Arc::clone(&mgmt_api_client);
async move {
(
tenant_id,
mgmt_api_client.list_timelines(tenant_id).await.unwrap(),
)
}
});
}
while let Some(res) = js.join_next().await {
let (tenant_id, tl_infos) = res.unwrap();
for tl in tl_infos {
timelines.push(TenantTimelineId {
tenant_id,
timeline_id: tl.timeline_id,
});
}
}
}
info!("timelines:\n{:?}", timelines);
let mut js = JoinSet::new();
for timeline in &timelines {
js.spawn({
let mgmt_api_client = Arc::clone(&mgmt_api_client);
let timeline = *timeline;
async move {
let partitioning = mgmt_api_client
.keyspace(timeline.tenant_id, timeline.timeline_id)
.await?;
let lsn = partitioning.at_lsn;
let ranges = partitioning
.keys
.ranges
.iter()
.filter_map(|r| {
let start = r.start;
let end = r.end;
// filter out non-relblock keys
match (is_rel_block_key(start), is_rel_block_key(end)) {
(true, true) => Some(KeyRange {
timeline,
timeline_lsn: lsn,
start: start.to_i128(),
end: end.to_i128(),
}),
(true, false) | (false, true) => {
unimplemented!("split up range")
}
(false, false) => None,
}
})
.collect::<Vec<_>>();
anyhow::Ok(ranges)
}
});
}
let mut all_ranges: Vec<KeyRange> = Vec::new();
while let Some(res) = js.join_next().await {
all_ranges.extend(res.unwrap().unwrap());
}
let weights =
rand::distributions::weighted::WeightedIndex::new(all_ranges.iter().map(|v| v.len()))
.unwrap();
let live_stats = Arc::new(LiveStats::default());
let num_client_tasks = timelines.len();
let num_live_stats_dump = 1;
let num_work_sender_tasks = 1;
let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
num_client_tasks + num_live_stats_dump + num_work_sender_tasks,
));
let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks));
tokio::spawn({
let stats = Arc::clone(&live_stats);
let start_work_barrier = Arc::clone(&start_work_barrier);
async move {
start_work_barrier.wait().await;
loop {
let start = std::time::Instant::now();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
let elapsed = start.elapsed();
info!(
"RPS: {:.0}",
completed_requests as f64 / elapsed.as_secs_f64()
);
}
}
});
let mut work_senders = HashMap::new();
let mut tasks = Vec::new();
for tl in &timelines {
let (sender, receiver) = tokio::sync::mpsc::channel(10); // TODO: not sure what the implications of this are
work_senders.insert(tl, sender);
tasks.push(tokio::spawn(client(
args,
*tl,
Arc::clone(&start_work_barrier),
receiver,
Arc::clone(&all_work_done_barrier),
Arc::clone(&live_stats),
)));
}
let work_sender = async move {
start_work_barrier.wait().await;
loop {
let (range, key) = {
let mut rng = rand::thread_rng();
let r = &all_ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = repository::Key::from_i128(key);
let (rel_tag, block_no) =
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
(r, RelTagBlockNo { rel_tag, block_no })
};
let sender = work_senders.get(&range.timeline).unwrap();
// TODO: what if this blocks?
sender.send((key, range.timeline_lsn)).await.ok().unwrap();
}
};
if let Some(runtime) = args.runtime {
match tokio::time::timeout(runtime.into(), work_sender).await {
Ok(()) => unreachable!("work sender never terminates"),
Err(_timeout) => {
// this implicitly drops the work_senders, making all the clients exit
}
}
} else {
work_sender.await;
unreachable!("work sender never terminates");
}
for t in tasks {
t.await.unwrap();
}
let output = Output {
total: {
let mut agg_stats = ThreadLocalStats::new();
for stats in thread_local_stats.lock().unwrap().iter() {
let stats = stats.lock().unwrap();
agg_stats.add(&*stats);
}
agg_stats.output()
},
};
let output = serde_json::to_string_pretty(&output).unwrap();
println!("{output}");
anyhow::Ok(())
}
#[instrument(skip_all)]
async fn client(
args: &'static Args,
timeline: TenantTimelineId,
start_work_barrier: Arc<Barrier>,
mut work: tokio::sync::mpsc::Receiver<(RelTagBlockNo, Lsn)>,
all_work_done_barrier: Arc<Barrier>,
live_stats: Arc<LiveStats>,
) {
start_work_barrier.wait().await;
let client =
pageserver::client::page_service::Client::new(args.page_service_connstring.clone())
.await
.unwrap();
let mut client = client
.pagestream(timeline.tenant_id, timeline.timeline_id)
.await
.unwrap();
while let Some((key, lsn)) = work.recv().await {
let start = Instant::now();
client
.getpage(key, lsn)
.await
.with_context(|| format!("getpage for {timeline}"))
.unwrap();
let elapsed = start.elapsed();
live_stats.inc();
STATS.with(|stats| {
stats.borrow().lock().unwrap().observe(elapsed).unwrap();
});
}
all_work_done_barrier.wait().await;
}

View File

@@ -1,22 +0,0 @@
use clap::Parser;
pub(crate) mod util;
mod basebackup;
mod getpage_latest_lsn;
/// Component-level performance test for pageserver.
#[derive(clap::Parser)]
enum Args {
GetPageLatestLsn(getpage_latest_lsn::Args),
Basebackup(basebackup::Args),
}
fn main() {
let args = Args::parse();
match args {
Args::GetPageLatestLsn(args) => getpage_latest_lsn::main(args),
Args::Basebackup(args) => basebackup::main(args),
}
.unwrap()
}

View File

@@ -1,2 +0,0 @@
pub(crate) mod tenant_timeline_id;
pub(crate) mod connstring;

View File

@@ -1,8 +0,0 @@
pub(crate) fn connstring(host_port: &str, jwt: Option<&str>) -> String {
let colon_and_jwt = if let Some(jwt) = jwt {
format!(":{jwt}") // TODO: urlescape
} else {
format!("")
};
format!("postgres://postgres{colon_and_jwt}@{host_port}")
}

View File

@@ -1,36 +0,0 @@
use std::str::FromStr;
use anyhow::Context;
use utils::id::TimelineId;
use utils::id::TenantId;
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
pub(crate) struct TenantTimelineId {
pub(crate) tenant_id: TenantId,
pub(crate) timeline_id: TimelineId,
}
impl FromStr for TenantTimelineId {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let (tenant_id, timeline_id) = s
.split_once("/")
.context("tenant and timeline id must be separated by `/`")?;
let tenant_id = TenantId::from_str(&tenant_id)
.with_context(|| format!("invalid tenant id: {tenant_id:?}"))?;
let timeline_id = TimelineId::from_str(&timeline_id)
.with_context(|| format!("invalid timeline id: {timeline_id:?}"))?;
Ok(Self {
tenant_id,
timeline_id,
})
}
}
impl std::fmt::Display for TenantTimelineId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", self.tenant_id, self.timeline_id)
}
}

View File

@@ -1,21 +1,22 @@
use utils::auth::{AuthError, Claims, Scope};
use anyhow::{bail, Result};
use utils::auth::{Claims, Scope};
use utils::id::TenantId;
pub fn check_permission(claims: &Claims, tenant_id: Option<TenantId>) -> Result<(), AuthError> {
pub fn check_permission(claims: &Claims, tenant_id: Option<TenantId>) -> Result<()> {
match (&claims.scope, tenant_id) {
(Scope::Tenant, None) => Err(AuthError(
"Attempt to access management api with tenant scope. Permission denied".into(),
)),
(Scope::Tenant, None) => {
bail!("Attempt to access management api with tenant scope. Permission denied")
}
(Scope::Tenant, Some(tenant_id)) => {
if claims.tenant_id.unwrap() != tenant_id {
return Err(AuthError("Tenant id mismatch. Permission denied".into()));
bail!("Tenant id mismatch. Permission denied")
}
Ok(())
}
(Scope::PageServerApi, None) => Ok(()), // access to management api for PageServerApi scope
(Scope::PageServerApi, Some(_)) => Ok(()), // access to tenant api using PageServerApi scope
(Scope::SafekeeperData, _) => Err(AuthError(
"SafekeeperData scope makes no sense for Pageserver".into(),
)),
(Scope::SafekeeperData, _) => {
bail!("SafekeeperData scope makes no sense for Pageserver")
}
}
}

View File

@@ -166,111 +166,71 @@ where
}
}
debug!("Gather non-relational files from object storage pages");
// Gather non-relational files from object storage pages.
for kind in [
SlruKind::Clog,
SlruKind::MultiXactOffsets,
SlruKind::MultiXactMembers,
] {
async {
debug!("list slru segments");
for segno in self
.timeline
.list_slru_segments(kind, self.lsn, self.ctx)
.await?
{
async {
debug!("add slru segment");
self.add_slru_segment(kind, segno).await?;
anyhow::Ok(())
}
.instrument(debug_span!("slru segment", ?segno))
.await?;
}
anyhow::Ok(())
for segno in self
.timeline
.list_slru_segments(kind, self.lsn, self.ctx)
.await?
{
self.add_slru_segment(kind, segno).await?;
}
.instrument(debug_span!("non-rel file", ?kind))
.await?;
}
let mut min_restart_lsn: Lsn = Lsn::MAX;
debug!("Create tablespace directories");
// Create tablespace directories
for ((spcnode, dbnode), has_relmap_file) in
self.timeline.list_dbdirs(self.lsn, self.ctx).await?
{
async {
debug!("iter");
self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;
self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;
// If full backup is requested, include all relation files.
// Otherwise only include init forks of unlogged relations.
debug!("list rels");
let rels = self
.timeline
.list_rels(spcnode, dbnode, self.lsn, self.ctx)
.await?;
for &rel in rels.iter() {
async {
debug!("iter");
// Send init fork as main fork to provide well formed empty
// contents of UNLOGGED relations. Postgres copies it in
// `reinit.c` during recovery.
if rel.forknum == INIT_FORKNUM {
// I doubt we need _init fork itself, but having it at least
// serves as a marker relation is unlogged.
self.add_rel(rel, rel).await?;
self.add_rel(rel, rel.with_forknum(MAIN_FORKNUM)).await?;
return Ok(());
}
if self.full_backup {
if rel.forknum == MAIN_FORKNUM
&& rels.contains(&rel.with_forknum(INIT_FORKNUM))
{
// skip this, will include it when we reach the init fork
return Ok(());
}
self.add_rel(rel, rel).await?;
}
anyhow::Ok(())
}
.instrument(debug_span!("process rel", ?rel))
.await?;
// If full backup is requested, include all relation files.
// Otherwise only include init forks of unlogged relations.
let rels = self
.timeline
.list_rels(spcnode, dbnode, self.lsn, self.ctx)
.await?;
for &rel in rels.iter() {
// Send init fork as main fork to provide well formed empty
// contents of UNLOGGED relations. Postgres copies it in
// `reinit.c` during recovery.
if rel.forknum == INIT_FORKNUM {
// I doubt we need _init fork itself, but having it at least
// serves as a marker relation is unlogged.
self.add_rel(rel, rel).await?;
self.add_rel(rel, rel.with_forknum(MAIN_FORKNUM)).await?;
continue;
}
debug!("list aux files");
for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? {
async {
debug!("iter");
if path.starts_with("pg_replslot") {
let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
let restart_lsn = Lsn(u64::from_le_bytes(
content[offs..offs + 8].try_into().unwrap(),
));
info!("Replication slot {} restart LSN={}", path, restart_lsn);
min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn);
}
let header = new_tar_header(&path, content.len() as u64)?;
self.ar
.append(&header, &*content)
.await
.context("could not add aux file to basebackup tarball")?;
anyhow::Ok(())
if self.full_backup {
if rel.forknum == MAIN_FORKNUM && rels.contains(&rel.with_forknum(INIT_FORKNUM))
{
// skip this, will include it when we reach the init fork
continue;
}
.instrument(debug_span!("process aux file", ?path))
.await?;
self.add_rel(rel, rel).await?;
}
debug!("done");
anyhow::Ok(())
}
.instrument(debug_span!(
"process tablespace directory",
?spcnode,
?dbnode
))
.await?;
for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? {
if path.starts_with("pg_replslot") {
let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
let restart_lsn = Lsn(u64::from_le_bytes(
content[offs..offs + 8].try_into().unwrap(),
));
info!("Replication slot {} restart LSN={}", path, restart_lsn);
min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn);
}
let header = new_tar_header(&path, content.len() as u64)?;
self.ar
.append(&header, &*content)
.await
.context("could not add aux file to basebackup tarball")?;
}
}
if min_restart_lsn != Lsn::MAX {
info!(
@@ -284,25 +244,19 @@ where
.await
.context("could not add restart.lsn file to basebackup tarball")?;
}
debug!("list twophase files");
for xid in self
.timeline
.list_twophase_files(self.lsn, self.ctx)
.await?
{
async {
self.add_twophase_file(xid).await?;
anyhow::Ok(())
}
.instrument(debug_span!("process twophase file", ?xid))
.await?;
self.add_twophase_file(xid).await?;
}
fail_point!("basebackup-before-control-file", |_| {
bail!("failpoint basebackup-before-control-file")
});
debug!("Generate pg_control and bootstrap WAL segment.");
// Generate pg_control and bootstrap WAL segment.
self.add_pgcontrol_file().await?;
self.ar.finish().await?;
debug!("all tarred up!");

View File

@@ -34,11 +34,8 @@ use postgres_backend::AuthType;
use utils::logging::TracingErrorLayerEnablement;
use utils::signals::ShutdownSignals;
use utils::{
auth::{JwtAuth, SwappableJwtAuth},
logging, project_build_tag, project_git_version,
sentry_init::init_sentry,
signals::Signal,
tcp_listener,
auth::JwtAuth, logging, project_build_tag, project_git_version, sentry_init::init_sentry,
signals::Signal, tcp_listener,
};
project_git_version!(GIT_VERSION);
@@ -103,11 +100,7 @@ fn main() -> anyhow::Result<()> {
} else {
TracingErrorLayerEnablement::Disabled
};
let _guard = logging::init(
conf.log_format,
tracing_error_layer_enablement,
logging::Output::Stdout,
)?;
logging::init(conf.log_format, tracing_error_layer_enablement)?;
// mind the order required here: 1. logging, 2. panic_hook, 3. sentry.
// disarming this hook on pageserver, because we never tear down tracing.
@@ -328,12 +321,13 @@ fn start_pageserver(
let http_auth;
let pg_auth;
if conf.http_auth_type == AuthType::NeonJWT || conf.pg_auth_type == AuthType::NeonJWT {
// unwrap is ok because check is performed when creating config, so path is set and exists
// unwrap is ok because check is performed when creating config, so path is set and file exists
let key_path = conf.auth_validation_public_key_path.as_ref().unwrap();
info!("Loading public key(s) for verifying JWT tokens from {key_path:?}");
let jwt_auth = JwtAuth::from_key_path(key_path)?;
let auth: Arc<SwappableJwtAuth> = Arc::new(SwappableJwtAuth::new(jwt_auth));
info!(
"Loading public key for verifying JWT tokens from {:#?}",
key_path
);
let auth: Arc<JwtAuth> = Arc::new(JwtAuth::from_key_path(key_path)?);
http_auth = match &conf.http_auth_type {
AuthType::Trust => None,
@@ -416,7 +410,7 @@ fn start_pageserver(
// Scan the local 'tenants/' directory and start loading the tenants
let deletion_queue_client = deletion_queue.new_client();
let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
TenantSharedResources {
broker_client: broker_client.clone(),
@@ -426,7 +420,6 @@ fn start_pageserver(
order,
shutdown_pageserver.clone(),
))?;
let tenant_manager = Arc::new(tenant_manager);
BACKGROUND_RUNTIME.spawn({
let init_done_rx = init_done_rx;
@@ -555,7 +548,6 @@ fn start_pageserver(
let router_state = Arc::new(
http::routes::State::new(
conf,
tenant_manager,
http_auth.clone(),
remote_storage.clone(),
broker_client.clone(),

View File

@@ -1,2 +0,0 @@
pub mod mgmt_api;
pub mod page_service;

View File

@@ -1,89 +0,0 @@
use anyhow::Context;
use hyper::{client::HttpConnector, Uri};
use utils::id::{TenantId, TimelineId};
pub struct Client {
mgmt_api_endpoint: String,
authorization_header: Option<String>,
client: hyper::Client<HttpConnector, hyper::Body>,
}
impl Client {
pub fn new(mgmt_api_endpoint: String, jwt: Option<&str>) -> Self {
Self {
mgmt_api_endpoint,
authorization_header: jwt.map(|jwt| format!("Bearer {jwt}")),
client: hyper::client::Client::new(),
}
}
pub async fn list_tenants(&self) -> anyhow::Result<Vec<pageserver_api::models::TenantInfo>> {
let uri = Uri::try_from(format!("{}/v1/tenant", self.mgmt_api_endpoint))?;
let resp = self.get(uri).await?;
if !resp.status().is_success() {
anyhow::bail!("status error");
}
let body = hyper::body::to_bytes(resp).await?;
Ok(serde_json::from_slice(&body)?)
}
pub async fn list_timelines(
&self,
tenant_id: TenantId,
) -> anyhow::Result<Vec<pageserver_api::models::TimelineInfo>> {
let uri = Uri::try_from(format!(
"{}/v1/tenant/{tenant_id}/timeline",
self.mgmt_api_endpoint
))?;
let resp = self.get(uri).await?;
if !resp.status().is_success() {
anyhow::bail!("status error");
}
let body = hyper::body::to_bytes(resp).await?;
Ok(serde_json::from_slice(&body)?)
}
pub async fn timeline_info(
&self, tenant_id: TenantId, timeline_id: TimelineId,
) -> anyhow::Result<pageserver_api::models::TimelineInfo> {
let uri = Uri::try_from(format!(
"{}/v1/tenant/{tenant_id}/timeline/{timeline_id}",
self.mgmt_api_endpoint
))?;
let resp = self.get(uri).await?;
if !resp.status().is_success() {
anyhow::bail!("status error");
}
let body = hyper::body::to_bytes(resp).await?;
Ok(serde_json::from_slice(&body)?)
}
pub async fn keyspace(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> anyhow::Result<crate::http::models::partitioning::Partitioning> {
let uri = Uri::try_from(format!(
"{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/keyspace?check_serialization_roundtrip=true",
self.mgmt_api_endpoint
))?;
let resp = self.get(uri).await?;
if !resp.status().is_success() {
anyhow::bail!("status error");
}
let body = hyper::body::to_bytes(resp).await?;
Ok(serde_json::from_slice(&body).context("deserialize")?)
}
async fn get(&self, uri: Uri) -> hyper::Result<hyper::Response<hyper::Body>> {
let req = hyper::Request::builder().uri(uri).method("GET");
let req = if let Some(value) = &self.authorization_header {
req.header("Authorization", value)
} else {
req
};
let req = req.body(hyper::Body::default());
self.client.request(req.unwrap()).await
}
}

View File

@@ -1,145 +0,0 @@
use std::pin::Pin;
use futures::SinkExt;
use pageserver_api::{
models::{
PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
PagestreamGetPageResponse,
},
reltag::RelTag,
};
use tokio::task::JoinHandle;
use tokio_postgres::CopyOutStream;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
pub struct Client {
client: tokio_postgres::Client,
cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
conn_task: JoinHandle<()>,
}
pub struct BasebackupRequest {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub lsn: Option<Lsn>,
pub gzip: bool,
}
impl Client {
pub async fn new(connstring: String) -> anyhow::Result<Self> {
let (client, connection) = tokio_postgres::connect(&connstring, postgres::NoTls).await?;
let conn_task_cancel = CancellationToken::new();
let conn_task = tokio::spawn({
let conn_task_cancel = conn_task_cancel.clone();
async move {
tokio::select! {
_ = conn_task_cancel.cancelled() => { }
res = connection => {
res.unwrap();
}
}
}
});
Ok(Self {
cancel_on_client_drop: Some(conn_task_cancel.drop_guard()),
conn_task,
client,
})
}
pub async fn pagestream(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> anyhow::Result<PagestreamClient> {
let copy_both: tokio_postgres::CopyBothDuplex<bytes::Bytes> = self
.client
.copy_both_simple(&format!("pagestream {tenant_id} {timeline_id}"))
.await?;
let Client {
cancel_on_client_drop,
conn_task,
client: _,
} = self;
Ok(PagestreamClient {
copy_both: Box::pin(copy_both),
conn_task,
cancel_on_client_drop,
})
}
pub async fn basebackup(&self, req: &BasebackupRequest) -> anyhow::Result<CopyOutStream> {
let BasebackupRequest {
tenant_id,
timeline_id,
lsn,
gzip,
} = req;
let mut args = Vec::with_capacity(5);
args.push("basebackup".to_string());
args.push(format!("{tenant_id}"));
args.push(format!("{timeline_id}"));
if let Some(lsn) = lsn {
args.push(format!("{lsn}"));
}
if *gzip {
args.push(format!("--gzip"))
}
Ok(self.client.copy_out(&args.join(" ")).await?)
}
}
/// Create using [`Client::pagestream`].
pub struct PagestreamClient {
copy_both: Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>,
cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
conn_task: JoinHandle<()>,
}
pub struct RelTagBlockNo {
pub rel_tag: RelTag,
pub block_no: u32,
}
impl PagestreamClient {
pub async fn shutdown(mut self) {
let _ = self.cancel_on_client_drop.take();
self.conn_task.await.unwrap();
}
pub async fn getpage(
&mut self,
key: RelTagBlockNo,
lsn: Lsn,
) -> anyhow::Result<PagestreamGetPageResponse> {
let req = PagestreamGetPageRequest {
latest: false,
rel: key.rel_tag,
blkno: key.block_no,
lsn,
};
let req = PagestreamFeMessage::GetPage(req);
let req: bytes::Bytes = req.serialize();
// let mut req = tokio_util::io::ReaderStream::new(&req);
let mut req = tokio_stream::once(Ok(req));
self.copy_both.send_all(&mut req).await?;
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
let next = next.unwrap().unwrap();
match PagestreamBeMessage::deserialize(next)? {
PagestreamBeMessage::Exists(_) => todo!(),
PagestreamBeMessage::Nblocks(_) => todo!(),
PagestreamBeMessage::GetPage(p) => Ok(p),
PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
PagestreamBeMessage::DbSize(_) => todo!(),
}
}
}

View File

@@ -161,7 +161,7 @@ pub struct PageServerConf {
pub http_auth_type: AuthType,
/// authentication method for libpq connections from compute
pub pg_auth_type: AuthType,
/// Path to a file or directory containing public key(s) for verifying JWT tokens.
/// Path to a file containing public key for verifying JWT tokens.
/// Used for both mgmt and compute auth, if enabled.
pub auth_validation_public_key_path: Option<Utf8PathBuf>,
@@ -1314,6 +1314,12 @@ broker_endpoint = '{broker_endpoint}'
assert_eq!(
parsed_remote_storage_config,
RemoteStorageConfig {
max_concurrent_syncs: NonZeroUsize::new(
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS
)
.unwrap(),
max_sync_errors: NonZeroU32::new(remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS)
.unwrap(),
storage: RemoteStorageKind::LocalFs(local_storage_path.clone()),
},
"Remote storage config should correctly parse the local FS config and fill other storage defaults"
@@ -1374,6 +1380,8 @@ broker_endpoint = '{broker_endpoint}'
assert_eq!(
parsed_remote_storage_config,
RemoteStorageConfig {
max_concurrent_syncs,
max_sync_errors,
storage: RemoteStorageKind::AwsS3(S3Config {
bucket_name: bucket_name.clone(),
bucket_region: bucket_region.clone(),

View File

@@ -261,7 +261,7 @@ async fn calculate_synthetic_size_worker(
}
};
for (tenant_id, tenant_state, _gen) in tenants {
for (tenant_id, tenant_state) in tenants {
if tenant_state != TenantState::Active {
continue;
}

View File

@@ -197,7 +197,7 @@ pub(super) async fn collect_all_metrics(
}
};
let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
let tenants = futures::stream::iter(tenants).filter_map(|(id, state)| async move {
if state != TenantState::Active {
None
} else {

View File

@@ -345,7 +345,7 @@ impl DeletionList {
result.extend(
timeline_layers
.into_iter()
.map(|l| timeline_remote_path.join(Utf8PathBuf::from(l))),
.map(|l| timeline_remote_path.join(&Utf8PathBuf::from(l))),
);
}
}
@@ -893,6 +893,14 @@ mod test {
std::fs::create_dir_all(remote_fs_dir)?;
let remote_fs_dir = harness.conf.workdir.join("remote_fs").canonicalize_utf8()?;
let storage_config = RemoteStorageConfig {
max_concurrent_syncs: std::num::NonZeroUsize::new(
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
)
.unwrap(),
max_sync_errors: std::num::NonZeroU32::new(
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS,
)
.unwrap(),
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
};
let storage = GenericRemoteStorage::from_config(&storage_config).unwrap();

View File

@@ -55,24 +55,21 @@ impl Deleter {
/// Wrap the remote `delete_objects` with a failpoint
async fn remote_delete(&self) -> Result<(), anyhow::Error> {
fail::fail_point!("deletion-queue-before-execute", |_| {
info!("Skipping execution, failpoint set");
metrics::DELETION_QUEUE
.remote_errors
.with_label_values(&["failpoint"])
.inc();
Err(anyhow::anyhow!("failpoint hit"))
});
// A backoff::retry is used here for two reasons:
// - To provide a backoff rather than busy-polling the API on errors
// - To absorb transient 429/503 conditions without hitting our error
// logging path for issues deleting objects.
backoff::retry(
|| async {
fail::fail_point!("deletion-queue-before-execute", |_| {
info!("Skipping execution, failpoint set");
metrics::DELETION_QUEUE
.remote_errors
.with_label_values(&["failpoint"])
.inc();
Err(anyhow::anyhow!("failpoint: deletion-queue-before-execute"))
});
self.remote_storage.delete_objects(&self.accumulator).await
},
|| async { self.remote_storage.delete_objects(&self.accumulator).await },
|_| false,
3,
10,

View File

@@ -541,7 +541,7 @@ async fn collect_eviction_candidates(
let mut candidates = Vec::new();
for (tenant_id, _state, _gen) in &tenants {
for (tenant_id, _state) in &tenants {
if cancel.is_cancelled() {
return Ok(EvictionCandidates::Cancelled);
}

View File

@@ -1,4 +1,4 @@
pub mod routes;
pub use routes::make_router;
pub mod models;
pub use pageserver_api::models;

View File

@@ -1,3 +0,0 @@
//! If possible, use `::pageserver_api::models` instead.
pub mod partitioning;

View File

@@ -1,112 +0,0 @@
use utils::lsn::Lsn;
#[derive(Debug, PartialEq, Eq)]
pub struct Partitioning {
pub keys: crate::keyspace::KeySpace,
pub at_lsn: Lsn,
}
impl serde::Serialize for Partitioning {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
pub struct KeySpace<'a>(&'a crate::keyspace::KeySpace);
impl<'a> serde::Serialize for KeySpace<'a> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeSeq;
let mut seq = serializer.serialize_seq(Some(self.0.ranges.len()))?;
for kr in &self.0.ranges {
seq.serialize_element(&KeyRange(kr))?;
}
seq.end()
}
}
use serde::ser::SerializeMap;
let mut map = serializer.serialize_map(Some(2))?;
map.serialize_key("keys")?;
map.serialize_value(&KeySpace(&self.keys))?;
map.serialize_key("at_lsn")?;
map.serialize_value(&WithDisplay(&self.at_lsn))?;
map.end()
}
}
pub struct WithDisplay<'a, T>(&'a T);
impl<'a, T: std::fmt::Display> serde::Serialize for WithDisplay<'a, T> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.collect_str(&self.0)
}
}
pub struct KeyRange<'a>(&'a std::ops::Range<crate::repository::Key>);
impl<'a> serde::Serialize for KeyRange<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeTuple;
let mut t = serializer.serialize_tuple(2)?;
t.serialize_element(&WithDisplay(&self.0.start))?;
t.serialize_element(&WithDisplay(&self.0.end))?;
t.end()
}
}
impl<'a> serde::Deserialize<'a> for Partitioning {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'a>,
{
pub struct KeySpace(crate::keyspace::KeySpace);
impl<'de> serde::Deserialize<'de> for KeySpace {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
#[serde_with::serde_as]
#[derive(serde::Deserialize)]
#[serde(transparent)]
struct Key(#[serde_as(as = "serde_with::DisplayFromStr")] crate::repository::Key);
#[serde_with::serde_as]
#[derive(serde::Deserialize)]
struct Range(Key, Key);
let ranges: Vec<Range> = serde::Deserialize::deserialize(deserializer)?;
Ok(Self(crate::keyspace::KeySpace {
ranges: ranges
.into_iter()
.map(|Range(start, end)| (start.0..end.0))
.collect(),
}))
}
}
#[serde_with::serde_as]
#[derive(serde::Deserialize)]
struct De {
keys: KeySpace,
#[serde_as(as = "serde_with::DisplayFromStr")]
at_lsn: Lsn,
}
let de: De = serde::Deserialize::deserialize(deserializer)?;
Ok(Self {
at_lsn: de.at_lsn,
keys: de.keys.0,
})
}
}

View File

@@ -52,31 +52,6 @@ paths:
schema:
type: object
/v1/reload_auth_validation_keys:
post:
description: Reloads the JWT public keys from their pre-configured location on disk.
responses:
"200":
description: The reload completed successfully.
"401":
description: Unauthorized Error
content:
application/json:
schema:
$ref: "#/components/schemas/UnauthorizedError"
"403":
description: Forbidden Error
content:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"500":
description: Generic operation error (also hits if no keys were found)
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}:
parameters:
- name: tenant_id
@@ -352,8 +327,7 @@ paths:
in: query
required: true
schema:
type: string
format: hex
type: integer
description: A LSN to get the timestamp
responses:
"200":

View File

@@ -16,16 +16,18 @@ use pageserver_api::models::{
DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest,
TenantLoadRequest, TenantLocationConfigRequest,
};
use pageserver_api::shard::TenantShardId;
use remote_storage::GenericRemoteStorage;
use tenant_size_model::{SizeResult, StorageModel};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::auth::JwtAuth;
use utils::http::endpoint::request_span;
use utils::http::json::json_request_or_empty_body;
use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
use super::models::{
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
TimelineCreateRequest, TimelineGcRequest, TimelineInfo,
};
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::DeletionQueueClient;
use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
@@ -33,8 +35,8 @@ use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
use crate::tenant::config::{LocationConf, TenantConfOpt};
use crate::tenant::mgr::{
GetTenantError, SetNewTenantConfigError, TenantManager, TenantMapError, TenantMapInsertError,
TenantSlotError, TenantSlotUpsertError, TenantStateError,
GetTenantError, SetNewTenantConfigError, TenantMapError, TenantMapInsertError, TenantSlotError,
TenantSlotUpsertError, TenantStateError,
};
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::LayerAccessStatsReset;
@@ -42,12 +44,8 @@ use crate::tenant::timeline::Timeline;
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, TenantSharedResources};
use crate::{config::PageServerConf, tenant::mgr};
use crate::{disk_usage_eviction_task, tenant};
use pageserver_api::models::{
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
TimelineCreateRequest, TimelineGcRequest, TimelineInfo,
};
use utils::{
auth::SwappableJwtAuth,
auth::JwtAuth,
generation::Generation,
http::{
endpoint::{self, attach_openapi_ui, auth_middleware, check_permission_with},
@@ -61,12 +59,11 @@ use utils::{
};
// Imports only used for testing APIs
use pageserver_api::models::ConfigureFailpointsRequest;
use super::models::ConfigureFailpointsRequest;
pub struct State {
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
auth: Option<Arc<JwtAuth>>,
allowlist_routes: Vec<Uri>,
remote_storage: Option<GenericRemoteStorage>,
broker_client: storage_broker::BrokerClientChannel,
@@ -77,8 +74,7 @@ pub struct State {
impl State {
pub fn new(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
auth: Option<Arc<JwtAuth>>,
remote_storage: Option<GenericRemoteStorage>,
broker_client: storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
@@ -90,7 +86,6 @@ impl State {
.collect::<Vec<_>>();
Ok(Self {
conf,
tenant_manager,
auth,
allowlist_routes,
remote_storage,
@@ -304,7 +299,11 @@ async fn build_timeline_info(
// we're executing this function, we will outlive the timeline on-disk state.
info.current_logical_size_non_incremental = Some(
timeline
.get_current_logical_size_non_incremental(info.last_record_lsn, ctx)
.get_current_logical_size_non_incremental(
info.last_record_lsn,
CancellationToken::new(),
ctx,
)
.await?,
);
}
@@ -390,39 +389,13 @@ async fn status_handler(
json_response(StatusCode::OK, StatusResponse { id: config.id })
}
async fn reload_auth_validation_keys_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let config = get_config(&request);
let state = get_state(&request);
let Some(shared_auth) = &state.auth else {
return json_response(StatusCode::BAD_REQUEST, ());
};
// unwrap is ok because check is performed when creating config, so path is set and exists
let key_path = config.auth_validation_public_key_path.as_ref().unwrap();
info!("Reloading public key(s) for verifying JWT tokens from {key_path:?}");
match JwtAuth::from_key_path(key_path) {
Ok(new_auth) => {
shared_auth.swap(new_auth);
json_response(StatusCode::OK, ())
}
Err(e) => {
warn!("Error reloading public keys from {key_path:?}: {e:}");
json_response(StatusCode::INTERNAL_SERVER_ERROR, ())
}
}
}
async fn timeline_create_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let request_data: TimelineCreateRequest = json_request(&mut request).await?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
check_permission(&request, Some(tenant_id))?;
let new_timeline_id = request_data.new_timeline_id;
@@ -431,7 +404,7 @@ async fn timeline_create_handler(
let state = get_state(&request);
async {
let tenant = state.tenant_manager.get_attached_tenant_shard(tenant_shard_id, true)?;
let tenant = mgr::get_tenant(tenant_id, true)?;
match tenant.create_timeline(
new_timeline_id,
request_data.ancestor_timeline_id.map(TimelineId::from),
@@ -465,10 +438,7 @@ async fn timeline_create_handler(
Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)),
}
}
.instrument(info_span!("timeline_create",
tenant_id = %tenant_shard_id.tenant_id,
shard = %tenant_shard_id.shard_slug(),
timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
.instrument(info_span!("timeline_create", %tenant_id, timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
.await
}
@@ -664,15 +634,14 @@ async fn timeline_delete_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
check_permission(&request, Some(tenant_id))?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let state = get_state(&request);
state.tenant_manager.delete_timeline(tenant_shard_id, timeline_id, &ctx)
.instrument(info_span!("timeline_delete", tenant_id=%tenant_shard_id.tenant_id, shard=%tenant_shard_id.shard_slug(), %timeline_id))
mgr::delete_timeline(tenant_id, timeline_id, &ctx)
.instrument(info_span!("timeline_delete", %tenant_id, %timeline_id))
.await?;
json_response(StatusCode::ACCEPTED, ())
@@ -686,14 +655,11 @@ async fn tenant_detach_handler(
check_permission(&request, Some(tenant_id))?;
let detach_ignored: Option<bool> = parse_query_param(&request, "detach_ignored")?;
// This is a legacy API (`/location_conf` is the replacement). It only supports unsharded tenants
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let state = get_state(&request);
let conf = state.conf;
mgr::detach_tenant(
conf,
tenant_shard_id,
tenant_id,
detach_ignored.unwrap_or(false),
&state.deletion_queue_client,
)
@@ -764,12 +730,11 @@ async fn tenant_list_handler(
ApiError::ResourceUnavailable("Tenant map is initializing or shutting down".into())
})?
.iter()
.map(|(id, state, gen)| TenantInfo {
.map(|(id, state)| TenantInfo {
id: *id,
state: state.clone(),
current_physical_size: None,
attachment_status: state.attachment_status(),
generation: (*gen).into(),
})
.collect::<Vec<TenantInfo>>();
@@ -798,7 +763,6 @@ async fn tenant_status(
state: state.clone(),
current_physical_size: Some(current_physical_size),
attachment_status: state.attachment_status(),
generation: tenant.generation().into(),
})
}
.instrument(info_span!("tenant_status_handler", %tenant_id))
@@ -812,16 +776,13 @@ async fn tenant_delete_handler(
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
// TODO openapi spec
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let state = get_state(&request);
mgr::delete_tenant(state.conf, state.remote_storage.clone(), tenant_shard_id)
.instrument(info_span!("tenant_delete_handler",
tenant_id = %tenant_shard_id.tenant_id,
shard = tenant_shard_id.shard_slug()
))
mgr::delete_tenant(state.conf, state.remote_storage.clone(), tenant_id)
.instrument(info_span!("tenant_delete_handler", %tenant_id))
.await?;
json_response(StatusCode::ACCEPTED, ())
@@ -1151,10 +1112,9 @@ async fn put_tenant_location_config_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let request_data: TenantLocationConfigRequest = json_request(&mut request).await?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let tenant_id = request_data.tenant_id;
check_permission(&request, Some(tenant_id))?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let state = get_state(&request);
@@ -1163,13 +1123,9 @@ async fn put_tenant_location_config_handler(
// The `Detached` state is special, it doesn't upsert a tenant, it removes
// its local disk content and drops it from memory.
if let LocationConfigMode::Detached = request_data.config.mode {
if let Err(e) =
mgr::detach_tenant(conf, tenant_shard_id, true, &state.deletion_queue_client)
.instrument(info_span!("tenant_detach",
tenant_id = %tenant_shard_id.tenant_id,
shard = tenant_shard_id.shard_slug()
))
.await
if let Err(e) = mgr::detach_tenant(conf, tenant_id, true, &state.deletion_queue_client)
.instrument(info_span!("tenant_detach", %tenant_id))
.await
{
match e {
TenantStateError::SlotError(TenantSlotError::NotFound(_)) => {
@@ -1184,14 +1140,20 @@ async fn put_tenant_location_config_handler(
let location_conf =
LocationConf::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
state
.tenant_manager
.upsert_location(tenant_shard_id, location_conf, &ctx)
.await
// TODO: badrequest assumes the caller was asking for something unreasonable, but in
// principle we might have hit something like concurrent API calls to the same tenant,
// which is not a 400 but a 409.
.map_err(ApiError::BadRequest)?;
mgr::upsert_location(
state.conf,
tenant_id,
location_conf,
state.broker_client.clone(),
state.remote_storage.clone(),
state.deletion_queue_client.clone(),
&ctx,
)
.await
// TODO: badrequest assumes the caller was asking for something unreasonable, but in
// principle we might have hit something like concurrent API calls to the same tenant,
// which is not a 400 but a 409.
.map_err(ApiError::BadRequest)?;
json_response(StatusCode::OK, ())
}
@@ -1424,10 +1386,70 @@ async fn timeline_collect_keyspace(
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
let at_lsn: Option<Lsn> = parse_query_param(&request, "at_lsn")?;
struct Partitioning {
keys: crate::keyspace::KeySpace,
let check_serialization_roundtrip: bool =
parse_query_param(&request, "check_serialization_roundtrip")?.unwrap_or(false);
at_lsn: Lsn,
}
impl serde::Serialize for Partitioning {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeMap;
let mut map = serializer.serialize_map(Some(2))?;
map.serialize_key("keys")?;
map.serialize_value(&KeySpace(&self.keys))?;
map.serialize_key("at_lsn")?;
map.serialize_value(&WithDisplay(&self.at_lsn))?;
map.end()
}
}
struct WithDisplay<'a, T>(&'a T);
impl<'a, T: std::fmt::Display> serde::Serialize for WithDisplay<'a, T> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.collect_str(&self.0)
}
}
struct KeySpace<'a>(&'a crate::keyspace::KeySpace);
impl<'a> serde::Serialize for KeySpace<'a> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeSeq;
let mut seq = serializer.serialize_seq(Some(self.0.ranges.len()))?;
for kr in &self.0.ranges {
seq.serialize_element(&KeyRange(kr))?;
}
seq.end()
}
}
struct KeyRange<'a>(&'a std::ops::Range<crate::repository::Key>);
impl<'a> serde::Serialize for KeyRange<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeTuple;
let mut t = serializer.serialize_tuple(2)?;
t.serialize_element(&WithDisplay(&self.0.start))?;
t.serialize_element(&WithDisplay(&self.0.end))?;
t.end()
}
}
let at_lsn: Option<Lsn> = parse_query_param(&request, "at_lsn")?;
async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
@@ -1436,22 +1458,9 @@ async fn timeline_collect_keyspace(
let keys = timeline
.collect_keyspace(at_lsn, &ctx)
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
let res = crate::http::models::partitioning::Partitioning { keys, at_lsn };
if check_serialization_roundtrip {
(|| {
let ser = serde_json::ser::to_vec(&res).context("serialize")?;
let de: crate::http::models::partitioning::Partitioning =
serde_json::from_slice(&ser).context("deserialize")?;
anyhow::ensure!(de == res, "not equal");
info!("passed serialization rountrip check");
Ok(())
})()
.context("serialization rountrip")
.map_err(ApiError::InternalServerError)?;
}
json_response(StatusCode::OK, res)
json_response(StatusCode::OK, Partitioning { keys, at_lsn })
}
.instrument(info_span!("timeline_collect_keyspace", %tenant_id, %timeline_id))
.await
@@ -1638,8 +1647,6 @@ where
);
match handle.await {
// TODO: never actually return Err from here, always Ok(...) so that we can log
// spanned errors. Call api_error_handler instead and return appropriate Body.
Ok(result) => result,
Err(e) => {
// The handler task panicked. We have a global panic handler that logs the
@@ -1688,7 +1695,7 @@ where
pub fn make_router(
state: Arc<State>,
launch_ts: &'static LaunchTimestamp,
auth: Option<Arc<SwappableJwtAuth>>,
auth: Option<Arc<JwtAuth>>,
) -> anyhow::Result<RouterBuilder<hyper::Body, ApiError>> {
let spec = include_bytes!("openapi_spec.yml");
let mut router = attach_openapi_ui(endpoint::make_router(), spec, "/swagger.yml", "/v1/doc");
@@ -1717,13 +1724,10 @@ pub fn make_router(
.put("/v1/failpoints", |r| {
testing_api_handler("manage failpoints", r, failpoints_handler)
})
.post("/v1/reload_auth_validation_keys", |r| {
api_handler(r, reload_auth_validation_keys_handler)
})
.get("/v1/tenant", |r| api_handler(r, tenant_list_handler))
.post("/v1/tenant", |r| api_handler(r, tenant_create_handler))
.get("/v1/tenant/:tenant_id", |r| api_handler(r, tenant_status))
.delete("/v1/tenant/:tenant_shard_id", |r| {
.delete("/v1/tenant/:tenant_id", |r| {
api_handler(r, tenant_delete_handler)
})
.get("/v1/tenant/:tenant_id/synthetic_size", |r| {
@@ -1735,13 +1739,13 @@ pub fn make_router(
.get("/v1/tenant/:tenant_id/config", |r| {
api_handler(r, get_tenant_config_handler)
})
.put("/v1/tenant/:tenant_shard_id/location_config", |r| {
.put("/v1/tenant/:tenant_id/location_config", |r| {
api_handler(r, put_tenant_location_config_handler)
})
.get("/v1/tenant/:tenant_id/timeline", |r| {
api_handler(r, timeline_list_handler)
})
.post("/v1/tenant/:tenant_shard_id/timeline", |r| {
.post("/v1/tenant/:tenant_id/timeline", |r| {
api_handler(r, timeline_create_handler)
})
.post("/v1/tenant/:tenant_id/attach", |r| {
@@ -1785,7 +1789,7 @@ pub fn make_router(
"/v1/tenant/:tenant_id/timeline/:timeline_id/download_remote_layers",
|r| api_handler(r, timeline_download_remote_layers_handler_get),
)
.delete("/v1/tenant/:tenant_shard_id/timeline/:timeline_id", |r| {
.delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
api_handler(r, timeline_delete_handler)
})
.get("/v1/tenant/:tenant_id/timeline/:timeline_id/layer", |r| {

View File

@@ -5,7 +5,7 @@ use std::ops::Range;
///
/// Represents a set of Keys, in a compact form.
///
#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[derive(Clone, Debug, Default)]
pub struct KeySpace {
/// Contiguous ranges of keys that belong to the key space. In key order,
/// and with no overlap.

View File

@@ -1,5 +1,3 @@
#![deny(clippy::undocumented_unsafe_blocks)]
mod auth;
pub mod basebackup;
pub mod config;
@@ -25,7 +23,6 @@ pub mod walingest;
pub mod walrecord;
pub mod walredo;
pub mod client;
pub mod failpoint_support;
use crate::task_mgr::TaskKind;

View File

@@ -638,7 +638,7 @@ const STORAGE_IO_TIME_BUCKETS: &[f64] = &[
///
/// Operations:
/// - open ([`std::fs::OpenOptions::open`])
/// - close (dropping [`crate::virtual_file::VirtualFile`])
/// - close (dropping [`std::fs::File`])
/// - close-by-replace (close by replacement algorithm)
/// - read (`read_at`)
/// - write (`write_at`)
@@ -1225,6 +1225,15 @@ pub(crate) static WAL_REDO_TIME: Lazy<Histogram> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static WAL_REDO_WAIT_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_wal_redo_wait_seconds",
"Time spent waiting for access to the Postgres WAL redo process",
redo_histogram_time_buckets!(),
)
.expect("failed to define a metric")
});
pub(crate) static WAL_REDO_RECORDS_HISTOGRAM: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_wal_redo_records_histogram",
@@ -1252,46 +1261,6 @@ pub(crate) static WAL_REDO_RECORD_COUNTER: Lazy<IntCounter> = Lazy::new(|| {
.unwrap()
});
pub(crate) struct WalRedoProcessCounters {
pub(crate) started: IntCounter,
pub(crate) killed_by_cause: enum_map::EnumMap<WalRedoKillCause, IntCounter>,
}
#[derive(Debug, enum_map::Enum, strum_macros::IntoStaticStr)]
pub(crate) enum WalRedoKillCause {
WalRedoProcessDrop,
NoLeakChildDrop,
Startup,
}
impl Default for WalRedoProcessCounters {
fn default() -> Self {
let started = register_int_counter!(
"pageserver_wal_redo_process_started_total",
"Number of WAL redo processes started",
)
.unwrap();
let killed = register_int_counter_vec!(
"pageserver_wal_redo_process_stopped_total",
"Number of WAL redo processes stopped",
&["cause"],
)
.unwrap();
Self {
started,
killed_by_cause: EnumMap::from_array(std::array::from_fn(|i| {
let cause = <WalRedoKillCause as enum_map::Enum>::from_usize(i);
let cause_str: &'static str = cause.into();
killed.with_label_values(&[cause_str])
})),
}
}
}
pub(crate) static WAL_REDO_PROCESS_COUNTERS: Lazy<WalRedoProcessCounters> =
Lazy::new(WalRedoProcessCounters::default);
/// Similar to `prometheus::HistogramTimer` but does not record on drop.
pub struct StorageTimeMetricsTimer {
metrics: StorageTimeMetrics,
@@ -1959,6 +1928,7 @@ pub fn preinitialize_metrics() {
&READ_NUM_FS_LAYERS,
&WAIT_LSN_TIME,
&WAL_REDO_TIME,
&WAL_REDO_WAIT_TIME,
&WAL_REDO_RECORDS_HISTOGRAM,
&WAL_REDO_BYTES_HISTOGRAM,
]

View File

@@ -14,7 +14,6 @@ use async_compression::tokio::write::GzipEncoder;
use bytes::Buf;
use bytes::Bytes;
use futures::Stream;
use pageserver_api::models::TenantState;
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
@@ -40,7 +39,7 @@ use tracing::field;
use tracing::*;
use utils::id::ConnectionId;
use utils::{
auth::{Claims, Scope, SwappableJwtAuth},
auth::{Claims, JwtAuth, Scope},
id::{TenantId, TimelineId},
lsn::Lsn,
simple_rcu::RcuReadGuard,
@@ -122,7 +121,7 @@ async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<()
pub async fn libpq_listener_main(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<SwappableJwtAuth>>,
auth: Option<Arc<JwtAuth>>,
listener: TcpListener,
auth_type: AuthType,
listener_ctx: RequestContext,
@@ -190,7 +189,7 @@ pub async fn libpq_listener_main(
async fn page_service_conn_main(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<SwappableJwtAuth>>,
auth: Option<Arc<JwtAuth>>,
socket: tokio::net::TcpStream,
auth_type: AuthType,
connection_ctx: RequestContext,
@@ -218,27 +217,9 @@ async fn page_service_conn_main(
// no write timeout is used, because the kernel is assumed to error writes after some time.
let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
let socket_timeout_ms = (|| {
fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
// Exponential distribution for simulating
// poor network conditions, expect about avg_timeout_ms to be around 15
// in tests
if let Some(avg_timeout_ms) = avg_timeout_ms {
let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
let u = rand::random::<f32>();
((1.0 - u).ln() / (-avg)) as u64
} else {
default_timeout_ms
}
});
default_timeout_ms
})();
// A timeout here does not mean the client died, it can happen if it's just idle for
// a while: we will tear down this PageServerHandler and instantiate a new one if/when
// they reconnect.
socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
// timeout should be lower, but trying out multiple days for
// <https://github.com/neondatabase/neon/issues/4205>
socket.set_timeout(Some(std::time::Duration::from_secs(60 * 60 * 24 * 3)));
let socket = std::pin::pin!(socket);
// XXX: pgbackend.run() should take the connection_ctx,
@@ -271,7 +252,7 @@ async fn page_service_conn_main(
struct PageServerHandler {
_conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<SwappableJwtAuth>>,
auth: Option<Arc<JwtAuth>>,
claims: Option<Claims>,
/// The context created for the lifetime of the connection
@@ -285,7 +266,7 @@ impl PageServerHandler {
pub fn new(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<SwappableJwtAuth>>,
auth: Option<Arc<JwtAuth>>,
connection_ctx: RequestContext,
) -> Self {
PageServerHandler {
@@ -512,11 +493,7 @@ impl PageServerHandler {
};
if let Err(e) = &response {
// Requests may fail as soon as we are Stopping, even if the Timeline's cancellation token wasn't fired yet,
// because wait_lsn etc will drop out
// is_stopping(): [`Timeline::flush_and_shutdown`] has entered
// is_canceled(): [`Timeline::shutdown`]` has entered
if timeline.cancel.is_cancelled() || timeline.is_stopping() {
if timeline.cancel.is_cancelled() {
// If we fail to fulfil a request during shutdown, which may be _because_ of
// shutdown, then do not send the error to the client. Instead just drop the
// connection.
@@ -920,7 +897,7 @@ impl PageServerHandler {
// when accessing management api supply None as an argument
// when using to authorize tenant pass corresponding tenant id
fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
fn check_permission(&self, tenant_id: Option<TenantId>) -> anyhow::Result<()> {
if self.auth.is_none() {
// auth is set to Trust, nothing to check so just return ok
return Ok(());
@@ -932,7 +909,7 @@ impl PageServerHandler {
.claims
.as_ref()
.expect("claims presence already checked");
check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
check_permission(claims, tenant_id)
}
/// Shorthand for getting a reference to a Timeline of an Active tenant.
@@ -971,17 +948,16 @@ where
.auth
.as_ref()
.unwrap()
.decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
.map_err(|e| QueryError::Unauthorized(e.0))?;
.decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)?;
if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
return Err(QueryError::Unauthorized(
"jwt token scope is Tenant, but tenant id is missing".into(),
));
return Err(QueryError::Other(anyhow::anyhow!(
"jwt token scope is Tenant, but tenant id is missing"
)));
}
debug!(
"jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
info!(
"jwt auth succeeded for scope: {:#?} by tenant id: {:?}",
data.claims.scope, data.claims.tenant_id,
);
@@ -1003,13 +979,9 @@ where
pgb: &mut PostgresBackend<IO>,
query_string: &str,
) -> Result<(), QueryError> {
fail::fail_point!("simulated-bad-compute-connection", |_| {
info!("Hit failpoint for bad connection");
Err(QueryError::SimulatedConnectionError)
});
let ctx = self.connection_ctx.attached_child();
debug!("process query {query_string:?}");
if query_string.starts_with("pagestream ") {
let (_, params_raw) = query_string.split_at("pagestream ".len());
let params = params_raw.split(' ').collect::<Vec<_>>();
@@ -1358,9 +1330,6 @@ impl From<GetActiveTenantError> for QueryError {
GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
),
GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
QueryError::Shutdown
}
e => QueryError::Other(anyhow::anyhow!(e)),
}
}

View File

@@ -21,8 +21,8 @@ use serde::{Deserialize, Serialize};
use std::collections::{hash_map, HashMap, HashSet};
use std::ops::ControlFlow;
use std::ops::Range;
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
use utils::bin_ser::DeserializeError;
use utils::{bin_ser::BeSer, lsn::Lsn};
/// Block number within a relation or SLRU. This matches PostgreSQL's BlockNumber type.
@@ -30,33 +30,9 @@ pub type BlockNumber = u32;
#[derive(Debug)]
pub enum LsnForTimestamp {
/// Found commits both before and after the given timestamp
Present(Lsn),
/// Found no commits after the given timestamp, this means
/// that the newest data in the branch is older than the given
/// timestamp.
///
/// All commits <= LSN happened before the given timestamp
Future(Lsn),
/// The queried timestamp is past our horizon we look back at (PITR)
///
/// All commits > LSN happened after the given timestamp,
/// but any commits < LSN might have happened before or after
/// the given timestamp. We don't know because no data before
/// the given lsn is available.
Past(Lsn),
/// We have found no commit with a timestamp,
/// so we can't return anything meaningful.
///
/// The associated LSN is the lower bound value we can safely
/// create branches on, but no statement is made if it is
/// older or newer than the timestamp.
///
/// This variant can e.g. be returned right after a
/// cluster import.
NoData(Lsn),
}
@@ -68,25 +44,6 @@ pub enum CalculateLogicalSizeError {
Other(#[from] anyhow::Error),
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum CollectKeySpaceError {
#[error(transparent)]
Decode(#[from] DeserializeError),
#[error(transparent)]
PageRead(PageReconstructError),
#[error("cancelled")]
Cancelled,
}
impl From<PageReconstructError> for CollectKeySpaceError {
fn from(err: PageReconstructError) -> Self {
match err {
PageReconstructError::Cancelled => Self::Cancelled,
err => Self::PageRead(err),
}
}
}
impl From<PageReconstructError> for CalculateLogicalSizeError {
fn from(pre: PageReconstructError) -> Self {
match pre {
@@ -368,11 +325,7 @@ impl Timeline {
ctx: &RequestContext,
) -> Result<LsnForTimestamp, PageReconstructError> {
let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
// We use this method to figure out the branching LSN for the new branch, but the
// GC cutoff could be before the branching point and we cannot create a new branch
// with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
// on the safe side.
let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
let min_lsn = *gc_cutoff_lsn_guard;
let max_lsn = self.get_last_record_lsn();
// LSNs are always 8-byte aligned. low/mid/high represent the
@@ -402,33 +355,30 @@ impl Timeline {
low = mid + 1;
}
}
// If `found_smaller == true`, `low = t + 1` where `t` is the target LSN,
// so the LSN of the last commit record before or at `search_timestamp`.
// Remove one from `low` to get `t`.
//
// FIXME: it would be better to get the LSN of the previous commit.
// Otherwise, if you restore to the returned LSN, the database will
// include physical changes from later commits that will be marked
// as aborted, and will need to be vacuumed away.
let commit_lsn = Lsn((low - 1) * 8);
match (found_smaller, found_larger) {
(false, false) => {
// This can happen if no commit records have been processed yet, e.g.
// just after importing a cluster.
Ok(LsnForTimestamp::NoData(min_lsn))
Ok(LsnForTimestamp::NoData(max_lsn))
}
(true, false) => {
// Didn't find any commit timestamps larger than the request
Ok(LsnForTimestamp::Future(max_lsn))
}
(false, true) => {
// Didn't find any commit timestamps smaller than the request
Ok(LsnForTimestamp::Past(min_lsn))
Ok(LsnForTimestamp::Past(max_lsn))
}
(true, false) => {
// Only found commits with timestamps smaller than the request.
// It's still a valid case for branch creation, return it.
// And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
// case, anyway.
Ok(LsnForTimestamp::Future(commit_lsn))
(true, true) => {
// low is the LSN of the first commit record *after* the search_timestamp,
// Back off by one to get to the point just before the commit.
//
// FIXME: it would be better to get the LSN of the previous commit.
// Otherwise, if you restore to the returned LSN, the database will
// include physical changes from later commits that will be marked
// as aborted, and will need to be vacuumed away.
Ok(LsnForTimestamp::Present(Lsn((low - 1) * 8)))
}
(true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
}
}
@@ -628,6 +578,7 @@ impl Timeline {
pub async fn get_current_logical_size_non_incremental(
&self,
lsn: Lsn,
cancel: CancellationToken,
ctx: &RequestContext,
) -> Result<u64, CalculateLogicalSizeError> {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
@@ -638,8 +589,12 @@ impl Timeline {
let mut total_size: u64 = 0;
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
for rel in self.list_rels(*spcnode, *dbnode, lsn, ctx).await? {
if self.cancel.is_cancelled() {
for rel in self
.list_rels(*spcnode, *dbnode, lsn, ctx)
.await
.context("list rels")?
{
if cancel.is_cancelled() {
return Err(CalculateLogicalSizeError::Cancelled);
}
let relsize_key = rel_size_to_key(rel);
@@ -656,11 +611,11 @@ impl Timeline {
/// Get a KeySpace that covers all the Keys that are in use at the given LSN.
/// Anything that's not listed maybe removed from the underlying storage (from
/// that LSN forwards).
pub(crate) async fn collect_keyspace(
pub async fn collect_keyspace(
&self,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<KeySpace, CollectKeySpaceError> {
) -> anyhow::Result<KeySpace> {
// Iterate through key ranges, greedily packing them into partitions
let mut result = KeySpaceAccum::new();
@@ -669,7 +624,7 @@ impl Timeline {
// Fetch list of database dirs and iterate them
let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
let dbdir = DbDirectory::des(&buf)?;
let dbdir = DbDirectory::des(&buf).context("deserialization failure")?;
let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
dbs.sort_unstable();
@@ -702,7 +657,7 @@ impl Timeline {
let slrudir_key = slru_dir_to_key(kind);
result.add_key(slrudir_key);
let buf = self.get(slrudir_key, lsn, ctx).await?;
let dir = SlruSegmentDirectory::des(&buf)?;
let dir = SlruSegmentDirectory::des(&buf).context("deserialization failure")?;
let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
segments.sort_unstable();
for segno in segments {
@@ -720,7 +675,7 @@ impl Timeline {
// Then pg_twophase
result.add_key(TWOPHASEDIR_KEY);
let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
let twophase_dir = TwoPhaseDirectory::des(&buf)?;
let twophase_dir = TwoPhaseDirectory::des(&buf).context("deserialization failure")?;
let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
xids.sort_unstable();
for xid in xids {
@@ -1749,7 +1704,6 @@ const AUX_FILES_KEY: Key = Key {
// Reverse mappings for a few Keys.
// These are needed by WAL redo manager.
/// Guaranteed to return `Ok()` if [[is_rel_block_key]] returns `true` for `key`.
pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
Ok(match key.field1 {
0x00 => (
@@ -1765,8 +1719,7 @@ pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
})
}
/// See [[key_to_rel_block]].
pub fn is_rel_block_key(key: Key) -> bool {
fn is_rel_block_key(key: Key) -> bool {
key.field1 == 0x00 && key.field4 != 0
}

View File

@@ -1,11 +1,106 @@
use crate::walrecord::NeonWalRecord;
use anyhow::Result;
use anyhow::{bail, Result};
use byteorder::{ByteOrder, BE};
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::ops::{AddAssign, Range};
use std::time::Duration;
pub use pageserver_api::key::{Key, KEY_SIZE};
/// Key used in the Repository kv-store.
///
/// The Repository treats this as an opaque struct, but see the code in pgdatadir_mapping.rs
/// for what we actually store in these fields.
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
pub struct Key {
pub field1: u8,
pub field2: u32,
pub field3: u32,
pub field4: u32,
pub field5: u8,
pub field6: u32,
}
pub const KEY_SIZE: usize = 18;
impl Key {
/// 'field2' is used to store tablespaceid for relations and small enum numbers for other relish.
/// As long as Neon does not support tablespace (because of lack of access to local file system),
/// we can assume that only some predefined namespace OIDs are used which can fit in u16
pub fn to_i128(&self) -> i128 {
assert!(self.field2 < 0xFFFF || self.field2 == 0xFFFFFFFF || self.field2 == 0x22222222);
(((self.field1 & 0xf) as i128) << 120)
| (((self.field2 & 0xFFFF) as i128) << 104)
| ((self.field3 as i128) << 72)
| ((self.field4 as i128) << 40)
| ((self.field5 as i128) << 32)
| self.field6 as i128
}
pub const fn from_i128(x: i128) -> Self {
Key {
field1: ((x >> 120) & 0xf) as u8,
field2: ((x >> 104) & 0xFFFF) as u32,
field3: (x >> 72) as u32,
field4: (x >> 40) as u32,
field5: (x >> 32) as u8,
field6: x as u32,
}
}
pub fn next(&self) -> Key {
self.add(1)
}
pub fn add(&self, x: u32) -> Key {
let mut key = *self;
let r = key.field6.overflowing_add(x);
key.field6 = r.0;
if r.1 {
let r = key.field5.overflowing_add(1);
key.field5 = r.0;
if r.1 {
let r = key.field4.overflowing_add(1);
key.field4 = r.0;
if r.1 {
let r = key.field3.overflowing_add(1);
key.field3 = r.0;
if r.1 {
let r = key.field2.overflowing_add(1);
key.field2 = r.0;
if r.1 {
let r = key.field1.overflowing_add(1);
key.field1 = r.0;
assert!(!r.1);
}
}
}
}
}
key
}
pub fn from_slice(b: &[u8]) -> Self {
Key {
field1: b[0],
field2: u32::from_be_bytes(b[1..5].try_into().unwrap()),
field3: u32::from_be_bytes(b[5..9].try_into().unwrap()),
field4: u32::from_be_bytes(b[9..13].try_into().unwrap()),
field5: b[13],
field6: u32::from_be_bytes(b[14..18].try_into().unwrap()),
}
}
pub fn write_to_byte_slice(&self, buf: &mut [u8]) {
buf[0] = self.field1;
BE::write_u32(&mut buf[1..5], self.field2);
BE::write_u32(&mut buf[5..9], self.field3);
BE::write_u32(&mut buf[9..13], self.field4);
buf[13] = self.field5;
BE::write_u32(&mut buf[14..18], self.field6);
}
}
pub fn key_range_size(key_range: &Range<Key>) -> u32 {
let start = key_range.start;
@@ -34,9 +129,51 @@ pub fn singleton_range(key: Key) -> Range<Key> {
key..key.next()
}
impl fmt::Display for Key {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{:02X}{:08X}{:08X}{:08X}{:02X}{:08X}",
self.field1, self.field2, self.field3, self.field4, self.field5, self.field6
)
}
}
impl Key {
pub const MIN: Key = Key {
field1: u8::MIN,
field2: u32::MIN,
field3: u32::MIN,
field4: u32::MIN,
field5: u8::MIN,
field6: u32::MIN,
};
pub const MAX: Key = Key {
field1: u8::MAX,
field2: u32::MAX,
field3: u32::MAX,
field4: u32::MAX,
field5: u8::MAX,
field6: u32::MAX,
};
pub fn from_hex(s: &str) -> Result<Self> {
if s.len() != 36 {
bail!("parse error");
}
Ok(Key {
field1: u8::from_str_radix(&s[0..2], 16)?,
field2: u32::from_str_radix(&s[2..10], 16)?,
field3: u32::from_str_radix(&s[10..18], 16)?,
field4: u32::from_str_radix(&s[18..26], 16)?,
field5: u8::from_str_radix(&s[26..28], 16)?,
field6: u32::from_str_radix(&s[28..36], 16)?,
})
}
}
/// A 'value' stored for a one Key.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub enum Value {
/// An Image value contains a full copy of the value
Image(Bytes),
@@ -60,70 +197,6 @@ impl Value {
}
}
#[cfg(test)]
mod test {
use super::*;
use bytes::Bytes;
use utils::bin_ser::BeSer;
macro_rules! roundtrip {
($orig:expr, $expected:expr) => {{
let orig: Value = $orig;
let actual = Value::ser(&orig).unwrap();
let expected: &[u8] = &$expected;
assert_eq!(utils::Hex(&actual), utils::Hex(expected));
let deser = Value::des(&actual).unwrap();
assert_eq!(orig, deser);
}};
}
#[test]
fn image_roundtrip() {
let image = Bytes::from_static(b"foobar");
let image = Value::Image(image);
#[rustfmt::skip]
let expected = [
// top level discriminator of 4 bytes
0x00, 0x00, 0x00, 0x00,
// 8 byte length
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06,
// foobar
0x66, 0x6f, 0x6f, 0x62, 0x61, 0x72
];
roundtrip!(image, expected);
}
#[test]
fn walrecord_postgres_roundtrip() {
let rec = NeonWalRecord::Postgres {
will_init: true,
rec: Bytes::from_static(b"foobar"),
};
let rec = Value::WalRecord(rec);
#[rustfmt::skip]
let expected = [
// flattened discriminator of total 8 bytes
0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00,
// will_init
0x01,
// 8 byte length
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06,
// foobar
0x66, 0x6f, 0x6f, 0x62, 0x61, 0x72
];
roundtrip!(rec, expected);
}
}
///
/// Result of performing GC
///

View File

@@ -291,16 +291,6 @@ impl From<harness::TestRedoManager> for WalRedoManager {
}
impl WalRedoManager {
pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
match self {
Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout),
#[cfg(test)]
Self::Test(_) => {
// Not applicable to test redo manager
}
}
}
pub async fn request_redo(
&self,
key: crate::repository::Key,
@@ -1659,16 +1649,22 @@ impl Tenant {
/// This function is periodically called by compactor task.
/// Also it can be explicitly requested per timeline through page server
/// api's 'compact' command.
async fn compaction_iteration(
pub async fn compaction_iteration(
&self,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<(), timeline::CompactionError> {
// Don't start doing work during shutdown, or when broken, we do not need those in the logs
if !self.is_active() {
) -> anyhow::Result<()> {
// Don't start doing work during shutdown
if let TenantState::Stopping { .. } = self.current_state() {
return Ok(());
}
// We should only be called once the tenant has activated.
anyhow::ensure!(
self.is_active(),
"Cannot run compaction iteration on inactive tenant"
);
{
let conf = self.tenant_conf.read().unwrap();
if !conf.location.may_delete_layers_hint() || !conf.location.may_upload_layers_hint() {
@@ -1715,10 +1711,6 @@ impl Tenant {
self.current_state() == TenantState::Active
}
pub fn generation(&self) -> Generation {
self.generation
}
/// Changes tenant status to active, unless shutdown was already requested.
///
/// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup
@@ -1849,13 +1841,7 @@ impl Tenant {
timelines.values().for_each(|timeline| {
let timeline = Arc::clone(timeline);
let span = Span::current();
js.spawn(async move {
if freeze_and_flush {
timeline.flush_and_shutdown().instrument(span).await
} else {
timeline.shutdown().instrument(span).await
}
});
js.spawn(async move { timeline.shutdown(freeze_and_flush).instrument(span).await });
})
};
tracing::info!("Waiting for timelines...");
@@ -3508,7 +3494,6 @@ pub(crate) mod harness {
// enable it in case the tests exercise code paths that use
// debug_assert_current_span_has_tenant_and_timeline_id
logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
logging::Output::Stdout,
)
.expect("Failed to init test logging")
});
@@ -3543,6 +3528,10 @@ pub(crate) mod harness {
let remote_fs_dir = conf.workdir.join("localfs");
std::fs::create_dir_all(&remote_fs_dir).unwrap();
let config = RemoteStorageConfig {
// TODO: why not remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
max_concurrent_syncs: std::num::NonZeroUsize::new(2_000_000).unwrap(),
// TODO: why not remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS,
max_sync_errors: std::num::NonZeroU32::new(3_000_000).unwrap(),
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
};
let remote_storage = GenericRemoteStorage::from_config(&config).unwrap();
@@ -4742,7 +4731,7 @@ mod tests {
// Keeps uninit mark in place
let raw_tline = tline.raw_timeline().unwrap();
raw_tline
.shutdown()
.shutdown(false)
.instrument(info_span!("test_shutdown", tenant_id=%raw_tline.tenant_id))
.await;
std::mem::forget(tline);

View File

@@ -20,14 +20,12 @@ use std::io::{Error, ErrorKind};
impl<'a> BlockCursor<'a> {
/// Read a blob into a new buffer.
#[tracing::instrument(skip_all, fields(%offset), level = tracing::Level::DEBUG)]
pub async fn read_blob(
&self,
offset: u64,
ctx: &RequestContext,
) -> Result<Vec<u8>, std::io::Error> {
let mut buf = Vec::new();
tracing::debug!("reading blob");
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
Ok(buf)
}
@@ -329,7 +327,7 @@ mod tests {
let mut sz: u16 = rng.gen();
// Make 50% of the arrays small
if rng.gen() {
sz &= 63;
sz |= 63;
}
random_array(sz.into())
})

View File

@@ -141,7 +141,6 @@ impl<'a> BlockCursor<'a> {
/// access to the contents of the page. (For the page cache, the
/// lease object represents a lock on the buffer.)
#[inline(always)]
#[tracing::instrument(skip_all, level = tracing::Level::DEBUG)]
pub async fn read_blk(
&self,
blknum: u32,

View File

@@ -573,10 +573,10 @@ impl<const L: usize> BuildNode<L> {
BuildNode {
num_children: 0,
level,
prefix: Vec::new(),
prefix: Vec::with_capacity(16),
suffix_len: 0,
keys: Vec::new(),
values: Vec::new(),
keys: Vec::with_capacity(5024),
values: Vec::with_capacity(3140),
size: NODE_HDR_SIZE,
}
}

View File

@@ -181,7 +181,6 @@ impl LayerMap {
/// NOTE: This only searches the 'historic' layers, *not* the
/// 'open' and 'frozen' layers!
///
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult> {
let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
let latest_delta = version.delta_coverage.query(key.to_i128());

File diff suppressed because it is too large Load Diff

View File

@@ -6,6 +6,7 @@ use std::sync::Arc;
use anyhow::{bail, Context};
use tokio::sync::oneshot::error::RecvError;
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
use crate::context::RequestContext;
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
@@ -349,6 +350,10 @@ async fn fill_logical_sizes(
// our advantage with `?` error handling.
let mut joinset = tokio::task::JoinSet::new();
let cancel = tokio_util::sync::CancellationToken::new();
// be sure to cancel all spawned tasks if we are dropped
let _dg = cancel.clone().drop_guard();
// For each point that would benefit from having a logical size available,
// spawn a Task to fetch it, unless we have it cached already.
for seg in segments.iter() {
@@ -366,8 +371,15 @@ async fn fill_logical_sizes(
let parallel_size_calcs = Arc::clone(limit);
let ctx = ctx.attached_child();
joinset.spawn(
calculate_logical_size(parallel_size_calcs, timeline, lsn, cause, ctx)
.in_current_span(),
calculate_logical_size(
parallel_size_calcs,
timeline,
lsn,
cause,
ctx,
cancel.child_token(),
)
.in_current_span(),
);
}
e.insert(cached_size);
@@ -475,13 +487,14 @@ async fn calculate_logical_size(
lsn: utils::lsn::Lsn,
cause: LogicalSizeCalculationCause,
ctx: RequestContext,
cancel: CancellationToken,
) -> Result<TimelineAtLsnSizeResult, RecvError> {
let _permit = tokio::sync::Semaphore::acquire_owned(limit)
.await
.expect("global semaphore should not had been closed");
let size_res = timeline
.spawn_ondemand_logical_size_calculation(lsn, cause, ctx)
.spawn_ondemand_logical_size_calculation(lsn, cause, ctx, cancel)
.instrument(info_span!("spawn_ondemand_logical_size_calculation"))
.await?;
Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))

View File

@@ -2,7 +2,7 @@
pub mod delta_layer;
mod filename;
pub mod image_layer;
mod image_layer;
mod inmemory_layer;
mod layer;
mod layer_desc;

Some files were not shown because too many files have changed in this diff Show More