Compare commits

..

9 Commits

Author SHA1 Message Date
Alek Westover
4af6a4d5e8 hopefully unbroken wip 2023-07-18 08:45:39 -04:00
Alek Westover
b27fa34c00 pass aws creds via cli 2023-07-17 08:31:12 -04:00
Alek Westover
ca22453627 Merge branch 'alek_targz' of github.com:neondatabase/neon into alek_targz_default_on 2023-07-17 07:59:30 -04:00
Alek Westover
0a00869615 this should pass github tests, but will fail with my local cloud repo 2023-07-14 13:55:14 -04:00
Alek Westover
87eead5220 Update rfc 2023-07-14 10:54:16 -04:00
Alek Westover
3cf83014d4 patch rfc 2023-07-14 09:21:46 -04:00
Alek Westover
353a735acb @arpad-m suggested using as_slice instead of creating a cursor 2023-07-14 07:58:05 -04:00
Alek Westover
107ebd3d21 turn remote extensions on by default 2023-07-13 17:05:52 -04:00
Alek Westover
89c93457f3 Add support for remote extensions. When requested, downloads a tar.gz file for the extension and then organizes the contained files. For instance, placing .so files in sharelib. 2023-07-13 16:15:18 -04:00
157 changed files with 1432 additions and 2894 deletions

View File

@@ -12,11 +12,6 @@ opt-level = 3
# Turn on a small amount of optimization in Development mode.
opt-level = 1
[build]
# This is only present for local builds, as it will be overridden
# by the RUSTDOCFLAGS env var in CI.
rustdocflags = ["-Arustdoc::private_intra_doc_links"]
[alias]
build_testing = ["build", "--features", "testing"]
neon = ["run", "--bin", "neon_local"]

View File

@@ -150,14 +150,6 @@ runs:
EXTRA_PARAMS="--flaky-tests-json $TEST_OUTPUT/flaky.json $EXTRA_PARAMS"
fi
# We use pytest-split plugin to run benchmarks in parallel on different CI runners
if [ "${TEST_SELECTION}" = "test_runner/performance" ] && [ "${{ inputs.build_type }}" != "remote" ]; then
mkdir -p $TEST_OUTPUT
poetry run ./scripts/benchmark_durations.py "${TEST_RESULT_CONNSTR}" --days 10 --output "$TEST_OUTPUT/benchmark_durations.json"
EXTRA_PARAMS="--durations-path $TEST_OUTPUT/benchmark_durations.json $EXTRA_PARAMS"
fi
if [[ "${{ inputs.build_type }}" == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
elif [[ "${{ inputs.build_type }}" == "release" ]]; then

View File

@@ -1,55 +0,0 @@
name: Handle `approved-for-ci-run` label
# This workflow helps to run CI pipeline for PRs made by external contributors (from forks).
on:
pull_request:
types:
# Default types that triggers a workflow ([1]):
# - [1] https://docs.github.com/en/actions/using-workflows/events-that-trigger-workflows#pull_request
- opened
- synchronize
- reopened
# Types that we wand to handle in addition to keep labels tidy:
- closed
# Actual magic happens here:
- labeled
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
PR_NUMBER: ${{ github.event.pull_request.number }}
jobs:
remove-label:
# Remove `approved-for-ci-run` label if the workflow is triggered by changes in a PR.
# The PR should be reviewed and labelled manually again.
runs-on: [ ubuntu-latest ]
if: |
contains(fromJSON('["opened", "synchronize", "reopened", "closed"]'), github.event.action) &&
contains(github.event.pull_request.labels.*.name, 'approved-for-ci-run')
steps:
- run: gh pr --repo "${GITHUB_REPOSITORY}" edit "${PR_NUMBER}" --remove-label "approved-for-ci-run"
create-branch:
# Create a local branch for an `approved-for-ci-run` labelled PR to run CI pipeline in it.
runs-on: [ ubuntu-latest ]
if: |
github.event.action == 'labeled' &&
contains(github.event.pull_request.labels.*.name, 'approved-for-ci-run')
steps:
- run: gh pr --repo "${GITHUB_REPOSITORY}" edit "${PR_NUMBER}" --remove-label "approved-for-ci-run"
- uses: actions/checkout@v3
with:
ref: main
- run: gh pr checkout "${PR_NUMBER}"
- run: git checkout -b "ci-run/pr-${PR_NUMBER}"
- run: git push --force origin "ci-run/pr-${PR_NUMBER}"

View File

@@ -5,7 +5,6 @@ on:
branches:
- main
- release
- ci-run/pr-*
pull_request:
defaults:
@@ -128,11 +127,6 @@ jobs:
- name: Run cargo clippy (release)
run: cargo hack --feature-powerset clippy --release $CLIPPY_COMMON_ARGS
- name: Check documentation generation
run: cargo doc --workspace --no-deps --document-private-items
env:
RUSTDOCFLAGS: "-Dwarnings -Arustdoc::private_intra_doc_links"
# Use `${{ !cancelled() }}` to run quck tests after the longer clippy run
- name: Check formatting
if: ${{ !cancelled() }}
@@ -396,11 +390,13 @@ jobs:
strategy:
fail-fast: false
matrix:
pytest_split_group: [ 1, 2, 3, 4 ]
build_type: [ release ]
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 1
- name: Pytest benchmarks
uses: ./.github/actions/run-python-test-set
@@ -409,11 +405,9 @@ jobs:
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ github.ref_name == 'main' }}
extra_params: --splits ${{ strategy.job-total }} --group ${{ matrix.pytest_split_group }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
TEST_RESULT_CONNSTR: "${{ secrets.REGRESS_TEST_RESULT_CONNSTR }}"
# XXX: no coverage data handling here, since benchmarks are run on release builds,
# while coverage is currently collected for the debug ones
@@ -794,7 +788,7 @@ jobs:
run:
shell: sh -eu {0}
env:
VM_BUILDER_VERSION: v0.13.1
VM_BUILDER_VERSION: v0.12.1
steps:
- name: Checkout
@@ -1007,8 +1001,6 @@ jobs:
done
- name: Upload postgres-extensions to S3
# TODO: Reenable step after switching to the new extensions format (tar-gzipped + index.json)
if: false
run: |
for BUCKET in $(echo ${S3_BUCKETS}); do
aws s3 cp --recursive --only-show-errors ./extensions-to-upload s3://${BUCKET}/${{ needs.tag.outputs.build-tag }}/${{ matrix.version }}

View File

@@ -3,8 +3,7 @@ name: Check neon with extra platform builds
on:
push:
branches:
- main
- ci-run/pr-*
- main
pull_request:
defaults:

125
Cargo.lock generated
View File

@@ -740,9 +740,6 @@ name = "cc"
version = "1.0.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f"
dependencies = [
"jobserver",
]
[[package]]
name = "cexpr"
@@ -925,7 +922,6 @@ dependencies = [
"url",
"utils",
"workspace_hack",
"zstd",
]
[[package]]
@@ -1979,15 +1975,6 @@ version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6"
[[package]]
name = "jobserver"
version = "0.1.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "936cfd212a0155903bcbc060e316fb6cc7cbf2e1907329391ebadc1fe0ce77c2"
dependencies = [
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.63"
@@ -2395,9 +2382,9 @@ dependencies = [
[[package]]
name = "opentelemetry"
version = "0.19.0"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f4b8347cc26099d3aeee044065ecc3ae11469796b4d65d065a23a584ed92a6f"
checksum = "69d6c3d7288a106c0a363e4b0e8d308058d56902adefb16f4936f417ffef086e"
dependencies = [
"opentelemetry_api",
"opentelemetry_sdk",
@@ -2405,9 +2392,9 @@ dependencies = [
[[package]]
name = "opentelemetry-http"
version = "0.8.0"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a819b71d6530c4297b49b3cae2939ab3a8cc1b9f382826a1bc29dd0ca3864906"
checksum = "1edc79add46364183ece1a4542592ca593e6421c60807232f5b8f7a31703825d"
dependencies = [
"async-trait",
"bytes",
@@ -2418,9 +2405,9 @@ dependencies = [
[[package]]
name = "opentelemetry-otlp"
version = "0.12.0"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8af72d59a4484654ea8eb183fea5ae4eb6a41d7ac3e3bae5f4d2a282a3a7d3ca"
checksum = "d1c928609d087790fc936a1067bdc310ae702bdf3b090c3f281b713622c8bbde"
dependencies = [
"async-trait",
"futures",
@@ -2436,47 +2423,48 @@ dependencies = [
[[package]]
name = "opentelemetry-proto"
version = "0.2.0"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "045f8eea8c0fa19f7d48e7bc3128a39c2e5c533d5c61298c548dfefc1064474c"
checksum = "d61a2f56df5574508dd86aaca016c917489e589ece4141df1b5e349af8d66c28"
dependencies = [
"futures",
"futures-util",
"opentelemetry",
"prost",
"tonic 0.8.3",
"tonic-build 0.8.4",
]
[[package]]
name = "opentelemetry-semantic-conventions"
version = "0.11.0"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24e33428e6bf08c6f7fcea4ddb8e358fab0fe48ab877a87c70c6ebe20f673ce5"
checksum = "9b02e0230abb0ab6636d18e2ba8fa02903ea63772281340ccac18e0af3ec9eeb"
dependencies = [
"opentelemetry",
]
[[package]]
name = "opentelemetry_api"
version = "0.19.0"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed41783a5bf567688eb38372f2b7a8530f5a607a4b49d38dd7573236c23ca7e2"
checksum = "c24f96e21e7acc813c7a8394ee94978929db2bcc46cf6b5014fc612bf7760c22"
dependencies = [
"fnv",
"futures-channel",
"futures-util",
"indexmap",
"js-sys",
"once_cell",
"pin-project-lite",
"thiserror",
"urlencoding",
]
[[package]]
name = "opentelemetry_sdk"
version = "0.19.0"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b3a2a91fdbfdd4d212c0dcc2ab540de2c2bcbbd90be17de7a7daf8822d010c1"
checksum = "1ca41c4933371b61c2a2f214bf16931499af4ec90543604ec828f7a625c09113"
dependencies = [
"async-trait",
"crossbeam-channel",
@@ -2952,9 +2940,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068"
[[package]]
name = "proc-macro2"
version = "1.0.64"
version = "1.0.58"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78803b62cbf1f46fde80d7c0e803111524b9877184cfe7c3033659490ac7a7da"
checksum = "fa1fb82fc0c281dd9671101b66b771ebbe1eaf967b96ac8740dcba4b70005ca8"
dependencies = [
"unicode-ident",
]
@@ -3343,9 +3331,9 @@ dependencies = [
[[package]]
name = "reqwest-tracing"
version = "0.4.5"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b97ad83c2fc18113346b7158d79732242002427c30f620fa817c1f32901e0a8"
checksum = "783e8130d2427ddd7897dd3f814d4a3aea31b05deb42a4fdf8c18258fe5aefd1"
dependencies = [
"anyhow",
"async-trait",
@@ -3870,8 +3858,7 @@ dependencies = [
[[package]]
name = "sharded-slab"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31"
source = "git+https://github.com/neondatabase/sharded-slab.git?rev=98d16753ab01c61f0a028de44167307a00efea00#98d16753ab01c61f0a028de44167307a00efea00"
dependencies = [
"lazy_static",
]
@@ -4014,7 +4001,7 @@ dependencies = [
"tokio",
"tokio-stream",
"tonic 0.9.2",
"tonic-build",
"tonic-build 0.9.2",
"tracing",
"utils",
"workspace_hack",
@@ -4115,7 +4102,7 @@ checksum = "4b55807c0344e1e6c04d7c965f5289c39a8d94ae23ed5c0b57aabac549f871c6"
dependencies = [
"filetime",
"libc",
"xattr 0.2.3",
"xattr",
]
[[package]]
@@ -4396,17 +4383,16 @@ dependencies = [
[[package]]
name = "tokio-tar"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d5714c010ca3e5c27114c1cdeb9d14641ace49874aa5626d7149e47aedace75"
version = "0.3.0"
source = "git+https://github.com/neondatabase/tokio-tar.git?rev=404df61437de0feef49ba2ccdbdd94eb8ad6e142#404df61437de0feef49ba2ccdbdd94eb8ad6e142"
dependencies = [
"filetime",
"futures-core",
"libc",
"redox_syscall 0.3.5",
"redox_syscall 0.2.16",
"tokio",
"tokio-stream",
"xattr 1.0.0",
"xattr",
]
[[package]]
@@ -4533,6 +4519,19 @@ dependencies = [
"tracing",
]
[[package]]
name = "tonic-build"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bf5e9b9c0f7e0a7c027dcfaba7b2c60816c7049171f679d99ee2ff65d0de8c4"
dependencies = [
"prettyplease 0.1.25",
"proc-macro2",
"prost-build",
"quote",
"syn 1.0.109",
]
[[package]]
name = "tonic-build"
version = "0.9.2"
@@ -4656,9 +4655,9 @@ dependencies = [
[[package]]
name = "tracing-opentelemetry"
version = "0.19.0"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00a39dcf9bfc1742fa4d6215253b33a6e474be78275884c216fc2a06267b3600"
checksum = "21ebb87a95ea13271332df069020513ab70bdb5637ca42d6e492dc3bbbad48de"
dependencies = [
"once_cell",
"opentelemetry",
@@ -5309,7 +5308,6 @@ version = "0.1.0"
dependencies = [
"anyhow",
"bytes",
"cc",
"chrono",
"clap",
"clap_builder",
@@ -5381,15 +5379,6 @@ dependencies = [
"libc",
]
[[package]]
name = "xattr"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea263437ca03c1522846a4ddafbca2542d0ad5ed9b784909d4b27b76f62bc34a"
dependencies = [
"libc",
]
[[package]]
name = "xmlparser"
version = "0.13.5"
@@ -5410,33 +5399,3 @@ name = "zeroize"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9"
[[package]]
name = "zstd"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a27595e173641171fc74a1232b7b1c7a7cb6e18222c11e9dfb9888fa424c53c"
dependencies = [
"zstd-safe",
]
[[package]]
name = "zstd-safe"
version = "6.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee98ffd0b48ee95e6c5168188e44a54550b1564d9d530ee21d5f0eaed1069581"
dependencies = [
"libc",
"zstd-sys",
]
[[package]]
name = "zstd-sys"
version = "2.0.8+zstd.1.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5556e6ee25d32df2586c098bbfa278803692a20d0ab9565e049480d52707ec8c"
dependencies = [
"cc",
"libc",
"pkg-config",
]

View File

@@ -84,9 +84,9 @@ notify = "5.0.0"
num_cpus = "1.15"
num-traits = "0.2.15"
once_cell = "1.13"
opentelemetry = "0.19.0"
opentelemetry-otlp = { version = "0.12.0", default_features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-semantic-conventions = "0.11.0"
opentelemetry = "0.18.0"
opentelemetry-otlp = { version = "0.11.0", default_features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-semantic-conventions = "0.10.0"
parking_lot = "0.12"
pbkdf2 = "0.12.1"
pin-project-lite = "0.2"
@@ -95,7 +95,7 @@ prost = "0.11"
rand = "0.8"
regex = "1.4"
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"] }
reqwest-tracing = { version = "0.4.0", features = ["opentelemetry_0_19"] }
reqwest-tracing = { version = "0.4.0", features = ["opentelemetry_0_18"] }
reqwest-middleware = "0.2.0"
reqwest-retry = "0.2.2"
routerify = "3"
@@ -124,14 +124,13 @@ tokio-io-timeout = "1.2.0"
tokio-postgres-rustls = "0.9.0"
tokio-rustls = "0.23"
tokio-stream = "0.1"
tokio-tar = "0.3"
tokio-util = { version = "0.7", features = ["io"] }
toml = "0.7"
toml_edit = "0.19"
tonic = {version = "0.9", features = ["tls", "tls-roots"]}
tracing = "0.1"
tracing-error = "0.2.0"
tracing-opentelemetry = "0.19.0"
tracing-opentelemetry = "0.18.0"
tracing-subscriber = { version = "0.3", default_features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter"] }
url = "2.2"
uuid = { version = "1.2", features = ["v4", "serde"] }
@@ -149,6 +148,7 @@ postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" }
tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df61437de0feef49ba2ccdbdd94eb8ad6e142" }
## Other git libraries
heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending
@@ -185,6 +185,11 @@ tonic-build = "0.9"
# TODO: we should probably fork `tokio-postgres-rustls` instead.
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" }
# Changes the MAX_THREADS limit from 4096 to 32768.
# This is a temporary workaround for using tracing from many threads in safekeepers code,
# until async safekeepers patch is merged to the main.
sharded-slab = { git = "https://github.com/neondatabase/sharded-slab.git", rev="98d16753ab01c61f0a028de44167307a00efea00" }
################# Binary contents sections
[profile.release]

View File

@@ -535,10 +535,10 @@ FROM build-deps AS pg-embedding-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ENV PATH "/usr/local/pgsql/bin/:$PATH"
# eeb3ba7c3a60c95b2604dd543c64b2f1bb4a3703 made on 15/07/2023
# 2465f831ea1f8d49c1d74f8959adb7fc277d70cd made on 05/07/2023
# There is no release tag yet
RUN wget https://github.com/neondatabase/pg_embedding/archive/eeb3ba7c3a60c95b2604dd543c64b2f1bb4a3703.tar.gz -O pg_embedding.tar.gz && \
echo "030846df723652f99a8689ce63b66fa0c23477a7fd723533ab8a6b28ab70730f pg_embedding.tar.gz" | sha256sum --check && \
RUN wget https://github.com/neondatabase/pg_embedding/archive/2465f831ea1f8d49c1d74f8959adb7fc277d70cd.tar.gz -O pg_embedding.tar.gz && \
echo "047af2b1f664a1e6e37867bd4eeaf5934fa27d6ba3d6c4461efa388ddf7cd1d5 pg_embedding.tar.gz" | sha256sum --check && \
mkdir pg_embedding-src && cd pg_embedding-src && tar xvzf ../pg_embedding.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \

View File

@@ -34,4 +34,3 @@ utils.workspace = true
workspace_hack.workspace = true
toml_edit.workspace = true
remote_storage = { version = "0.1", path = "../libs/remote_storage/" }
zstd = "0.12.4"

View File

@@ -30,10 +30,10 @@
//! -C 'postgresql://cloud_admin@localhost/postgres' \
//! -S /var/db/postgres/specs/current.json \
//! -b /usr/local/bin/postgres \
//! -r {"bucket": "my-bucket", "region": "eu-central-1", "endpoint": "http:://localhost:9000"}
//! -r {"bucket": "my-bucket", "region": "eu-central-1", "endpoint": "http:://localhost:9000"} \
//! ```
//!
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::fs::File;
use std::panic;
use std::path::Path;
@@ -51,6 +51,7 @@ use compute_api::responses::ComputeStatus;
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_tools::configurator::launch_configurator;
use compute_tools::extension_server::launch_download_extensions;
use compute_tools::extension_server::{get_pg_version, init_remote_storage};
use compute_tools::http::api::launch_http_server;
use compute_tools::logger::*;
@@ -58,26 +59,43 @@ use compute_tools::monitor::launch_monitor;
use compute_tools::params::*;
use compute_tools::spec::*;
// this is an arbitrary build tag. Fine as a default / for testing purposes
// in-case of not-set environment var
const BUILD_TAG_DEFAULT: &str = "5670669815";
const BUILD_TAG_DEFAULT: &str = "local";
const DEFAULT_REMOTE_EXT_CONFIG: &str = r#"{"bucket": "neon-dev-extensions", "region": "eu-central-1", "endpoint": null, "prefix": "5555"}"#;
fn main() -> Result<()> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
let build_tag = option_env!("BUILD_TAG")
.unwrap_or(BUILD_TAG_DEFAULT)
.to_string();
let build_tag = option_env!("BUILD_TAG").unwrap_or(BUILD_TAG_DEFAULT);
info!("build_tag: {build_tag}");
let matches = cli().get_matches();
let pgbin_default = String::from("postgres");
let pgbin = matches.get_one::<String>("pgbin").unwrap_or(&pgbin_default);
let remote_ext_config = matches.get_one::<String>("remote-ext-config");
let remote_ext_config = matches
.get_one::<String>("remote-ext-config")
.map(|x| x.to_string());
// let remote_ext_config =
// Some(remote_ext_config.unwrap_or(DEFAULT_REMOTE_EXT_CONFIG.to_string()));
let ext_remote_storage = remote_ext_config.map(|x| {
init_remote_storage(x).expect("cannot initialize remote extension storage from config")
init_remote_storage(&x, build_tag)
.expect("cannot initialize remote extension storage from config")
});
// creds used to connect to remote extensions bucket
// let aws_creds = matches.get_one::<String>("awscreds");
// if let Some(aws_creds) = aws_creds {
// // not sure if this is a bad idea?
// let aws_creds_dict: serde_json::Value = serde_json::from_str(aws_creds)?;
// std::env::set_var(
// "AWS_ACCESS_KEY_ID",
// aws_creds_dict["ID"].as_str().expect("config parse error"),
// );
// std::env::set_var(
// "AWS_SECRET_ACCESS_KEY",
// aws_creds_dict["key"].as_str().expect("config parse error"),
// );
// }
let http_port = *matches
.get_one::<u16>("http-port")
@@ -196,10 +214,7 @@ fn main() -> Result<()> {
state: Mutex::new(new_state),
state_changed: Condvar::new(),
ext_remote_storage,
ext_remote_paths: OnceLock::new(),
started_to_download_extensions: Mutex::new(HashSet::new()),
library_index: OnceLock::new(),
build_tag,
available_extensions: OnceLock::new(),
};
let compute = Arc::new(compute_node);
@@ -247,6 +262,9 @@ fn main() -> Result<()> {
let _configurator_handle =
launch_configurator(&compute).expect("cannot launch configurator thread");
let _download_extensions_handle =
launch_download_extensions(&compute).expect("cannot launch download extensions thread");
// Start Postgres
let mut delay_exit = false;
let mut exit_code = None;
@@ -385,6 +403,12 @@ fn cli() -> clap::Command {
.long("remote-ext-config")
.value_name("REMOTE_EXT_CONFIG"),
)
.arg(
Arg::new("awscreds")
.short('k')
.long("awscreds")
.value_name("AWS_CREDENTIALS"),
)
}
#[test]

View File

@@ -1,4 +1,3 @@
use std::collections::HashMap;
use std::collections::HashSet;
use std::fs;
use std::io::BufRead;
@@ -22,7 +21,7 @@ use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::{ComputeMode, ComputeSpec};
use utils::measured_stream::MeasuredReader;
use remote_storage::{GenericRemoteStorage, RemotePath};
use remote_storage::GenericRemoteStorage;
use crate::pg_helpers::*;
use crate::spec::*;
@@ -56,11 +55,8 @@ pub struct ComputeNode {
pub state_changed: Condvar,
/// the S3 bucket that we search for extensions in
pub ext_remote_storage: Option<GenericRemoteStorage>,
// (key: extension name, value: path to extension archive in remote storage)
pub ext_remote_paths: OnceLock<HashMap<String, RemotePath>>,
pub library_index: OnceLock<HashMap<String, String>>,
pub started_to_download_extensions: Mutex<HashSet<String>>,
pub build_tag: String,
// cached lists of available extensions and libraries
pub available_extensions: OnceLock<HashSet<String>>,
}
#[derive(Clone, Debug)]
@@ -739,67 +735,36 @@ LIMIT 100",
// If remote extension storage is configured,
// download extension control files
#[tokio::main]
pub async fn prepare_external_extensions(&self, compute_state: &ComputeState) -> Result<()> {
if let Some(ref ext_remote_storage) = self.ext_remote_storage {
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let spec = &pspec.spec;
let custom_ext = spec.custom_extensions.clone().unwrap_or(Vec::new());
info!("custom extensions: {:?}", &custom_ext);
let (ext_remote_paths, library_index) = extension_server::get_available_extensions(
let custom_ext_prefixes = spec.custom_extensions.clone().unwrap_or(Vec::new());
info!("custom_ext_prefixes: {:?}", &custom_ext_prefixes);
let available_extensions = extension_server::get_available_extensions(
ext_remote_storage,
&self.pgbin,
&self.pgversion,
&custom_ext,
&self.build_tag,
&custom_ext_prefixes,
)
.await?;
self.ext_remote_paths
.set(ext_remote_paths)
.expect("this is the only time we set ext_remote_paths");
self.library_index
.set(library_index)
.expect("this is the only time we set library_index");
self.available_extensions
.set(available_extensions)
.expect("available_extensions.set error");
}
Ok(())
}
pub async fn download_extension(&self, ext_name: &str, is_library: bool) -> Result<()> {
pub async fn download_extension(&self, ext_name: &str) -> Result<()> {
match &self.ext_remote_storage {
None => anyhow::bail!("No remote extension storage"),
Some(remote_storage) => {
let mut real_ext_name = ext_name.to_string();
if is_library {
real_ext_name = real_ext_name.replace(".so", "");
real_ext_name = self
.library_index
.get()
.expect("must have already downloaded the library_index")[&real_ext_name]
.clone();
}
{
let mut started_to_download_extensions = self
.started_to_download_extensions
.lock()
.expect("bad lock");
if started_to_download_extensions.contains(&real_ext_name) {
info!(
"extension {:?} already exists, skipping download",
&ext_name
);
return Ok(());
} else {
started_to_download_extensions.insert(real_ext_name.clone());
}
}
extension_server::download_extension(
&real_ext_name,
&self
.ext_remote_paths
.get()
.expect("error accessing ext_remote_paths")[&real_ext_name],
ext_name,
remote_storage,
&self.pgbin,
&self.pgversion,
)
.await
}
@@ -844,13 +809,10 @@ LIMIT 100",
libs_vec.extend(preload_libs_vec);
}
info!("Download ext_index.json, find the extension paths");
self.prepare_external_extensions(compute_state).await?;
info!("Downloading to shared preload libraries: {:?}", &libs_vec);
let mut download_tasks = Vec::new();
for library in &libs_vec {
download_tasks.push(self.download_extension(library, true));
download_tasks.push(self.download_extension(library));
}
let results = join_all(download_tasks).await;
for result in results {

View File

@@ -42,13 +42,15 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
}
}
pub fn launch_configurator(compute: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
pub fn launch_configurator(
compute: &Arc<ComputeNode>,
) -> Result<thread::JoinHandle<()>, std::io::Error> {
let compute = Arc::clone(compute);
Ok(thread::Builder::new()
thread::Builder::new()
.name("compute-configurator".into())
.spawn(move || {
configurator_main_loop(&compute);
info!("configurator thread is exited");
})?)
})
}

View File

@@ -1,77 +1,35 @@
// Download extension files from the extension store
// and put them in the right place in the postgres directory (share / lib)
// and put them in the right place in the postgres directory
/*
The layout of the S3 bucket is as follows:
5615610098 // this is an extension build number
├── v14
│   ├── extensions
│   │   ├── anon.tar.zst
│   │   └── embedding.tar.zst
│   └── ext_index.json
└── v15
├── extensions
│   ├── anon.tar.zst
│   └── embedding.tar.zst
└── ext_index.json
5615261079
├── v14
│   ├── extensions
│   │   └── anon.tar.zst
│   └── ext_index.json
└── v15
├── extensions
│   └── anon.tar.zst
└── ext_index.json
5623261088
├── v14
│   ├── extensions
│   │   └── embedding.tar.zst
│   └── ext_index.json
└── v15
├── extensions
│   └── embedding.tar.zst
└── ext_index.json
Note that build number cannot be part of prefix because we might need extensions
from other build numbers.
v14/ext_index.json
-- this contains information necessary to create control files
v14/extensions/test_ext1.tar.gz
-- this contains the library files and sql files necessary to create this extension
v14/extensions/custom_ext1.tar.gz
ext_index.json stores the control files and location of extension archives
The difference between a private and public extensions is determined by who can
load the extension this is specified in ext_index.json
We do not duplicate extension.tar.zst files.
We only upload a new one if it is updated.
*access* is controlled by spec
More specifically, here is an example ext_index.json
{
"embedding": {
"control_data": {
"embedding.control": "comment = 'hnsw index' \ndefault_version = '0.1.0' \nmodule_pathname = '$libdir/embedding' \nrelocatable = true \ntrusted = true"
},
"archive_path": "5623261088/v15/extensions/embedding.tar.zst"
},
"anon": {
"control_data": {
"anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n"
},
"archive_path": "5615261079/v15/extensions/anon.tar.zst"
}
}
Speicially, ext_index.json has a list of public extensions, and a list of
extensions enabled for specific tenant-ids.
*/
use crate::compute::ComputeNode;
use anyhow::Context;
use anyhow::{self, Result};
use futures::future::join_all;
use flate2::read::GzDecoder;
use remote_storage::*;
use serde_json;
use std::collections::HashMap;
use std::io::Read;
use serde_json::{self, Value};
use std::collections::HashSet;
use std::num::{NonZeroU32, NonZeroUsize};
use std::path::Path;
use std::str;
use std::sync::Arc;
use std::thread;
use tar::Archive;
use tokio::io::AsyncReadExt;
use tracing::info;
use tracing::log::warn;
use zstd::stream::read::Decoder;
fn get_pg_config(argument: &str, pgbin: &str) -> String {
// gives the result of `pg_config [argument]`
@@ -103,148 +61,154 @@ pub fn get_pg_version(pgbin: &str) -> String {
panic!("Unsuported postgres version {human_version}");
}
// download control files for enabled_extensions
// return the paths in s3 to the archives containing the actual extension files
// for use in creating the extension
// download extension control files
// if custom_ext_prefixes is provided - search also in custom extension paths
pub async fn get_available_extensions(
remote_storage: &GenericRemoteStorage,
pgbin: &str,
pg_version: &str,
custom_extensions: &[String],
build_tag: &str,
) -> Result<(HashMap<String, RemotePath>, HashMap<String, String>)> {
custom_ext_prefixes: &[String],
) -> Result<HashSet<String>> {
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
let index_path = format!("{build_tag}/{pg_version}/ext_index.json");
let index_path = pg_version.to_owned() + "/ext_index.json";
let index_path = RemotePath::new(Path::new(&index_path)).context("error forming path")?;
info!("download ext_index.json from: {:?}", &index_path);
info!("download ext_index.json: {:?}", &index_path);
let mut download = better_download(remote_storage, &index_path).await?;
let mut ext_idx_buffer = Vec::new();
// TODO: potential optimization: cache ext_index.json
let mut download = remote_storage.download(&index_path).await?;
let mut write_data_buffer = Vec::new();
download
.download_stream
.read_to_end(&mut ext_idx_buffer)
.read_to_end(&mut write_data_buffer)
.await?;
let ext_index_str = match str::from_utf8(&write_data_buffer) {
Ok(v) => v,
Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
};
#[derive(Debug, serde::Deserialize)]
struct Index {
public_extensions: Vec<String>,
library_index: HashMap<String, String>,
extension_data: HashMap<String, ExtensionData>,
}
let ext_index_full: Value = serde_json::from_str(ext_index_str)?;
let ext_index_full = ext_index_full.as_object().context("error parsing json")?;
let control_data = ext_index_full["control_data"]
.as_object()
.context("json parse error")?;
let enabled_extensions = ext_index_full["enabled_extensions"]
.as_object()
.context("json parse error")?;
info!("{:?}", control_data.clone());
info!("{:?}", enabled_extensions.clone());
#[derive(Debug, serde::Deserialize)]
struct ExtensionData {
control_data: HashMap<String, String>,
archive_path: String,
}
let ext_index_full = serde_json::from_slice::<Index>(&ext_idx_buffer)?;
let mut enabled_extensions = ext_index_full.public_extensions;
enabled_extensions.extend_from_slice(custom_extensions);
let library_index = ext_index_full.library_index;
let all_extension_data = ext_index_full.extension_data;
info!("library_index {:?}", &library_index);
info!("enabled_extensions: {:?}", enabled_extensions);
let mut ext_remote_paths = HashMap::new();
let mut file_create_tasks = Vec::new();
for extension in enabled_extensions {
let ext_data = &all_extension_data[&extension];
for (control_file, control_contents) in &ext_data.control_data {
let extension_name = control_file
.strip_suffix(".control")
.expect("control files must end in .control");
ext_remote_paths.insert(
extension_name.to_string(),
RemotePath::from_string(&ext_data.archive_path)?,
);
let control_path = local_sharedir.join(control_file);
info!("writing file {:?}{:?}", control_path, control_contents);
file_create_tasks.push(tokio::fs::write(control_path, control_contents));
let mut prefixes = vec!["public".to_string()];
prefixes.extend(custom_ext_prefixes.to_owned());
info!("{:?}", &prefixes);
let mut all_extensions = HashSet::new();
for prefix in prefixes {
let prefix_extensions = match enabled_extensions.get(&prefix) {
Some(Value::Array(ext_name)) => ext_name,
_ => {
info!("prefix {} has no extensions", prefix);
continue;
}
};
info!("{:?}", prefix_extensions);
for ext_name in prefix_extensions {
all_extensions.insert(ext_name.as_str().context("json parse error")?.to_string());
}
}
let results = join_all(file_create_tasks).await;
for result in results {
result?;
for prefix in &all_extensions {
let control_contents = control_data[prefix].as_str().context("json parse error")?;
let control_path = local_sharedir.join(prefix.to_owned() + ".control");
info!("WRITING FILE {:?}{:?}", control_path, control_contents);
std::fs::write(control_path, control_contents)?;
}
info!("ext_remote_paths {:?}", ext_remote_paths);
Ok((ext_remote_paths, library_index))
Ok(all_extensions.into_iter().collect())
}
// download the archive for a given extension,
// unzip it, and place files in the appropriate locations (share/lib)
// download all sqlfiles (and possibly data files) for a given extension name
pub async fn download_extension(
ext_name: &str,
ext_path: &RemotePath,
remote_storage: &GenericRemoteStorage,
pgbin: &str,
pg_version: &str,
) -> Result<()> {
info!("Download extension {:?} from {:?}", ext_name, ext_path);
let mut download = better_download(remote_storage, ext_path).await?;
let mut download_buffer = Vec::new();
// TODO: potential optimization: only download the extension if it doesn't exist
// problem: how would we tell if it exists?
let ext_name = ext_name.replace(".so", "");
let ext_name_targz = ext_name.to_owned() + ".tar.gz";
if Path::new(&ext_name_targz).exists() {
info!("extension {:?} already exists", ext_name_targz);
return Ok(());
}
let ext_path = RemotePath::new(
&Path::new(pg_version)
.join("extensions")
.join(ext_name_targz.clone()),
)?;
info!(
"Start downloading extension {:?} from {:?}",
ext_name, ext_path
);
let mut download = remote_storage.download(&ext_path).await?;
let mut write_data_buffer = Vec::new();
download
.download_stream
.read_to_end(&mut download_buffer)
.read_to_end(&mut write_data_buffer)
.await?;
let mut decoder = Decoder::new(download_buffer.as_slice())?;
let mut decompress_buffer = Vec::new();
decoder.read_to_end(&mut decompress_buffer)?;
let mut archive = Archive::new(decompress_buffer.as_slice());
let unzip_dest = pgbin
.strip_suffix("/bin/postgres")
.expect("bad pgbin")
.to_string()
+ "/download_extensions";
archive.unpack(&unzip_dest)?;
let unzip_dest = pgbin.strip_suffix("/bin/postgres").expect("bad pgbin");
let tar = GzDecoder::new(write_data_buffer.as_slice());
let mut archive = Archive::new(tar);
archive.unpack(unzip_dest)?;
info!("Download + unzip {:?} completed successfully", &ext_path);
let sharedir_paths = (
unzip_dest.to_string() + "/share/extension",
Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"),
);
let libdir_paths = (
unzip_dest.to_string() + "/lib",
Path::new(&get_pg_config("--libdir", pgbin)).join("postgresql"),
);
// move contents of the libdir / sharedir in unzipped archive to the correct local paths
for paths in [sharedir_paths, libdir_paths] {
let (zip_dir, real_dir) = paths;
info!("mv {zip_dir:?}/* {real_dir:?}");
for file in std::fs::read_dir(zip_dir)? {
let old_file = file?.path();
let new_file =
Path::new(&real_dir).join(old_file.file_name().context("error parsing file")?);
info!("moving {old_file:?} to {new_file:?}");
// extension download failed: Directory not empty (os error 39)
match std::fs::rename(old_file, new_file) {
Ok(()) => info!("move succeeded"),
Err(e) => {
warn!("move failed, probably because the extension already exists: {e}")
}
}
}
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
let zip_sharedir = format!("{unzip_dest}/extensions/{ext_name}/share/extension");
info!("mv {zip_sharedir:?}/* {local_sharedir:?}");
for file in std::fs::read_dir(zip_sharedir)? {
let old_file = file?.path();
let new_file =
Path::new(&local_sharedir).join(old_file.file_name().context("error parsing file")?);
std::fs::rename(old_file, new_file)?;
}
let local_libdir = Path::new(&get_pg_config("--libdir", pgbin)).join("postgresql");
let zip_libdir = format!("{unzip_dest}/extensions/{ext_name}/lib");
info!("mv {zip_libdir:?}/* {local_libdir:?}");
for file in std::fs::read_dir(zip_libdir)? {
let old_file = file?.path();
let new_file =
Path::new(&local_libdir).join(old_file.file_name().context("error parsing file")?);
std::fs::rename(old_file, new_file)?;
}
Ok(())
}
// This function initializes the necessary structs to use remote storage (should be fairly cheap)
pub fn init_remote_storage(remote_ext_config: &str) -> anyhow::Result<GenericRemoteStorage> {
#[derive(Debug, serde::Deserialize)]
struct RemoteExtJson {
bucket: String,
region: String,
endpoint: Option<String>,
prefix: Option<String>,
}
let remote_ext_json = serde_json::from_str::<RemoteExtJson>(remote_ext_config)?;
// This function initializes the necessary structs to use remmote storage (should be fairly cheap)
pub fn init_remote_storage(
remote_ext_config: &str,
default_prefix: &str,
) -> anyhow::Result<GenericRemoteStorage> {
let remote_ext_config: serde_json::Value = serde_json::from_str(remote_ext_config)?;
let remote_ext_bucket = remote_ext_config["bucket"]
.as_str()
.context("config parse error")?;
let remote_ext_region = remote_ext_config["region"]
.as_str()
.context("config parse error")?;
let remote_ext_endpoint = remote_ext_config["endpoint"].as_str();
let remote_ext_prefix = remote_ext_config["prefix"]
.as_str()
.unwrap_or(default_prefix)
.to_string();
// TODO: potentially allow modification of other parameters
// however, default values should be fine for now
let config = S3Config {
bucket_name: remote_ext_json.bucket,
bucket_region: remote_ext_json.region,
prefix_in_bucket: remote_ext_json.prefix,
endpoint: remote_ext_json.endpoint,
bucket_name: remote_ext_bucket.to_string(),
bucket_region: remote_ext_region.to_string(),
prefix_in_bucket: Some(remote_ext_prefix),
endpoint: remote_ext_endpoint.map(|x| x.to_string()),
concurrency_limit: NonZeroUsize::new(100).expect("100 != 0"),
max_keys_per_list_response: None,
};
@@ -255,3 +219,19 @@ pub fn init_remote_storage(remote_ext_config: &str) -> anyhow::Result<GenericRem
};
GenericRemoteStorage::from_config(&config)
}
pub fn launch_download_extensions(
compute: &Arc<ComputeNode>,
) -> Result<thread::JoinHandle<()>, std::io::Error> {
let compute = Arc::clone(compute);
thread::Builder::new()
.name("download-extensions".into())
.spawn(move || {
info!("start download_extension_files");
let compute_state = compute.state.lock().expect("error unlocking compute.state");
compute
.prepare_external_extensions(&compute_state)
.expect("error preparing extensions");
info!("download_extension_files done, exiting thread");
})
}

View File

@@ -125,23 +125,13 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
(&Method::POST, route) if route.starts_with("/extension_server/") => {
info!("serving {:?} POST request", route);
info!("req.uri {:?}", req.uri());
let mut is_library = false;
if let Some(params) = req.uri().query() {
info!("serving {:?} POST request with params: {}", route, params);
if params == "is_library=true" {
is_library = true;
} else {
let mut resp = Response::new(Body::from("Wrong request parameters"));
*resp.status_mut() = StatusCode::BAD_REQUEST;
return resp;
}
}
let filename = route.split('/').last().unwrap().to_string();
info!("serving /extension_server POST request, filename: {filename:?} is_library: {is_library}");
info!(
"serving /extension_server POST request, filename: {:?}",
&filename
);
match compute.download_extension(&filename, is_library).await {
match compute.download_extension(&filename).await {
Ok(_) => Response::new(Body::from("OK")),
Err(e) => {
error!("extension download failed: {}", e);

View File

@@ -105,10 +105,10 @@ fn watch_compute_activity(compute: &ComputeNode) {
}
/// Launch a separate compute monitor thread and return its `JoinHandle`.
pub fn launch_monitor(state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
pub fn launch_monitor(state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>, std::io::Error> {
let state = Arc::clone(state);
Ok(thread::Builder::new()
thread::Builder::new()
.name("compute-monitor".into())
.spawn(move || watch_compute_activity(&state))?)
.spawn(move || watch_compute_activity(&state))
}

View File

@@ -19,7 +19,7 @@ const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // mil
/// Escape a string for including it in a SQL literal. Wrapping the result
/// with `E'{}'` or `'{}'` is not required, as it returns a ready-to-use
/// SQL string literal, e.g. `'db'''` or `E'db\\'`.
/// See <https://github.com/postgres/postgres/blob/da98d005cdbcd45af563d0c4ac86d0e9772cd15f/src/backend/utils/adt/quote.c#L47>
/// See https://github.com/postgres/postgres/blob/da98d005cdbcd45af563d0c4ac86d0e9772cd15f/src/backend/utils/adt/quote.c#L47
/// for the original implementation.
pub fn escape_literal(s: &str) -> String {
let res = s.replace('\'', "''").replace('\\', "\\\\");

View File

@@ -10,7 +10,7 @@
//! (non-Neon binaries don't necessarily follow our pidfile conventions).
//! The pid stored in the file is later used to stop the service.
//!
//! See the [`lock_file`](utils::lock_file) module for more info.
//! See [`lock_file`] module for more info.
use std::ffi::OsStr;
use std::io::Write;

View File

@@ -2,9 +2,8 @@
//!
//! In the local test environment, the data for each safekeeper is stored in
//!
//! ```text
//! .neon/safekeepers/<safekeeper id>
//! ```
//!
use anyhow::Context;
use std::path::PathBuf;

View File

@@ -2,9 +2,7 @@
//!
//! In the local test environment, the data for each endpoint is stored in
//!
//! ```text
//! .neon/endpoints/<endpoint id>
//! ```
//!
//! Some basic information about the endpoint, like the tenant and timeline IDs,
//! are stored in the `endpoint.json` file. The `endpoint.json` file is created
@@ -24,7 +22,7 @@
//!
//! Directory contents:
//!
//! ```text
//! ```ignore
//! .neon/endpoints/main/
//! compute.log - log output of `compute_ctl` and `postgres`
//! endpoint.json - serialized `EndpointConf` struct
@@ -289,7 +287,7 @@ impl Endpoint {
.env
.safekeepers
.iter()
.map(|sk| format!("localhost:{}", sk.get_compute_port()))
.map(|sk| format!("localhost:{}", sk.pg_port))
.collect::<Vec<String>>()
.join(",");
conf.append("neon.safekeepers", &safekeepers);
@@ -318,7 +316,7 @@ impl Endpoint {
.env
.safekeepers
.iter()
.map(|x| x.get_compute_port().to_string())
.map(|x| x.pg_port.to_string())
.collect::<Vec<_>>()
.join(",");
let sk_hosts = vec!["localhost"; self.env.safekeepers.len()].join(",");
@@ -468,7 +466,7 @@ impl Endpoint {
.iter()
.find(|node| node.id == sk_id)
.ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.pg_port));
}
}
@@ -499,7 +497,9 @@ impl Endpoint {
//
// The proper way to implement this is to pass the custom extension
// in spec, but we don't have a way to do that yet in the python tests.
custom_extensions: Some(vec!["kq_imcx".into()]),
// NEW HACK: we enable the anon custom extension for everyone! this is of course just for testing
// how will we do it for real?
custom_extensions: Some(vec!["123454321".to_string(), self.tenant_id.to_string()]),
};
let spec_path = self.endpoint_path().join("spec.json");
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;

View File

@@ -137,7 +137,6 @@ impl Default for PageServerConf {
pub struct SafekeeperConf {
pub id: NodeId,
pub pg_port: u16,
pub pg_tenant_only_port: Option<u16>,
pub http_port: u16,
pub sync: bool,
pub remote_storage: Option<String>,
@@ -150,7 +149,6 @@ impl Default for SafekeeperConf {
Self {
id: NodeId(0),
pg_port: 0,
pg_tenant_only_port: None,
http_port: 0,
sync: true,
remote_storage: None,
@@ -160,14 +158,6 @@ impl Default for SafekeeperConf {
}
}
impl SafekeeperConf {
/// Compute is served by port on which only tenant scoped tokens allowed, if
/// it is configured.
pub fn get_compute_port(&self) -> u16 {
self.pg_tenant_only_port.unwrap_or(self.pg_port)
}
}
impl LocalEnv {
pub fn pg_distrib_dir_raw(&self) -> PathBuf {
self.pg_distrib_dir.clone()

View File

@@ -2,9 +2,8 @@
//!
//! In the local test environment, the data for each safekeeper is stored in
//!
//! ```text
//! .neon/safekeepers/<safekeeper id>
//! ```
//!
use std::io::Write;
use std::path::PathBuf;
use std::process::Child;
@@ -120,55 +119,45 @@ impl SafekeeperNode {
let availability_zone = format!("sk-{}", id_string);
let mut args = vec![
"-D".to_owned(),
datadir
.to_str()
.with_context(|| {
format!("Datadir path {datadir:?} cannot be represented as a unicode string")
})?
.to_owned(),
"--id".to_owned(),
id_string,
"--listen-pg".to_owned(),
listen_pg,
"--listen-http".to_owned(),
listen_http,
"--availability-zone".to_owned(),
availability_zone,
"-D",
datadir.to_str().with_context(|| {
format!("Datadir path {datadir:?} cannot be represented as a unicode string")
})?,
"--id",
&id_string,
"--listen-pg",
&listen_pg,
"--listen-http",
&listen_http,
"--availability-zone",
&availability_zone,
];
if let Some(pg_tenant_only_port) = self.conf.pg_tenant_only_port {
let listen_pg_tenant_only = format!("127.0.0.1:{}", pg_tenant_only_port);
args.extend(["--listen-pg-tenant-only".to_owned(), listen_pg_tenant_only]);
}
if !self.conf.sync {
args.push("--no-sync".to_owned());
args.push("--no-sync");
}
let broker_endpoint = format!("{}", self.env.broker.client_url());
args.extend(["--broker-endpoint".to_owned(), broker_endpoint]);
args.extend(["--broker-endpoint", &broker_endpoint]);
let mut backup_threads = String::new();
if let Some(threads) = self.conf.backup_threads {
backup_threads = threads.to_string();
args.extend(["--backup-threads".to_owned(), backup_threads]);
args.extend(["--backup-threads", &backup_threads]);
} else {
drop(backup_threads);
}
if let Some(ref remote_storage) = self.conf.remote_storage {
args.extend(["--remote-storage".to_owned(), remote_storage.clone()]);
args.extend(["--remote-storage", remote_storage]);
}
let key_path = self.env.base_data_dir.join("auth_public_key.pem");
if self.conf.auth_enabled {
args.extend([
"--auth-validation-public-key-path".to_owned(),
key_path
.to_str()
.with_context(|| {
format!("Key path {key_path:?} cannot be represented as a unicode string")
})?
.to_owned(),
"--auth-validation-public-key-path",
key_path.to_str().with_context(|| {
format!("Key path {key_path:?} cannot be represented as a unicode string")
})?,
]);
}

View File

@@ -30,8 +30,8 @@ or similar, to wake up on shutdown.
In async Rust, futures can be "cancelled" at any await point, by
dropping the Future. For example, `tokio::select!` returns as soon as
one of the Futures returns, and drops the others. `tokio::time::timeout`
is another example. In the Rust ecosystem, some functions are
one of the Futures returns, and drops the others. `tokio::timeout!` is
another example. In the Rust ecosystem, some functions are
cancellation-safe, meaning they can be safely dropped without
side-effects, while others are not. See documentation of
`tokio::select!` for examples.
@@ -42,9 +42,9 @@ function that you call cannot be assumed to be async
cancellation-safe, and must be polled to completion.
The downside of non-cancellation safe code is that you have to be very
careful when using `tokio::select!`, `tokio::time::timeout`, and other
such functions that can cause a Future to be dropped. They can only be
used with functions that are explicitly documented to be cancellation-safe,
careful when using `tokio::select!`, `tokio::timeout!`, and other such
functions that can cause a Future to be dropped. They can only be used
with functions that are explicitly documented to be cancellation-safe,
or you need to spawn a separate task to shield from the cancellation.
At the entry points to the code, we also take care to poll futures to

View File

@@ -141,60 +141,29 @@ popular extensions.
The layout of the S3 bucket is as follows:
```
5615610098 // this is an extension build number
├── v14
│   ├── extensions
│   │   ├── anon.tar.zst
│   │   └── embedding.tar.zst
│   └── ext_index.json
└── v15
├── extensions
│   ├── anon.tar.zst
│   └── embedding.tar.zst
└── ext_index.json
5615261079
├── v14
│   ├── extensions
│   │   └── anon.tar.zst
│   └── ext_index.json
└── v15
├── extensions
│   └── anon.tar.zst
└── ext_index.json
5623261088
├── v14
│   ├── extensions
│   │   └── embedding.tar.zst
│   └── ext_index.json
└── v15
├── extensions
│   └── embedding.tar.zst
└── ext_index.json
v14/ext_index.json
-- this contains information necessary to create control files
v14/extensions/test_ext1.tar.gz
-- this contains the library files and sql files necessary to create this extension
v14/extensions/custom_ext1.tar.gz
```
Note that build number cannot be part of prefix because we might need extensions
from other build numbers.
ext_index.json stores the control files and location of extension archives
We do not duplicate extension.tar.zst files.
We only upload a new one if it is updated.
*access* is controlled by spec
More specifically, here is an example ext_index.json
The difference between private and public extensions is determined by who can
load the extension. This is specified in `ext_index.json`.
Speicially, `ext_index.json` has a list of public extensions, and a list of
extensions enabled for specific tenant-ids. Here is an example `ext_index.json`:
```
{
"embedding": {
"control_data": {
"embedding.control": "comment = 'hnsw index' \ndefault_version = '0.1.0' \nmodule_pathname = '$libdir/embedding' \nrelocatable = true \ntrusted = true"
},
"archive_path": "5623261088/v15/extensions/embedding.tar.zst"
"enabled_extensions": {
"123454321": [
"anon"
],
"public": [
"embedding"
]
},
"anon": {
"control_data": {
"anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n"
},
"archive_path": "5615261079/v15/extensions/anon.tar.zst"
"control_data": {
"embedding": "comment = 'hnsw index' \ndefault_version = '0.1.0' \nmodule_pathname = '$libdir/embedding' \nrelocatable = true \ntrusted = true",
"anon": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n"
}
}
```
@@ -202,7 +171,7 @@ More specifically, here is an example ext_index.json
### How to add new extension to the Extension Storage?
Simply upload build artifacts to the S3 bucket.
Implement a CI step for that. Splitting it from compute-node-image build.
Implement a CI step for that. Splitting it from ompute-node-image build.
### How do we deal with extension versions and updates?

View File

@@ -1,22 +0,0 @@
# Useful development tools
This readme contains some hints on how to set up some optional development tools.
## ccls
[ccls](https://github.com/MaskRay/ccls) is a c/c++ language server. It requires some setup
to work well. There are different ways to do it but here's what works for me:
1. Make a common parent directory for all your common neon projects. (for example, `~/src/neondatabase/`)
2. Go to `vendor/postgres-v15`
3. Run `make clean && ./configure`
4. Install [bear](https://github.com/rizsotto/Bear), and run `bear -- make -j4`
5. Copy the generated `compile_commands.json` to `~/src/neondatabase` (or equivalent)
6. Run `touch ~/src/neondatabase/.ccls-root` this will make the `compile_commands.json` file discoverable in all subdirectories
With this setup you will get decent lsp mileage inside the postgres repo, and also any postgres extensions that you put in `~/src/neondatabase/`, like `pg_embedding`, or inside `~/src/neondatabase/neon/pgxn` as well.
Some additional tips for various IDEs:
### Emacs
To improve performance: `(setq lsp-lens-enable nil)`

View File

@@ -1,4 +1,4 @@
//! Helpers for observing duration on `HistogramVec` / `CounterVec` / `GaugeVec` / `MetricVec<T>`.
//! Helpers for observing duration on HistogramVec / CounterVec / GaugeVec / MetricVec<T>.
use std::{future::Future, time::Instant};

View File

@@ -9,7 +9,6 @@ use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use strum_macros;
use utils::{
completion,
history_buffer::HistoryBufferWithDropCounter,
id::{NodeId, TenantId, TimelineId},
lsn::Lsn,
@@ -77,12 +76,7 @@ pub enum TenantState {
/// system is being shut down.
///
/// Transitions out of this state are possible through `set_broken()`.
Stopping {
// Because of https://github.com/serde-rs/serde/issues/2105 this has to be a named field,
// otherwise it will not be skipped during deserialization
#[serde(skip)]
progress: completion::Barrier,
},
Stopping,
/// The tenant is recognized by the pageserver, but can no longer be used for
/// any operations.
///
@@ -124,7 +118,7 @@ impl TenantState {
// Why is Stopping a Maybe case? Because, during pageserver shutdown,
// we set the Stopping state irrespective of whether the tenant
// has finished attaching or not.
Self::Stopping { .. } => Maybe,
Self::Stopping => Maybe,
}
}
@@ -417,16 +411,12 @@ pub struct LayerResidenceEvent {
pub reason: LayerResidenceEventReason,
}
/// The reason for recording a given [`LayerResidenceEvent`].
/// The reason for recording a given [`ResidenceEvent`].
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum LayerResidenceEventReason {
/// The layer map is being populated, e.g. during timeline load or attach.
/// This includes [`RemoteLayer`] objects created in [`reconcile_with_remote`].
/// We need to record such events because there is no persistent storage for the events.
///
// https://github.com/rust-lang/rust/issues/74481
/// [`RemoteLayer`]: ../../tenant/storage_layer/struct.RemoteLayer.html
/// [`reconcile_with_remote`]: ../../tenant/struct.Timeline.html#method.reconcile_with_remote
LayerLoad,
/// We just created the layer (e.g., freeze_and_flush or compaction).
/// Such layers are always [`LayerResidenceStatus::Resident`].
@@ -934,13 +924,7 @@ mod tests {
"Activating",
),
(line!(), TenantState::Active, "Active"),
(
line!(),
TenantState::Stopping {
progress: utils::completion::Barrier::default(),
},
"Stopping",
),
(line!(), TenantState::Stopping, "Stopping"),
(
line!(),
TenantState::Broken {

View File

@@ -60,9 +60,8 @@ impl Ord for RelTag {
/// Display RelTag in the same format that's used in most PostgreSQL debug messages:
///
/// ```text
/// <spcnode>/<dbnode>/<relnode>[_fsm|_vm|_init]
/// ```
///
impl fmt::Display for RelTag {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(forkname) = forknumber_to_name(self.forknum) {

View File

@@ -49,16 +49,14 @@ pub fn forknumber_to_name(forknum: u8) -> Option<&'static str> {
}
}
///
/// Parse a filename of a relation file. Returns (relfilenode, forknum, segno) tuple.
///
/// Formats:
///
/// ```text
/// <oid>
/// <oid>_<fork name>
/// <oid>.<segment number>
/// <oid>_<fork name>.<segment number>
/// ```
///
/// See functions relpath() and _mdfd_segpath() in PostgreSQL sources.
///

View File

@@ -5,11 +5,11 @@
//! It is similar to what tokio_util::codec::Framed with appropriate codec
//! provides, but `FramedReader` and `FramedWriter` read/write parts can be used
//! separately without using split from futures::stream::StreamExt (which
//! allocates a [Box] in polling internally). tokio::io::split is used for splitting
//! allocates box[1] in polling internally). tokio::io::split is used for splitting
//! instead. Plus we customize error messages more than a single type for all io
//! calls.
//!
//! [Box]: https://docs.rs/futures-util/0.3.26/src/futures_util/lock/bilock.rs.html#107
//! [1] https://docs.rs/futures-util/0.3.26/src/futures_util/lock/bilock.rs.html#107
use bytes::{Buf, BytesMut};
use std::{
future::Future,
@@ -117,7 +117,7 @@ impl<S: AsyncWrite + Unpin> Framed<S> {
impl<S: AsyncRead + AsyncWrite + Unpin> Framed<S> {
/// Split into owned read and write parts. Beware of potential issues with
/// using halves in different tasks on TLS stream:
/// <https://github.com/tokio-rs/tls/issues/40>
/// https://github.com/tokio-rs/tls/issues/40
pub fn split(self) -> (FramedReader<S>, FramedWriter<S>) {
let (read_half, write_half) = tokio::io::split(self.stream);
let reader = FramedReader {

View File

@@ -934,15 +934,6 @@ impl<'a> BeMessage<'a> {
}
}
fn terminate_code(code: &[u8; 5]) -> [u8; 6] {
let mut terminated = [0; 6];
for (i, &elem) in code.iter().enumerate() {
terminated[i] = elem;
}
terminated
}
#[cfg(test)]
mod tests {
use super::*;
@@ -974,3 +965,12 @@ mod tests {
assert_eq!(split_options(&params), ["foo bar", " \\", "baz ", "lol"]);
}
}
fn terminate_code(code: &[u8; 5]) -> [u8; 6] {
let mut terminated = [0; 6];
for (i, &elem) in code.iter().enumerate() {
terminated[i] = elem;
}
terminated
}

View File

@@ -24,7 +24,6 @@ use tokio::io;
use toml_edit::Item;
use tracing::info;
pub use self::s3_bucket::better_download;
pub use self::{local_fs::LocalFs, s3_bucket::S3Bucket, simulate_failures::UnreliableWrapper};
/// How many different timelines can be processed simultaneously when synchronizing layers with the remote storage.
@@ -35,12 +34,12 @@ pub const DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS: usize = 50;
pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10;
/// Currently, sync happens with AWS S3, that has two limits on requests per second:
/// ~200 RPS for IAM services
/// <https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html>
/// https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html
/// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests
/// <https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/>
/// https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/
pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
/// No limits on the client side, which currenltly means 1000 for AWS S3.
/// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>
/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax
pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;
const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
@@ -51,12 +50,6 @@ const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct RemotePath(PathBuf);
impl std::fmt::Display for RemotePath {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.display())
}
}
impl RemotePath {
pub fn new(relative_path: &Path) -> anyhow::Result<Self> {
anyhow::ensure!(
@@ -66,10 +59,6 @@ impl RemotePath {
Ok(Self(relative_path.to_path_buf()))
}
pub fn from_string(relative_path: &str) -> anyhow::Result<Self> {
Self::new(Path::new(relative_path))
}
pub fn with_base(&self, base_path: &Path) -> PathBuf {
base_path.join(&self.0)
}

View File

@@ -151,7 +151,10 @@ impl RemoteStorage for LocalFs {
let mut files = vec![];
let mut directory_queue = vec![full_path.clone()];
while let Some(cur_folder) = directory_queue.pop() {
while !directory_queue.is_empty() {
let cur_folder = directory_queue
.pop()
.expect("queue cannot be empty: we just checked");
let mut entries = fs::read_dir(cur_folder.clone()).await?;
while let Some(entry) = entries.next_entry().await? {
let file_name: PathBuf = entry.file_name().into();

View File

@@ -31,8 +31,7 @@ use tracing::debug;
use super::StorageMetadata;
use crate::{
Download, DownloadError, GenericRemoteStorage, RemotePath, RemoteStorage, S3Config,
REMOTE_STORAGE_PREFIX_SEPARATOR,
Download, DownloadError, RemotePath, RemoteStorage, S3Config, REMOTE_STORAGE_PREFIX_SEPARATOR,
};
const MAX_DELETE_OBJECTS_REQUEST_SIZE: usize = 1000;
@@ -132,39 +131,6 @@ struct GetObjectRequest {
key: String,
range: Option<String>,
}
use crate::GenericRemoteStorage::AwsS3;
// the regular download function adds a "/" to the start of file names in the
// case of prefix="None", which breaks everything. Thus, the following function is necessary
pub async fn better_download(
bucket: &GenericRemoteStorage,
from: &RemotePath,
) -> Result<Download, DownloadError> {
if let AwsS3(bucket) = bucket {
// this is more expected behavior.
// prefix="" should result in a trailing slash
// wheras prefix=None should **NOT** result in a trailing slash
let query_key = match &bucket.prefix_in_bucket {
Some(_) => bucket.relative_path_to_s3_object(from),
None => from
.get_path()
.to_str()
.expect("bad object name")
.to_string(),
};
bucket
.download_object(GetObjectRequest {
bucket: bucket.bucket_name.clone(),
key: query_key,
range: None,
})
.await
} else {
panic!("this isn't supposed to happen");
}
}
impl S3Bucket {
/// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
pub fn new(aws_config: &S3Config) -> anyhow::Result<Self> {

View File

@@ -21,7 +21,7 @@ use crate::{SegmentMethod, SegmentSizeResult, SizeResult, StorageModel};
// 2. D+C+a+b
// 3. D+A+B
/// `Segment` which has had its size calculated.
/// [`Segment`] which has had it's size calculated.
#[derive(Clone, Debug)]
struct SegmentSize {
method: SegmentMethod,

View File

@@ -33,7 +33,7 @@ pub enum OtelName<'a> {
/// directly into HTTP servers. However, I couldn't find one for Hyper,
/// so I had to write our own. OpenTelemetry website has a registry of
/// instrumentation libraries at:
/// <https://opentelemetry.io/registry/?language=rust&component=instrumentation>
/// https://opentelemetry.io/registry/?language=rust&component=instrumentation
/// If a Hyper crate appears, consider switching to that.
pub async fn tracing_handler<F, R>(
req: Request<Body>,

View File

@@ -16,7 +16,7 @@ use crate::id::TenantId;
/// Algorithm to use. We require EdDSA.
const STORAGE_TOKEN_ALGORITHM: Algorithm = Algorithm::EdDSA;
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq)]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum Scope {
// Provides access to all data for a specific tenant (specified in `struct Claims` below)

View File

@@ -12,13 +12,6 @@ pub struct Completion(mpsc::Sender<()>);
#[derive(Clone)]
pub struct Barrier(Arc<Mutex<mpsc::Receiver<()>>>);
impl Default for Barrier {
fn default() -> Self {
let (_, rx) = channel();
rx
}
}
impl Barrier {
pub async fn wait(self) {
self.0.lock().await.recv().await;
@@ -31,15 +24,6 @@ impl Barrier {
}
}
impl PartialEq for Barrier {
fn eq(&self, other: &Self) -> bool {
// we don't use dyn so this is good
Arc::ptr_eq(&self.0, &other.0)
}
}
impl Eq for Barrier {}
/// Create new Guard and Barrier pair.
pub fn channel() -> (Completion, Barrier) {
let (tx, rx) = mpsc::channel::<()>(1);

View File

@@ -1,111 +0,0 @@
/// Create a reporter for an error that outputs similar to [`anyhow::Error`] with Display with alternative setting.
///
/// It can be used with `anyhow::Error` as well.
///
/// Why would one use this instead of converting to `anyhow::Error` on the spot? Because
/// anyhow::Error would also capture a stacktrace on the spot, which you would later discard after
/// formatting.
///
/// ## Usage
///
/// ```rust
/// #[derive(Debug, thiserror::Error)]
/// enum MyCoolError {
/// #[error("should never happen")]
/// Bad(#[source] std::io::Error),
/// }
///
/// # fn failing_call() -> Result<(), MyCoolError> { Err(MyCoolError::Bad(std::io::ErrorKind::PermissionDenied.into())) }
///
/// # fn main() {
/// use utils::error::report_compact_sources;
///
/// if let Err(e) = failing_call() {
/// let e = report_compact_sources(&e);
/// assert_eq!(format!("{e}"), "should never happen: permission denied");
/// }
/// # }
/// ```
///
/// ## TODO
///
/// When we are able to describe return position impl trait in traits, this should of course be an
/// extension trait. Until then avoid boxing with this more ackward interface.
pub fn report_compact_sources<E: std::error::Error>(e: &E) -> impl std::fmt::Display + '_ {
struct AnyhowDisplayAlternateAlike<'a, E>(&'a E);
impl<E: std::error::Error> std::fmt::Display for AnyhowDisplayAlternateAlike<'_, E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)?;
// why is E a generic parameter here? hope that rustc will see through a default
// Error::source implementation and leave the following out if there cannot be any
// sources:
Sources(self.0.source()).try_for_each(|src| write!(f, ": {}", src))
}
}
struct Sources<'a>(Option<&'a (dyn std::error::Error + 'static)>);
impl<'a> Iterator for Sources<'a> {
type Item = &'a (dyn std::error::Error + 'static);
fn next(&mut self) -> Option<Self::Item> {
let rem = self.0;
let next = self.0.and_then(|x| x.source());
self.0 = next;
rem
}
}
AnyhowDisplayAlternateAlike(e)
}
#[cfg(test)]
mod tests {
use super::report_compact_sources;
#[test]
fn report_compact_sources_examples() {
use std::fmt::Write;
#[derive(Debug, thiserror::Error)]
enum EvictionError {
#[error("cannot evict a remote layer")]
CannotEvictRemoteLayer,
#[error("stat failed")]
StatFailed(#[source] std::io::Error),
#[error("layer was no longer part of LayerMap")]
LayerNotFound(#[source] anyhow::Error),
}
let examples = [
(
line!(),
EvictionError::CannotEvictRemoteLayer,
"cannot evict a remote layer",
),
(
line!(),
EvictionError::StatFailed(std::io::ErrorKind::PermissionDenied.into()),
"stat failed: permission denied",
),
(
line!(),
EvictionError::LayerNotFound(anyhow::anyhow!("foobar")),
"layer was no longer part of LayerMap: foobar",
),
];
let mut s = String::new();
for (line, example, expected) in examples {
s.clear();
write!(s, "{}", report_compact_sources(&example)).expect("string grows");
assert_eq!(s, expected, "example on line {line}");
}
}
}

View File

@@ -14,7 +14,7 @@ pub async fn json_request<T: for<'de> Deserialize<'de>>(
.map_err(ApiError::BadRequest)
}
/// Will be removed as part of <https://github.com/neondatabase/neon/issues/4282>
/// Will be removed as part of https://github.com/neondatabase/neon/issues/4282
pub async fn json_request_or_empty_body<T: for<'de> Deserialize<'de>>(
request: &mut Request<Body>,
) -> Result<Option<T>, ApiError> {

View File

@@ -63,9 +63,6 @@ pub mod rate_limit;
/// Simple once-barrier and a guard which keeps barrier awaiting.
pub mod completion;
/// Reporting utilities
pub mod error;
mod failpoint_macro_helpers {
/// use with fail::cfg("$name", "return(2000)")
@@ -133,8 +130,8 @@ pub use failpoint_macro_helpers::failpoint_sleep_helper;
/// Note that with git_version prefix is `git:` and in case of git version from env its `git-env:`.
///
/// #############################################################################################
/// TODO this macro is not the way the library is intended to be used, see <https://github.com/neondatabase/neon/issues/1565> for details.
/// We use `cachepot` to reduce our current CI build times: <https://github.com/neondatabase/cloud/pull/1033#issuecomment-1100935036>
/// TODO this macro is not the way the library is intended to be used, see https://github.com/neondatabase/neon/issues/1565 for details.
/// We use `cachepot` to reduce our current CI build times: https://github.com/neondatabase/cloud/pull/1033#issuecomment-1100935036
/// Yet, it seems to ignore the GIT_VERSION env variable, passed to Docker build, even with build.rs that contains
/// `println!("cargo:rerun-if-env-changed=GIT_VERSION");` code for cachepot cache invalidation.
/// The problem needs further investigation and regular `const` declaration instead of a macro.

View File

@@ -1,10 +1,9 @@
//! A module to create and read lock files.
//!
//! File locking is done using [`fcntl::flock`] exclusive locks.
//! The only consumer of this module is currently
//! [`pid_file`](crate::pid_file). See the module-level comment
//! there for potential pitfalls with lock files that are used
//! to store PIDs (pidfiles).
//! The only consumer of this module is currently [`pid_file`].
//! See the module-level comment there for potential pitfalls
//! with lock files that are used to store PIDs (pidfiles).
use std::{
fs,
@@ -82,7 +81,7 @@ pub fn create_exclusive(lock_file_path: &Path) -> anyhow::Result<UnwrittenLockFi
}
/// Returned by [`read_and_hold_lock_file`].
/// Check out the [`pid_file`](crate::pid_file) module for what the variants mean
/// Check out the [`pid_file`] module for what the variants mean
/// and potential caveats if the lock files that are used to store PIDs.
pub enum LockFileRead {
/// No file exists at the given path.

View File

@@ -112,7 +112,7 @@ pub fn init(
///
/// When the return value is dropped, the hook is reverted to std default hook (prints to stderr).
/// If the assumptions about the initialization order are not held, use
/// [`TracingPanicHookGuard::forget`] but keep in mind, if tracing is stopped, then panics will be
/// [`TracingPanicHookGuard::disarm`] but keep in mind, if tracing is stopped, then panics will be
/// lost.
#[must_use]
pub fn replace_panic_hook_with_tracing_panic_hook() -> TracingPanicHookGuard {

View File

@@ -23,9 +23,9 @@ pub enum SeqWaitError {
/// Monotonically increasing value
///
/// It is handy to store some other fields under the same mutex in `SeqWait<S>`
/// It is handy to store some other fields under the same mutex in SeqWait<S>
/// (e.g. store prev_record_lsn). So we allow SeqWait to be parametrized with
/// any type that can expose counter. `V` is the type of exposed counter.
/// any type that can expose counter. <V> is the type of exposed counter.
pub trait MonotonicCounter<V> {
/// Bump counter value and check that it goes forward
/// N.B.: new_val is an actual new value, not a difference.
@@ -90,7 +90,7 @@ impl<T: Ord> Eq for Waiter<T> {}
/// [`wait_for`]: SeqWait::wait_for
/// [`advance`]: SeqWait::advance
///
/// `S` means Storage, `V` is type of counter that this storage exposes.
/// <S> means Storage, <V> is type of counter that this storage exposes.
///
pub struct SeqWait<S, V>
where

View File

@@ -1,15 +1,8 @@
//! Assert that the current [`tracing::Span`] has a given set of fields.
//!
//! Can only produce meaningful positive results when tracing has been configured as in example.
//! Absence of `tracing_error::ErrorLayer` is not detected yet.
//!
//! `#[cfg(test)]` code will get a pass when using the `check_fields_present` macro in case tracing
//! is completly unconfigured.
//!
//! # Usage
//!
//! ```rust
//! # fn main() {
//! ```
//! use tracing_subscriber::prelude::*;
//! let registry = tracing_subscriber::registry()
//! .with(tracing_error::ErrorLayer::default());
@@ -27,18 +20,23 @@
//!
//! use utils::tracing_span_assert::{check_fields_present, MultiNameExtractor};
//! let extractor = MultiNameExtractor::new("TestExtractor", ["test", "test_id"]);
//! if let Err(missing) = check_fields_present!([&extractor]) {
//! // if you copypaste this to a custom assert method, remember to add #[track_caller]
//! // to get the "user" code location for the panic.
//! panic!("Missing fields: {missing:?}");
//! match check_fields_present([&extractor]) {
//! Ok(()) => {},
//! Err(missing) => {
//! panic!("Missing fields: {:?}", missing.into_iter().map(|f| f.name() ).collect::<Vec<_>>());
//! }
//! }
//! # }
//! ```
//!
//! Recommended reading: <https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering>
//! Recommended reading: https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
//!
#[derive(Debug)]
use std::{
collections::HashSet,
fmt::{self},
hash::{Hash, Hasher},
};
pub enum ExtractionResult {
Present,
Absent,
@@ -73,101 +71,49 @@ impl<const L: usize> Extractor for MultiNameExtractor<L> {
}
}
/// Checks that the given extractors are satisfied with the current span hierarchy.
///
/// This should not be called directly, but used through [`check_fields_present`] which allows
/// `Summary::Unconfigured` only when the calling crate is being `#[cfg(test)]` as a conservative default.
#[doc(hidden)]
pub fn check_fields_present0<const L: usize>(
must_be_present: [&dyn Extractor; L],
) -> Result<Summary, Vec<&dyn Extractor>> {
let mut missing = must_be_present.into_iter().collect::<Vec<_>>();
let trace = tracing_error::SpanTrace::capture();
trace.with_spans(|md, _formatted_fields| {
// when trying to understand the inner workings of how does the matching work, note that
// this closure might be called zero times if the span is disabled. normally it is called
// once per span hierarchy level.
missing.retain(|extractor| match extractor.extract(md.fields()) {
ExtractionResult::Present => false,
ExtractionResult::Absent => true,
});
struct MemoryIdentity<'a>(&'a dyn Extractor);
// continue walking up until we've found all missing
!missing.is_empty()
});
if missing.is_empty() {
Ok(Summary::FoundEverything)
} else if !tracing_subscriber_configured() {
Ok(Summary::Unconfigured)
} else {
// we can still hit here if a tracing subscriber has been configured but the ErrorLayer is
// missing, which can be annoying. for this case, we could probably use
// SpanTrace::status().
//
// another way to end up here is with RUST_LOG=pageserver=off while configuring the
// logging, though I guess in that case the SpanTrace::status() == EMPTY would be valid.
// this case is covered by test `not_found_if_tracing_error_subscriber_has_wrong_filter`.
Err(missing)
impl<'a> MemoryIdentity<'a> {
fn as_ptr(&self) -> *const () {
self.0 as *const _ as *const ()
}
}
impl<'a> PartialEq for MemoryIdentity<'a> {
fn eq(&self, other: &Self) -> bool {
self.as_ptr() == other.as_ptr()
}
}
impl<'a> Eq for MemoryIdentity<'a> {}
impl<'a> Hash for MemoryIdentity<'a> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.as_ptr().hash(state);
}
}
impl<'a> fmt::Debug for MemoryIdentity<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:p}: {}", self.as_ptr(), self.0.name())
}
}
/// Checks that the given extractors are satisfied with the current span hierarchy.
///
/// The macro is the preferred way of checking if fields exist while passing checks if a test does
/// not have tracing configured.
///
/// Why mangled name? Because #[macro_export] will expose it at utils::__check_fields_present.
/// However we can game a module namespaced macro for `use` purposes by re-exporting the
/// #[macro_export] exported name with an alias (below).
#[doc(hidden)]
#[macro_export]
macro_rules! __check_fields_present {
($extractors:expr) => {{
{
use $crate::tracing_span_assert::{check_fields_present0, Summary::*, Extractor};
match check_fields_present0($extractors) {
Ok(FoundEverything) => Ok(()),
Ok(Unconfigured) if cfg!(test) => {
// allow unconfigured in tests
Ok(())
},
Ok(Unconfigured) => {
panic!("utils::tracing_span_assert: outside of #[cfg(test)] expected tracing to be configured with tracing_error::ErrorLayer")
},
Err(missing) => Err(missing)
}
}
}}
}
pub use crate::__check_fields_present as check_fields_present;
/// Explanation for why the check was deemed ok.
///
/// Mainly useful for testing, or configuring per-crate behaviour as in with
/// [`check_fields_present`].
#[derive(Debug)]
pub enum Summary {
/// All extractors were found.
///
/// Should only happen when tracing is properly configured.
FoundEverything,
/// Tracing has not been configured at all. This is ok for tests running without tracing set
/// up.
Unconfigured,
}
fn tracing_subscriber_configured() -> bool {
let mut noop_configured = false;
tracing::dispatcher::get_default(|d| {
// it is possible that this closure will not be invoked, but the current implementation
// always invokes it
noop_configured = d.is::<tracing::subscriber::NoSubscriber>();
/// The extractor names passed as keys to [`new`].
pub fn check_fields_present<const L: usize>(
must_be_present: [&dyn Extractor; L],
) -> Result<(), Vec<&dyn Extractor>> {
let mut missing: HashSet<MemoryIdentity> =
HashSet::from_iter(must_be_present.into_iter().map(|r| MemoryIdentity(r)));
let trace = tracing_error::SpanTrace::capture();
trace.with_spans(|md, _formatted_fields| {
missing.retain(|extractor| match extractor.0.extract(md.fields()) {
ExtractionResult::Present => false,
ExtractionResult::Absent => true,
});
!missing.is_empty() // continue walking up until we've found all missing
});
!noop_configured
if missing.is_empty() {
Ok(())
} else {
Err(missing.into_iter().map(|mi| mi.0).collect())
}
}
#[cfg(test)]
@@ -177,36 +123,6 @@ mod tests {
use super::*;
use std::{
collections::HashSet,
fmt::{self},
hash::{Hash, Hasher},
};
struct MemoryIdentity<'a>(&'a dyn Extractor);
impl<'a> MemoryIdentity<'a> {
fn as_ptr(&self) -> *const () {
self.0 as *const _ as *const ()
}
}
impl<'a> PartialEq for MemoryIdentity<'a> {
fn eq(&self, other: &Self) -> bool {
self.as_ptr() == other.as_ptr()
}
}
impl<'a> Eq for MemoryIdentity<'a> {}
impl<'a> Hash for MemoryIdentity<'a> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.as_ptr().hash(state);
}
}
impl<'a> fmt::Debug for MemoryIdentity<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:p}: {}", self.as_ptr(), self.0.name())
}
}
struct Setup {
_current_thread_subscriber_guard: tracing::subscriber::DefaultGuard,
tenant_extractor: MultiNameExtractor<2>,
@@ -243,8 +159,7 @@ mod tests {
let setup = setup_current_thread();
let span = tracing::info_span!("root", tenant_id = "tenant-1", timeline_id = "timeline-1");
let _guard = span.enter();
let res = check_fields_present0([&setup.tenant_extractor, &setup.timeline_extractor]);
assert!(matches!(res, Ok(Summary::FoundEverything)), "{res:?}");
check_fields_present([&setup.tenant_extractor, &setup.timeline_extractor]).unwrap();
}
#[test]
@@ -252,8 +167,8 @@ mod tests {
let setup = setup_current_thread();
let span = tracing::info_span!("root", timeline_id = "timeline-1");
let _guard = span.enter();
let missing = check_fields_present0([&setup.tenant_extractor, &setup.timeline_extractor])
.unwrap_err();
let missing =
check_fields_present([&setup.tenant_extractor, &setup.timeline_extractor]).unwrap_err();
assert_missing(missing, vec![&setup.tenant_extractor]);
}
@@ -270,8 +185,7 @@ mod tests {
let span = tracing::info_span!("grandchild", timeline_id = "timeline-1");
let _guard = span.enter();
let res = check_fields_present0([&setup.tenant_extractor, &setup.timeline_extractor]);
assert!(matches!(res, Ok(Summary::FoundEverything)), "{res:?}");
check_fields_present([&setup.tenant_extractor, &setup.timeline_extractor]).unwrap();
}
#[test]
@@ -284,7 +198,7 @@ mod tests {
let span = tracing::info_span!("child", timeline_id = "timeline-1");
let _guard = span.enter();
let missing = check_fields_present0([&setup.tenant_extractor]).unwrap_err();
let missing = check_fields_present([&setup.tenant_extractor]).unwrap_err();
assert_missing(missing, vec![&setup.tenant_extractor]);
}
@@ -293,8 +207,7 @@ mod tests {
let setup = setup_current_thread();
let span = tracing::info_span!("root", tenant_id = "tenant-1", timeline_id = "timeline-1");
let _guard = span.enter();
let res = check_fields_present0([&setup.tenant_extractor]);
assert!(matches!(res, Ok(Summary::FoundEverything)), "{res:?}");
check_fields_present([&setup.tenant_extractor]).unwrap();
}
#[test]
@@ -310,8 +223,7 @@ mod tests {
let span = tracing::info_span!("grandchild", timeline_id = "timeline-1");
let _guard = span.enter();
let res = check_fields_present0([&setup.tenant_extractor]);
assert!(matches!(res, Ok(Summary::FoundEverything)), "{res:?}");
check_fields_present([&setup.tenant_extractor]).unwrap();
}
#[test]
@@ -319,7 +231,7 @@ mod tests {
let setup = setup_current_thread();
let span = tracing::info_span!("root", timeline_id = "timeline-1");
let _guard = span.enter();
let missing = check_fields_present0([&setup.tenant_extractor]).unwrap_err();
let missing = check_fields_present([&setup.tenant_extractor]).unwrap_err();
assert_missing(missing, vec![&setup.tenant_extractor]);
}
@@ -333,107 +245,43 @@ mod tests {
let span = tracing::info_span!("child", timeline_id = "timeline-1");
let _guard = span.enter();
let missing = check_fields_present0([&setup.tenant_extractor]).unwrap_err();
let missing = check_fields_present([&setup.tenant_extractor]).unwrap_err();
assert_missing(missing, vec![&setup.tenant_extractor]);
}
#[test]
fn tracing_error_subscriber_not_set_up_straight_line() {
fn tracing_error_subscriber_not_set_up() {
// no setup
let span = tracing::info_span!("foo", e = "some value");
let _guard = span.enter();
let extractor = MultiNameExtractor::new("E", ["e"]);
let res = check_fields_present0([&extractor]);
assert!(matches!(res, Ok(Summary::Unconfigured)), "{res:?}");
// similarly for a not found key
let extractor = MultiNameExtractor::new("F", ["foobar"]);
let res = check_fields_present0([&extractor]);
assert!(matches!(res, Ok(Summary::Unconfigured)), "{res:?}");
let missing = check_fields_present([&extractor]).unwrap_err();
assert_missing(missing, vec![&extractor]);
}
#[test]
fn tracing_error_subscriber_not_set_up_with_instrument() {
// no setup
// demo a case where span entering is used to establish a parent child connection, but
// when we re-enter the subspan SpanTrace::with_spans iterates over nothing.
let span = tracing::info_span!("foo", e = "some value");
let _guard = span.enter();
let subspan = tracing::info_span!("bar", f = "foobar");
drop(_guard);
// normally this would work, but without any tracing-subscriber configured, both
// check_field_present find nothing
let _guard = subspan.enter();
let extractors: [&dyn Extractor; 2] = [
&MultiNameExtractor::new("E", ["e"]),
&MultiNameExtractor::new("F", ["f"]),
];
let res = check_fields_present0(extractors);
assert!(matches!(res, Ok(Summary::Unconfigured)), "{res:?}");
// similarly for a not found key
let extractor = MultiNameExtractor::new("G", ["g"]);
let res = check_fields_present0([&extractor]);
assert!(matches!(res, Ok(Summary::Unconfigured)), "{res:?}");
}
#[test]
fn tracing_subscriber_configured() {
// this will fail if any utils::logging::init callers appear, but let's hope they do not
// appear.
assert!(!super::tracing_subscriber_configured());
let _g = setup_current_thread();
assert!(super::tracing_subscriber_configured());
}
#[test]
fn not_found_when_disabled_by_filter() {
#[should_panic]
fn panics_if_tracing_error_subscriber_has_wrong_filter() {
let r = tracing_subscriber::registry().with({
tracing_error::ErrorLayer::default().with_filter(tracing_subscriber::filter::filter_fn(
|md| !(md.is_span() && *md.level() == tracing::Level::INFO),
))
tracing_error::ErrorLayer::default().with_filter(
tracing_subscriber::filter::dynamic_filter_fn(|md, _| {
if md.is_span() && *md.level() == tracing::Level::INFO {
return false;
}
true
}),
)
});
let _guard = tracing::subscriber::set_default(r);
// this test is a rather tricky one, it has a number of possible outcomes depending on the
// execution order when executed with other tests even if no test sets the global default
// subscriber.
let span = tracing::info_span!("foo", e = "some value");
let _guard = span.enter();
let extractors: [&dyn Extractor; 1] = [&MultiNameExtractor::new("E", ["e"])];
if span.is_disabled() {
// the tests are running single threaded, or we got lucky and no other tests subscriber
// was got to register their per-CALLSITE::META interest between `set_default` and
// creation of the span, thus the filter got to apply and registered interest of Never,
// so the span was never created.
//
// as the span is disabled, no keys were recorded to it, leading check_fields_present0
// to find an error.
let missing = check_fields_present0(extractors).unwrap_err();
assert_missing(missing, vec![extractors[0]]);
} else {
// when the span is enabled, it is because some other test is running at the same time,
// and that tests registry has filters which are interested in our above span.
//
// because the span is now enabled, all keys will be found for it. the
// tracing_error::SpanTrace does not consider layer filters during the span hierarchy
// walk (SpanTrace::with_spans), nor is the SpanTrace::status a reliable indicator in
// this test-induced issue.
let res = check_fields_present0(extractors);
assert!(matches!(res, Ok(Summary::FoundEverything)), "{res:?}");
}
let extractor = MultiNameExtractor::new("E", ["e"]);
let missing = check_fields_present([&extractor]).unwrap_err();
assert_missing(missing, vec![&extractor]);
}
}

View File

@@ -82,7 +82,6 @@ strum_macros.workspace = true
criterion.workspace = true
hex-literal.workspace = true
tempfile.workspace = true
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time", "test-util"] }
[[bench]]
name = "bench_layer_map"

View File

@@ -7,10 +7,10 @@
//! - The y axis represents LSN, growing upwards.
//!
//! Coordinates in both axis are compressed for better readability.
//! (see <https://medium.com/algorithms-digest/coordinate-compression-2fff95326fb>)
//! (see https://medium.com/algorithms-digest/coordinate-compression-2fff95326fb)
//!
//! Example use:
//! ```bash
//! ```
//! $ ls test_output/test_pgbench\[neon-45-684\]/repo/tenants/$TENANT/timelines/$TIMELINE | \
//! $ grep "__" | cargo run --release --bin pagectl draw-timeline-dir > out.svg
//! $ firefox out.svg
@@ -20,7 +20,7 @@
//! or from pageserver log files.
//!
//! TODO Consider shipping this as a grafana panel plugin:
//! <https://grafana.com/tutorials/build-a-panel-plugin/>
//! https://grafana.com/tutorials/build-a-panel-plugin/
use anyhow::Result;
use pageserver::repository::Key;
use std::cmp::Ordering;

View File

@@ -19,6 +19,12 @@ use tokio::io;
use tokio::io::AsyncWrite;
use tracing::*;
/// NB: This relies on a modified version of tokio_tar that does *not* write the
/// end-of-archive marker (1024 zero bytes), when the Builder struct is dropped
/// without explicitly calling 'finish' or 'into_inner'!
///
/// See https://github.com/neondatabase/tokio-tar/pull/1
///
use tokio_tar::{Builder, EntryType, Header};
use crate::context::RequestContext;

View File

@@ -396,8 +396,8 @@ fn start_pageserver(
let guard = scopeguard::guard_on_success((), |_| tracing::info!("Cancelled before initial logical sizes completed"));
let init_sizes_done = match tokio::time::timeout(timeout, &mut init_sizes_done).await {
Ok(_) => {
let init_sizes_done = tokio::select! {
_ = &mut init_sizes_done => {
let now = std::time::Instant::now();
tracing::info!(
from_init_done_millis = (now - init_done).as_millis(),
@@ -406,7 +406,7 @@ fn start_pageserver(
);
None
}
Err(_) => {
_ = tokio::time::sleep(timeout) => {
tracing::info!(
timeout_millis = timeout.as_millis(),
"Initial logical size timeout elapsed; starting background jobs"

View File

@@ -171,13 +171,11 @@ pub struct PageServerConf {
pub log_format: LogFormat,
/// Number of concurrent [`Tenant::gather_size_inputs`](crate::tenant::Tenant::gather_size_inputs) allowed.
/// Number of concurrent [`Tenant::gather_size_inputs`] allowed.
pub concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore,
/// Limit of concurrent [`Tenant::gather_size_inputs`] issued by module `eviction_task`.
/// The number of permits is the same as `concurrent_tenant_size_logical_size_queries`.
/// See the comment in `eviction_task` for details.
///
/// [`Tenant::gather_size_inputs`]: crate::tenant::Tenant::gather_size_inputs
pub eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore,
// How often to collect metrics and send them to the metrics endpoint.
@@ -995,8 +993,6 @@ impl ConfigurableSemaphore {
/// Require a non-zero initial permits, because using permits == 0 is a crude way to disable a
/// feature such as [`Tenant::gather_size_inputs`]. Otherwise any semaphore using future will
/// behave like [`futures::future::pending`], just waiting until new permits are added.
///
/// [`Tenant::gather_size_inputs`]: crate::tenant::Tenant::gather_size_inputs
pub fn new(initial_permits: NonZeroUsize) -> Self {
ConfigurableSemaphore {
initial_permits,

View File

@@ -179,9 +179,6 @@ impl RequestContext {
/// a context and you are unwilling to change all callers to provide one.
///
/// Before we add cancellation, we should get rid of this method.
///
/// [`attached_child`]: Self::attached_child
/// [`detached_child`]: Self::detached_child
pub fn todo_child(task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self {
Self::new(task_kind, download_behavior)
}

View File

@@ -60,7 +60,7 @@ use utils::serde_percent::Percent;
use crate::{
config::PageServerConf,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{self, storage_layer::PersistentLayer, timeline::EvictionError, Timeline},
tenant::{self, storage_layer::PersistentLayer, Timeline},
};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -166,11 +166,11 @@ async fn disk_usage_eviction_task(
.await;
let sleep_until = start + task_config.period;
if tokio::time::timeout_at(sleep_until, cancel.cancelled())
.await
.is_ok()
{
break;
tokio::select! {
_ = tokio::time::sleep_until(sleep_until) => {},
_ = cancel.cancelled() => {
break
}
}
}
}
@@ -390,22 +390,13 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
assert_eq!(results.len(), batch.len());
for (result, layer) in results.into_iter().zip(batch.iter()) {
match result {
Some(Ok(())) => {
Some(Ok(true)) => {
usage_assumed.add_available_bytes(layer.file_size());
}
Some(Err(EvictionError::CannotEvictRemoteLayer)) => {
unreachable!("get_local_layers_for_disk_usage_eviction finds only local layers")
}
Some(Err(EvictionError::FileNotFound)) => {
evictions_failed.file_sizes += layer.file_size();
evictions_failed.count += 1;
}
Some(Err(
e @ EvictionError::LayerNotFound(_)
| e @ EvictionError::StatFailed(_),
)) => {
let e = utils::error::report_compact_sources(&e);
warn!(%layer, "failed to evict layer: {e}");
Some(Ok(false)) => {
// this is:
// - Replacement::{NotFound, Unexpected}
// - it cannot be is_remote_layer, filtered already
evictions_failed.file_sizes += layer.file_size();
evictions_failed.count += 1;
}
@@ -413,6 +404,10 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
assert!(cancel.is_cancelled());
return;
}
Some(Err(e)) => {
// we really shouldn't be getting this, precondition failure
error!("failed to evict layer: {:#}", e);
}
}
}
}

View File

@@ -1143,7 +1143,7 @@ async fn disk_usage_eviction_run(
let Some(storage) = state.remote_storage.clone() else {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"remote storage not configured, cannot run eviction iteration"
)));
)))
};
let state = state.disk_usage_eviction_state.clone();

View File

@@ -385,7 +385,7 @@ pub static UNEXPECTED_ONDEMAND_DOWNLOADS: Lazy<IntCounter> = Lazy::new(|| {
.expect("failed to define a metric")
});
/// Each `Timeline`'s [`EVICTIONS_WITH_LOW_RESIDENCE_DURATION`] metric.
/// Each [`Timeline`]'s [`EVICTIONS_WITH_LOW_RESIDENCE_DURATION`] metric.
#[derive(Debug)]
pub struct EvictionsWithLowResidenceDuration {
data_source: &'static str,
@@ -541,17 +541,6 @@ pub static SMGR_QUERY_TIME: Lazy<HistogramVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
// keep in sync with control plane Go code so that we can validate
// compute's basebackup_ms metric with our perspective in the context of SLI/SLO.
static COMPUTE_STARTUP_BUCKETS: Lazy<[f64; 28]> = Lazy::new(|| {
// Go code uses milliseconds. Variable is called `computeStartupBuckets`
[
5, 10, 20, 30, 50, 70, 100, 120, 150, 200, 250, 300, 350, 400, 450, 500, 600, 800, 1000,
1500, 2000, 2500, 3000, 5000, 10000, 20000, 40000, 60000,
]
.map(|ms| (ms as f64) / 1000.0)
});
pub struct BasebackupQueryTime(HistogramVec);
pub static BASEBACKUP_QUERY_TIME: Lazy<BasebackupQueryTime> = Lazy::new(|| {
BasebackupQueryTime({
@@ -559,7 +548,7 @@ pub static BASEBACKUP_QUERY_TIME: Lazy<BasebackupQueryTime> = Lazy::new(|| {
"pageserver_basebackup_query_seconds",
"Histogram of basebackup queries durations, by result type",
&["result"],
COMPUTE_STARTUP_BUCKETS.to_vec(),
CRITICAL_OP_BUCKETS.into(),
)
.expect("failed to define a metric")
})
@@ -829,7 +818,7 @@ pub static WAL_REDO_RECORD_COUNTER: Lazy<IntCounter> = Lazy::new(|| {
.unwrap()
});
/// Similar to `prometheus::HistogramTimer` but does not record on drop.
/// Similar to [`prometheus::HistogramTimer`] but does not record on drop.
pub struct StorageTimeMetricsTimer {
metrics: StorageTimeMetrics,
start: Instant,
@@ -887,7 +876,7 @@ impl StorageTimeMetrics {
/// Starts timing a new operation.
///
/// Note: unlike `prometheus::HistogramTimer` the returned timer does not record on drop.
/// Note: unlike [`prometheus::HistogramTimer`] the returned timer does not record on drop.
pub fn start_timer(&self) -> StorageTimeMetricsTimer {
StorageTimeMetricsTimer::new(self.clone())
}
@@ -1267,7 +1256,7 @@ impl RemoteTimelineClientMetrics {
/// Update the metrics that change when a call to the remote timeline client instance starts.
///
/// Drop the returned guard object once the operation is finished to updates corresponding metrics that track completions.
/// Or, use [`RemoteTimelineClientCallMetricGuard::will_decrement_manually`] and [`call_end`](Self::call_end) if that
/// Or, use [`RemoteTimelineClientCallMetricGuard::will_decrement_manually`] and [`call_end`] if that
/// is more suitable.
/// Never do both.
pub(crate) fn call_begin(
@@ -1300,7 +1289,7 @@ impl RemoteTimelineClientMetrics {
/// Manually udpate the metrics that track completions, instead of using the guard object.
/// Using the guard object is generally preferable.
/// See [`call_begin`](Self::call_begin) for more context.
/// See [`call_begin`] for more context.
pub(crate) fn call_end(
&self,
file_kind: &RemoteOpFileKind,

View File

@@ -1131,7 +1131,7 @@ impl<'a> DatadirModification<'a> {
/// context, breaking the atomicity is OK. If the import is interrupted, the
/// whole import fails and the timeline will be deleted anyway.
/// (Or to be precise, it will be left behind for debugging purposes and
/// ignored, see <https://github.com/neondatabase/neon/pull/1809>)
/// ignored, see https://github.com/neondatabase/neon/pull/1809)
///
/// Note: A consequence of flushing the pending operations is that they
/// won't be visible to subsequent operations until `commit`. The function

View File

@@ -205,7 +205,7 @@ pub enum TaskKind {
///
/// Walreceiver uses its own abstraction called `TaskHandle` to represent the activity of establishing and handling a connection.
/// That abstraction doesn't use `task_mgr`.
/// The `WalReceiverManager` task ensures that this `TaskHandle` task does not outlive the `WalReceiverManager` task.
/// The [`WalReceiverManager`] task ensures that this `TaskHandle` task does not outlive the [`WalReceiverManager`] task.
/// For the `RequestContext` that we hand to the TaskHandle, we use the [`WalReceiverConnectionHandler`] task kind.
///
/// Once the connection is established, the `TaskHandle` task creates a
@@ -213,21 +213,16 @@ pub enum TaskKind {
/// the `Connection` object.
/// A `CancellationToken` created by the `TaskHandle` task ensures
/// that the [`WalReceiverConnectionPoller`] task will cancel soon after as the `TaskHandle` is dropped.
///
/// [`WalReceiverConnectionHandler`]: Self::WalReceiverConnectionHandler
/// [`WalReceiverConnectionPoller`]: Self::WalReceiverConnectionPoller
WalReceiverManager,
/// The `TaskHandle` task that executes `handle_walreceiver_connection`.
/// The `TaskHandle` task that executes [`walreceiver_connection::handle_walreceiver_connection`].
/// Not a `task_mgr` task, but we use this `TaskKind` for its `RequestContext`.
/// See the comment on [`WalReceiverManager`].
///
/// [`WalReceiverManager`]: Self::WalReceiverManager
WalReceiverConnectionHandler,
/// The task that polls the `tokio-postgres::Connection` object.
/// Spawned by task [`WalReceiverConnectionHandler`](Self::WalReceiverConnectionHandler).
/// See the comment on [`WalReceiverManager`](Self::WalReceiverManager).
/// Spawned by task [`WalReceiverConnectionHandler`].
/// See the comment on [`WalReceiverManager`].
WalReceiverConnectionPoller,
// Garbage collection worker. One per tenant
@@ -511,13 +506,17 @@ pub async fn shutdown_tasks(
warn!(name = task.name, tenant_id = ?tenant_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
}
}
if tokio::time::timeout(std::time::Duration::from_secs(1), &mut join_handle)
.await
.is_err()
{
// allow some time to elapse before logging to cut down the number of log
// lines.
info!("waiting for {} to shut down", task.name);
let join_handle = tokio::select! {
biased;
_ = &mut join_handle => { None },
_ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
// allow some time to elapse before logging to cut down the number of log
// lines.
info!("waiting for {} to shut down", task.name);
Some(join_handle)
}
};
if let Some(join_handle) = join_handle {
// we never handled this return value, but:
// - we don't deschedule which would lead to is_cancelled
// - panics are already logged (is_panicked)

View File

@@ -121,7 +121,7 @@ pub mod mgr;
pub mod tasks;
pub mod upload_queue;
pub(crate) mod timeline;
mod timeline;
pub mod size;
@@ -133,7 +133,7 @@ pub use timeline::{
// re-export this function so that page_cache.rs can use it.
pub use crate::tenant::ephemeral_file::writeback as writeback_ephemeral_file;
// re-export for use in remote_timeline_client.rs
// re-export for use in storage_sync.rs
pub use crate::tenant::metadata::save_metadata;
// re-export for use in walreceiver
@@ -281,7 +281,7 @@ pub enum DeleteTimelineError {
}
pub enum SetStoppingError {
AlreadyStopping(completion::Barrier),
AlreadyStopping,
Broken,
}
@@ -318,6 +318,10 @@ impl std::fmt::Display for WaitToBecomeActiveError {
}
}
pub(crate) enum ShutdownError {
AlreadyStopping,
}
struct DeletionGuard(OwnedMutexGuard<bool>);
impl DeletionGuard {
@@ -1168,7 +1172,7 @@ impl Tenant {
)
}
/// Helper for unit tests to create an empty timeline.
/// Helper for unit tests to create an emtpy timeline.
///
/// The timeline is has state value `Active` but its background loops are not running.
// This makes the various functions which anyhow::ensure! for Active state work in tests.
@@ -1455,7 +1459,7 @@ impl Tenant {
let layer_removal_guard = timeline.layer_removal_cs.lock().await;
info!("got layer_removal_cs.lock(), deleting layer files");
// NB: remote_timeline_client upload tasks that reference these layers have been cancelled
// NB: storage_sync upload tasks that reference these layers have been cancelled
// by the caller.
let local_timeline_directory = self
@@ -1717,7 +1721,7 @@ impl Tenant {
self.state.send_modify(|current_state| {
use pageserver_api::models::ActivatingFrom;
match &*current_state {
TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => {
TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping => {
panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state);
}
TenantState::Loading => {
@@ -1781,16 +1785,7 @@ impl Tenant {
/// - detach + ignore (freeze_and_flush == false)
///
/// This will attempt to shutdown even if tenant is broken.
///
/// `shutdown_progress` is a [`completion::Barrier`] for the shutdown initiated by this call.
/// If the tenant is already shutting down, we return a clone of the first shutdown call's
/// `Barrier` as an `Err`. This not-first caller can use the returned barrier to join with
/// the ongoing shutdown.
async fn shutdown(
&self,
shutdown_progress: completion::Barrier,
freeze_and_flush: bool,
) -> Result<(), completion::Barrier> {
pub(crate) async fn shutdown(&self, freeze_and_flush: bool) -> Result<(), ShutdownError> {
span::debug_assert_current_span_has_tenant_id();
// Set tenant (and its timlines) to Stoppping state.
//
@@ -1809,16 +1804,12 @@ impl Tenant {
// But the tenant background loops are joined-on in our caller.
// It's mesed up.
// we just ignore the failure to stop
match self.set_stopping(shutdown_progress).await {
match self.set_stopping().await {
Ok(()) => {}
Err(SetStoppingError::Broken) => {
// assume that this is acceptable
}
Err(SetStoppingError::AlreadyStopping(other)) => {
// give caller the option to wait for this this shutdown
return Err(other);
}
Err(SetStoppingError::AlreadyStopping) => return Err(ShutdownError::AlreadyStopping),
};
if freeze_and_flush {
@@ -1850,7 +1841,7 @@ impl Tenant {
/// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
///
/// This function is not cancel-safe!
async fn set_stopping(&self, progress: completion::Barrier) -> Result<(), SetStoppingError> {
async fn set_stopping(&self) -> Result<(), SetStoppingError> {
let mut rx = self.state.subscribe();
// cannot stop before we're done activating, so wait out until we're done activating
@@ -1862,7 +1853,7 @@ impl Tenant {
);
false
}
TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping {} => true,
})
.await
.expect("cannot drop self.state while on a &self method");
@@ -1877,7 +1868,7 @@ impl Tenant {
// FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines
// are created after the transition to Stopping. That's harmless, as the Timelines
// won't be accessible to anyone afterwards, because the Tenant is in Stopping state.
*current_state = TenantState::Stopping { progress };
*current_state = TenantState::Stopping;
// Continue stopping outside the closure. We need to grab timelines.lock()
// and we plan to turn it into a tokio::sync::Mutex in a future patch.
true
@@ -1889,9 +1880,9 @@ impl Tenant {
err = Some(SetStoppingError::Broken);
false
}
TenantState::Stopping { progress } => {
TenantState::Stopping => {
info!("Tenant is already in Stopping state");
err = Some(SetStoppingError::AlreadyStopping(progress.clone()));
err = Some(SetStoppingError::AlreadyStopping);
false
}
});
@@ -1935,7 +1926,7 @@ impl Tenant {
);
false
}
TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping {} => true,
})
.await
.expect("cannot drop self.state while on a &self method");
@@ -1958,7 +1949,7 @@ impl Tenant {
warn!("Tenant is already in Broken state");
}
// This is the only "expected" path, any other path is a bug.
TenantState::Stopping { .. } => {
TenantState::Stopping => {
warn!(
"Marking Stopping tenant as Broken state, reason: {}",
reason
@@ -1991,7 +1982,7 @@ impl Tenant {
TenantState::Active { .. } => {
return Ok(());
}
TenantState::Broken { .. } | TenantState::Stopping { .. } => {
TenantState::Broken { .. } | TenantState::Stopping => {
// There's no chance the tenant can transition back into ::Active
return Err(WaitToBecomeActiveError::WillNotBecomeActive {
tenant_id: self.tenant_id,
@@ -3359,18 +3350,14 @@ pub mod harness {
pub async fn load(&self) -> (Arc<Tenant>, RequestContext) {
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
(
self.try_load(&ctx, None)
self.try_load(&ctx)
.await
.expect("failed to load test tenant"),
ctx,
)
}
pub async fn try_load(
&self,
ctx: &RequestContext,
remote_storage: Option<remote_storage::GenericRemoteStorage>,
) -> anyhow::Result<Arc<Tenant>> {
pub async fn try_load(&self, ctx: &RequestContext) -> anyhow::Result<Arc<Tenant>> {
let walredo_mgr = Arc::new(TestRedoManager);
let tenant = Arc::new(Tenant::new(
@@ -3379,7 +3366,7 @@ pub mod harness {
TenantConfOpt::from(self.tenant_conf),
walredo_mgr,
self.tenant_id,
remote_storage,
None,
));
tenant
.load(None, ctx)
@@ -3917,11 +3904,7 @@ mod tests {
metadata_bytes[8] ^= 1;
std::fs::write(metadata_path, metadata_bytes)?;
let err = harness
.try_load(&ctx, None)
.await
.err()
.expect("should fail");
let err = harness.try_load(&ctx).await.err().expect("should fail");
// get all the stack with all .context, not tonly the last one
let message = format!("{err:#}");
let expected = "Failed to parse metadata bytes from path";
@@ -4352,13 +4335,13 @@ mod tests {
// assert freeze_and_flush exercised the initdb optimization
{
let state = tline.flush_loop_state.lock().unwrap();
let timeline::FlushLoopState::Running {
expect_initdb_optimization,
initdb_optimization_count,
} = *state
else {
panic!("unexpected state: {:?}", *state);
};
let
timeline::FlushLoopState::Running {
expect_initdb_optimization,
initdb_optimization_count,
} = *state else {
panic!("unexpected state: {:?}", *state);
};
assert!(expect_initdb_optimization);
assert!(initdb_optimization_count > 0);
}

View File

@@ -442,7 +442,7 @@ where
writer: W,
///
/// `stack[0]` is the current root page, `stack.last()` is the leaf.
/// stack[0] is the current root page, stack.last() is the leaf.
///
/// We maintain the length of the stack to be always greater than zero.
/// Two exceptions are:

View File

@@ -16,7 +16,7 @@
//! Other read methods are less critical but still impact performance of background tasks.
//!
//! This data structure relies on a persistent/immutable binary search tree. See the
//! following lecture for an introduction <https://www.youtube.com/watch?v=WqCWghETNDc&t=581s>
//! following lecture for an introduction https://www.youtube.com/watch?v=WqCWghETNDc&t=581s
//! Summary: A persistent/immutable BST (and persistent data structures in general) allows
//! you to modify the tree in such a way that each modification creates a new "version"
//! of the tree. When you modify it, you get a new version, but all previous versions are
@@ -40,7 +40,7 @@
//! afterwards. We can add layers as long as they have larger LSNs than any previous layer in
//! the map, but if we need to remove a layer, or insert anything with an older LSN, we need
//! to throw away most of the persistent BST and build a new one, starting from the oldest
//! LSN. See [`LayerMap::flush_updates()`].
//! LSN. See `LayerMap::flush_updates()`.
//!
mod historic_layer_coverage;

View File

@@ -122,7 +122,8 @@ impl<Value: Clone> HistoricLayerCoverage<Value> {
self.head = self
.historic
.iter()
.next_back()
.rev()
.next()
.map(|(_, v)| v.clone())
.unwrap_or_default();
}
@@ -411,7 +412,7 @@ fn test_persistent_overlapping() {
/// still be more critical.
///
/// See this for more on persistent and retroactive techniques:
/// <https://www.youtube.com/watch?v=WqCWghETNDc&t=581s>
/// https://www.youtube.com/watch?v=WqCWghETNDc&t=581s
pub struct BufferedHistoricLayerCoverage<Value> {
/// A persistent layer map that we rebuild when we need to retroactively update
historic_coverage: HistoricLayerCoverage<Value>,

View File

@@ -2,7 +2,7 @@ use std::ops::Range;
// NOTE the `im` crate has 20x more downloads and also has
// persistent/immutable BTree. But it's bugged so rpds is a
// better choice <https://github.com/neondatabase/neon/issues/3395>
// better choice https://github.com/neondatabase/neon/issues/3395
use rpds::RedBlackTreeMapSync;
/// Data structure that can efficiently:
@@ -11,7 +11,7 @@ use rpds::RedBlackTreeMapSync;
/// - insert layers in non-decreasing lsn.start order
///
/// For a detailed explanation and justification of this approach, see:
/// <https://neon.tech/blog/persistent-structures-in-neons-wal-indexing>
/// https://neon.tech/blog/persistent-structures-in-neons-wal-indexing
///
/// NOTE The struct is parameterized over Value for easier
/// testing, but in practice it's some sort of layer.
@@ -113,7 +113,8 @@ impl<Value: Clone> LayerCoverage<Value> {
pub fn query(&self, key: i128) -> Option<Value> {
self.nodes
.range(..=key)
.next_back()?
.rev()
.next()?
.1
.as_ref()
.map(|(_, v)| v.clone())

View File

@@ -24,7 +24,7 @@
//! Currently, this is not used in the system. Future refactors will ensure
//! the storage state will be recorded in this file, and the system can be
//! recovered from this file. This is tracked in
//! <https://github.com/neondatabase/neon/issues/4418>
//! https://github.com/neondatabase/neon/issues/4418
use std::io::{self, Read, Write};

View File

@@ -1,12 +1,10 @@
//! Every image of a certain timeline from [`crate::tenant::Tenant`]
//! has a metadata that needs to be stored persistently.
//!
//! Later, the file gets used in [`remote_timeline_client`] as a part of
//! Later, the file gets is used in [`crate::remote_storage::storage_sync`] as a part of
//! external storage import and export operations.
//!
//! The module contains all structs and related helper methods related to timeline metadata.
//!
//! [`remote_timeline_client`]: super::remote_timeline_client
use std::fs::{File, OpenOptions};
use std::io::Write;

View File

@@ -233,17 +233,11 @@ pub fn schedule_local_tenant_processing(
/// That could be easily misinterpreted by control plane, the consumer of the
/// management API. For example, it could attach the tenant on a different pageserver.
/// We would then be in split-brain once this pageserver restarts.
#[instrument(skip_all)]
#[instrument]
pub async fn shutdown_all_tenants() {
shutdown_all_tenants0(&TENANTS).await
}
async fn shutdown_all_tenants0(tenants: &tokio::sync::RwLock<TenantsMap>) {
use utils::completion;
// Prevent new tenants from being created.
let tenants_to_shut_down = {
let mut m = tenants.write().await;
let mut m = TENANTS.write().await;
match &mut *m {
TenantsMap::Initializing => {
*m = TenantsMap::ShuttingDown(HashMap::default());
@@ -268,41 +262,14 @@ async fn shutdown_all_tenants0(tenants: &tokio::sync::RwLock<TenantsMap>) {
for (tenant_id, tenant) in tenants_to_shut_down {
join_set.spawn(
async move {
// ordering shouldn't matter for this, either we store true right away or never
let ordering = std::sync::atomic::Ordering::Relaxed;
let joined_other = std::sync::atomic::AtomicBool::new(false);
let freeze_and_flush = true;
let mut shutdown = std::pin::pin!(async {
let freeze_and_flush = true;
let res = {
let (_guard, shutdown_progress) = completion::channel();
tenant.shutdown(shutdown_progress, freeze_and_flush).await
};
if let Err(other_progress) = res {
// join the another shutdown in progress
joined_other.store(true, ordering);
other_progress.wait().await;
match tenant.shutdown(freeze_and_flush).await {
Ok(()) => debug!("tenant successfully stopped"),
Err(super::ShutdownError::AlreadyStopping) => {
warn!("tenant was already shutting down")
}
});
// in practice we might not have a lot time to go, since systemd is going to
// SIGKILL us at 10s, but we can try. delete tenant might take a while, so put out
// a warning.
let warning = std::time::Duration::from_secs(5);
let mut warning = std::pin::pin!(tokio::time::sleep(warning));
tokio::select! {
_ = &mut shutdown => {},
_ = &mut warning => {
let joined_other = joined_other.load(ordering);
warn!(%joined_other, "waiting for the shutdown to complete");
shutdown.await;
}
};
debug!("tenant successfully stopped");
}
}
.instrument(info_span!("shutdown", %tenant_id)),
);
@@ -446,15 +413,6 @@ pub async fn detach_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
detach_ignored: bool,
) -> Result<(), TenantStateError> {
detach_tenant0(conf, &TENANTS, tenant_id, detach_ignored).await
}
async fn detach_tenant0(
conf: &'static PageServerConf,
tenants: &tokio::sync::RwLock<TenantsMap>,
tenant_id: TenantId,
detach_ignored: bool,
) -> Result<(), TenantStateError> {
let local_files_cleanup_operation = |tenant_id_to_clean| async move {
let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
@@ -467,8 +425,7 @@ async fn detach_tenant0(
};
let removal_result =
remove_tenant_from_memory(tenants, tenant_id, local_files_cleanup_operation(tenant_id))
.await;
remove_tenant_from_memory(tenant_id, local_files_cleanup_operation(tenant_id)).await;
// Ignored tenants are not present in memory and will bail the removal from memory operation.
// Before returning the error, check for ignored tenant removal case — we only need to clean its local files then.
@@ -515,15 +472,7 @@ pub async fn ignore_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
) -> Result<(), TenantStateError> {
ignore_tenant0(conf, &TENANTS, tenant_id).await
}
async fn ignore_tenant0(
conf: &'static PageServerConf,
tenants: &tokio::sync::RwLock<TenantsMap>,
tenant_id: TenantId,
) -> Result<(), TenantStateError> {
remove_tenant_from_memory(tenants, tenant_id, async {
remove_tenant_from_memory(tenant_id, async {
let ignore_mark_file = conf.tenant_ignore_mark_file_path(&tenant_id);
fs::File::create(&ignore_mark_file)
.await
@@ -648,21 +597,18 @@ where
/// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
/// operation would be needed to remove it.
async fn remove_tenant_from_memory<V, F>(
tenants: &tokio::sync::RwLock<TenantsMap>,
tenant_id: TenantId,
tenant_cleanup: F,
) -> Result<V, TenantStateError>
where
F: std::future::Future<Output = anyhow::Result<V>>,
{
use utils::completion;
// It's important to keep the tenant in memory after the final cleanup, to avoid cleanup races.
// The exclusive lock here ensures we don't miss the tenant state updates before trying another removal.
// tenant-wde cleanup operations may take some time (removing the entire tenant directory), we want to
// avoid holding the lock for the entire process.
let tenant = {
tenants
TENANTS
.write()
.await
.get(&tenant_id)
@@ -670,20 +616,14 @@ where
.ok_or(TenantStateError::NotFound(tenant_id))?
};
// allow pageserver shutdown to await for our completion
let (_guard, progress) = completion::channel();
// whenever we remove a tenant from memory, we don't want to flush and wait for upload
let freeze_and_flush = false;
// shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
// that we can continue safely to cleanup.
match tenant.shutdown(progress, freeze_and_flush).await {
match tenant.shutdown(freeze_and_flush).await {
Ok(()) => {}
Err(_other) => {
// if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
// wait for it but return an error right away because these are distinct requests.
return Err(TenantStateError::IsStopping(tenant_id));
Err(super::ShutdownError::AlreadyStopping) => {
return Err(TenantStateError::IsStopping(tenant_id))
}
}
@@ -692,14 +632,14 @@ where
.with_context(|| format!("Failed to run cleanup for tenant {tenant_id}"))
{
Ok(hook_value) => {
let mut tenants_accessor = tenants.write().await;
let mut tenants_accessor = TENANTS.write().await;
if tenants_accessor.remove(&tenant_id).is_none() {
warn!("Tenant {tenant_id} got removed from memory before operation finished");
}
Ok(hook_value)
}
Err(e) => {
let tenants_accessor = tenants.read().await;
let tenants_accessor = TENANTS.read().await;
match tenants_accessor.get(&tenant_id) {
Some(tenant) => {
tenant.set_broken(e.to_string()).await;
@@ -816,109 +756,3 @@ pub async fn immediate_compact(
Ok(wait_task_done)
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use tracing::{info_span, Instrument};
use super::{super::harness::TenantHarness, TenantsMap};
#[tokio::test(start_paused = true)]
async fn shutdown_joins_remove_tenant_from_memory() {
// the test is a bit ugly with the lockstep together with spawned tasks. the aim is to make
// sure `shutdown_all_tenants0` per-tenant processing joins in any active
// remove_tenant_from_memory calls, which is enforced by making the operation last until
// we've ran `shutdown_all_tenants0` for a long time.
let (t, _ctx) = TenantHarness::create("shutdown_joins_detach")
.unwrap()
.load()
.await;
// harness loads it to active, which is forced and nothing is running on the tenant
let id = t.tenant_id();
// tenant harness configures the logging and we cannot escape it
let _e = info_span!("testing", tenant_id = %id).entered();
let tenants = HashMap::from([(id, t.clone())]);
let tenants = Arc::new(tokio::sync::RwLock::new(TenantsMap::Open(tenants)));
let (until_cleanup_completed, can_complete_cleanup) = utils::completion::channel();
let (until_cleanup_started, cleanup_started) = utils::completion::channel();
// start a "detaching operation", which will take a while, until can_complete_cleanup
let cleanup_task = {
let jh = tokio::spawn({
let tenants = tenants.clone();
async move {
let cleanup = async move {
drop(until_cleanup_started);
can_complete_cleanup.wait().await;
anyhow::Ok(())
};
super::remove_tenant_from_memory(&tenants, id, cleanup).await
}
.instrument(info_span!("foobar", tenant_id = %id))
});
// now the long cleanup should be in place, with the stopping state
cleanup_started.wait().await;
jh
};
let mut cleanup_progress = std::pin::pin!(t
.shutdown(utils::completion::Barrier::default(), false)
.await
.unwrap_err()
.wait());
let mut shutdown_task = {
let (until_shutdown_started, shutdown_started) = utils::completion::channel();
let shutdown_task = tokio::spawn(async move {
drop(until_shutdown_started);
super::shutdown_all_tenants0(&tenants).await;
});
shutdown_started.wait().await;
shutdown_task
};
// if the joining in is removed from shutdown_all_tenants0, the shutdown_task should always
// get to complete within timeout and fail the test. it is expected to continue awaiting
// until completion or SIGKILL during normal shutdown.
//
// the timeout is long to cover anything that shutdown_task could be doing, but it is
// handled instantly because we use tokio's time pausing in this test. 100s is much more than
// what we get from systemd on shutdown (10s).
let long_time = std::time::Duration::from_secs(100);
tokio::select! {
_ = &mut shutdown_task => unreachable!("shutdown must continue, until_cleanup_completed is not dropped"),
_ = &mut cleanup_progress => unreachable!("cleanup progress must continue, until_cleanup_completed is not dropped"),
_ = tokio::time::sleep(long_time) => {},
}
// allow the remove_tenant_from_memory and thus eventually the shutdown to continue
drop(until_cleanup_completed);
let (je, ()) = tokio::join!(shutdown_task, cleanup_progress);
je.expect("Tenant::shutdown shutdown not have panicked");
cleanup_task
.await
.expect("no panicking")
.expect("remove_tenant_from_memory failed");
futures::future::poll_immediate(
t.shutdown(utils::completion::Barrier::default(), false)
.await
.unwrap_err()
.wait(),
)
.await
.expect("the stopping progress must still be complete");
}
}

View File

@@ -135,7 +135,7 @@
//! - Initiate upload queue with that [`IndexPart`].
//! - Reschedule all lost operations by comparing the local filesystem state
//! and remote state as per [`IndexPart`]. This is done in
//! [`Tenant::timeline_init_and_sync`] and [`Timeline::reconcile_with_remote`].
//! [`Timeline::timeline_init_and_sync`] and [`Timeline::reconcile_with_remote`].
//!
//! Note that if we crash during file deletion between the index update
//! that removes the file from the list of files, and deleting the remote file,
@@ -163,8 +163,8 @@
//! - download their remote [`IndexPart`]s
//! - create `Timeline` struct and a `RemoteTimelineClient`
//! - initialize the client's upload queue with its `IndexPart`
//! - create [`RemoteLayer`](super::storage_layer::RemoteLayer) instances
//! for layers that are referenced by `IndexPart` but not present locally
//! - create [`RemoteLayer`] instances for layers that are referenced by `IndexPart`
//! but not present locally
//! - schedule uploads for layers that are only present locally.
//! - if the remote `IndexPart`'s metadata was newer than the metadata in
//! the local filesystem, write the remote metadata to the local filesystem
@@ -198,8 +198,6 @@
//! in remote storage.
//! But note that we don't test any of this right now.
//!
//! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync
//! [`Timeline::reconcile_with_remote`]: super::Timeline::reconcile_with_remote
mod delete;
mod download;
@@ -842,16 +840,14 @@ impl RemoteTimelineClient {
let remaining: Vec<RemotePath> = remaining
.into_iter()
.filter(|p| p.object_name() != Some(IndexPart::FILE_NAME))
.inspect(|path| {
if let Some(name) = path.object_name() {
info!(%name, "deleting a file not referenced from index_part.json");
} else {
warn!(%path, "deleting a nameless or non-utf8 object not referenced from index_part.json");
}
})
.collect();
if !remaining.is_empty() {
warn!(
"Found {} files not bound to index_file.json, proceeding with their deletion",
remaining.len()
);
warn!("About to remove {} files", remaining.len());
self.storage_impl.delete_objects(&remaining).await?;
}
@@ -860,7 +856,7 @@ impl RemoteTimelineClient {
debug!("deleting index part");
self.storage_impl.delete(&index_file_path).await?;
info!(prefix=%timeline_storage_path, referenced=deletions_queued, not_referenced=%remaining.len(), "done deleting in timeline prefix, including index_part.json");
info!(deletions_queued, "done deleting, including index_part.json");
Ok(())
}

View File

@@ -62,11 +62,12 @@ pub(super) async fn upload_timeline_layer<'a>(
let source_file = match source_file_res {
Ok(source_file) => source_file,
Err(e) if e.kind() == ErrorKind::NotFound => {
// If we encounter this arm, it wasn't intended, but it's also not
// a big problem, if it's because the file was deleted before an
// upload. However, a nonexistent file can also be indicative of
// something worse, like when a file is scheduled for upload before
// it has been written to disk yet.
// In some situations we might run into the underlying file being deleted by
// e.g. compaction before the uploader gets to it. In that instance, we don't
// want to retry the error: a deleted file won't come back. In theory, the
// file might not have been written in the first place, which also indicates
// a bug. Still log the situation so that we can keep an eye on it.
// See https://github.com/neondatabase/neon/issues/4526
info!(path = %source_path.display(), "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more.");
return Ok(());
}

View File

@@ -110,11 +110,11 @@ pub struct TimelineInputs {
///
/// Tenant size does not consider the latest state, but only the state until next_gc_cutoff, which
/// is updated on-demand, during the start of this calculation and separate from the
/// [`TimelineInputs::latest_gc_cutoff`].
/// [`Timeline::latest_gc_cutoff`].
///
/// For timelines in general:
///
/// ```text
/// ```ignore
/// 0-----|---------|----|------------| · · · · · |·> lsn
/// initdb_lsn branchpoints* next_gc_cutoff latest
/// ```

View File

@@ -11,7 +11,10 @@ pub(crate) static TENANT_ID_EXTRACTOR: once_cell::sync::Lazy<MultiNameExtractor<
#[cfg(debug_assertions)]
#[track_caller]
pub(crate) fn debug_assert_current_span_has_tenant_id() {
if let Err(missing) = check_fields_present!([&*TENANT_ID_EXTRACTOR]) {
panic!("missing extractors: {missing:?}")
if let Err(missing) = check_fields_present([&*TENANT_ID_EXTRACTOR]) {
panic!(
"missing extractors: {:?}",
missing.into_iter().map(|e| e.name()).collect::<Vec<_>>()
)
}
}

View File

@@ -162,9 +162,6 @@ impl LayerAccessStats {
/// The caller is responsible for recording a residence event
/// using [`record_residence_event`] before calling `latest_activity`.
/// If they don't, [`latest_activity`] will return `None`.
///
/// [`record_residence_event`]: Self::record_residence_event
/// [`latest_activity`]: Self::latest_activity
pub(crate) fn empty_will_record_residence_event_later() -> Self {
LayerAccessStats(Mutex::default())
}
@@ -172,9 +169,6 @@ impl LayerAccessStats {
/// Create an empty stats object and record a [`LayerLoad`] event with the given residence status.
///
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
///
/// [`LayerLoad`]: LayerResidenceEventReason::LayerLoad
/// [`record_residence_event`]: Self::record_residence_event
pub(crate) fn for_loading_layer(
layer_map_lock_held_witness: &LayerManager,
status: LayerResidenceStatus,
@@ -193,8 +187,6 @@ impl LayerAccessStats {
/// The `new_status` is not recorded in `self`.
///
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
///
/// [`record_residence_event`]: Self::record_residence_event
pub(crate) fn clone_for_residence_change(
&self,
layer_map_lock_held_witness: &LayerManager,
@@ -302,13 +294,11 @@ impl LayerAccessStats {
/// implementation error. This function logs a rate-limited warning in that case.
///
/// TODO: use type system to avoid the need for `fallback`.
/// The approach in <https://github.com/neondatabase/neon/pull/3775>
/// The approach in https://github.com/neondatabase/neon/pull/3775
/// could be used to enforce that a residence event is recorded
/// before a layer is added to the layer map. We could also have
/// a layer wrapper type that holds the LayerAccessStats, and ensure
/// that that type can only be produced by inserting into the layer map.
///
/// [`record_residence_event`]: Self::record_residence_event
pub(crate) fn latest_activity(&self) -> Option<SystemTime> {
let locked = self.0.lock().unwrap();
let inner = &locked.for_eviction_policy;
@@ -333,7 +323,7 @@ impl LayerAccessStats {
}
/// Supertrait of the [`Layer`] trait that captures the bare minimum interface
/// required by [`LayerMap`](super::layer_map::LayerMap).
/// required by [`LayerMap`].
///
/// All layers should implement a minimal `std::fmt::Debug` without tenant or
/// timeline names, because those are known in the context of which the layers
@@ -380,10 +370,10 @@ pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync {
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()>;
}
/// Returned by [`PersistentLayer::iter`]
/// Returned by [`Layer::iter`]
pub type LayerIter<'i> = Box<dyn Iterator<Item = Result<(Key, Lsn, Value)>> + 'i + Send>;
/// Returned by [`PersistentLayer::key_iter`]
/// Returned by [`Layer::key_iter`]
pub type LayerKeyIter<'i> = Box<dyn Iterator<Item = (Key, Lsn, u64)> + 'i + Send>;
/// Get a layer descriptor from a layer.
@@ -442,10 +432,6 @@ pub trait PersistentLayer: Layer + AsLayerDesc {
None
}
fn downcast_delta_layer(self: Arc<Self>) -> Option<std::sync::Arc<DeltaLayer>> {
None
}
fn is_remote_layer(&self) -> bool {
false
}

View File

@@ -7,18 +7,14 @@
//! must be page images or WAL records with the 'will_init' flag set, so that
//! they can be replayed without referring to an older page version.
//!
//! The delta files are stored in `timelines/<timeline_id>` directory. Currently,
//! The delta files are stored in timelines/<timeline_id> directory. Currently,
//! there are no subdirectories, and each delta file is named like this:
//!
//! ```text
//! <key start>-<key end>__<start LSN>-<end LSN>
//! ```
//!
//! For example:
//!
//! ```text
//! 000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
//! ```
//!
//! Every delta file consists of three parts: "summary", "index", and
//! "values". The summary is a fixed size header at the beginning of the file,
@@ -51,7 +47,6 @@ use std::io::{Seek, SeekFrom};
use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tracing::*;
use utils::{
@@ -415,10 +410,6 @@ impl AsLayerDesc for DeltaLayer {
}
impl PersistentLayer for DeltaLayer {
fn downcast_delta_layer(self: Arc<Self>) -> Option<std::sync::Arc<DeltaLayer>> {
Some(self)
}
fn local_path(&self) -> Option<PathBuf> {
Some(self.path())
}
@@ -809,7 +800,7 @@ impl DeltaLayerWriterInner {
///
/// # Note
///
/// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
/// As described in https://github.com/neondatabase/neon/issues/2650, it's
/// possible for the writer to drop before `finish` is actually called. So this
/// could lead to odd temporary files in the directory, exhausting file system.
/// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`

View File

@@ -57,9 +57,8 @@ impl Ord for DeltaFileName {
/// Represents the filename of a DeltaLayer
///
/// ```text
/// <key start>-<key end>__<LSN start>-<LSN end>
/// ```
///
impl DeltaFileName {
///
/// Parse a string as a delta file name. Returns None if the filename does not
@@ -163,9 +162,7 @@ impl ImageFileName {
///
/// Represents the filename of an ImageLayer
///
/// ```text
/// <key start>-<key end>__<LSN>
/// ```
impl ImageFileName {
///
/// Parse a string as an image file name. Returns None if the filename does not

View File

@@ -7,15 +7,11 @@
//! timelines/<timeline_id> directory. Currently, there are no
//! subdirectories, and each image layer file is named like this:
//!
//! ```text
//! <key start>-<key end>__<LSN>
//! ```
//!
//! For example:
//!
//! ```text
//! 000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
//! ```
//!
//! Every image layer file consists of three parts: "summary",
//! "index", and "values". The summary is a fixed size header at the
@@ -664,7 +660,7 @@ impl ImageLayerWriterInner {
///
/// # Note
///
/// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
/// As described in https://github.com/neondatabase/neon/issues/2650, it's
/// possible for the writer to drop before `finish` is actually called. So this
/// could lead to odd temporary files in the directory, exhausting file system.
/// This structure wraps `ImageLayerWriterInner` and also contains `Drop`

View File

@@ -25,7 +25,7 @@ use super::{
};
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
/// [`DeltaLayer`](super::DeltaLayer).
/// [`crate::storage_layer::DeltaLayer`].
///
/// RemoteLayer might be downloaded on-demand during operations which are
/// allowed download remote layers and during which, it gets replaced with a
@@ -50,8 +50,6 @@ pub struct RemoteLayer {
/// It is very unlikely to accumulate these in the Timeline's LayerMap, but having this avoids
/// a possible fast loop between `Timeline::get_reconstruct_data` and
/// `Timeline::download_remote_layer`, which also logs.
///
/// [`ongoing_download`]: Self::ongoing_download
pub(crate) download_replacement_failure: std::sync::atomic::AtomicBool,
}

View File

@@ -122,12 +122,12 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
warn_when_period_overrun(started_at.elapsed(), period, "compaction");
// Sleep
if tokio::time::timeout(sleep_duration, cancel.cancelled())
.await
.is_ok()
{
info!("received cancellation request during idling");
break;
tokio::select! {
_ = cancel.cancelled() => {
info!("received cancellation request during idling");
break;
},
_ = tokio::time::sleep(sleep_duration) => {},
}
}
}
@@ -196,12 +196,12 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
warn_when_period_overrun(started_at.elapsed(), period, "gc");
// Sleep
if tokio::time::timeout(sleep_duration, cancel.cancelled())
.await
.is_ok()
{
info!("received cancellation request during idling");
break;
tokio::select! {
_ = cancel.cancelled() => {
info!("received cancellation request during idling");
break;
},
_ = tokio::time::sleep(sleep_duration) => {},
}
}
}
@@ -263,9 +263,9 @@ pub(crate) async fn random_init_delay(
rng.gen_range(Duration::ZERO..=period)
};
match tokio::time::timeout(d, cancel.cancelled()).await {
Ok(_) => Err(Cancelled),
Err(_) => Ok(()),
tokio::select! {
_ = cancel.cancelled() => Err(Cancelled),
_ = tokio::time::sleep(d) => Ok(()),
}
}

View File

@@ -24,7 +24,7 @@ use tracing::*;
use utils::id::TenantTimelineId;
use std::cmp::{max, min, Ordering};
use std::collections::{BinaryHeap, HashMap, HashSet};
use std::collections::{BinaryHeap, HashMap};
use std::fs;
use std::ops::{Deref, Range};
use std::path::{Path, PathBuf};
@@ -183,7 +183,7 @@ pub struct Timeline {
walredo_mgr: Arc<dyn WalRedoManager + Sync + Send>,
/// Remote storage client.
/// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
/// See [`storage_sync`] module comment for details.
pub remote_client: Option<Arc<RemoteTimelineClient>>,
// What page versions do we hold in the repository? If we get a
@@ -240,8 +240,6 @@ pub struct Timeline {
/// This lock is acquired in [`Timeline::gc`], [`Timeline::compact`],
/// and [`Tenant::delete_timeline`]. This is an `Arc<Mutex>` lock because we need an owned
/// lock guard in functions that will be spawned to tokio I/O pool (which requires `'static`).
///
/// [`Tenant::delete_timeline`]: super::Tenant::delete_timeline
pub(super) layer_removal_cs: Arc<tokio::sync::Mutex<()>>,
// Needed to ensure that we can't create a branch at a point that was already garbage collected
@@ -981,12 +979,8 @@ impl Timeline {
#[instrument(skip_all, fields(tenant_id = %self.tenant_id, timeline_id = %self.timeline_id))]
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(layer) = self.find_layer(layer_file_name).await else {
return Ok(None);
};
let Some(remote_layer) = layer.downcast_remote_layer() else {
return Ok(Some(false));
};
let Some(layer) = self.find_layer(layer_file_name).await else { return Ok(None) };
let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) };
if self.remote_client.is_none() {
return Ok(Some(false));
}
@@ -995,12 +989,10 @@ impl Timeline {
Ok(Some(true))
}
/// Like [`evict_layer_batch`](Self::evict_layer_batch), but for just one layer.
/// Like [`evict_layer_batch`], but for just one layer.
/// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(local_layer) = self.find_layer(layer_file_name).await else {
return Ok(None);
};
let Some(local_layer) = self.find_layer(layer_file_name).await else { return Ok(None) };
let remote_client = self
.remote_client
.as_ref()
@@ -1011,25 +1003,25 @@ impl Timeline {
.evict_layer_batch(remote_client, &[local_layer], cancel)
.await?;
assert_eq!(results.len(), 1);
let result: Option<Result<(), EvictionError>> = results.into_iter().next().unwrap();
let result: Option<anyhow::Result<bool>> = results.into_iter().next().unwrap();
match result {
None => anyhow::bail!("task_mgr shutdown requested"),
Some(Ok(())) => Ok(Some(true)),
Some(Err(e)) => Err(anyhow::Error::new(e)),
Some(Ok(b)) => Ok(Some(b)),
Some(Err(e)) => Err(e),
}
}
/// Evict a batch of layers.
///
/// GenericRemoteStorage reference is required as a (witness)[witness_article] for "remote storage is configured."
/// GenericRemoteStorage reference is required as a witness[^witness_article] for "remote storage is configured."
///
/// [witness_article]: https://willcrichton.net/rust-api-type-patterns/witnesses.html
pub(crate) async fn evict_layers(
/// [^witness_article]: https://willcrichton.net/rust-api-type-patterns/witnesses.html
pub async fn evict_layers(
&self,
_: &GenericRemoteStorage,
layers_to_evict: &[Arc<dyn PersistentLayer>],
cancel: CancellationToken,
) -> anyhow::Result<Vec<Option<Result<(), EvictionError>>>> {
) -> anyhow::Result<Vec<Option<anyhow::Result<bool>>>> {
let remote_client = self.remote_client.clone().expect(
"GenericRemoteStorage is configured, so timeline must have RemoteTimelineClient",
);
@@ -1064,7 +1056,7 @@ impl Timeline {
remote_client: &Arc<RemoteTimelineClient>,
layers_to_evict: &[Arc<dyn PersistentLayer>],
cancel: CancellationToken,
) -> anyhow::Result<Vec<Option<Result<(), EvictionError>>>> {
) -> anyhow::Result<Vec<Option<anyhow::Result<bool>>>> {
// ensure that the layers have finished uploading
// (don't hold the layer_removal_cs while we do it, we're not removing anything yet)
remote_client
@@ -1110,9 +1102,11 @@ impl Timeline {
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
local_layer: &Arc<dyn PersistentLayer>,
layer_mgr: &mut LayerManager,
) -> Result<(), EvictionError> {
) -> anyhow::Result<bool> {
if local_layer.is_remote_layer() {
return Err(EvictionError::CannotEvictRemoteLayer);
// TODO(issue #3851): consider returning an err here instead of false,
// which is the same out the match later
return Ok(false);
}
let layer_file_size = local_layer.file_size();
@@ -1121,22 +1115,13 @@ impl Timeline {
.local_path()
.expect("local layer should have a local path")
.metadata()
// when the eviction fails because we have already deleted the layer in compaction for
// example, a NotFound error bubbles up from here.
.map_err(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
EvictionError::FileNotFound
} else {
EvictionError::StatFailed(e)
}
})?
.context("get local layer file stat")?
.modified()
.map_err(EvictionError::StatFailed)?;
.context("get mtime of layer file")?;
let local_layer_residence_duration =
match SystemTime::now().duration_since(local_layer_mtime) {
Err(e) => {
warn!(layer = %local_layer, "layer mtime is in the future: {}", e);
warn!("layer mtime is in the future: {}", e);
None
}
Ok(delta) => Some(delta),
@@ -1167,65 +1152,54 @@ impl Timeline {
assert_eq!(local_layer.layer_desc(), new_remote_layer.layer_desc());
layer_mgr
.replace_and_verify(local_layer.clone(), new_remote_layer)
.map_err(EvictionError::LayerNotFound)?;
let succeed = match layer_mgr.replace_and_verify(local_layer.clone(), new_remote_layer) {
Ok(()) => {
if let Err(e) = local_layer.delete_resident_layer_file() {
error!("failed to remove layer file on evict after replacement: {e:#?}");
}
// Always decrement the physical size gauge, even if we failed to delete the file.
// Rationale: we already replaced the layer with a remote layer in the layer map,
// and any subsequent download_remote_layer will
// 1. overwrite the file on disk and
// 2. add the downloaded size to the resident size gauge.
//
// If there is no re-download, and we restart the pageserver, then load_layer_map
// will treat the file as a local layer again, count it towards resident size,
// and it'll be like the layer removal never happened.
// The bump in resident size is perhaps unexpected but overall a robust behavior.
self.metrics
.resident_physical_size_gauge
.sub(layer_file_size);
if let Err(e) = local_layer.delete_resident_layer_file() {
// this should never happen, because of layer_removal_cs usage and above stat
// access for mtime
error!("failed to remove layer file on evict after replacement: {e:#?}");
}
// Always decrement the physical size gauge, even if we failed to delete the file.
// Rationale: we already replaced the layer with a remote layer in the layer map,
// and any subsequent download_remote_layer will
// 1. overwrite the file on disk and
// 2. add the downloaded size to the resident size gauge.
//
// If there is no re-download, and we restart the pageserver, then load_layer_map
// will treat the file as a local layer again, count it towards resident size,
// and it'll be like the layer removal never happened.
// The bump in resident size is perhaps unexpected but overall a robust behavior.
self.metrics
.resident_physical_size_gauge
.sub(layer_file_size);
self.metrics.evictions.inc();
self.metrics.evictions.inc();
if let Some(delta) = local_layer_residence_duration {
self.metrics
.evictions_with_low_residence_duration
.read()
.unwrap()
.observe(delta);
info!(layer=%local_layer, residence_millis=delta.as_millis(), "evicted layer after known residence period");
} else {
info!(layer=%local_layer, "evicted layer after unknown residence period");
}
if let Some(delta) = local_layer_residence_duration {
self.metrics
.evictions_with_low_residence_duration
.read()
.unwrap()
.observe(delta);
info!(layer=%local_layer, residence_millis=delta.as_millis(), "evicted layer after known residence period");
} else {
info!(layer=%local_layer, "evicted layer after unknown residence period");
}
true
}
Err(err) => {
if cfg!(debug_assertions) {
panic!("failed to replace: {err}, evicted: {local_layer:?}");
} else {
error!(evicted=?local_layer, "failed to replace: {err}");
}
false
}
};
Ok(())
Ok(succeed)
}
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum EvictionError {
#[error("cannot evict a remote layer")]
CannotEvictRemoteLayer,
/// Most likely the to-be evicted layer has been deleted by compaction or gc which use the same
/// locks, so they got to execute before the eviction.
#[error("file backing the layer has been removed already")]
FileNotFound,
#[error("stat failed")]
StatFailed(#[source] std::io::Error),
/// In practice, this can be a number of things, but lets assume it means only this.
///
/// This case includes situations such as the Layer was evicted and redownloaded in between,
/// because the file existed before an replacement attempt was made but now the Layers are
/// different objects in memory.
#[error("layer was no longer part of LayerMap")]
LayerNotFound(#[source] anyhow::Error),
}
/// Number of times we will compute partition within a checkpoint distance.
const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
@@ -1795,7 +1769,7 @@ impl Timeline {
/// 3. Schedule upload of local-only layer files (which will then also update the remote
/// IndexPart to include the new layer files).
///
/// Refer to the [`remote_timeline_client`] module comment for more context.
/// Refer to the `storage_sync` module comment for more context.
///
/// # TODO
/// May be a bit cleaner to do things based on populated remote client,
@@ -2628,9 +2602,7 @@ impl Timeline {
guard.layer_map().frozen_layers.front().cloned()
// drop 'layers' lock to allow concurrent reads and writes
};
let Some(layer_to_flush) = layer_to_flush else {
break Ok(());
};
let Some(layer_to_flush) = layer_to_flush else { break Ok(()) };
if let Err(err) = self.flush_frozen_layer(layer_to_flush, ctx).await {
error!("could not flush frozen layer: {err:?}");
break Err(err);
@@ -2742,12 +2714,6 @@ impl Timeline {
HashMap::from([(delta_path, metadata)])
};
// FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
// a compaction can delete the file and then it won't be available for uploads any more.
// We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this
// race situation.
// See https://github.com/neondatabase/neon/issues/4526
pausable_failpoint!("flush-frozen-before-sync");
// The new on-disk layers are now in the layer map. We can remove the
@@ -3158,7 +3124,7 @@ impl Timeline {
#[derive(Default)]
struct CompactLevel0Phase1Result {
new_layers: Vec<Arc<DeltaLayer>>,
new_layers: Vec<DeltaLayer>,
deltas_to_compact: Vec<Arc<PersistentLayerDesc>>,
}
@@ -3307,8 +3273,6 @@ impl Timeline {
/// This method takes the `_layer_removal_cs` guard to highlight it required downloads are
/// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the
/// start of level0 files compaction, the on-demand download should be revisited as well.
///
/// [`compact_inner`]: Self::compact_inner
fn compact_level0_phase1(
self: Arc<Self>,
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
@@ -3336,37 +3300,6 @@ impl Timeline {
return Ok(CompactLevel0Phase1Result::default());
}
// This failpoint is used together with `test_duplicate_layers` integration test.
// It returns the compaction result exactly the same layers as input to compaction.
// We want to ensure that this will not cause any problem when updating the layer map
// after the compaction is finished.
//
// Currently, there are two rare edge cases that will cause duplicated layers being
// inserted.
// 1. The compaction job is inturrupted / did not finish successfully. Assume we have file 1, 2, 3, 4, which
// is compacted to 5, but the page server is shut down, next time we start page server we will get a layer
// map containing 1, 2, 3, 4, and 5, whereas 5 has the same content as 4. If we trigger L0 compation at this
// point again, it is likely that we will get a file 6 which has the same content and the key range as 5,
// and this causes an overwrite. This is acceptable because the content is the same, and we should do a
// layer replace instead of the normal remove / upload process.
// 2. The input workload pattern creates exactly n files that are sorted, non-overlapping and is of target file
// size length. Compaction will likely create the same set of n files afterwards.
//
// This failpoint is a superset of both of the cases.
fail_point!("compact-level0-phase1-return-same", |_| {
println!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
Ok(CompactLevel0Phase1Result {
new_layers: level0_deltas
.iter()
.map(|x| x.clone().downcast_delta_layer().unwrap())
.collect(),
deltas_to_compact: level0_deltas
.iter()
.map(|x| x.layer_desc().clone().into())
.collect(),
})
});
// Gather the files to compact in this iteration.
//
// Start with the oldest Level 0 delta file, and collect any other
@@ -3625,9 +3558,7 @@ impl Timeline {
|| contains_hole
{
// ... if so, flush previous layer and prepare to write new one
new_layers.push(Arc::new(
writer.take().unwrap().finish(prev_key.unwrap().next())?,
));
new_layers.push(writer.take().unwrap().finish(prev_key.unwrap().next())?);
writer = None;
if contains_hole {
@@ -3665,7 +3596,7 @@ impl Timeline {
prev_key = Some(key);
}
if let Some(writer) = writer {
new_layers.push(Arc::new(writer.finish(prev_key.unwrap().next())?));
new_layers.push(writer.finish(prev_key.unwrap().next())?);
}
// Sync layers
@@ -3761,7 +3692,7 @@ impl Timeline {
}
// Before deleting any layers, we need to wait for their upload ops to finish.
// See remote_timeline_client module level comment on consistency.
// See storage_sync module level comment on consistency.
// Do it here because we don't want to hold self.layers.write() while waiting.
if let Some(remote_client) = &self.remote_client {
debug!("waiting for upload ops to complete");
@@ -3774,11 +3705,6 @@ impl Timeline {
let mut guard = self.layers.write().await;
let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
// In some rare cases, we may generate a file with exactly the same key range / LSN as before the compaction.
// We should move to numbering the layer files instead of naming them using key range / LSN some day. But for
// now, we just skip the file to avoid unintentional modification to files on the disk and in the layer map.
let mut duplicated_layers = HashSet::new();
let mut insert_layers = Vec::new();
let mut remove_layers = Vec::new();
@@ -3805,33 +3731,21 @@ impl Timeline {
.add(metadata.len());
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
l.access_stats().record_residence_event(
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
x.access_stats().record_residence_event(
&guard,
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
let l = l as Arc<dyn PersistentLayer>;
if guard.contains(&l) {
duplicated_layers.insert(l.layer_desc().key());
} else {
if LayerMap::is_l0(l.layer_desc()) {
return Err(CompactionError::Other(anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction.")));
}
insert_layers.push(l);
}
insert_layers.push(x);
}
// Now that we have reshuffled the data to set of new delta layers, we can
// delete the old ones
let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len());
for ldesc in deltas_to_compact {
if duplicated_layers.contains(&ldesc.key()) {
// skip duplicated layers, they will not be removed; we have already overwritten them
// with new layers in the compaction phase 1.
continue;
}
layer_names_to_delete.push(ldesc.filename());
remove_layers.push(guard.get_from_desc(&ldesc));
for l in deltas_to_compact {
layer_names_to_delete.push(l.filename());
remove_layers.push(guard.get_from_desc(&l));
}
guard.finish_compact_l0(
@@ -4590,7 +4504,6 @@ impl LocalLayerInfoForDiskUsageEviction {
}
impl Timeline {
/// Returns non-remote layers for eviction.
pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
let guard = self.layers.read().await;
let layers = guard.layer_map();
@@ -4760,179 +4673,3 @@ pub fn compare_arced_layers<L: ?Sized>(left: &Arc<L>, right: &Arc<L>) -> bool {
left == right
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use utils::{id::TimelineId, lsn::Lsn};
use crate::tenant::{harness::TenantHarness, storage_layer::PersistentLayer};
use super::{EvictionError, Timeline};
#[tokio::test]
async fn two_layer_eviction_attempts_at_the_same_time() {
let harness =
TenantHarness::create("two_layer_eviction_attempts_at_the_same_time").unwrap();
let remote_storage = {
// this is never used for anything, because of how the create_test_timeline works, but
// it is with us in spirit and a Some.
use remote_storage::{GenericRemoteStorage, RemoteStorageConfig, RemoteStorageKind};
let path = harness.conf.workdir.join("localfs");
std::fs::create_dir_all(&path).unwrap();
let config = RemoteStorageConfig {
max_concurrent_syncs: std::num::NonZeroUsize::new(2_000_000).unwrap(),
max_sync_errors: std::num::NonZeroU32::new(3_000_000).unwrap(),
storage: RemoteStorageKind::LocalFs(path),
};
GenericRemoteStorage::from_config(&config).unwrap()
};
let ctx = any_context();
let tenant = harness.try_load(&ctx, Some(remote_storage)).await.unwrap();
let timeline = tenant
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
.await
.unwrap();
let rc = timeline
.remote_client
.clone()
.expect("just configured this");
let layer = find_some_layer(&timeline).await;
let cancel = tokio_util::sync::CancellationToken::new();
let batch = [layer];
let first = {
let cancel = cancel.clone();
async {
timeline
.evict_layer_batch(&rc, &batch, cancel)
.await
.unwrap()
}
};
let second = async {
timeline
.evict_layer_batch(&rc, &batch, cancel)
.await
.unwrap()
};
let (first, second) = tokio::join!(first, second);
let (first, second) = (only_one(first), only_one(second));
match (first, second) {
(Ok(()), Err(EvictionError::FileNotFound))
| (Err(EvictionError::FileNotFound), Ok(())) => {
// one of the evictions gets to do it,
// other one gets FileNotFound. all is good.
}
other => unreachable!("unexpected {:?}", other),
}
}
#[tokio::test]
async fn layer_eviction_aba_fails() {
let harness = TenantHarness::create("layer_eviction_aba_fails").unwrap();
let remote_storage = {
// this is never used for anything, because of how the create_test_timeline works, but
// it is with us in spirit and a Some.
use remote_storage::{GenericRemoteStorage, RemoteStorageConfig, RemoteStorageKind};
let path = harness.conf.workdir.join("localfs");
std::fs::create_dir_all(&path).unwrap();
let config = RemoteStorageConfig {
max_concurrent_syncs: std::num::NonZeroUsize::new(2_000_000).unwrap(),
max_sync_errors: std::num::NonZeroU32::new(3_000_000).unwrap(),
storage: RemoteStorageKind::LocalFs(path),
};
GenericRemoteStorage::from_config(&config).unwrap()
};
let ctx = any_context();
let tenant = harness.try_load(&ctx, Some(remote_storage)).await.unwrap();
let timeline = tenant
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
.await
.unwrap();
let _e = tracing::info_span!("foobar", tenant_id = %tenant.tenant_id, timeline_id = %timeline.timeline_id).entered();
let rc = timeline.remote_client.clone().unwrap();
// TenantHarness allows uploads to happen given GenericRemoteStorage is configured
let layer = find_some_layer(&timeline).await;
let cancel = tokio_util::sync::CancellationToken::new();
let batch = [layer];
let first = {
let cancel = cancel.clone();
async {
timeline
.evict_layer_batch(&rc, &batch, cancel)
.await
.unwrap()
}
};
// lets imagine this is stuck somehow, still referencing the original `Arc<dyn PersistentLayer>`
let second = {
let cancel = cancel.clone();
async {
timeline
.evict_layer_batch(&rc, &batch, cancel)
.await
.unwrap()
}
};
// while it's stuck, we evict and end up redownloading it
only_one(first.await).expect("eviction succeeded");
let layer = find_some_layer(&timeline).await;
let layer = layer.downcast_remote_layer().unwrap();
timeline.download_remote_layer(layer).await.unwrap();
let res = only_one(second.await);
assert!(
matches!(res, Err(EvictionError::LayerNotFound(_))),
"{res:?}"
);
// no more specific asserting, outside of preconds this is the only valid replacement
// failure
}
fn any_context() -> crate::context::RequestContext {
use crate::context::*;
use crate::task_mgr::*;
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
}
fn only_one<T>(mut input: Vec<Option<T>>) -> T {
assert_eq!(1, input.len());
input
.pop()
.expect("length just checked")
.expect("no cancellation")
}
async fn find_some_layer(timeline: &Timeline) -> Arc<dyn PersistentLayer> {
let layers = timeline.layers.read().await;
let desc = layers
.layer_map()
.iter_historic_layers()
.next()
.expect("must find one layer to evict");
layers.get_from_desc(&desc)
}
}

View File

@@ -30,7 +30,6 @@ use crate::{
tenant::{
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
storage_layer::PersistentLayer,
timeline::EvictionError,
LogicalSizeCalculationCause, Tenant,
},
};
@@ -101,11 +100,11 @@ impl Timeline {
match cf {
ControlFlow::Break(()) => break,
ControlFlow::Continue(sleep_until) => {
if tokio::time::timeout_at(sleep_until, cancel.cancelled())
.await
.is_ok()
{
break;
tokio::select! {
_ = cancel.cancelled() => {
break;
}
_ = tokio::time::sleep_until(sleep_until) => { }
}
}
}
@@ -271,22 +270,20 @@ impl Timeline {
None => {
stats.skipped_for_shutdown += 1;
}
Some(Ok(())) => {
Some(Ok(true)) => {
debug!("evicted layer {l:?}");
stats.evicted += 1;
}
Some(Err(EvictionError::CannotEvictRemoteLayer)) => {
Some(Ok(false)) => {
debug!("layer is not evictable: {l:?}");
stats.not_evictable += 1;
}
Some(Err(EvictionError::FileNotFound)) => {
// compaction/gc removed the file while we were waiting on layer_removal_cs
stats.not_evictable += 1;
}
Some(Err(
e @ EvictionError::LayerNotFound(_) | e @ EvictionError::StatFailed(_),
)) => {
let e = utils::error::report_compact_sources(&e);
warn!(layer = %l, "failed to evict layer: {e}");
stats.not_evictable += 1;
Some(Err(e)) => {
// This variant is the case where an unexpected error happened during eviction.
// Expected errors that result in non-eviction are `Some(Ok(false))`.
// So, dump Debug here to gather as much info as possible in this rare case.
warn!("failed to evict layer {l:?}: {e:?}");
stats.errors += 1;
}
}
}

View File

@@ -295,10 +295,6 @@ impl LayerManager {
Ok(())
}
pub(crate) fn contains(&self, layer: &Arc<dyn PersistentLayer>) -> bool {
self.layer_fmgr.contains(layer)
}
}
pub struct LayerFileManager<T: AsLayerDesc + ?Sized = dyn PersistentLayer>(
@@ -323,10 +319,6 @@ impl<T: AsLayerDesc + ?Sized> LayerFileManager<T> {
}
}
pub(crate) fn contains(&self, layer: &Arc<T>) -> bool {
self.0.contains_key(&layer.layer_desc().key())
}
pub(crate) fn new() -> Self {
Self(HashMap::new())
}

View File

@@ -14,7 +14,10 @@ pub(crate) fn debug_assert_current_span_has_tenant_and_timeline_id() {
&*crate::tenant::span::TENANT_ID_EXTRACTOR,
&*TIMELINE_ID_EXTRACTOR,
];
if let Err(missing) = check_fields_present!(fields) {
panic!("missing extractors: {missing:?}")
if let Err(missing) = check_fields_present(fields) {
panic!(
"missing extractors: {:?}",
missing.into_iter().map(|e| e.name()).collect::<Vec<_>>()
)
}
}

View File

@@ -6,7 +6,7 @@
//! Current connection state is tracked too, to ensure it's not getting stale.
//!
//! After every connection or storage broker update fetched, the state gets updated correspondingly and rechecked for the new conneciton leader,
//! then a (re)connection happens, if necessary.
//! then a [re]connection happens, if necessary.
//! Only WAL streaming task expects to be finished, other loops (storage broker, connection management) never exit unless cancelled explicitly via the dedicated channel.
use std::{collections::HashMap, num::NonZeroU64, ops::ControlFlow, sync::Arc, time::Duration};

View File

@@ -1,4 +1,4 @@
comment = '** Deprecated ** Please use pg_embedding instead'
comment = 'hnsw index'
default_version = '0.1.0'
module_pathname = '$libdir/hnsw'
relocatable = true

View File

@@ -59,7 +59,8 @@ neon_download_extension_file_http(const char *filename, bool is_library)
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST");
curl_easy_setopt(curl, CURLOPT_URL, compute_ctl_url);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L /* seconds */);
// NOTE: 15L may be insufficient time for large extensions like postgis
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 15L /* seconds */);
if (curl)
{

View File

@@ -0,0 +1 @@

168
poetry.lock generated
View File

@@ -1,9 +1,10 @@
# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand.
# This file is automatically @generated by Poetry and should not be changed by hand.
[[package]]
name = "aiohttp"
version = "3.7.4"
description = "Async http client/server framework (asyncio)"
category = "main"
optional = false
python-versions = ">=3.6"
files = [
@@ -61,6 +62,7 @@ speedups = ["aiodns", "brotlipy", "cchardet"]
name = "aiopg"
version = "1.3.4"
description = "Postgres integration with asyncio."
category = "main"
optional = false
python-versions = ">=3.6"
files = [
@@ -79,6 +81,7 @@ sa = ["sqlalchemy[postgresql-psycopg2binary] (>=1.3,<1.5)"]
name = "allure-pytest"
version = "2.13.2"
description = "Allure pytest integration"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -94,6 +97,7 @@ pytest = ">=4.5.0"
name = "allure-python-commons"
version = "2.13.2"
description = "Common module for integrate allure with python-based frameworks"
category = "main"
optional = false
python-versions = ">=3.6"
files = [
@@ -109,6 +113,7 @@ pluggy = ">=0.4.0"
name = "async-timeout"
version = "3.0.1"
description = "Timeout context manager for asyncio programs"
category = "main"
optional = false
python-versions = ">=3.5.3"
files = [
@@ -120,6 +125,7 @@ files = [
name = "asyncpg"
version = "0.27.0"
description = "An asyncio PostgreSQL driver"
category = "main"
optional = false
python-versions = ">=3.7.0"
files = [
@@ -170,6 +176,7 @@ test = ["flake8 (>=5.0.4,<5.1.0)", "uvloop (>=0.15.3)"]
name = "attrs"
version = "21.4.0"
description = "Classes Without Boilerplate"
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
files = [
@@ -187,6 +194,7 @@ tests-no-zope = ["cloudpickle", "coverage[toml] (>=5.0.2)", "hypothesis", "mypy"
name = "aws-sam-translator"
version = "1.48.0"
description = "AWS SAM Translator is a library that transform SAM templates into AWS CloudFormation templates"
category = "main"
optional = false
python-versions = ">=3.7, <=4.0, !=4.0"
files = [
@@ -196,7 +204,7 @@ files = [
]
[package.dependencies]
boto3 = ">=1.19.5,<2.dev0"
boto3 = ">=1.19.5,<2.0.0"
jsonschema = ">=3.2,<4.0"
[package.extras]
@@ -206,6 +214,7 @@ dev = ["black (==20.8b1)", "boto3 (>=1.23,<2)", "click (>=7.1,<8.0)", "coverage
name = "aws-xray-sdk"
version = "2.10.0"
description = "The AWS X-Ray SDK for Python (the SDK) enables Python developers to record and emit information from within their applications to the AWS X-Ray service."
category = "main"
optional = false
python-versions = "*"
files = [
@@ -221,6 +230,7 @@ wrapt = "*"
name = "backoff"
version = "2.2.1"
description = "Function decoration for backoff and retry"
category = "main"
optional = false
python-versions = ">=3.7,<4.0"
files = [
@@ -232,6 +242,7 @@ files = [
name = "black"
version = "23.3.0"
description = "The uncompromising code formatter."
category = "dev"
optional = false
python-versions = ">=3.7"
files = [
@@ -281,6 +292,7 @@ uvloop = ["uvloop (>=0.15.2)"]
name = "boto3"
version = "1.26.16"
description = "The AWS SDK for Python"
category = "main"
optional = false
python-versions = ">= 3.7"
files = [
@@ -300,6 +312,7 @@ crt = ["botocore[crt] (>=1.21.0,<2.0a0)"]
name = "boto3-stubs"
version = "1.26.16"
description = "Type annotations for boto3 1.26.16 generated with mypy-boto3-builder 7.11.11"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -644,6 +657,7 @@ xray = ["mypy-boto3-xray (>=1.26.0,<1.27.0)"]
name = "botocore"
version = "1.29.16"
description = "Low-level, data-driven core of boto 3."
category = "main"
optional = false
python-versions = ">= 3.7"
files = [
@@ -663,6 +677,7 @@ crt = ["awscrt (==0.14.0)"]
name = "botocore-stubs"
version = "1.27.38"
description = "Type annotations for botocore 1.27.38 generated with mypy-boto3-builder 7.10.1"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -677,6 +692,7 @@ typing-extensions = ">=4.1.0"
name = "certifi"
version = "2022.12.7"
description = "Python package for providing Mozilla's CA Bundle."
category = "main"
optional = false
python-versions = ">=3.6"
files = [
@@ -688,6 +704,7 @@ files = [
name = "cffi"
version = "1.15.1"
description = "Foreign Function Interface for Python calling C code."
category = "main"
optional = false
python-versions = "*"
files = [
@@ -764,6 +781,7 @@ pycparser = "*"
name = "cfn-lint"
version = "0.61.3"
description = "Checks CloudFormation templates for practices and behaviour that could potentially be improved"
category = "main"
optional = false
python-versions = ">=3.6, <=4.0, !=4.0"
files = [
@@ -785,6 +803,7 @@ sarif-om = ">=1.0.4,<1.1.0"
name = "chardet"
version = "3.0.4"
description = "Universal encoding detector for Python 2 and 3"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -796,6 +815,7 @@ files = [
name = "charset-normalizer"
version = "2.1.0"
description = "The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet."
category = "main"
optional = false
python-versions = ">=3.6.0"
files = [
@@ -810,6 +830,7 @@ unicode-backport = ["unicodedata2"]
name = "click"
version = "8.1.3"
description = "Composable command line interface toolkit"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -824,6 +845,7 @@ colorama = {version = "*", markers = "platform_system == \"Windows\""}
name = "colorama"
version = "0.4.5"
description = "Cross-platform colored terminal text."
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
files = [
@@ -833,34 +855,31 @@ files = [
[[package]]
name = "cryptography"
version = "41.0.2"
version = "41.0.0"
description = "cryptography is a package which provides cryptographic recipes and primitives to Python developers."
category = "main"
optional = false
python-versions = ">=3.7"
files = [
{file = "cryptography-41.0.2-cp37-abi3-macosx_10_12_universal2.whl", hash = "sha256:01f1d9e537f9a15b037d5d9ee442b8c22e3ae11ce65ea1f3316a41c78756b711"},
{file = "cryptography-41.0.2-cp37-abi3-macosx_10_12_x86_64.whl", hash = "sha256:079347de771f9282fbfe0e0236c716686950c19dee1b76240ab09ce1624d76d7"},
{file = "cryptography-41.0.2-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:439c3cc4c0d42fa999b83ded80a9a1fb54d53c58d6e59234cfe97f241e6c781d"},
{file = "cryptography-41.0.2-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f14ad275364c8b4e525d018f6716537ae7b6d369c094805cae45300847e0894f"},
{file = "cryptography-41.0.2-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:84609ade00a6ec59a89729e87a503c6e36af98ddcd566d5f3be52e29ba993182"},
{file = "cryptography-41.0.2-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:49c3222bb8f8e800aead2e376cbef687bc9e3cb9b58b29a261210456a7783d83"},
{file = "cryptography-41.0.2-cp37-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:d73f419a56d74fef257955f51b18d046f3506270a5fd2ac5febbfa259d6c0fa5"},
{file = "cryptography-41.0.2-cp37-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:2a034bf7d9ca894720f2ec1d8b7b5832d7e363571828037f9e0c4f18c1b58a58"},
{file = "cryptography-41.0.2-cp37-abi3-win32.whl", hash = "sha256:d124682c7a23c9764e54ca9ab5b308b14b18eba02722b8659fb238546de83a76"},
{file = "cryptography-41.0.2-cp37-abi3-win_amd64.whl", hash = "sha256:9c3fe6534d59d071ee82081ca3d71eed3210f76ebd0361798c74abc2bcf347d4"},
{file = "cryptography-41.0.2-pp310-pypy310_pp73-macosx_10_12_x86_64.whl", hash = "sha256:a719399b99377b218dac6cf547b6ec54e6ef20207b6165126a280b0ce97e0d2a"},
{file = "cryptography-41.0.2-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:182be4171f9332b6741ee818ec27daff9fb00349f706629f5cbf417bd50e66fd"},
{file = "cryptography-41.0.2-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:7a9a3bced53b7f09da251685224d6a260c3cb291768f54954e28f03ef14e3766"},
{file = "cryptography-41.0.2-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:f0dc40e6f7aa37af01aba07277d3d64d5a03dc66d682097541ec4da03cc140ee"},
{file = "cryptography-41.0.2-pp38-pypy38_pp73-macosx_10_12_x86_64.whl", hash = "sha256:674b669d5daa64206c38e507808aae49904c988fa0a71c935e7006a3e1e83831"},
{file = "cryptography-41.0.2-pp38-pypy38_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:7af244b012711a26196450d34f483357e42aeddb04128885d95a69bd8b14b69b"},
{file = "cryptography-41.0.2-pp38-pypy38_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:9b6d717393dbae53d4e52684ef4f022444fc1cce3c48c38cb74fca29e1f08eaa"},
{file = "cryptography-41.0.2-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:192255f539d7a89f2102d07d7375b1e0a81f7478925b3bc2e0549ebf739dae0e"},
{file = "cryptography-41.0.2-pp39-pypy39_pp73-macosx_10_12_x86_64.whl", hash = "sha256:f772610fe364372de33d76edcd313636a25684edb94cee53fd790195f5989d14"},
{file = "cryptography-41.0.2-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:b332cba64d99a70c1e0836902720887fb4529ea49ea7f5462cf6640e095e11d2"},
{file = "cryptography-41.0.2-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:9a6673c1828db6270b76b22cc696f40cde9043eb90373da5c2f8f2158957f42f"},
{file = "cryptography-41.0.2-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:342f3767e25876751e14f8459ad85e77e660537ca0a066e10e75df9c9e9099f0"},
{file = "cryptography-41.0.2.tar.gz", hash = "sha256:7d230bf856164de164ecb615ccc14c7fc6de6906ddd5b491f3af90d3514c925c"},
{file = "cryptography-41.0.0-cp37-abi3-macosx_10_12_universal2.whl", hash = "sha256:3c5ef25d060c80d6d9f7f9892e1d41bb1c79b78ce74805b8cb4aa373cb7d5ec8"},
{file = "cryptography-41.0.0-cp37-abi3-macosx_10_12_x86_64.whl", hash = "sha256:8362565b3835ceacf4dc8f3b56471a2289cf51ac80946f9087e66dc283a810e0"},
{file = "cryptography-41.0.0-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3680248309d340fda9611498a5319b0193a8dbdb73586a1acf8109d06f25b92d"},
{file = "cryptography-41.0.0-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:84a165379cb9d411d58ed739e4af3396e544eac190805a54ba2e0322feb55c46"},
{file = "cryptography-41.0.0-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:4ab14d567f7bbe7f1cdff1c53d5324ed4d3fc8bd17c481b395db224fb405c237"},
{file = "cryptography-41.0.0-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:9f65e842cb02550fac96536edb1d17f24c0a338fd84eaf582be25926e993dde4"},
{file = "cryptography-41.0.0-cp37-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:b7f2f5c525a642cecad24ee8670443ba27ac1fab81bba4cc24c7b6b41f2d0c75"},
{file = "cryptography-41.0.0-cp37-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:7d92f0248d38faa411d17f4107fc0bce0c42cae0b0ba5415505df72d751bf62d"},
{file = "cryptography-41.0.0-cp37-abi3-win32.whl", hash = "sha256:34d405ea69a8b34566ba3dfb0521379b210ea5d560fafedf9f800a9a94a41928"},
{file = "cryptography-41.0.0-cp37-abi3-win_amd64.whl", hash = "sha256:344c6de9f8bda3c425b3a41b319522ba3208551b70c2ae00099c205f0d9fd3be"},
{file = "cryptography-41.0.0-pp38-pypy38_pp73-macosx_10_12_x86_64.whl", hash = "sha256:88ff107f211ea696455ea8d911389f6d2b276aabf3231bf72c8853d22db755c5"},
{file = "cryptography-41.0.0-pp38-pypy38_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:b846d59a8d5a9ba87e2c3d757ca019fa576793e8758174d3868aecb88d6fc8eb"},
{file = "cryptography-41.0.0-pp38-pypy38_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:f5d0bf9b252f30a31664b6f64432b4730bb7038339bd18b1fafe129cfc2be9be"},
{file = "cryptography-41.0.0-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:5c1f7293c31ebc72163a9a0df246f890d65f66b4a40d9ec80081969ba8c78cc9"},
{file = "cryptography-41.0.0-pp39-pypy39_pp73-macosx_10_12_x86_64.whl", hash = "sha256:bf8fc66012ca857d62f6a347007e166ed59c0bc150cefa49f28376ebe7d992a2"},
{file = "cryptography-41.0.0-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:a4fc68d1c5b951cfb72dfd54702afdbbf0fb7acdc9b7dc4301bbf2225a27714d"},
{file = "cryptography-41.0.0-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:14754bcdae909d66ff24b7b5f166d69340ccc6cb15731670435efd5719294895"},
{file = "cryptography-41.0.0-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:0ddaee209d1cf1f180f1efa338a68c4621154de0afaef92b89486f5f96047c55"},
{file = "cryptography-41.0.0.tar.gz", hash = "sha256:6b71f64beeea341c9b4f963b48ee3b62d62d57ba93eb120e1196b31dc1025e78"},
]
[package.dependencies]
@@ -880,6 +899,7 @@ test-randomorder = ["pytest-randomly"]
name = "docker"
version = "4.2.2"
description = "A Python library for the Docker Engine API."
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
files = [
@@ -901,6 +921,7 @@ tls = ["cryptography (>=1.3.4)", "idna (>=2.0.0)", "pyOpenSSL (>=17.5.0)"]
name = "ecdsa"
version = "0.18.0"
description = "ECDSA cryptographic signature library (pure python)"
category = "main"
optional = false
python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*"
files = [
@@ -919,6 +940,7 @@ gmpy2 = ["gmpy2"]
name = "exceptiongroup"
version = "1.1.1"
description = "Backport of PEP 654 (exception groups)"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -933,6 +955,7 @@ test = ["pytest (>=6)"]
name = "execnet"
version = "1.9.0"
description = "execnet: rapid multi-Python deployment"
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
files = [
@@ -947,6 +970,7 @@ testing = ["pre-commit"]
name = "flask"
version = "2.2.5"
description = "A simple framework for building complex web applications."
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -969,6 +993,7 @@ dotenv = ["python-dotenv"]
name = "flask-cors"
version = "3.0.10"
description = "A Flask extension adding a decorator for CORS support"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -984,6 +1009,7 @@ Six = "*"
name = "graphql-core"
version = "3.2.1"
description = "GraphQL implementation for Python, a port of GraphQL.js, the JavaScript reference implementation for GraphQL."
category = "main"
optional = false
python-versions = ">=3.6,<4"
files = [
@@ -995,6 +1021,7 @@ files = [
name = "idna"
version = "3.3"
description = "Internationalized Domain Names in Applications (IDNA)"
category = "main"
optional = false
python-versions = ">=3.5"
files = [
@@ -1006,6 +1033,7 @@ files = [
name = "importlib-metadata"
version = "4.12.0"
description = "Read metadata from Python packages"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -1025,6 +1053,7 @@ testing = ["flufl.flake8", "importlib-resources (>=1.3)", "packaging", "pyfakefs
name = "iniconfig"
version = "1.1.1"
description = "iniconfig: brain-dead simple config-ini parsing"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -1036,6 +1065,7 @@ files = [
name = "itsdangerous"
version = "2.1.2"
description = "Safely pass data to untrusted environments and back."
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -1047,6 +1077,7 @@ files = [
name = "jinja2"
version = "3.1.2"
description = "A very fast and expressive template engine."
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -1064,6 +1095,7 @@ i18n = ["Babel (>=2.7)"]
name = "jmespath"
version = "1.0.1"
description = "JSON Matching Expressions"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -1075,6 +1107,7 @@ files = [
name = "jschema-to-python"
version = "1.2.3"
description = "Generate source code for Python classes from a JSON schema."
category = "main"
optional = false
python-versions = ">= 2.7"
files = [
@@ -1091,6 +1124,7 @@ pbr = "*"
name = "jsondiff"
version = "2.0.0"
description = "Diff JSON and JSON-like structures in Python"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -1102,6 +1136,7 @@ files = [
name = "jsonpatch"
version = "1.32"
description = "Apply JSON-Patches (RFC 6902)"
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
files = [
@@ -1116,6 +1151,7 @@ jsonpointer = ">=1.9"
name = "jsonpickle"
version = "2.2.0"
description = "Python library for serializing any arbitrary object graph into JSON"
category = "main"
optional = false
python-versions = ">=2.7"
files = [
@@ -1132,6 +1168,7 @@ testing-libs = ["simplejson", "ujson", "yajl"]
name = "jsonpointer"
version = "2.3"
description = "Identify specific nodes in a JSON document (RFC 6901)"
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
files = [
@@ -1143,6 +1180,7 @@ files = [
name = "jsonschema"
version = "3.2.0"
description = "An implementation of JSON Schema validation for Python"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -1164,6 +1202,7 @@ format-nongpl = ["idna", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3986-va
name = "junit-xml"
version = "1.9"
description = "Creates JUnit XML test result documents that can be read by tools such as Jenkins"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -1178,6 +1217,7 @@ six = "*"
name = "markupsafe"
version = "2.1.1"
description = "Safely add untrusted strings to HTML/XML markup."
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -1227,6 +1267,7 @@ files = [
name = "moto"
version = "4.1.2"
description = ""
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -1287,6 +1328,7 @@ xray = ["aws-xray-sdk (>=0.93,!=0.96)", "setuptools"]
name = "multidict"
version = "6.0.4"
description = "multidict implementation"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -1370,6 +1412,7 @@ files = [
name = "mypy"
version = "1.3.0"
description = "Optional static typing for Python"
category = "dev"
optional = false
python-versions = ">=3.7"
files = [
@@ -1416,6 +1459,7 @@ reports = ["lxml"]
name = "mypy-boto3-s3"
version = "1.26.0.post1"
description = "Type annotations for boto3.S3 1.26.0 service generated with mypy-boto3-builder 7.11.10"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -1430,6 +1474,7 @@ typing-extensions = ">=4.1.0"
name = "mypy-extensions"
version = "1.0.0"
description = "Type system extensions for programs checked with the mypy type checker."
category = "dev"
optional = false
python-versions = ">=3.5"
files = [
@@ -1441,6 +1486,7 @@ files = [
name = "networkx"
version = "2.8.5"
description = "Python package for creating and manipulating graphs and networks"
category = "main"
optional = false
python-versions = ">=3.8"
files = [
@@ -1459,6 +1505,7 @@ test = ["codecov (>=2.1)", "pytest (>=7.1)", "pytest-cov (>=3.0)"]
name = "openapi-schema-validator"
version = "0.2.3"
description = "OpenAPI schema validation for Python"
category = "main"
optional = false
python-versions = ">=3.7.0,<4.0.0"
files = [
@@ -1478,6 +1525,7 @@ strict-rfc3339 = ["strict-rfc3339"]
name = "openapi-spec-validator"
version = "0.4.0"
description = "OpenAPI 2.0 (aka Swagger) and OpenAPI 3.0 spec validator"
category = "main"
optional = false
python-versions = ">=3.7.0,<4.0.0"
files = [
@@ -1498,6 +1546,7 @@ requests = ["requests"]
name = "packaging"
version = "23.0"
description = "Core utilities for Python packages"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -1509,6 +1558,7 @@ files = [
name = "pathspec"
version = "0.9.0"
description = "Utility library for gitignore style pattern matching of file paths."
category = "dev"
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,>=2.7"
files = [
@@ -1520,6 +1570,7 @@ files = [
name = "pbr"
version = "5.9.0"
description = "Python Build Reasonableness"
category = "main"
optional = false
python-versions = ">=2.6"
files = [
@@ -1531,6 +1582,7 @@ files = [
name = "platformdirs"
version = "2.5.2"
description = "A small Python module for determining appropriate platform-specific dirs, e.g. a \"user data dir\"."
category = "dev"
optional = false
python-versions = ">=3.7"
files = [
@@ -1546,6 +1598,7 @@ test = ["appdirs (==1.4.4)", "pytest (>=6)", "pytest-cov (>=2.7)", "pytest-mock
name = "pluggy"
version = "1.0.0"
description = "plugin and hook calling mechanisms for python"
category = "main"
optional = false
python-versions = ">=3.6"
files = [
@@ -1561,6 +1614,7 @@ testing = ["pytest", "pytest-benchmark"]
name = "prometheus-client"
version = "0.14.1"
description = "Python client for the Prometheus monitoring system."
category = "main"
optional = false
python-versions = ">=3.6"
files = [
@@ -1575,6 +1629,7 @@ twisted = ["twisted"]
name = "psutil"
version = "5.9.4"
description = "Cross-platform lib for process and system monitoring in Python."
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
files = [
@@ -1601,6 +1656,7 @@ test = ["enum34", "ipaddress", "mock", "pywin32", "wmi"]
name = "psycopg2-binary"
version = "2.9.6"
description = "psycopg2 - Python-PostgreSQL Database Adapter"
category = "main"
optional = false
python-versions = ">=3.6"
files = [
@@ -1672,6 +1728,7 @@ files = [
name = "pyasn1"
version = "0.4.8"
description = "ASN.1 types and codecs"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -1683,6 +1740,7 @@ files = [
name = "pycparser"
version = "2.21"
description = "C parser in Python"
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
files = [
@@ -1694,6 +1752,7 @@ files = [
name = "pyjwt"
version = "2.4.0"
description = "JSON Web Token implementation in Python"
category = "main"
optional = false
python-versions = ">=3.6"
files = [
@@ -1714,6 +1773,7 @@ tests = ["coverage[toml] (==5.0.4)", "pytest (>=6.0.0,<7.0.0)"]
name = "pyparsing"
version = "3.0.9"
description = "pyparsing module - Classes and methods to define and execute parsing grammars"
category = "main"
optional = false
python-versions = ">=3.6.8"
files = [
@@ -1728,6 +1788,7 @@ diagrams = ["jinja2", "railroad-diagrams"]
name = "pypiwin32"
version = "223"
description = ""
category = "main"
optional = false
python-versions = "*"
files = [
@@ -1742,6 +1803,7 @@ pywin32 = ">=223"
name = "pyrsistent"
version = "0.18.1"
description = "Persistent/Functional/Immutable data structures"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -1772,6 +1834,7 @@ files = [
name = "pytest"
version = "7.3.1"
description = "pytest: simple powerful testing with Python"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -1794,6 +1857,7 @@ testing = ["argcomplete", "attrs (>=19.2.0)", "hypothesis (>=3.56)", "mock", "no
name = "pytest-asyncio"
version = "0.21.0"
description = "Pytest support for asyncio"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -1812,6 +1876,7 @@ testing = ["coverage (>=6.2)", "flaky (>=3.5.0)", "hypothesis (>=5.7.1)", "mypy
name = "pytest-httpserver"
version = "1.0.8"
description = "pytest-httpserver is a httpserver for pytest"
category = "main"
optional = false
python-versions = ">=3.8,<4.0"
files = [
@@ -1826,6 +1891,7 @@ Werkzeug = ">=2.0.0"
name = "pytest-lazy-fixture"
version = "0.6.3"
description = "It helps to use fixtures in pytest.mark.parametrize"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -1840,6 +1906,7 @@ pytest = ">=3.2.5"
name = "pytest-order"
version = "1.1.0"
description = "pytest plugin to run your tests in a specific order"
category = "main"
optional = false
python-versions = ">=3.6"
files = [
@@ -1857,6 +1924,7 @@ pytest = [
name = "pytest-rerunfailures"
version = "11.1.2"
description = "pytest plugin to re-run tests to eliminate flaky failures"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -1868,24 +1936,11 @@ files = [
packaging = ">=17.1"
pytest = ">=5.3"
[[package]]
name = "pytest-split"
version = "0.8.1"
description = "Pytest plugin which splits the test suite to equally sized sub suites based on test execution time."
optional = false
python-versions = ">=3.7.1,<4.0"
files = [
{file = "pytest_split-0.8.1-py3-none-any.whl", hash = "sha256:74b110ea091bd147cc1c5f9665a59506e5cedfa66f96a89fb03e4ab447c2c168"},
{file = "pytest_split-0.8.1.tar.gz", hash = "sha256:2d88bd3dc528689a7a3f58fc12ea165c3aa62e90795e420dfad920afe5612d6d"},
]
[package.dependencies]
pytest = ">=5,<8"
[[package]]
name = "pytest-timeout"
version = "2.1.0"
description = "pytest plugin to abort hanging tests"
category = "main"
optional = false
python-versions = ">=3.6"
files = [
@@ -1900,6 +1955,7 @@ pytest = ">=5.0.0"
name = "pytest-xdist"
version = "3.3.1"
description = "pytest xdist plugin for distributed testing, most importantly across multiple CPUs"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -1920,6 +1976,7 @@ testing = ["filelock"]
name = "python-dateutil"
version = "2.8.2"
description = "Extensions to the standard Python datetime module"
category = "main"
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7"
files = [
@@ -1934,6 +1991,7 @@ six = ">=1.5"
name = "python-jose"
version = "3.3.0"
description = "JOSE implementation in Python"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -1956,6 +2014,7 @@ pycryptodome = ["pyasn1", "pycryptodome (>=3.3.1,<4.0.0)"]
name = "pywin32"
version = "301"
description = "Python for Window Extensions"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -1975,6 +2034,7 @@ files = [
name = "pyyaml"
version = "6.0"
description = "YAML parser and emitter for Python"
category = "main"
optional = false
python-versions = ">=3.6"
files = [
@@ -2024,6 +2084,7 @@ files = [
name = "requests"
version = "2.31.0"
description = "Python HTTP for Humans."
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -2045,6 +2106,7 @@ use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"]
name = "responses"
version = "0.21.0"
description = "A utility library for mocking out the `requests` Python library."
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -2063,6 +2125,7 @@ tests = ["coverage (>=6.0.0)", "flake8", "mypy", "pytest (>=7.0.0)", "pytest-asy
name = "rsa"
version = "4.9"
description = "Pure-Python RSA implementation"
category = "main"
optional = false
python-versions = ">=3.6,<4"
files = [
@@ -2077,6 +2140,7 @@ pyasn1 = ">=0.1.3"
name = "ruff"
version = "0.0.269"
description = "An extremely fast Python linter, written in Rust."
category = "dev"
optional = false
python-versions = ">=3.7"
files = [
@@ -2103,6 +2167,7 @@ files = [
name = "s3transfer"
version = "0.6.0"
description = "An Amazon S3 Transfer Manager"
category = "main"
optional = false
python-versions = ">= 3.7"
files = [
@@ -2120,6 +2185,7 @@ crt = ["botocore[crt] (>=1.20.29,<2.0a.0)"]
name = "sarif-om"
version = "1.0.4"
description = "Classes implementing the SARIF 2.1.0 object model."
category = "main"
optional = false
python-versions = ">= 2.7"
files = [
@@ -2135,6 +2201,7 @@ pbr = "*"
name = "setuptools"
version = "65.5.1"
description = "Easily download, build, install, upgrade, and uninstall Python packages"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -2151,6 +2218,7 @@ testing-integration = ["build[virtualenv]", "filelock (>=3.4.0)", "jaraco.envs (
name = "six"
version = "1.16.0"
description = "Python 2 and 3 compatibility utilities"
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*"
files = [
@@ -2162,6 +2230,7 @@ files = [
name = "sshpubkeys"
version = "3.3.1"
description = "SSH public key parser"
category = "main"
optional = false
python-versions = ">=3"
files = [
@@ -2180,6 +2249,7 @@ dev = ["twine", "wheel", "yapf"]
name = "toml"
version = "0.10.2"
description = "Python Library for Tom's Obvious, Minimal Language"
category = "main"
optional = false
python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*"
files = [
@@ -2191,6 +2261,7 @@ files = [
name = "tomli"
version = "2.0.1"
description = "A lil' TOML parser"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -2202,6 +2273,7 @@ files = [
name = "types-psutil"
version = "5.9.5.12"
description = "Typing stubs for psutil"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -2213,6 +2285,7 @@ files = [
name = "types-psycopg2"
version = "2.9.21.10"
description = "Typing stubs for psycopg2"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -2224,6 +2297,7 @@ files = [
name = "types-pytest-lazy-fixture"
version = "0.6.3.3"
description = "Typing stubs for pytest-lazy-fixture"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -2235,6 +2309,7 @@ files = [
name = "types-requests"
version = "2.31.0.0"
description = "Typing stubs for requests"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -2249,6 +2324,7 @@ types-urllib3 = "*"
name = "types-s3transfer"
version = "0.6.0.post3"
description = "Type annotations and code completion for s3transfer"
category = "main"
optional = false
python-versions = ">=3.7,<4.0"
files = [
@@ -2260,6 +2336,7 @@ files = [
name = "types-toml"
version = "0.10.8.6"
description = "Typing stubs for toml"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -2271,6 +2348,7 @@ files = [
name = "types-urllib3"
version = "1.26.17"
description = "Typing stubs for urllib3"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -2282,6 +2360,7 @@ files = [
name = "typing-extensions"
version = "4.6.1"
description = "Backported and Experimental Type Hints for Python 3.7+"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -2293,6 +2372,7 @@ files = [
name = "urllib3"
version = "1.26.11"
description = "HTTP library with thread-safe connection pooling, file post, and more."
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*, <4"
files = [
@@ -2309,6 +2389,7 @@ socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"]
name = "websocket-client"
version = "1.3.3"
description = "WebSocket client for Python with low level API options"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -2325,6 +2406,7 @@ test = ["websockets"]
name = "werkzeug"
version = "2.2.3"
description = "The comprehensive WSGI web application library."
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -2342,6 +2424,7 @@ watchdog = ["watchdog"]
name = "wrapt"
version = "1.14.1"
description = "Module for decorators, wrappers and monkey patching."
category = "main"
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,>=2.7"
files = [
@@ -2415,6 +2498,7 @@ files = [
name = "xmltodict"
version = "0.13.0"
description = "Makes working with XML feel like you are working with JSON"
category = "main"
optional = false
python-versions = ">=3.4"
files = [
@@ -2426,6 +2510,7 @@ files = [
name = "yarl"
version = "1.8.2"
description = "Yet another URL library"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -2513,6 +2598,7 @@ multidict = ">=4.0"
name = "zipp"
version = "3.8.1"
description = "Backport of pathlib-compatible object wrapper for zip files"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -2527,4 +2613,4 @@ testing = ["func-timeout", "jaraco.itertools", "pytest (>=6)", "pytest-black (>=
[metadata]
lock-version = "2.0"
python-versions = "^3.9"
content-hash = "e16a65d8fdff4e2173610e552e0e7306e301de2c640ae6082ef6cc5755f566d2"
content-hash = "c6c217033f50430c31b0979b74db222e6bab2301abd8b9f0cce5a9d5bccc578f"

View File

@@ -5,6 +5,7 @@ use proxy::http;
use proxy::metrics;
use anyhow::bail;
use clap::{self, Arg};
use proxy::config::{self, ProxyConfig};
use std::pin::pin;
use std::{borrow::Cow, net::SocketAddr};
@@ -17,70 +18,6 @@ use utils::{project_git_version, sentry_init::init_sentry};
project_git_version!(GIT_VERSION);
use clap::{Parser, ValueEnum};
#[derive(Clone, Debug, ValueEnum)]
enum AuthBackend {
Console,
Postgres,
Link,
}
/// Neon proxy/router
#[derive(Parser)]
#[command(version = GIT_VERSION, about)]
struct ProxyCliArgs {
/// listen for incoming client connections on ip:port
#[clap(short, long, default_value = "127.0.0.1:4432")]
proxy: String,
#[clap(value_enum, long, default_value_t = AuthBackend::Link)]
auth_backend: AuthBackend,
/// listen for management callback connection on ip:port
#[clap(short, long, default_value = "127.0.0.1:7000")]
mgmt: String,
/// listen for incoming http connections (metrics, etc) on ip:port
#[clap(long, default_value = "127.0.0.1:7001")]
http: String,
/// listen for incoming wss connections on ip:port
#[clap(long)]
wss: Option<String>,
/// redirect unauthenticated users to the given uri in case of link auth
#[clap(short, long, default_value = "http://localhost:3000/psql_session/")]
uri: String,
/// cloud API endpoint for authenticating users
#[clap(
short,
long,
default_value = "http://localhost:3000/authenticate_proxy_request/"
)]
auth_endpoint: String,
/// path to TLS key for client postgres connections
///
/// tls-key and tls-cert are for backwards compatibility, we can put all certs in one dir
#[clap(short = 'k', long, alias = "ssl-key")]
tls_key: Option<String>,
/// path to TLS cert for client postgres connections
///
/// tls-key and tls-cert are for backwards compatibility, we can put all certs in one dir
#[clap(short = 'c', long, alias = "ssl-cert")]
tls_cert: Option<String>,
/// path to directory with TLS certificates for client postgres connections
#[clap(long)]
certs_dir: Option<String>,
/// http endpoint to receive periodic metric updates
#[clap(long)]
metric_collection_endpoint: Option<String>,
/// how often metrics should be sent to a collection endpoint
#[clap(long)]
metric_collection_interval: Option<String>,
/// cache for `wake_compute` api method (use `size=0` to disable)
#[clap(long, default_value = config::CacheOptions::DEFAULT_OPTIONS_NODE_INFO)]
wake_compute_cache: String,
/// Allow self-signed certificates for compute nodes (for testing)
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
allow_self_signed_compute: bool,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let _logging_guard = proxy::logging::init().await?;
@@ -90,21 +27,21 @@ async fn main() -> anyhow::Result<()> {
info!("Version: {GIT_VERSION}");
::metrics::set_build_info_metric(GIT_VERSION);
let args = ProxyCliArgs::parse();
let args = cli().get_matches();
let config = build_config(&args)?;
info!("Authentication backend: {}", config.auth_backend);
// Check that we can bind to address before further initialization
let http_address: SocketAddr = args.http.parse()?;
let http_address: SocketAddr = args.get_one::<String>("http").unwrap().parse()?;
info!("Starting http on {http_address}");
let http_listener = TcpListener::bind(http_address).await?.into_std()?;
let mgmt_address: SocketAddr = args.mgmt.parse()?;
let mgmt_address: SocketAddr = args.get_one::<String>("mgmt").unwrap().parse()?;
info!("Starting mgmt on {mgmt_address}");
let mgmt_listener = TcpListener::bind(mgmt_address).await?;
let proxy_address: SocketAddr = args.proxy.parse()?;
let proxy_address: SocketAddr = args.get_one::<String>("proxy").unwrap().parse()?;
info!("Starting proxy on {proxy_address}");
let proxy_listener = TcpListener::bind(proxy_address).await?;
let cancellation_token = CancellationToken::new();
@@ -118,7 +55,7 @@ async fn main() -> anyhow::Result<()> {
cancellation_token.clone(),
));
if let Some(wss_address) = args.wss {
if let Some(wss_address) = args.get_one::<String>("wss") {
let wss_address: SocketAddr = wss_address.parse()?;
info!("Starting wss on {wss_address}");
let wss_listener = TcpListener::bind(wss_address).await?;
@@ -165,24 +102,31 @@ async fn main() -> anyhow::Result<()> {
}
/// ProxyConfig is created at proxy startup, and lives forever.
fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
let tls_config = match (&args.tls_key, &args.tls_cert) {
fn build_config(args: &clap::ArgMatches) -> anyhow::Result<&'static ProxyConfig> {
let tls_config = match (
args.get_one::<String>("tls-key"),
args.get_one::<String>("tls-cert"),
) {
(Some(key_path), Some(cert_path)) => Some(config::configure_tls(
key_path,
cert_path,
args.certs_dir.as_ref(),
args.get_one::<String>("certs-dir"),
)?),
(None, None) => None,
_ => bail!("either both or neither tls-key and tls-cert must be specified"),
};
if args.allow_self_signed_compute {
let allow_self_signed_compute: bool = args
.get_one::<String>("allow-self-signed-compute")
.unwrap()
.parse()?;
if allow_self_signed_compute {
warn!("allowing self-signed compute certificates");
}
let metric_collection = match (
&args.metric_collection_endpoint,
&args.metric_collection_interval,
args.get_one::<String>("metric-collection-endpoint"),
args.get_one::<String>("metric-collection-interval"),
) {
(Some(endpoint), Some(interval)) => Some(config::MetricCollectionConfig {
endpoint: endpoint.parse()?,
@@ -195,38 +139,145 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
),
};
let auth_backend = match &args.auth_backend {
AuthBackend::Console => {
let config::CacheOptions { size, ttl } = args.wake_compute_cache.parse()?;
let auth_backend = match args.get_one::<String>("auth-backend").unwrap().as_str() {
"console" => {
let config::CacheOptions { size, ttl } = args
.get_one::<String>("wake-compute-cache")
.unwrap()
.parse()?;
info!("Using NodeInfoCache (wake_compute) with size={size} ttl={ttl:?}");
let caches = Box::leak(Box::new(console::caches::ApiCaches {
node_info: console::caches::NodeInfoCache::new("node_info_cache", size, ttl),
}));
let url = args.auth_endpoint.parse()?;
let url = args.get_one::<String>("auth-endpoint").unwrap().parse()?;
let endpoint = http::Endpoint::new(url, http::new_client());
let api = console::provider::neon::Api::new(endpoint, caches);
auth::BackendType::Console(Cow::Owned(api), ())
}
AuthBackend::Postgres => {
let url = args.auth_endpoint.parse()?;
"postgres" => {
let url = args.get_one::<String>("auth-endpoint").unwrap().parse()?;
let api = console::provider::mock::Api::new(url);
auth::BackendType::Postgres(Cow::Owned(api), ())
}
AuthBackend::Link => {
let url = args.uri.parse()?;
"link" => {
let url = args.get_one::<String>("uri").unwrap().parse()?;
auth::BackendType::Link(Cow::Owned(url))
}
other => bail!("unsupported auth backend: {other}"),
};
let config = Box::leak(Box::new(ProxyConfig {
tls_config,
auth_backend,
metric_collection,
allow_self_signed_compute: args.allow_self_signed_compute,
allow_self_signed_compute,
}));
Ok(config)
}
fn cli() -> clap::Command {
clap::Command::new("Neon proxy/router")
.disable_help_flag(true)
.version(GIT_VERSION)
.arg(
Arg::new("proxy")
.short('p')
.long("proxy")
.help("listen for incoming client connections on ip:port")
.default_value("127.0.0.1:4432"),
)
.arg(
Arg::new("auth-backend")
.long("auth-backend")
.value_parser(["console", "postgres", "link"])
.default_value("link"),
)
.arg(
Arg::new("mgmt")
.short('m')
.long("mgmt")
.help("listen for management callback connection on ip:port")
.default_value("127.0.0.1:7000"),
)
.arg(
Arg::new("http")
.long("http")
.help("listen for incoming http connections (metrics, etc) on ip:port")
.default_value("127.0.0.1:7001"),
)
.arg(
Arg::new("wss")
.long("wss")
.help("listen for incoming wss connections on ip:port"),
)
.arg(
Arg::new("uri")
.short('u')
.long("uri")
.help("redirect unauthenticated users to the given uri in case of link auth")
.default_value("http://localhost:3000/psql_session/"),
)
.arg(
Arg::new("auth-endpoint")
.short('a')
.long("auth-endpoint")
.help("cloud API endpoint for authenticating users")
.default_value("http://localhost:3000/authenticate_proxy_request/"),
)
.arg(
Arg::new("tls-key")
.short('k')
.long("tls-key")
.alias("ssl-key") // backwards compatibility
.help("path to TLS key for client postgres connections"),
)
.arg(
Arg::new("tls-cert")
.short('c')
.long("tls-cert")
.alias("ssl-cert") // backwards compatibility
.help("path to TLS cert for client postgres connections"),
)
// tls-key and tls-cert are for backwards compatibility, we can put all certs in one dir
.arg(
Arg::new("certs-dir")
.long("certs-dir")
.help("path to directory with TLS certificates for client postgres connections"),
)
.arg(
Arg::new("metric-collection-endpoint")
.long("metric-collection-endpoint")
.help("http endpoint to receive periodic metric updates"),
)
.arg(
Arg::new("metric-collection-interval")
.long("metric-collection-interval")
.help("how often metrics should be sent to a collection endpoint"),
)
.arg(
Arg::new("wake-compute-cache")
.long("wake-compute-cache")
.help("cache for `wake_compute` api method (use `size=0` to disable)")
.default_value(config::CacheOptions::DEFAULT_OPTIONS_NODE_INFO),
)
.arg(
Arg::new("allow-self-signed-compute")
.long("allow-self-signed-compute")
.help("Allow self-signed certificates for compute nodes (for testing)")
.default_value("false"),
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn verify_cli() {
cli().debug_assert();
}
}

View File

@@ -262,21 +262,24 @@ pub mod timed_lru {
token: Option<(C, C::LookupInfo<C::Key>)>,
/// The value itself.
value: C::Value,
pub value: C::Value,
}
impl<C: Cache> Cached<C> {
/// Place any entry into this wrapper; invalidation will be a no-op.
pub fn new_uncached(value: C::Value) -> Self {
Self { token: None, value }
/// Unfortunately, rust doesn't let us implement [`From`] or [`Into`].
pub fn new_uncached(value: impl Into<C::Value>) -> Self {
Self {
token: None,
value: value.into(),
}
}
/// Drop this entry from a cache if it's still there.
pub fn invalidate(self) -> C::Value {
pub fn invalidate(&self) {
if let Some((cache, info)) = &self.token {
cache.invalidate(info);
}
self.value
}
/// Tell if this entry is actually cached.

View File

@@ -110,7 +110,7 @@ impl<'a> Session<'a> {
impl Session<'_> {
/// Store the cancel token for the given session.
/// This enables query cancellation in `crate::proxy::prepare_client_connection`.
/// This enables query cancellation in [`crate::proxy::handshake`].
pub fn enable_query_cancellation(self, cancel_closure: CancelClosure) -> CancelKeyData {
info!("enabling query cancellation for this session");
self.cancel_map

View File

@@ -1,9 +1,4 @@
use crate::{
auth::parse_endpoint_param,
cancellation::CancelClosure,
console::errors::WakeComputeError,
error::{io_error, UserFacingError},
};
use crate::{auth::parse_endpoint_param, cancellation::CancelClosure, error::UserFacingError};
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
use pq_proto::StartupMessageParams;
@@ -18,7 +13,7 @@ const COULD_NOT_CONNECT: &str = "Couldn't connect to compute node";
#[derive(Debug, Error)]
pub enum ConnectionError {
/// This error doesn't seem to reveal any secrets; for instance,
/// `tokio_postgres::error::Kind` doesn't contain ip addresses and such.
/// [`tokio_postgres::error::Kind`] doesn't contain ip addresses and such.
#[error("{COULD_NOT_CONNECT}: {0}")]
Postgres(#[from] tokio_postgres::Error),
@@ -29,12 +24,6 @@ pub enum ConnectionError {
TlsError(#[from] native_tls::Error),
}
impl From<WakeComputeError> for ConnectionError {
fn from(value: WakeComputeError) -> Self {
io_error(value).into()
}
}
impl UserFacingError for ConnectionError {
fn to_string_client(&self) -> String {
use ConnectionError::*;

View File

@@ -211,7 +211,7 @@ pub struct CacheOptions {
}
impl CacheOptions {
/// Default options for [`crate::console::provider::NodeInfoCache`].
/// Default options for [`crate::auth::caches::NodeInfoCache`].
pub const DEFAULT_OPTIONS_NODE_INFO: &str = "size=4000,ttl=4m";
/// Parse cache options passed via cmdline.

View File

@@ -186,18 +186,18 @@ pub trait Api {
async fn get_auth_info(
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ClientCredentials,
creds: &ClientCredentials<'_>,
) -> Result<Option<AuthInfo>, errors::GetAuthInfoError>;
/// Wake up the compute node and return the corresponding connection info.
async fn wake_compute(
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ClientCredentials,
creds: &ClientCredentials<'_>,
) -> Result<CachedNodeInfo, errors::WakeComputeError>;
}
/// Various caches for [`console`](super).
/// Various caches for [`console`].
pub struct ApiCaches {
/// Cache for the `wake_compute` API method.
pub node_info: NodeInfoCache,

View File

@@ -106,7 +106,7 @@ impl super::Api for Api {
async fn get_auth_info(
&self,
_extra: &ConsoleReqExtra<'_>,
creds: &ClientCredentials,
creds: &ClientCredentials<'_>,
) -> Result<Option<AuthInfo>, GetAuthInfoError> {
self.do_get_auth_info(creds).await
}
@@ -115,7 +115,7 @@ impl super::Api for Api {
async fn wake_compute(
&self,
_extra: &ConsoleReqExtra<'_>,
_creds: &ClientCredentials,
_creds: &ClientCredentials<'_>,
) -> Result<CachedNodeInfo, WakeComputeError> {
self.do_wake_compute()
.map_ok(CachedNodeInfo::new_uncached)

View File

@@ -123,7 +123,7 @@ impl super::Api for Api {
async fn get_auth_info(
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ClientCredentials,
creds: &ClientCredentials<'_>,
) -> Result<Option<AuthInfo>, GetAuthInfoError> {
self.do_get_auth_info(extra, creds).await
}
@@ -132,7 +132,7 @@ impl super::Api for Api {
async fn wake_compute(
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ClientCredentials,
creds: &ClientCredentials<'_>,
) -> Result<CachedNodeInfo, WakeComputeError> {
let key = creds.project().expect("impossible");

View File

@@ -1,17 +1,19 @@
use anyhow::Context;
use async_trait::async_trait;
use parking_lot::Mutex;
use pq_proto::StartupMessageParams;
use std::fmt;
use std::ops::ControlFlow;
use std::{collections::HashMap, sync::Arc};
use tokio::time;
use crate::config;
use crate::{auth, console};
use crate::{compute, config};
use super::sql_over_http::MAX_RESPONSE_SIZE;
use crate::proxy::ConnectMechanism;
use crate::proxy::{
can_retry_tokio_postgres_error, invalidate_cache, retry_after, try_wake,
NUM_RETRIES_WAKE_COMPUTE,
};
use tracing::error;
use tracing::info;
@@ -185,27 +187,6 @@ impl GlobalConnPool {
}
}
struct TokioMechanism<'a> {
conn_info: &'a ConnInfo,
}
#[async_trait]
impl ConnectMechanism for TokioMechanism<'_> {
type Connection = tokio_postgres::Client;
type ConnectError = tokio_postgres::Error;
type Error = anyhow::Error;
async fn connect_once(
&self,
node_info: &console::CachedNodeInfo,
timeout: time::Duration,
) -> Result<Self::Connection, Self::ConnectError> {
connect_to_compute_once(node_info, self.conn_info, timeout).await
}
fn update_connect_config(&self, _config: &mut compute::ConnCfg) {}
}
// Wake up the destination if needed. Code here is a bit involved because
// we reuse the code from the usual proxy and we need to prepare few structures
// that this code expects.
@@ -239,18 +220,72 @@ async fn connect_to_compute(
application_name: Some(APP_NAME),
};
let node_info = creds
.wake_compute(&extra)
.await?
.context("missing cache entry from wake_compute")?;
let node_info = &mut creds.wake_compute(&extra).await?.expect("msg");
crate::proxy::connect_to_compute(&TokioMechanism { conn_info }, node_info, &extra, &creds).await
let mut num_retries = 0;
let mut wait_duration = time::Duration::ZERO;
let mut should_wake_with_error = None;
loop {
if !wait_duration.is_zero() {
time::sleep(wait_duration).await;
}
// try wake the compute node if we have determined it's sensible to do so
if let Some(err) = should_wake_with_error.take() {
match try_wake(node_info, &extra, &creds).await {
// we can't wake up the compute node
Ok(None) => return Err(err),
// there was an error communicating with the control plane
Err(e) => return Err(e.into()),
// failed to wake up but we can continue to retry
Ok(Some(ControlFlow::Continue(()))) => {
wait_duration = retry_after(num_retries);
should_wake_with_error = Some(err);
num_retries += 1;
info!(num_retries, "retrying wake compute");
continue;
}
// successfully woke up a compute node and can break the wakeup loop
Ok(Some(ControlFlow::Break(()))) => {}
}
}
match connect_to_compute_once(node_info, conn_info).await {
Ok(res) => return Ok(res),
Err(e) => {
error!(error = ?e, "could not connect to compute node");
if !can_retry_error(&e, num_retries) {
return Err(e.into());
}
wait_duration = retry_after(num_retries);
// after the first connect failure,
// we should invalidate the cache and wake up a new compute node
if num_retries == 0 {
invalidate_cache(node_info);
should_wake_with_error = Some(e.into());
}
}
}
num_retries += 1;
info!(num_retries, "retrying connect");
}
}
fn can_retry_error(err: &tokio_postgres::Error, num_retries: u32) -> bool {
match err {
// retry all errors at least once
_ if num_retries == 0 => true,
_ if num_retries >= NUM_RETRIES_WAKE_COMPUTE => false,
err => can_retry_tokio_postgres_error(err),
}
}
async fn connect_to_compute_once(
node_info: &console::CachedNodeInfo,
conn_info: &ConnInfo,
timeout: time::Duration,
) -> Result<tokio_postgres::Client, tokio_postgres::Error> {
let mut config = (*node_info.config).clone();
@@ -259,7 +294,6 @@ async fn connect_to_compute_once(
.password(&conn_info.password)
.dbname(&conn_info.dbname)
.max_backend_message_size(MAX_RESPONSE_SIZE)
.connect_timeout(timeout)
.connect(tokio_postgres::NoTls)
.await?;

View File

@@ -1,8 +1,5 @@
use crate::{
cancellation::CancelMap,
config::ProxyConfig,
error::io_error,
proxy::{handle_client, ClientMode},
cancellation::CancelMap, config::ProxyConfig, error::io_error, proxy::handle_ws_client,
};
use bytes::{Buf, Bytes};
use futures::{Sink, Stream, StreamExt};
@@ -153,12 +150,12 @@ async fn serve_websocket(
hostname: Option<String>,
) -> anyhow::Result<()> {
let websocket = websocket.await?;
handle_client(
handle_ws_client(
config,
cancel_map,
session_id,
WebSocketRw::new(websocket),
ClientMode::Websockets { hostname },
hostname,
)
.await?;
Ok(())
@@ -224,18 +221,6 @@ async fn ws_handler(
);
r
})
} else if request.uri().path() == "/sql" && request.method() == Method::OPTIONS {
Response::builder()
.header("Allow", "OPTIONS, POST")
.header("Access-Control-Allow-Origin", "*")
.header(
"Access-Control-Allow-Headers",
"Neon-Connection-String, Neon-Raw-Text-Output, Neon-Array-Mode, Neon-Pool-Opt-In",
)
.header("Access-Control-Max-Age", "86400" /* 24 hours */)
.status(StatusCode::OK) // 204 is also valid, but see: https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods/OPTIONS#status_code
.body(Body::empty())
.map_err(|e| ApiError::BadRequest(e.into()))
} else {
json_response(StatusCode::BAD_REQUEST, "query is not supported")
}

View File

@@ -11,16 +11,16 @@ use crate::{
errors::{ApiError, WakeComputeError},
messages::MetricsAuxInfo,
},
error::io_error,
stream::{PqStream, Stream},
};
use anyhow::{bail, Context};
use async_trait::async_trait;
use futures::TryFutureExt;
use hyper::StatusCode;
use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec};
use once_cell::sync::Lazy;
use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams};
use std::{error::Error, io, ops::ControlFlow, sync::Arc};
use std::{error::Error, ops::ControlFlow, sync::Arc};
use tokio::{
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
time,
@@ -31,8 +31,7 @@ use utils::measured_stream::MeasuredStream;
/// Number of times we should retry the `/proxy_wake_compute` http request.
/// Retry duration is BASE_RETRY_WAIT_DURATION * 1.5^n
const NUM_RETRIES_CONNECT: u32 = 10;
const CONNECT_TIMEOUT: time::Duration = time::Duration::from_secs(2);
pub const NUM_RETRIES_WAKE_COMPUTE: u32 = 10;
const BASE_RETRY_WAIT_DURATION: time::Duration = time::Duration::from_millis(100);
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
@@ -104,8 +103,7 @@ pub async fn task_main(
.set_nodelay(true)
.context("failed to set socket option")?;
handle_client(config, &cancel_map, session_id, socket, ClientMode::Tcp)
.await
handle_client(config, &cancel_map, session_id, socket).await
}
.unwrap_or_else(move |e| {
// Acknowledge that the task has finished with an error.
@@ -130,50 +128,14 @@ pub async fn task_main(
Ok(())
}
pub enum ClientMode {
Tcp,
Websockets { hostname: Option<String> },
}
/// Abstracts the logic of handling TCP vs WS clients
impl ClientMode {
fn allow_cleartext(&self) -> bool {
match self {
ClientMode::Tcp => false,
ClientMode::Websockets { .. } => true,
}
}
fn allow_self_signed_compute(&self, config: &ProxyConfig) -> bool {
match self {
ClientMode::Tcp => config.allow_self_signed_compute,
ClientMode::Websockets { .. } => false,
}
}
fn hostname<'a, S>(&'a self, s: &'a Stream<S>) -> Option<&'a str> {
match self {
ClientMode::Tcp => s.sni_hostname(),
ClientMode::Websockets { hostname } => hostname.as_deref(),
}
}
fn handshake_tls<'a>(&self, tls: Option<&'a TlsConfig>) -> Option<&'a TlsConfig> {
match self {
ClientMode::Tcp => tls,
// TLS is None here if using websockets, because the connection is already encrypted.
ClientMode::Websockets { .. } => None,
}
}
}
// TODO(tech debt): unite this with its twin below.
#[tracing::instrument(fields(session_id = ?session_id), skip_all)]
pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
pub async fn handle_ws_client(
config: &'static ProxyConfig,
cancel_map: &CancelMap,
session_id: uuid::Uuid,
stream: S,
mode: ClientMode,
stream: impl AsyncRead + AsyncWrite + Unpin,
hostname: Option<String>,
) -> anyhow::Result<()> {
// The `closed` counter will increase when this future is destroyed.
NUM_CONNECTIONS_ACCEPTED_COUNTER.inc();
@@ -182,8 +144,10 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
}
let tls = config.tls_config.as_ref();
let hostname = hostname.as_deref();
let do_handshake = handshake(stream, mode.handshake_tls(tls), cancel_map);
// TLS is None here, because the connection is already encrypted.
let do_handshake = handshake(stream, None, cancel_map);
let (mut stream, params) = match do_handshake.await? {
Some(x) => x,
None => return Ok(()), // it's a cancellation request
@@ -191,7 +155,6 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
// Extract credentials which we're going to use for auth.
let creds = {
let hostname = mode.hostname(stream.get_ref());
let common_names = tls.and_then(|tls| tls.common_names.clone());
let result = config
.auth_backend
@@ -205,15 +168,59 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
}
};
let client = Client::new(stream, creds, &params, session_id, false);
cancel_map
.with_session(|session| client.connect_to_db(session, true))
.await
}
#[tracing::instrument(fields(session_id = ?session_id), skip_all)]
async fn handle_client(
config: &'static ProxyConfig,
cancel_map: &CancelMap,
session_id: uuid::Uuid,
stream: impl AsyncRead + AsyncWrite + Unpin,
) -> anyhow::Result<()> {
// The `closed` counter will increase when this future is destroyed.
NUM_CONNECTIONS_ACCEPTED_COUNTER.inc();
scopeguard::defer! {
NUM_CONNECTIONS_CLOSED_COUNTER.inc();
}
let tls = config.tls_config.as_ref();
let do_handshake = handshake(stream, tls, cancel_map);
let (mut stream, params) = match do_handshake.await? {
Some(x) => x,
None => return Ok(()), // it's a cancellation request
};
// Extract credentials which we're going to use for auth.
let creds = {
let sni = stream.get_ref().sni_hostname();
let common_names = tls.and_then(|tls| tls.common_names.clone());
let result = config
.auth_backend
.as_ref()
.map(|_| auth::ClientCredentials::parse(&params, sni, common_names))
.transpose();
match result {
Ok(creds) => creds,
Err(e) => stream.throw_error(e).await?,
}
};
let allow_self_signed_compute = config.allow_self_signed_compute;
let client = Client::new(
stream,
creds,
&params,
session_id,
mode.allow_self_signed_compute(config),
allow_self_signed_compute,
);
cancel_map
.with_session(|session| client.connect_to_db(session, mode.allow_cleartext()))
.with_session(|session| client.connect_to_db(session, false))
.await
}
@@ -296,18 +303,18 @@ async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
/// (e.g. the compute node's address might've changed at the wrong time).
/// Invalidate the cache entry (if any) to prevent subsequent errors.
#[tracing::instrument(name = "invalidate_cache", skip_all)]
pub fn invalidate_cache(node_info: console::CachedNodeInfo) -> compute::ConnCfg {
pub fn invalidate_cache(node_info: &console::CachedNodeInfo) {
let is_cached = node_info.cached();
if is_cached {
warn!("invalidating stalled compute node info cache entry");
node_info.invalidate();
}
let label = match is_cached {
true => "compute_cached",
false => "compute_uncached",
};
NUM_CONNECTION_FAILURES.with_label_values(&[label]).inc();
node_info.invalidate().config
}
/// Try to connect to the compute node once.
@@ -324,118 +331,89 @@ async fn connect_to_compute_once(
.await
}
enum ConnectionState<E> {
Cached(console::CachedNodeInfo),
Invalid(compute::ConnCfg, E),
}
#[async_trait]
pub trait ConnectMechanism {
type Connection;
type ConnectError;
type Error: From<Self::ConnectError>;
async fn connect_once(
&self,
node_info: &console::CachedNodeInfo,
timeout: time::Duration,
) -> Result<Self::Connection, Self::ConnectError>;
fn update_connect_config(&self, conf: &mut compute::ConnCfg);
}
pub struct TcpMechanism<'a> {
/// KV-dictionary with PostgreSQL connection params.
pub params: &'a StartupMessageParams,
}
#[async_trait]
impl ConnectMechanism for TcpMechanism<'_> {
type Connection = PostgresConnection;
type ConnectError = compute::ConnectionError;
type Error = compute::ConnectionError;
async fn connect_once(
&self,
node_info: &console::CachedNodeInfo,
timeout: time::Duration,
) -> Result<PostgresConnection, Self::Error> {
connect_to_compute_once(node_info, timeout).await
}
fn update_connect_config(&self, config: &mut compute::ConnCfg) {
config.set_startup_params(self.params);
}
}
/// Try to connect to the compute node, retrying if necessary.
/// This function might update `node_info`, so we take it by `&mut`.
#[tracing::instrument(skip_all)]
pub async fn connect_to_compute<M: ConnectMechanism>(
mechanism: &M,
mut node_info: console::CachedNodeInfo,
async fn connect_to_compute(
node_info: &mut console::CachedNodeInfo,
params: &StartupMessageParams,
extra: &console::ConsoleReqExtra<'_>,
creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>,
) -> Result<M::Connection, M::Error>
where
M::ConnectError: ShouldRetry + std::fmt::Debug,
M::Error: From<WakeComputeError>,
{
mechanism.update_connect_config(&mut node_info.config);
) -> Result<PostgresConnection, compute::ConnectionError> {
let mut num_retries = 0;
let mut state = ConnectionState::<M::ConnectError>::Cached(node_info);
let mut wait_duration = time::Duration::ZERO;
let mut should_wake_with_error = None;
loop {
match state {
ConnectionState::Invalid(config, err) => {
match try_wake(&config, extra, creds).await {
// we can't wake up the compute node
Ok(None) => return Err(err.into()),
// there was an error communicating with the control plane
Err(e) => return Err(e.into()),
// failed to wake up but we can continue to retry
Ok(Some(ControlFlow::Continue(()))) => {
state = ConnectionState::Invalid(config, err);
let wait_duration = retry_after(num_retries);
num_retries += 1;
// Apply startup params to the (possibly, cached) compute node info.
node_info.config.set_startup_params(params);
info!(num_retries, "retrying wake compute");
time::sleep(wait_duration).await;
continue;
}
// successfully woke up a compute node and can break the wakeup loop
Ok(Some(ControlFlow::Break(mut node_info))) => {
mechanism.update_connect_config(&mut node_info.config);
state = ConnectionState::Cached(node_info)
}
if !wait_duration.is_zero() {
time::sleep(wait_duration).await;
}
// try wake the compute node if we have determined it's sensible to do so
if let Some(err) = should_wake_with_error.take() {
match try_wake(node_info, extra, creds).await {
// we can't wake up the compute node
Ok(None) => return Err(err),
// there was an error communicating with the control plane
Err(e) => return Err(io_error(e).into()),
// failed to wake up but we can continue to retry
Ok(Some(ControlFlow::Continue(()))) => {
wait_duration = retry_after(num_retries);
should_wake_with_error = Some(err);
num_retries += 1;
info!(num_retries, "retrying wake compute");
continue;
}
// successfully woke up a compute node and can break the wakeup loop
Ok(Some(ControlFlow::Break(()))) => {}
}
ConnectionState::Cached(node_info) => {
match mechanism.connect_once(&node_info, CONNECT_TIMEOUT).await {
Ok(res) => return Ok(res),
Err(e) => {
error!(error = ?e, "could not connect to compute node");
if !e.should_retry(num_retries) {
return Err(e.into());
}
}
// after the first connect failure,
// we should invalidate the cache and wake up a new compute node
if num_retries == 0 {
state = ConnectionState::Invalid(invalidate_cache(node_info), e);
} else {
state = ConnectionState::Cached(node_info);
}
// Set a shorter timeout for the initial connection attempt.
//
// In case we try to connect to an outdated address that is no longer valid, the
// default behavior of Kubernetes is to drop the packets, causing us to wait for
// the entire timeout period. We want to fail fast in such cases.
//
// A specific case to consider is when we have cached compute node information
// with a 4-minute TTL (Time To Live), but the user has executed a `/suspend` API
// call, resulting in the nonexistence of the compute node.
//
// We only use caching in case of scram proxy backed by the console, so reduce
// the timeout only in that case.
let is_scram_proxy = matches!(creds, auth::BackendType::Console(_, _));
let timeout = if is_scram_proxy && num_retries == 0 {
time::Duration::from_secs(2)
} else {
time::Duration::from_secs(10)
};
let wait_duration = retry_after(num_retries);
num_retries += 1;
// do this again to ensure we have username?
node_info.config.set_startup_params(params);
info!(num_retries, "retrying wake compute");
time::sleep(wait_duration).await;
}
match connect_to_compute_once(node_info, timeout).await {
Ok(res) => return Ok(res),
Err(e) => {
error!(error = ?e, "could not connect to compute node");
if !can_retry_error(&e, num_retries) {
return Err(e);
}
wait_duration = retry_after(num_retries);
// after the first connect failure,
// we should invalidate the cache and wake up a new compute node
if num_retries == 0 {
invalidate_cache(node_info);
should_wake_with_error = Some(e);
}
}
}
num_retries += 1;
info!(num_retries, "retrying connect");
}
}
@@ -443,11 +421,11 @@ where
/// * Returns Ok(Some(true)) if there was an error waking but retries are acceptable
/// * Returns Ok(Some(false)) if the wakeup succeeded
/// * Returns Ok(None) or Err(e) if there was an error
async fn try_wake(
config: &compute::ConnCfg,
pub async fn try_wake(
node_info: &mut console::CachedNodeInfo,
extra: &console::ConsoleReqExtra<'_>,
creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>,
) -> Result<Option<ControlFlow<console::CachedNodeInfo>>, WakeComputeError> {
) -> Result<Option<ControlFlow<()>>, WakeComputeError> {
info!("compute node's state has likely changed; requesting a wake-up");
match creds.wake_compute(extra).await {
// retry wake if the compute was in an invalid state
@@ -457,69 +435,53 @@ async fn try_wake(
})) => Ok(Some(ControlFlow::Continue(()))),
// Update `node_info` and try again.
Ok(Some(mut new)) => {
new.config.reuse_password(config);
Ok(Some(ControlFlow::Break(new)))
new.config.reuse_password(&node_info.config);
*node_info = new;
Ok(Some(ControlFlow::Break(())))
}
Err(e) => Err(e),
Ok(None) => Ok(None),
}
}
pub trait ShouldRetry {
fn could_retry(&self) -> bool;
fn should_retry(&self, num_retries: u32) -> bool {
match self {
// retry all errors at least once
_ if num_retries == 0 => true,
_ if num_retries >= NUM_RETRIES_CONNECT => false,
err => err.could_retry(),
}
fn can_retry_error(err: &compute::ConnectionError, num_retries: u32) -> bool {
match err {
// retry all errors at least once
_ if num_retries == 0 => true,
_ if num_retries >= NUM_RETRIES_WAKE_COMPUTE => false,
compute::ConnectionError::Postgres(err) => can_retry_tokio_postgres_error(err),
compute::ConnectionError::CouldNotConnect(err) => is_io_connection_err(err),
_ => false,
}
}
impl ShouldRetry for io::Error {
fn could_retry(&self) -> bool {
use std::io::ErrorKind;
matches!(
self.kind(),
ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable | ErrorKind::TimedOut
)
pub fn can_retry_tokio_postgres_error(err: &tokio_postgres::Error) -> bool {
if let Some(io_err) = err.source().and_then(|x| x.downcast_ref()) {
is_io_connection_err(io_err)
} else if let Some(db_err) = err.source().and_then(|x| x.downcast_ref()) {
is_sql_connection_err(db_err)
} else {
false
}
}
impl ShouldRetry for tokio_postgres::error::DbError {
fn could_retry(&self) -> bool {
use tokio_postgres::error::SqlState;
matches!(
self.code(),
&SqlState::CONNECTION_FAILURE
| &SqlState::CONNECTION_EXCEPTION
| &SqlState::CONNECTION_DOES_NOT_EXIST
| &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION,
)
}
fn is_sql_connection_err(err: &tokio_postgres::error::DbError) -> bool {
use tokio_postgres::error::SqlState;
matches!(
err.code(),
&SqlState::CONNECTION_FAILURE
| &SqlState::CONNECTION_EXCEPTION
| &SqlState::CONNECTION_DOES_NOT_EXIST
| &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION,
)
}
impl ShouldRetry for tokio_postgres::Error {
fn could_retry(&self) -> bool {
if let Some(io_err) = self.source().and_then(|x| x.downcast_ref()) {
io::Error::could_retry(io_err)
} else if let Some(db_err) = self.source().and_then(|x| x.downcast_ref()) {
tokio_postgres::error::DbError::could_retry(db_err)
} else {
false
}
}
}
impl ShouldRetry for compute::ConnectionError {
fn could_retry(&self) -> bool {
match self {
compute::ConnectionError::Postgres(err) => err.could_retry(),
compute::ConnectionError::CouldNotConnect(err) => err.could_retry(),
_ => false,
}
}
fn is_io_connection_err(err: &std::io::Error) -> bool {
use std::io::ErrorKind;
matches!(
err.kind(),
ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable | ErrorKind::TimedOut
)
}
pub fn retry_after(num_retries: u32) -> time::Duration {
@@ -675,8 +637,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
node_info.allow_self_signed_compute = allow_self_signed_compute;
let aux = node_info.aux.clone();
let mut node = connect_to_compute(&TcpMechanism { params }, node_info, &extra, &creds)
let mut node = connect_to_compute(&mut node_info, params, &extra, &creds)
.or_else(|e| stream.throw_error(e))
.await?;
@@ -687,6 +648,6 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
// immediately after opening the connection.
let (stream, read_buf) = stream.into_inner();
node.stream.write_all(&read_buf).await?;
proxy_pass(stream, node.stream, &aux).await
proxy_pass(stream, node.stream, &node_info.aux).await
}
}

View File

@@ -12,7 +12,7 @@ mod messages;
mod secret;
mod signature;
#[cfg(any(test, doc))]
#[cfg(test)]
mod password;
pub use exchange::Exchange;

View File

@@ -36,7 +36,6 @@ pytest-httpserver = "^1.0.8"
aiohttp = "3.7.4"
pytest-rerunfailures = "^11.1.2"
types-pytest-lazy-fixture = "^0.6.3.3"
pytest-split = "^0.8.1"
[tool.poetry.group.dev.dependencies]
black = "^23.3.0"
@@ -79,7 +78,6 @@ module = [
ignore_missing_imports = true
[tool.ruff]
target-version = "py39"
extend-exclude = ["vendor/"]
ignore = ["E501"]
select = [
@@ -87,5 +85,4 @@ select = [
"F", # Pyflakes
"I", # isort
"W", # pycodestyle
"B", # bugbear
]

View File

@@ -1,5 +1,5 @@
[toolchain]
channel = "1.71.0"
channel = "1.70.0"
profile = "default"
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
# https://rust-lang.github.io/rustup/concepts/profiles.html

View File

@@ -37,7 +37,7 @@ use safekeeper::{http, WAL_REMOVER_RUNTIME};
use safekeeper::{remove_wal, WAL_BACKUP_RUNTIME};
use safekeeper::{wal_backup, HTTP_RUNTIME};
use storage_broker::DEFAULT_ENDPOINT;
use utils::auth::{JwtAuth, Scope};
use utils::auth::JwtAuth;
use utils::{
id::NodeId,
logging::{self, LogFormat},
@@ -72,10 +72,6 @@ struct Args {
/// Listen endpoint for receiving/sending WAL in the form host:port.
#[arg(short, long, default_value = DEFAULT_PG_LISTEN_ADDR)]
listen_pg: String,
/// Listen endpoint for receiving/sending WAL in the form host:port allowing
/// only tenant scoped auth tokens. Pointless if auth is disabled.
#[arg(long, default_value = None, verbatim_doc_comment)]
listen_pg_tenant_only: Option<String>,
/// Listen http endpoint for management and metrics in the form host:port.
#[arg(long, default_value = DEFAULT_HTTP_LISTEN_ADDR)]
listen_http: String,
@@ -98,7 +94,7 @@ struct Args {
broker_keepalive_interval: Duration,
/// Peer safekeeper is considered dead after not receiving heartbeats from
/// it during this period passed as a human readable duration.
#[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HEARTBEAT_TIMEOUT, verbatim_doc_comment)]
#[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HEARTBEAT_TIMEOUT)]
heartbeat_timeout: Duration,
/// Remote storage configuration for WAL backup (offloading to s3) as TOML
/// inline table, e.g.
@@ -183,7 +179,6 @@ async fn main() -> anyhow::Result<()> {
workdir,
my_id: id,
listen_pg_addr: args.listen_pg,
listen_pg_addr_tenant_only: args.listen_pg_tenant_only,
listen_http_addr: args.listen_http,
availability_zone: args.availability_zone,
no_sync: args.no_sync,
@@ -227,21 +222,6 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
e
})?;
let pg_listener_tenant_only =
if let Some(listen_pg_addr_tenant_only) = &conf.listen_pg_addr_tenant_only {
info!(
"starting safekeeper tenant scoped WAL service on {}",
listen_pg_addr_tenant_only
);
let listener = tcp_listener::bind(listen_pg_addr_tenant_only.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_pg_addr, e);
e
})?;
Some(listener)
} else {
None
};
info!(
"starting safekeeper HTTP service on {}",
conf.listen_http_addr
@@ -273,34 +253,14 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
let current_thread_rt = conf
.current_thread_runtime
.then(|| Handle::try_current().expect("no runtime in main"));
let wal_service_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
.spawn(wal_service::task_main(
conf_,
pg_listener,
Some(Scope::SafekeeperData),
))
.spawn(wal_service::task_main(conf_, pg_listener))
// wrap with task name for error reporting
.map(|res| ("WAL service main".to_owned(), res));
tasks_handles.push(Box::pin(wal_service_handle));
if let Some(pg_listener_tenant_only) = pg_listener_tenant_only {
let conf_ = conf.clone();
let wal_service_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
.spawn(wal_service::task_main(
conf_,
pg_listener_tenant_only,
Some(Scope::Tenant),
))
// wrap with task name for error reporting
.map(|res| ("WAL service tenant only main".to_owned(), res));
tasks_handles.push(Box::pin(wal_service_handle));
}
let conf_ = conf.clone();
let http_handle = current_thread_rt
.as_ref()

View File

@@ -163,9 +163,8 @@ impl Deref for FileStorage {
#[async_trait::async_trait]
impl Storage for FileStorage {
/// Persists state durably to the underlying storage.
///
/// For a description, see <https://lwn.net/Articles/457667/>.
/// persists state durably to underlying storage
/// for description see https://lwn.net/Articles/457667/
async fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer();

View File

@@ -34,8 +34,6 @@ pub struct SafekeeperPostgresHandler {
pub ttid: TenantTimelineId,
/// Unique connection id is logged in spans for observability.
pub conn_id: ConnectionId,
/// Auth scope allowed on the connections. None if auth is not configured.
allowed_auth_scope: Option<Scope>,
claims: Option<Claims>,
io_metrics: Option<TrafficMetrics>,
}
@@ -149,16 +147,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
.unwrap()
.decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)?;
let scope = self
.allowed_auth_scope
.expect("auth is enabled but scope is not configured");
// The handler might be configured to allow only tenant scope tokens.
if matches!(scope, Scope::Tenant) && !matches!(data.claims.scope, Scope::Tenant) {
return Err(QueryError::Other(anyhow::anyhow!(
"passed JWT token is for full access, but only tenant scope is allowed"
)));
}
if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
return Err(QueryError::Other(anyhow::anyhow!(
"jwt token scope is Tenant, but tenant id is missing"
@@ -227,12 +215,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
}
impl SafekeeperPostgresHandler {
pub fn new(
conf: SafeKeeperConf,
conn_id: u32,
io_metrics: Option<TrafficMetrics>,
allowed_auth_scope: Option<Scope>,
) -> Self {
pub fn new(conf: SafeKeeperConf, conn_id: u32, io_metrics: Option<TrafficMetrics>) -> Self {
SafekeeperPostgresHandler {
conf,
appname: None,
@@ -241,7 +224,6 @@ impl SafekeeperPostgresHandler {
ttid: TenantTimelineId::empty(),
conn_id,
claims: None,
allowed_auth_scope,
io_metrics,
}
}

View File

@@ -53,7 +53,6 @@ pub struct SafeKeeperConf {
pub workdir: PathBuf,
pub my_id: NodeId,
pub listen_pg_addr: String,
pub listen_pg_addr_tenant_only: Option<String>,
pub listen_http_addr: String,
pub availability_zone: Option<String>,
pub no_sync: bool,
@@ -86,7 +85,6 @@ impl SafeKeeperConf {
workdir: PathBuf::from("./"),
no_sync: false,
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_pg_addr_tenant_only: None,
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
availability_zone: None,
remote_storage: None,

Some files were not shown because too many files have changed in this diff Show More