mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
50 Commits
skyzh/try-
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
489c7a20f4 | ||
|
|
015b1c7cb3 | ||
|
|
5e85c02f37 | ||
|
|
c17d3fe645 | ||
|
|
4ac447c75d | ||
|
|
26b47b5beb | ||
|
|
85ce109361 | ||
|
|
77e22e4bf0 | ||
|
|
d96cea1917 | ||
|
|
312a74f11f | ||
|
|
df4e37b7cc | ||
|
|
b4a63e0a34 | ||
|
|
f8fc0bf3c0 | ||
|
|
8fe7596120 | ||
|
|
f3ee6e818d | ||
|
|
edd60730c8 | ||
|
|
975b95f4cd | ||
|
|
01c39f378e | ||
|
|
4d3b28bd2e | ||
|
|
81ddd10be6 | ||
|
|
e470997627 | ||
|
|
eb2741758b | ||
|
|
f3a0e4f255 | ||
|
|
842a5091d5 | ||
|
|
056056bef0 | ||
|
|
e989e0da78 | ||
|
|
b3c1aecd11 | ||
|
|
1dce2a9e74 | ||
|
|
ca88521653 | ||
|
|
07c3cfd2a0 | ||
|
|
7cd0066212 | ||
|
|
bf3a1529bf | ||
|
|
65d1be6e90 | ||
|
|
16eb8dda3d | ||
|
|
bb32f1b3d0 | ||
|
|
5585c32cee | ||
|
|
0ffdc98e20 | ||
|
|
62d844e657 | ||
|
|
1bb434ab74 | ||
|
|
dbde37c53a | ||
|
|
5e3cb2ab07 | ||
|
|
61f267d8f9 | ||
|
|
e2411818ef | ||
|
|
58327cbba8 | ||
|
|
568927a8a0 | ||
|
|
1ed7252950 | ||
|
|
30b57334ef | ||
|
|
d487ba2b9b | ||
|
|
e7a1d5de94 | ||
|
|
6be572177c |
@@ -146,7 +146,9 @@ jobs:
|
||||
with:
|
||||
file: build-tools/Dockerfile
|
||||
context: .
|
||||
provenance: false
|
||||
attests: |
|
||||
type=provenance,mode=max
|
||||
type=sbom,generator=docker.io/docker/buildkit-syft-scanner:1
|
||||
push: true
|
||||
pull: true
|
||||
build-args: |
|
||||
|
||||
12
.github/workflows/build_and_test.yml
vendored
12
.github/workflows/build_and_test.yml
vendored
@@ -634,7 +634,9 @@ jobs:
|
||||
DEBIAN_VERSION=bookworm
|
||||
secrets: |
|
||||
SUBZERO_ACCESS_TOKEN=${{ secrets.CI_ACCESS_TOKEN }}
|
||||
provenance: false
|
||||
attests: |
|
||||
type=provenance,mode=max
|
||||
type=sbom,generator=docker.io/docker/buildkit-syft-scanner:1
|
||||
push: true
|
||||
pull: true
|
||||
file: Dockerfile
|
||||
@@ -747,7 +749,9 @@ jobs:
|
||||
PG_VERSION=${{ matrix.version.pg }}
|
||||
BUILD_TAG=${{ needs.meta.outputs.release-tag || needs.meta.outputs.build-tag }}
|
||||
DEBIAN_VERSION=${{ matrix.version.debian }}
|
||||
provenance: false
|
||||
attests: |
|
||||
type=provenance,mode=max
|
||||
type=sbom,generator=docker.io/docker/buildkit-syft-scanner:1
|
||||
push: true
|
||||
pull: true
|
||||
file: compute/compute-node.Dockerfile
|
||||
@@ -766,7 +770,9 @@ jobs:
|
||||
PG_VERSION=${{ matrix.version.pg }}
|
||||
BUILD_TAG=${{ needs.meta.outputs.release-tag || needs.meta.outputs.build-tag }}
|
||||
DEBIAN_VERSION=${{ matrix.version.debian }}
|
||||
provenance: false
|
||||
attests: |
|
||||
type=provenance,mode=max
|
||||
type=sbom,generator=docker.io/docker/buildkit-syft-scanner:1
|
||||
push: true
|
||||
pull: true
|
||||
file: compute/compute-node.Dockerfile
|
||||
|
||||
3
.github/workflows/large_oltp_growth.yml
vendored
3
.github/workflows/large_oltp_growth.yml
vendored
@@ -2,9 +2,6 @@ name: large oltp growth
|
||||
# workflow to grow the reuse branch of large oltp benchmark continuously (about 16 GB per run)
|
||||
|
||||
on:
|
||||
# uncomment to run on push for debugging your PR
|
||||
# push:
|
||||
# branches: [ bodobolero/increase_large_oltp_workload ]
|
||||
|
||||
schedule:
|
||||
# * is a special character in YAML so you have to quote this string
|
||||
|
||||
3
.github/workflows/pg-clients.yml
vendored
3
.github/workflows/pg-clients.yml
vendored
@@ -72,9 +72,10 @@ jobs:
|
||||
options: --init --user root
|
||||
services:
|
||||
clickhouse:
|
||||
image: clickhouse/clickhouse-server:24.8
|
||||
image: clickhouse/clickhouse-server:25.6
|
||||
env:
|
||||
CLICKHOUSE_PASSWORD: ${{ needs.generate-ch-tmppw.outputs.tmp_val }}
|
||||
PGSSLCERT: /tmp/postgresql.crt
|
||||
ports:
|
||||
- 9000:9000
|
||||
- 8123:8123
|
||||
|
||||
101
Cargo.lock
generated
101
Cargo.lock
generated
@@ -145,9 +145,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "anyhow"
|
||||
version = "1.0.94"
|
||||
version = "1.0.98"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c1fd03a028ef38ba2276dce7e33fcd6369c158a1bca17946c4b1b701891c1ff7"
|
||||
checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
]
|
||||
@@ -2402,9 +2402,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "futures"
|
||||
version = "0.3.28"
|
||||
version = "0.3.31"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40"
|
||||
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
|
||||
dependencies = [
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
@@ -2433,9 +2433,9 @@ checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
|
||||
|
||||
[[package]]
|
||||
name = "futures-executor"
|
||||
version = "0.3.28"
|
||||
version = "0.3.31"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0"
|
||||
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-task",
|
||||
@@ -2510,6 +2510,33 @@ dependencies = [
|
||||
"slab",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "gcp_auth"
|
||||
version = "0.12.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dbf67f30198e045a039264c01fb44659ce82402d7771c50938beb41a5ac87733"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"base64 0.22.1",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"home",
|
||||
"http 1.3.1",
|
||||
"http-body-util",
|
||||
"hyper 1.4.1",
|
||||
"hyper-rustls 0.27.5",
|
||||
"hyper-util",
|
||||
"ring",
|
||||
"rustls-pemfile 2.1.1",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thiserror 1.0.69",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-futures",
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "gen_ops"
|
||||
version = "0.4.0"
|
||||
@@ -2840,6 +2867,15 @@ dependencies = [
|
||||
"digest",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "home"
|
||||
version = "0.5.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "589533453244b0995c858700322199b2becb13b627df2851f64a2775d024abcf"
|
||||
dependencies = [
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hostname"
|
||||
version = "0.4.0"
|
||||
@@ -3075,6 +3111,24 @@ dependencies = [
|
||||
"tower-service",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-rustls"
|
||||
version = "0.27.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2d191583f3da1305256f22463b9bb0471acad48a4e534a5218b9963e9c1f59b2"
|
||||
dependencies = [
|
||||
"futures-util",
|
||||
"http 1.3.1",
|
||||
"hyper 1.4.1",
|
||||
"hyper-util",
|
||||
"rustls 0.23.29",
|
||||
"rustls-native-certs 0.8.0",
|
||||
"rustls-pki-types",
|
||||
"tokio",
|
||||
"tokio-rustls 0.26.2",
|
||||
"tower-service",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-timeout"
|
||||
version = "0.5.1"
|
||||
@@ -3852,6 +3906,16 @@ version = "0.3.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
|
||||
|
||||
[[package]]
|
||||
name = "mime_guess"
|
||||
version = "2.0.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e"
|
||||
dependencies = [
|
||||
"mime",
|
||||
"unicase",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "minimal-lexical"
|
||||
version = "0.2.1"
|
||||
@@ -5077,8 +5141,6 @@ dependencies = [
|
||||
"crc32c",
|
||||
"criterion",
|
||||
"env_logger",
|
||||
"log",
|
||||
"memoffset 0.9.0",
|
||||
"once_cell",
|
||||
"postgres",
|
||||
"postgres_ffi_types",
|
||||
@@ -5519,6 +5581,7 @@ dependencies = [
|
||||
"workspace_hack",
|
||||
"x509-cert",
|
||||
"zerocopy 0.8.24",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5873,8 +5936,11 @@ dependencies = [
|
||||
"bytes",
|
||||
"camino",
|
||||
"camino-tempfile",
|
||||
"chrono",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"gcp_auth",
|
||||
"http 1.3.1",
|
||||
"http-body-util",
|
||||
"http-types",
|
||||
"humantime-serde",
|
||||
@@ -5895,7 +5961,9 @@ dependencies = [
|
||||
"tokio-util",
|
||||
"toml_edit",
|
||||
"tracing",
|
||||
"url",
|
||||
"utils",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5925,6 +5993,7 @@ dependencies = [
|
||||
"js-sys",
|
||||
"log",
|
||||
"mime",
|
||||
"mime_guess",
|
||||
"once_cell",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
@@ -8034,6 +8103,16 @@ dependencies = [
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-futures"
|
||||
version = "0.2.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2"
|
||||
dependencies = [
|
||||
"pin-project",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-log"
|
||||
version = "0.2.0"
|
||||
@@ -8209,6 +8288,12 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "unicase"
|
||||
version = "2.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539"
|
||||
|
||||
[[package]]
|
||||
name = "unicode-bidi"
|
||||
version = "0.3.17"
|
||||
|
||||
@@ -135,7 +135,6 @@ lock_api = "0.4.13"
|
||||
md5 = "0.7.0"
|
||||
measured = { version = "0.0.22", features=["lasso"] }
|
||||
measured-process = { version = "0.0.22" }
|
||||
memoffset = "0.9"
|
||||
moka = { version = "0.12", features = ["sync"] }
|
||||
nix = { version = "0.30.1", features = ["dir", "fs", "mman", "process", "socket", "signal", "poll"] }
|
||||
# Do not update to >= 7.0.0, at least. The update will have a significant impact
|
||||
@@ -234,9 +233,10 @@ uuid = { version = "1.6.1", features = ["v4", "v7", "serde"] }
|
||||
walkdir = "2.3.2"
|
||||
rustls-native-certs = "0.8"
|
||||
whoami = "1.5.1"
|
||||
zerocopy = { version = "0.8", features = ["derive", "simd"] }
|
||||
json-structural-diff = { version = "0.2.0" }
|
||||
x509-cert = { version = "0.2.5" }
|
||||
zerocopy = { version = "0.8", features = ["derive", "simd"] }
|
||||
zeroize = "1.8"
|
||||
|
||||
## TODO replace this with tracing
|
||||
env_logger = "0.11"
|
||||
|
||||
10
Dockerfile
10
Dockerfile
@@ -78,6 +78,7 @@ WORKDIR /home/nonroot
|
||||
ARG GIT_VERSION=local
|
||||
ARG BUILD_TAG
|
||||
ARG ADDITIONAL_RUSTFLAGS=""
|
||||
ARG IO_ALIGNMENT=512
|
||||
ENV CARGO_FEATURES="default"
|
||||
|
||||
# 3. Build cargo dependencies. Note that this step doesn't depend on anything else than
|
||||
@@ -101,9 +102,14 @@ COPY --chown=nonroot --from=plan /home/nonroot/Cargo.lock Carg
|
||||
RUN --mount=type=secret,uid=1000,id=SUBZERO_ACCESS_TOKEN \
|
||||
set -e \
|
||||
&& if [ -s /run/secrets/SUBZERO_ACCESS_TOKEN ]; then \
|
||||
export CARGO_FEATURES="rest_broker"; \
|
||||
export CARGO_FEATURES="${CARGO_FEATURES},rest_broker"; \
|
||||
fi \
|
||||
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo build \
|
||||
&& if [ "$IO_ALIGNMENT" = "4k" ]; then \
|
||||
export CARGO_FEATURES="${CARGO_FEATURES},io-align-4k"; \
|
||||
elif [ "$IO_ALIGNMENT" = "512" ]; then \
|
||||
export CARGO_FEATURES="${CARGO_FEATURES},io-align-512"; \
|
||||
fi \
|
||||
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo auditable build \
|
||||
--features $CARGO_FEATURES \
|
||||
--bin pg_sni_router \
|
||||
--bin pageserver \
|
||||
|
||||
10
README.md
10
README.md
@@ -1,13 +1,13 @@
|
||||
[](https://neon.tech)
|
||||
[](https://neon.com)
|
||||
|
||||
|
||||
|
||||
# Neon
|
||||
|
||||
Neon is a serverless open-source alternative to AWS Aurora Postgres. It separates storage and compute and substitutes the PostgreSQL storage layer by redistributing data across a cluster of nodes.
|
||||
Neon is an open-source serverless Postgres database platform. It separates storage and compute and substitutes the PostgreSQL storage layer by redistributing data across a cluster of nodes.
|
||||
|
||||
## Quick start
|
||||
Try the [Neon Free Tier](https://neon.tech/github) to create a serverless Postgres instance. Then connect to it with your preferred Postgres client (psql, dbeaver, etc) or use the online [SQL Editor](https://neon.tech/docs/get-started-with-neon/query-with-neon-sql-editor/). See [Connect from any application](https://neon.tech/docs/connect/connect-from-any-app/) for connection instructions.
|
||||
Try the [Neon Free Tier](https://neon.com/signup) to create a serverless Postgres instance. Then connect to it with your preferred Postgres client (psql, dbeaver, etc) or use the online [SQL Editor](https://neon.com/docs/get-started-with-neon/query-with-neon-sql-editor/). See [Connect from any application](https://neon.com/docs/connect/connect-from-any-app/) for connection instructions.
|
||||
|
||||
Alternatively, compile and run the project [locally](#running-local-installation).
|
||||
|
||||
@@ -301,8 +301,8 @@ See also README files in some source directories, and `rustdoc` style documentat
|
||||
|
||||
Other resources:
|
||||
|
||||
- [SELECT 'Hello, World'](https://neon.tech/blog/hello-world/): Blog post by Nikita Shamgunov on the high level architecture
|
||||
- [Architecture decisions in Neon](https://neon.tech/blog/architecture-decisions-in-neon/): Blog post by Heikki Linnakangas
|
||||
- [SELECT 'Hello, World'](https://neon.com/blog/hello-world/): Blog post by Nikita Shamgunov on the high level architecture
|
||||
- [Architecture decisions in Neon](https://neon.com/blog/architecture-decisions-in-neon/): Blog post by Heikki Linnakangas
|
||||
- [Neon: Serverless PostgreSQL!](https://www.youtube.com/watch?v=rES0yzeERns): Presentation on storage system by Heikki Linnakangas in the CMU Database Group seminar series
|
||||
|
||||
### Postgres-specific terms
|
||||
|
||||
@@ -299,6 +299,7 @@ WORKDIR /home/nonroot
|
||||
ENV RUSTC_VERSION=1.88.0
|
||||
ENV RUSTUP_HOME="/home/nonroot/.rustup"
|
||||
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
|
||||
ARG CARGO_AUDITABLE_VERSION=0.7.0
|
||||
ARG RUSTFILT_VERSION=0.2.1
|
||||
ARG CARGO_HAKARI_VERSION=0.9.36
|
||||
ARG CARGO_DENY_VERSION=0.18.2
|
||||
@@ -314,14 +315,16 @@ RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux
|
||||
. "$HOME/.cargo/env" && \
|
||||
cargo --version && rustup --version && \
|
||||
rustup component add llvm-tools rustfmt clippy && \
|
||||
cargo install rustfilt --locked --version "${RUSTFILT_VERSION}" && \
|
||||
cargo install cargo-hakari --locked --version "${CARGO_HAKARI_VERSION}" && \
|
||||
cargo install cargo-deny --locked --version "${CARGO_DENY_VERSION}" && \
|
||||
cargo install cargo-hack --locked --version "${CARGO_HACK_VERSION}" && \
|
||||
cargo install cargo-nextest --locked --version "${CARGO_NEXTEST_VERSION}" && \
|
||||
cargo install cargo-chef --locked --version "${CARGO_CHEF_VERSION}" && \
|
||||
cargo install diesel_cli --locked --version "${CARGO_DIESEL_CLI_VERSION}" \
|
||||
--features postgres-bundled --no-default-features && \
|
||||
cargo install cargo-auditable --locked --version "${CARGO_AUDITABLE_VERSION}" && \
|
||||
cargo auditable install cargo-auditable --locked --version "${CARGO_AUDITABLE_VERSION}" --force && \
|
||||
cargo auditable install rustfilt --version "${RUSTFILT_VERSION}" && \
|
||||
cargo auditable install cargo-hakari --locked --version "${CARGO_HAKARI_VERSION}" && \
|
||||
cargo auditable install cargo-deny --locked --version "${CARGO_DENY_VERSION}" && \
|
||||
cargo auditable install cargo-hack --locked --version "${CARGO_HACK_VERSION}" && \
|
||||
cargo auditable install cargo-nextest --locked --version "${CARGO_NEXTEST_VERSION}" && \
|
||||
cargo auditable install cargo-chef --locked --version "${CARGO_CHEF_VERSION}" && \
|
||||
cargo auditable install diesel_cli --locked --version "${CARGO_DIESEL_CLI_VERSION}" \
|
||||
--features postgres-bundled --no-default-features && \
|
||||
rm -rf /home/nonroot/.cargo/registry && \
|
||||
rm -rf /home/nonroot/.cargo/git
|
||||
|
||||
|
||||
@@ -1 +1 @@
|
||||
SELECT num_requested AS checkpoints_req FROM pg_stat_checkpointer;
|
||||
SELECT num_requested AS checkpoints_req FROM pg_catalog.pg_stat_checkpointer;
|
||||
|
||||
@@ -1 +1 @@
|
||||
SELECT checkpoints_req FROM pg_stat_bgwriter;
|
||||
SELECT checkpoints_req FROM pg_catalog.pg_stat_bgwriter;
|
||||
|
||||
@@ -1 +1 @@
|
||||
SELECT checkpoints_timed FROM pg_stat_bgwriter;
|
||||
SELECT checkpoints_timed FROM pg_catalog.pg_stat_bgwriter;
|
||||
|
||||
@@ -1 +1 @@
|
||||
SELECT (neon.backpressure_throttling_time()::float8 / 1000000) AS throttled;
|
||||
SELECT (neon.backpressure_throttling_time()::pg_catalog.float8 / 1000000) AS throttled;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
SELECT CASE
|
||||
WHEN pg_catalog.pg_is_in_recovery() THEN (pg_last_wal_replay_lsn() - '0/0')::FLOAT8
|
||||
ELSE (pg_current_wal_lsn() - '0/0')::FLOAT8
|
||||
WHEN pg_catalog.pg_is_in_recovery() THEN (pg_catalog.pg_last_wal_replay_lsn() - '0/0')::pg_catalog.FLOAT8
|
||||
ELSE (pg_catalog.pg_current_wal_lsn() - '0/0')::pg_catalog.FLOAT8
|
||||
END AS lsn;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
SELECT
|
||||
(SELECT setting FROM pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
|
||||
(SELECT setting FROM pg_catalog.pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
|
||||
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp.
|
||||
-- These temporary snapshot files are renamed to the actual snapshot files
|
||||
-- after they are completely built. We only WAL-log the completely built
|
||||
-- snapshot files
|
||||
(SELECT COUNT(*) FROM pg_ls_dir('pg_logical/snapshots') AS name WHERE name LIKE '%.snap') AS num_logical_snapshot_files;
|
||||
(SELECT COUNT(*) FROM pg_catalog.pg_ls_dir('pg_logical/snapshots') AS name WHERE name LIKE '%.snap') AS num_logical_snapshot_files;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
SELECT
|
||||
(SELECT current_setting('neon.timeline_id')) AS timeline_id,
|
||||
(SELECT pg_catalog.current_setting('neon.timeline_id')) AS timeline_id,
|
||||
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp.
|
||||
-- These temporary snapshot files are renamed to the actual snapshot files
|
||||
-- after they are completely built. We only WAL-log the completely built
|
||||
-- snapshot files
|
||||
(SELECT COALESCE(sum(size), 0) FROM pg_ls_logicalsnapdir() WHERE name LIKE '%.snap') AS logical_snapshots_bytes;
|
||||
(SELECT COALESCE(pg_catalog.sum(size), 0) FROM pg_catalog.pg_ls_logicalsnapdir() WHERE name LIKE '%.snap') AS logical_snapshots_bytes;
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
SELECT
|
||||
(SELECT setting FROM pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
|
||||
(SELECT setting FROM pg_catalog.pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
|
||||
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp.
|
||||
-- These temporary snapshot files are renamed to the actual snapshot files
|
||||
-- after they are completely built. We only WAL-log the completely built
|
||||
-- snapshot files
|
||||
(SELECT COALESCE(sum((pg_stat_file('pg_logical/snapshots/' || name, missing_ok => true)).size), 0)
|
||||
FROM (SELECT * FROM pg_ls_dir('pg_logical/snapshots') WHERE pg_ls_dir LIKE '%.snap') AS name
|
||||
(SELECT COALESCE(pg_catalog.sum((pg_catalog.pg_stat_file('pg_logical/snapshots/' || name, missing_ok => true)).size), 0)
|
||||
FROM (SELECT * FROM pg_catalog.pg_ls_dir('pg_logical/snapshots') WHERE pg_ls_dir LIKE '%.snap') AS name
|
||||
) AS logical_snapshots_bytes;
|
||||
|
||||
@@ -1 +1 @@
|
||||
SELECT current_setting('max_connections') as max_connections;
|
||||
SELECT pg_catalog.current_setting('max_connections') AS max_connections;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
SELECT datname database_name,
|
||||
age(datfrozenxid) frozen_xid_age
|
||||
FROM pg_database
|
||||
pg_catalog.age(datfrozenxid) frozen_xid_age
|
||||
FROM pg_catalog.pg_database
|
||||
ORDER BY frozen_xid_age DESC LIMIT 10;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
SELECT datname database_name,
|
||||
mxid_age(datminmxid) min_mxid_age
|
||||
FROM pg_database
|
||||
pg_catalog.mxid_age(datminmxid) min_mxid_age
|
||||
FROM pg_catalog.pg_database
|
||||
ORDER BY min_mxid_age DESC LIMIT 10;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
SELECT CASE
|
||||
WHEN pg_catalog.pg_is_in_recovery() THEN (pg_last_wal_receive_lsn() - '0/0')::FLOAT8
|
||||
WHEN pg_catalog.pg_is_in_recovery() THEN (pg_catalog.pg_last_wal_receive_lsn() - '0/0')::pg_catalog.FLOAT8
|
||||
ELSE 0
|
||||
END AS lsn;
|
||||
|
||||
@@ -1 +1 @@
|
||||
SELECT subenabled::text AS enabled, count(*) AS subscriptions_count FROM pg_subscription GROUP BY subenabled;
|
||||
SELECT subenabled::pg_catalog.text AS enabled, pg_catalog.count(*) AS subscriptions_count FROM pg_catalog.pg_subscription GROUP BY subenabled;
|
||||
|
||||
@@ -1 +1 @@
|
||||
SELECT datname, state, count(*) AS count FROM pg_stat_activity WHERE state <> '' GROUP BY datname, state;
|
||||
SELECT datname, state, pg_catalog.count(*) AS count FROM pg_catalog.pg_stat_activity WHERE state <> '' GROUP BY datname, state;
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
SELECT sum(pg_database_size(datname)) AS total
|
||||
FROM pg_database
|
||||
SELECT pg_catalog.sum(pg_catalog.pg_database_size(datname)) AS total
|
||||
FROM pg_catalog.pg_database
|
||||
-- Ignore invalid databases, as we will likely have problems with
|
||||
-- getting their size from the Pageserver.
|
||||
WHERE datconnlimit != -2;
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
-- minutes.
|
||||
|
||||
SELECT
|
||||
x::text as duration_seconds,
|
||||
x::pg_catalog.text AS duration_seconds,
|
||||
neon.approximate_working_set_size_seconds(x) AS size
|
||||
FROM (SELECT generate_series * 60 AS x FROM generate_series(1, 60)) AS t (x);
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
|
||||
SELECT
|
||||
x AS duration,
|
||||
neon.approximate_working_set_size_seconds(extract('epoch' FROM x::interval)::int) AS size FROM (
|
||||
neon.approximate_working_set_size_seconds(extract('epoch' FROM x::pg_catalog.interval)::pg_catalog.int4) AS size FROM (
|
||||
VALUES ('5m'), ('15m'), ('1h')
|
||||
) AS t (x);
|
||||
|
||||
@@ -1 +1 @@
|
||||
SELECT pg_size_bytes(current_setting('neon.file_cache_size_limit')) AS lfc_cache_size_limit;
|
||||
SELECT pg_catalog.pg_size_bytes(pg_catalog.current_setting('neon.file_cache_size_limit')) AS lfc_cache_size_limit;
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
SELECT slot_name, (restart_lsn - '0/0')::FLOAT8 as restart_lsn
|
||||
FROM pg_replication_slots
|
||||
SELECT slot_name, (restart_lsn - '0/0')::pg_catalog.FLOAT8 AS restart_lsn
|
||||
FROM pg_catalog.pg_replication_slots
|
||||
WHERE slot_type = 'logical';
|
||||
|
||||
@@ -1 +1 @@
|
||||
SELECT setting::int AS max_cluster_size FROM pg_settings WHERE name = 'neon.max_cluster_size';
|
||||
SELECT setting::pg_catalog.int4 AS max_cluster_size FROM pg_catalog.pg_settings WHERE name = 'neon.max_cluster_size';
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
-- We export stats for 10 non-system databases. Without this limit it is too
|
||||
-- easy to abuse the system by creating lots of databases.
|
||||
|
||||
SELECT pg_database_size(datname) AS db_size,
|
||||
SELECT pg_catalog.pg_database_size(datname) AS db_size,
|
||||
deadlocks,
|
||||
tup_inserted AS inserted,
|
||||
tup_updated AS updated,
|
||||
tup_deleted AS deleted,
|
||||
datname
|
||||
FROM pg_stat_database
|
||||
FROM pg_catalog.pg_stat_database
|
||||
WHERE datname IN (
|
||||
SELECT datname FROM pg_database
|
||||
-- Ignore invalid databases, as we will likely have problems with
|
||||
|
||||
@@ -3,4 +3,4 @@
|
||||
-- replay LSN may have advanced past the receive LSN we are using for the
|
||||
-- calculation.
|
||||
|
||||
SELECT GREATEST(0, pg_wal_lsn_diff(pg_last_wal_receive_lsn(), pg_last_wal_replay_lsn())) AS replication_delay_bytes;
|
||||
SELECT GREATEST(0, pg_catalog.pg_wal_lsn_diff(pg_catalog.pg_last_wal_receive_lsn(), pg_catalog.pg_last_wal_replay_lsn())) AS replication_delay_bytes;
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
SELECT
|
||||
CASE
|
||||
WHEN pg_last_wal_receive_lsn() = pg_last_wal_replay_lsn() THEN 0
|
||||
ELSE GREATEST(0, EXTRACT (EPOCH FROM now() - pg_last_xact_replay_timestamp()))
|
||||
WHEN pg_catalog.pg_last_wal_receive_lsn() = pg_catalog.pg_last_wal_replay_lsn() THEN 0
|
||||
ELSE GREATEST(0, EXTRACT (EPOCH FROM pg_catalog.now() - pg_catalog.pg_last_xact_replay_timestamp()))
|
||||
END AS replication_delay_seconds;
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
SELECT
|
||||
slot_name,
|
||||
pg_wal_lsn_diff(
|
||||
pg_catalog.pg_wal_lsn_diff(
|
||||
CASE
|
||||
WHEN pg_is_in_recovery() THEN pg_last_wal_replay_lsn()
|
||||
ELSE pg_current_wal_lsn()
|
||||
WHEN pg_catalog.pg_is_in_recovery() THEN pg_catalog.pg_last_wal_replay_lsn()
|
||||
ELSE pg_catalog.pg_current_wal_lsn()
|
||||
END,
|
||||
restart_lsn)::FLOAT8 AS retained_wal
|
||||
FROM pg_replication_slots
|
||||
restart_lsn)::pg_catalog.FLOAT8 AS retained_wal
|
||||
FROM pg_catalog.pg_replication_slots
|
||||
WHERE active = false;
|
||||
|
||||
@@ -4,4 +4,4 @@ SELECT
|
||||
WHEN wal_status = 'lost' THEN 1
|
||||
ELSE 0
|
||||
END AS wal_is_lost
|
||||
FROM pg_replication_slots;
|
||||
FROM pg_catalog.pg_replication_slots;
|
||||
|
||||
@@ -1,5 +1,11 @@
|
||||
commit 5eb393810cf7c7bafa4e394dad2e349e2a8cb2cb
|
||||
Author: Alexey Masterov <alexey.masterov@databricks.com>
|
||||
Date: Mon Jul 28 18:11:02 2025 +0200
|
||||
|
||||
Patch for pg_repack
|
||||
|
||||
diff --git a/regress/Makefile b/regress/Makefile
|
||||
index bf6edcb..89b4c7f 100644
|
||||
index bf6edcb..110e734 100644
|
||||
--- a/regress/Makefile
|
||||
+++ b/regress/Makefile
|
||||
@@ -17,7 +17,7 @@ INTVERSION := $(shell echo $$(($$(echo $(VERSION).0 | sed 's/\([[:digit:]]\{1,\}
|
||||
@@ -7,18 +13,36 @@ index bf6edcb..89b4c7f 100644
|
||||
#
|
||||
|
||||
-REGRESS := init-extension repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper tablespace get_order_by trigger
|
||||
+REGRESS := init-extension repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper get_order_by trigger
|
||||
+REGRESS := init-extension noautovacuum repack-setup repack-run error-on-invalid-idx no-error-on-invalid-idx after-schema repack-check nosuper get_order_by trigger autovacuum
|
||||
|
||||
USE_PGXS = 1 # use pgxs if not in contrib directory
|
||||
PGXS := $(shell $(PG_CONFIG) --pgxs)
|
||||
diff --git a/regress/expected/init-extension.out b/regress/expected/init-extension.out
|
||||
index 9f2e171..f6e4f8d 100644
|
||||
--- a/regress/expected/init-extension.out
|
||||
+++ b/regress/expected/init-extension.out
|
||||
@@ -1,3 +1,2 @@
|
||||
SET client_min_messages = warning;
|
||||
CREATE EXTENSION pg_repack;
|
||||
-RESET client_min_messages;
|
||||
diff --git a/regress/expected/autovacuum.out b/regress/expected/autovacuum.out
|
||||
new file mode 100644
|
||||
index 0000000..e7f2363
|
||||
--- /dev/null
|
||||
+++ b/regress/expected/autovacuum.out
|
||||
@@ -0,0 +1,7 @@
|
||||
+ALTER SYSTEM SET autovacuum='on';
|
||||
+SELECT pg_reload_conf();
|
||||
+ pg_reload_conf
|
||||
+----------------
|
||||
+ t
|
||||
+(1 row)
|
||||
+
|
||||
diff --git a/regress/expected/noautovacuum.out b/regress/expected/noautovacuum.out
|
||||
new file mode 100644
|
||||
index 0000000..fc7978e
|
||||
--- /dev/null
|
||||
+++ b/regress/expected/noautovacuum.out
|
||||
@@ -0,0 +1,7 @@
|
||||
+ALTER SYSTEM SET autovacuum='off';
|
||||
+SELECT pg_reload_conf();
|
||||
+ pg_reload_conf
|
||||
+----------------
|
||||
+ t
|
||||
+(1 row)
|
||||
+
|
||||
diff --git a/regress/expected/nosuper.out b/regress/expected/nosuper.out
|
||||
index 8d0a94e..63b68bf 100644
|
||||
--- a/regress/expected/nosuper.out
|
||||
@@ -50,14 +74,22 @@ index 8d0a94e..63b68bf 100644
|
||||
INFO: repacking table "public.tbl_cluster"
|
||||
ERROR: query failed: ERROR: current transaction is aborted, commands ignored until end of transaction block
|
||||
DETAIL: query was: RESET lock_timeout
|
||||
diff --git a/regress/sql/init-extension.sql b/regress/sql/init-extension.sql
|
||||
index 9f2e171..f6e4f8d 100644
|
||||
--- a/regress/sql/init-extension.sql
|
||||
+++ b/regress/sql/init-extension.sql
|
||||
@@ -1,3 +1,2 @@
|
||||
SET client_min_messages = warning;
|
||||
CREATE EXTENSION pg_repack;
|
||||
-RESET client_min_messages;
|
||||
diff --git a/regress/sql/autovacuum.sql b/regress/sql/autovacuum.sql
|
||||
new file mode 100644
|
||||
index 0000000..a8eda63
|
||||
--- /dev/null
|
||||
+++ b/regress/sql/autovacuum.sql
|
||||
@@ -0,0 +1,2 @@
|
||||
+ALTER SYSTEM SET autovacuum='on';
|
||||
+SELECT pg_reload_conf();
|
||||
diff --git a/regress/sql/noautovacuum.sql b/regress/sql/noautovacuum.sql
|
||||
new file mode 100644
|
||||
index 0000000..13d4836
|
||||
--- /dev/null
|
||||
+++ b/regress/sql/noautovacuum.sql
|
||||
@@ -0,0 +1,2 @@
|
||||
+ALTER SYSTEM SET autovacuum='off';
|
||||
+SELECT pg_reload_conf();
|
||||
diff --git a/regress/sql/nosuper.sql b/regress/sql/nosuper.sql
|
||||
index 072f0fa..dbe60f8 100644
|
||||
--- a/regress/sql/nosuper.sql
|
||||
|
||||
@@ -82,6 +82,15 @@ struct Cli {
|
||||
#[arg(long, default_value_t = 3081)]
|
||||
pub internal_http_port: u16,
|
||||
|
||||
/// Backwards-compatible --http-port for Hadron deployments. Functionally the
|
||||
/// same as --external-http-port.
|
||||
#[arg(
|
||||
long,
|
||||
conflicts_with = "external_http_port",
|
||||
conflicts_with = "internal_http_port"
|
||||
)]
|
||||
pub http_port: Option<u16>,
|
||||
|
||||
#[arg(short = 'D', long, value_name = "DATADIR")]
|
||||
pub pgdata: String,
|
||||
|
||||
@@ -181,6 +190,26 @@ impl Cli {
|
||||
}
|
||||
}
|
||||
|
||||
// Hadron helpers to get compatible compute_ctl http ports from Cli. The old `--http-port`
|
||||
// arg is used and acts the same as `--external-http-port`. The internal http port is defined
|
||||
// to be http_port + 1. Hadron runs in the dblet environment which uses the host network, so
|
||||
// we need to be careful with the ports to choose.
|
||||
fn get_external_http_port(cli: &Cli) -> u16 {
|
||||
if cli.lakebase_mode {
|
||||
return cli.http_port.unwrap_or(cli.external_http_port);
|
||||
}
|
||||
cli.external_http_port
|
||||
}
|
||||
fn get_internal_http_port(cli: &Cli) -> u16 {
|
||||
if cli.lakebase_mode {
|
||||
return cli
|
||||
.http_port
|
||||
.map(|p| p + 1)
|
||||
.unwrap_or(cli.internal_http_port);
|
||||
}
|
||||
cli.internal_http_port
|
||||
}
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let cli = Cli::parse();
|
||||
|
||||
@@ -205,13 +234,18 @@ fn main() -> Result<()> {
|
||||
// enable core dumping for all child processes
|
||||
setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
|
||||
|
||||
installed_extensions::initialize_metrics();
|
||||
hadron_metrics::initialize_metrics();
|
||||
if cli.lakebase_mode {
|
||||
installed_extensions::initialize_metrics();
|
||||
hadron_metrics::initialize_metrics();
|
||||
}
|
||||
|
||||
let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?;
|
||||
|
||||
let config = get_config(&cli)?;
|
||||
|
||||
let external_http_port = get_external_http_port(&cli);
|
||||
let internal_http_port = get_internal_http_port(&cli);
|
||||
|
||||
let compute_node = ComputeNode::new(
|
||||
ComputeNodeParams {
|
||||
compute_id: cli.compute_id,
|
||||
@@ -220,8 +254,8 @@ fn main() -> Result<()> {
|
||||
pgdata: cli.pgdata.clone(),
|
||||
pgbin: cli.pgbin.clone(),
|
||||
pgversion: get_pg_version_string(&cli.pgbin),
|
||||
external_http_port: cli.external_http_port,
|
||||
internal_http_port: cli.internal_http_port,
|
||||
external_http_port,
|
||||
internal_http_port,
|
||||
remote_ext_base_url: cli.remote_ext_base_url.clone(),
|
||||
resize_swap_on_bind: cli.resize_swap_on_bind,
|
||||
set_disk_quota_for_fs: cli.set_disk_quota_for_fs,
|
||||
@@ -245,7 +279,7 @@ fn main() -> Result<()> {
|
||||
config,
|
||||
)?;
|
||||
|
||||
let exit_code = compute_node.run()?;
|
||||
let exit_code = compute_node.run().context("running compute node")?;
|
||||
|
||||
scenario.teardown();
|
||||
|
||||
|
||||
@@ -24,9 +24,9 @@ pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
|
||||
});
|
||||
|
||||
let query = "
|
||||
INSERT INTO health_check VALUES (1, now())
|
||||
INSERT INTO public.health_check VALUES (1, pg_catalog.now())
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET updated_at = now();";
|
||||
SET updated_at = pg_catalog.now();";
|
||||
|
||||
match client.simple_query(query).await {
|
||||
Result::Ok(result) => {
|
||||
|
||||
@@ -6,7 +6,8 @@ use compute_api::responses::{
|
||||
LfcPrewarmState, PromoteState, TlsConfig,
|
||||
};
|
||||
use compute_api::spec::{
|
||||
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PageserverProtocol, PgIdent,
|
||||
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, GenericOption,
|
||||
PageserverConnectionInfo, PageserverProtocol, PgIdent, Role,
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use futures::future::join_all;
|
||||
@@ -31,13 +32,17 @@ use std::sync::{Arc, Condvar, Mutex, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{env, fs};
|
||||
use tokio::{spawn, sync::watch, task::JoinHandle, time};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{Instrument, debug, error, info, instrument, warn};
|
||||
use url::Url;
|
||||
use utils::backoff::{
|
||||
DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff_duration,
|
||||
};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::measured_stream::MeasuredReader;
|
||||
use utils::pid_file;
|
||||
use utils::shard::{ShardCount, ShardIndex, ShardNumber};
|
||||
use utils::shard::{ShardIndex, ShardNumber, ShardStripeSize};
|
||||
|
||||
use crate::configurator::launch_configurator;
|
||||
use crate::disk_quota::set_disk_quota;
|
||||
@@ -191,6 +196,7 @@ pub struct ComputeState {
|
||||
pub startup_span: Option<tracing::span::Span>,
|
||||
|
||||
pub lfc_prewarm_state: LfcPrewarmState,
|
||||
pub lfc_prewarm_token: CancellationToken,
|
||||
pub lfc_offload_state: LfcOffloadState,
|
||||
|
||||
/// WAL flush LSN that is set after terminating Postgres and syncing safekeepers if
|
||||
@@ -216,6 +222,7 @@ impl ComputeState {
|
||||
lfc_offload_state: LfcOffloadState::default(),
|
||||
terminate_flush_lsn: None,
|
||||
promote_state: None,
|
||||
lfc_prewarm_token: CancellationToken::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -248,7 +255,7 @@ pub struct ParsedSpec {
|
||||
pub spec: ComputeSpec,
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub pageserver_connstr: String,
|
||||
pub pageserver_conninfo: PageserverConnectionInfo,
|
||||
pub safekeeper_connstrings: Vec<String>,
|
||||
pub storage_auth_token: Option<String>,
|
||||
/// k8s dns name and port
|
||||
@@ -296,25 +303,47 @@ impl ParsedSpec {
|
||||
}
|
||||
|
||||
impl TryFrom<ComputeSpec> for ParsedSpec {
|
||||
type Error = String;
|
||||
fn try_from(spec: ComputeSpec) -> Result<Self, String> {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(spec: ComputeSpec) -> Result<Self, anyhow::Error> {
|
||||
// Extract the options from the spec file that are needed to connect to
|
||||
// the storage system.
|
||||
//
|
||||
// For backwards-compatibility, the top-level fields in the spec file
|
||||
// may be empty. In that case, we need to dig them from the GUCs in the
|
||||
// cluster.settings field.
|
||||
let pageserver_connstr = spec
|
||||
.pageserver_connstring
|
||||
.clone()
|
||||
.or_else(|| spec.cluster.settings.find("neon.pageserver_connstring"))
|
||||
.ok_or("pageserver connstr should be provided")?;
|
||||
// In compute specs generated by old control plane versions, the spec file might
|
||||
// be missing the `pageserver_connection_info` field. In that case, we need to dig
|
||||
// the pageserver connection info from the `pageserver_connstr` field instead, or
|
||||
// if that's missing too, from the GUC in the cluster.settings field.
|
||||
let mut pageserver_conninfo = spec.pageserver_connection_info.clone();
|
||||
if pageserver_conninfo.is_none() {
|
||||
if let Some(pageserver_connstr_field) = &spec.pageserver_connstring {
|
||||
pageserver_conninfo = Some(PageserverConnectionInfo::from_connstr(
|
||||
pageserver_connstr_field,
|
||||
spec.shard_stripe_size,
|
||||
)?);
|
||||
}
|
||||
}
|
||||
if pageserver_conninfo.is_none() {
|
||||
if let Some(guc) = spec.cluster.settings.find("neon.pageserver_connstring") {
|
||||
let stripe_size = if let Some(guc) = spec.cluster.settings.find("neon.stripe_size")
|
||||
{
|
||||
Some(ShardStripeSize(u32::from_str(&guc)?))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
pageserver_conninfo =
|
||||
Some(PageserverConnectionInfo::from_connstr(&guc, stripe_size)?);
|
||||
}
|
||||
}
|
||||
let pageserver_conninfo = pageserver_conninfo.ok_or(anyhow::anyhow!(
|
||||
"pageserver connection information should be provided"
|
||||
))?;
|
||||
|
||||
// Similarly for safekeeper connection strings
|
||||
let safekeeper_connstrings = if spec.safekeeper_connstrings.is_empty() {
|
||||
if matches!(spec.mode, ComputeMode::Primary) {
|
||||
spec.cluster
|
||||
.settings
|
||||
.find("neon.safekeepers")
|
||||
.ok_or("safekeeper connstrings should be provided")?
|
||||
.ok_or(anyhow::anyhow!("safekeeper connstrings should be provided"))?
|
||||
.split(',')
|
||||
.map(|str| str.to_string())
|
||||
.collect()
|
||||
@@ -329,22 +358,22 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
|
||||
let tenant_id: TenantId = if let Some(tenant_id) = spec.tenant_id {
|
||||
tenant_id
|
||||
} else {
|
||||
spec.cluster
|
||||
let guc = spec
|
||||
.cluster
|
||||
.settings
|
||||
.find("neon.tenant_id")
|
||||
.ok_or("tenant id should be provided")
|
||||
.map(|s| TenantId::from_str(&s))?
|
||||
.or(Err("invalid tenant id"))?
|
||||
.ok_or(anyhow::anyhow!("tenant id should be provided"))?;
|
||||
TenantId::from_str(&guc).context("invalid tenant id")?
|
||||
};
|
||||
let timeline_id: TimelineId = if let Some(timeline_id) = spec.timeline_id {
|
||||
timeline_id
|
||||
} else {
|
||||
spec.cluster
|
||||
let guc = spec
|
||||
.cluster
|
||||
.settings
|
||||
.find("neon.timeline_id")
|
||||
.ok_or("timeline id should be provided")
|
||||
.map(|s| TimelineId::from_str(&s))?
|
||||
.or(Err("invalid timeline id"))?
|
||||
.ok_or(anyhow::anyhow!("timeline id should be provided"))?;
|
||||
TimelineId::from_str(&guc).context(anyhow::anyhow!("invalid timeline id"))?
|
||||
};
|
||||
|
||||
let endpoint_storage_addr: Option<String> = spec
|
||||
@@ -358,7 +387,7 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
|
||||
|
||||
let res = ParsedSpec {
|
||||
spec,
|
||||
pageserver_connstr,
|
||||
pageserver_conninfo,
|
||||
safekeeper_connstrings,
|
||||
storage_auth_token,
|
||||
tenant_id,
|
||||
@@ -368,7 +397,7 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
|
||||
};
|
||||
|
||||
// Now check validity of the parsed specification
|
||||
res.validate()?;
|
||||
res.validate().map_err(anyhow::Error::msg)?;
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
@@ -413,6 +442,66 @@ struct StartVmMonitorResult {
|
||||
vm_monitor: Option<JoinHandle<Result<()>>>,
|
||||
}
|
||||
|
||||
// BEGIN_HADRON
|
||||
/// This function creates roles that are used by Databricks.
|
||||
/// These roles are not needs to be botostrapped at PG Compute provisioning time.
|
||||
/// The auth method for these roles are configured in databricks_pg_hba.conf in universe repository.
|
||||
pub(crate) fn create_databricks_roles() -> Vec<String> {
|
||||
let roles = vec![
|
||||
// Role for prometheus_stats_exporter
|
||||
Role {
|
||||
name: "databricks_monitor".to_string(),
|
||||
// This uses "local" connection and auth method for that is "trust", so no password is needed.
|
||||
encrypted_password: None,
|
||||
options: Some(vec![GenericOption {
|
||||
name: "IN ROLE pg_monitor".to_string(),
|
||||
value: None,
|
||||
vartype: "string".to_string(),
|
||||
}]),
|
||||
},
|
||||
// Role for brickstore control plane
|
||||
Role {
|
||||
name: "databricks_control_plane".to_string(),
|
||||
// Certificate user does not need password.
|
||||
encrypted_password: None,
|
||||
options: Some(vec![GenericOption {
|
||||
name: "SUPERUSER".to_string(),
|
||||
value: None,
|
||||
vartype: "string".to_string(),
|
||||
}]),
|
||||
},
|
||||
// Role for brickstore httpgateway.
|
||||
Role {
|
||||
name: "databricks_gateway".to_string(),
|
||||
// Certificate user does not need password.
|
||||
encrypted_password: None,
|
||||
options: None,
|
||||
},
|
||||
];
|
||||
|
||||
roles
|
||||
.into_iter()
|
||||
.map(|role| {
|
||||
let query = format!(
|
||||
r#"
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (
|
||||
SELECT FROM pg_catalog.pg_roles WHERE rolname = '{}')
|
||||
THEN
|
||||
CREATE ROLE {} {};
|
||||
END IF;
|
||||
END
|
||||
$$;"#,
|
||||
role.name,
|
||||
role.name.pg_quote(),
|
||||
role.to_pg_options(),
|
||||
);
|
||||
query
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Databricks-specific environment variables to be passed to the `postgres` sub-process.
|
||||
pub struct DatabricksEnvVars {
|
||||
/// The Databricks "endpoint ID" of the compute instance. Used by `postgres` to check
|
||||
@@ -421,14 +510,27 @@ pub struct DatabricksEnvVars {
|
||||
/// Hostname of the Databricks workspace URL this compute instance belongs to.
|
||||
/// Used by postgres to verify Databricks PAT tokens.
|
||||
pub workspace_host: String,
|
||||
|
||||
pub lakebase_mode: bool,
|
||||
}
|
||||
|
||||
impl DatabricksEnvVars {
|
||||
pub fn new(compute_spec: &ComputeSpec, compute_id: Option<&String>) -> Self {
|
||||
// compute_id is a string format of "{endpoint_id}/{compute_idx}"
|
||||
// endpoint_id is a uuid. We only need to pass down endpoint_id to postgres.
|
||||
// Panics if compute_id is not set or not in the expected format.
|
||||
let endpoint_id = compute_id.unwrap().split('/').next().unwrap().to_string();
|
||||
pub fn new(
|
||||
compute_spec: &ComputeSpec,
|
||||
compute_id: Option<&String>,
|
||||
instance_id: Option<String>,
|
||||
lakebase_mode: bool,
|
||||
) -> Self {
|
||||
let endpoint_id = if let Some(instance_id) = instance_id {
|
||||
// Use instance_id as endpoint_id if it is set. This code path is for PuPr model.
|
||||
instance_id
|
||||
} else {
|
||||
// Use compute_id as endpoint_id if instance_id is not set. The code path is for PrPr model.
|
||||
// compute_id is a string format of "{endpoint_id}/{compute_idx}"
|
||||
// endpoint_id is a uuid. We only need to pass down endpoint_id to postgres.
|
||||
// Panics if compute_id is not set or not in the expected format.
|
||||
compute_id.unwrap().split('/').next().unwrap().to_string()
|
||||
};
|
||||
let workspace_host = compute_spec
|
||||
.databricks_settings
|
||||
.as_ref()
|
||||
@@ -437,6 +539,7 @@ impl DatabricksEnvVars {
|
||||
Self {
|
||||
endpoint_id,
|
||||
workspace_host,
|
||||
lakebase_mode,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -446,6 +549,10 @@ impl DatabricksEnvVars {
|
||||
|
||||
/// Convert DatabricksEnvVars to a list of string pairs that can be passed as env vars. Consumes `self`.
|
||||
pub fn to_env_var_list(self) -> Vec<(String, String)> {
|
||||
if !self.lakebase_mode {
|
||||
// In neon env, we don't need to pass down the env vars to postgres.
|
||||
return vec![];
|
||||
}
|
||||
vec![
|
||||
(
|
||||
Self::DATABRICKS_ENDPOINT_ID_ENVVAR.to_string(),
|
||||
@@ -482,7 +589,7 @@ impl ComputeNode {
|
||||
// that can affect `compute_ctl` and prevent it from properly configuring the database schema.
|
||||
// Unset them via connection string options before connecting to the database.
|
||||
// N.B. keep it in sync with `ZENITH_OPTIONS` in `get_maintenance_client()`.
|
||||
const EXTRA_OPTIONS: &str = "-c role=cloud_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0 -c pgaudit.log=none";
|
||||
const EXTRA_OPTIONS: &str = "-c role=cloud_admin -c default_transaction_read_only=off -c search_path='' -c statement_timeout=0 -c pgaudit.log=none";
|
||||
let options = match conn_conf.get_options() {
|
||||
// Allow the control plane to override any options set by the
|
||||
// compute
|
||||
@@ -495,7 +602,11 @@ impl ComputeNode {
|
||||
let mut new_state = ComputeState::new();
|
||||
if let Some(spec) = config.spec {
|
||||
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
|
||||
new_state.pspec = Some(pspec);
|
||||
if params.lakebase_mode {
|
||||
ComputeNode::set_spec(¶ms, &mut new_state, pspec);
|
||||
} else {
|
||||
new_state.pspec = Some(pspec);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(ComputeNode {
|
||||
@@ -1093,7 +1204,14 @@ impl ComputeNode {
|
||||
// If it is something different then create_dir() will error out anyway.
|
||||
let pgdata = &self.params.pgdata;
|
||||
let _ok = fs::remove_dir_all(pgdata);
|
||||
fs::create_dir(pgdata)?;
|
||||
if self.params.lakebase_mode {
|
||||
// Ignore creation errors if the directory already exists (e.g. mounting it ahead of time).
|
||||
// If it is something different then PG startup will error out anyway.
|
||||
let _ok = fs::create_dir(pgdata);
|
||||
} else {
|
||||
fs::create_dir(pgdata)?;
|
||||
}
|
||||
|
||||
fs::set_permissions(pgdata, fs::Permissions::from_mode(0o700))?;
|
||||
|
||||
Ok(())
|
||||
@@ -1105,12 +1223,10 @@ impl ComputeNode {
|
||||
fn try_get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
|
||||
let spec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
|
||||
let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap();
|
||||
let started = Instant::now();
|
||||
|
||||
let (connected, size) = match PageserverProtocol::from_connstring(shard0_connstr)? {
|
||||
PageserverProtocol::Libpq => self.try_get_basebackup_libpq(spec, lsn)?,
|
||||
let (connected, size) = match spec.pageserver_conninfo.prefer_protocol {
|
||||
PageserverProtocol::Grpc => self.try_get_basebackup_grpc(spec, lsn)?,
|
||||
PageserverProtocol::Libpq => self.try_get_basebackup_libpq(spec, lsn)?,
|
||||
};
|
||||
|
||||
self.fix_zenith_signal_neon_signal()?;
|
||||
@@ -1148,23 +1264,20 @@ impl ComputeNode {
|
||||
/// Fetches a basebackup via gRPC. The connstring must use grpc://. Returns the timestamp when
|
||||
/// the connection was established, and the (compressed) size of the basebackup.
|
||||
fn try_get_basebackup_grpc(&self, spec: &ParsedSpec, lsn: Lsn) -> Result<(Instant, usize)> {
|
||||
let shard0_connstr = spec
|
||||
.pageserver_connstr
|
||||
.split(',')
|
||||
.next()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
let shard_index = match spec.pageserver_connstr.split(',').count() as u8 {
|
||||
0 | 1 => ShardIndex::unsharded(),
|
||||
count => ShardIndex::new(ShardNumber(0), ShardCount(count)),
|
||||
let shard0_index = ShardIndex {
|
||||
shard_number: ShardNumber(0),
|
||||
shard_count: spec.pageserver_conninfo.shard_count,
|
||||
};
|
||||
|
||||
let shard0_url = spec
|
||||
.pageserver_conninfo
|
||||
.shard_url(ShardNumber(0), PageserverProtocol::Grpc)?
|
||||
.to_owned();
|
||||
let (reader, connected) = tokio::runtime::Handle::current().block_on(async move {
|
||||
let mut client = page_api::Client::connect(
|
||||
shard0_connstr,
|
||||
shard0_url,
|
||||
spec.tenant_id,
|
||||
spec.timeline_id,
|
||||
shard_index,
|
||||
shard0_index,
|
||||
spec.storage_auth_token.clone(),
|
||||
None, // NB: base backups use payload compression
|
||||
)
|
||||
@@ -1196,7 +1309,9 @@ impl ComputeNode {
|
||||
/// Fetches a basebackup via libpq. The connstring must use postgresql://. Returns the timestamp
|
||||
/// when the connection was established, and the (compressed) size of the basebackup.
|
||||
fn try_get_basebackup_libpq(&self, spec: &ParsedSpec, lsn: Lsn) -> Result<(Instant, usize)> {
|
||||
let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap();
|
||||
let shard0_connstr = spec
|
||||
.pageserver_conninfo
|
||||
.shard_url(ShardNumber(0), PageserverProtocol::Libpq)?;
|
||||
let mut config = postgres::Config::from_str(shard0_connstr)?;
|
||||
|
||||
// Use the storage auth token from the config file, if given.
|
||||
@@ -1283,10 +1398,7 @@ impl ComputeNode {
|
||||
return result;
|
||||
}
|
||||
Err(ref e) if attempts < max_attempts => {
|
||||
warn!(
|
||||
"Failed to get basebackup: {} (attempt {}/{})",
|
||||
e, attempts, max_attempts
|
||||
);
|
||||
warn!("Failed to get basebackup: {e:?} (attempt {attempts}/{max_attempts})");
|
||||
std::thread::sleep(std::time::Duration::from_millis(retry_period_ms as u64));
|
||||
retry_period_ms *= 1.5;
|
||||
}
|
||||
@@ -1448,6 +1560,41 @@ impl ComputeNode {
|
||||
Ok(lsn)
|
||||
}
|
||||
|
||||
fn sync_safekeepers_with_retries(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
|
||||
let max_retries = 5;
|
||||
let mut attempts = 0;
|
||||
loop {
|
||||
let result = self.sync_safekeepers(storage_auth_token.clone());
|
||||
match &result {
|
||||
Ok(_) => {
|
||||
if attempts > 0 {
|
||||
tracing::info!("sync_safekeepers succeeded after {attempts} retries");
|
||||
}
|
||||
return result;
|
||||
}
|
||||
Err(e) if attempts < max_retries => {
|
||||
tracing::info!(
|
||||
"sync_safekeepers failed, will retry (attempt {attempts}): {e:#}"
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
"sync_safekeepers still failed after {attempts} retries, giving up: {err:?}"
|
||||
);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
// sleep and retry
|
||||
let backoff = exponential_backoff_duration(
|
||||
attempts,
|
||||
DEFAULT_BASE_BACKOFF_SECONDS,
|
||||
DEFAULT_MAX_BACKOFF_SECONDS,
|
||||
);
|
||||
std::thread::sleep(backoff);
|
||||
attempts += 1;
|
||||
}
|
||||
}
|
||||
|
||||
/// Do all the preparations like PGDATA directory creation, configuration,
|
||||
/// safekeepers sync, basebackup, etc.
|
||||
#[instrument(skip_all)]
|
||||
@@ -1483,7 +1630,7 @@ impl ComputeNode {
|
||||
lsn
|
||||
} else {
|
||||
info!("starting safekeepers syncing");
|
||||
self.sync_safekeepers(pspec.storage_auth_token.clone())
|
||||
self.sync_safekeepers_with_retries(pspec.storage_auth_token.clone())
|
||||
.with_context(|| "failed to sync safekeepers")?
|
||||
};
|
||||
info!("safekeepers synced at LSN {}", lsn);
|
||||
@@ -1499,16 +1646,8 @@ impl ComputeNode {
|
||||
}
|
||||
};
|
||||
|
||||
info!(
|
||||
"getting basebackup@{} from pageserver {}",
|
||||
lsn, &pspec.pageserver_connstr
|
||||
);
|
||||
self.get_basebackup(compute_state, lsn).with_context(|| {
|
||||
format!(
|
||||
"failed to get basebackup@{} from pageserver {}",
|
||||
lsn, &pspec.pageserver_connstr
|
||||
)
|
||||
})?;
|
||||
self.get_basebackup(compute_state, lsn)
|
||||
.with_context(|| format!("failed to get basebackup@{lsn}"))?;
|
||||
|
||||
if let Some(settings) = databricks_settings {
|
||||
copy_tls_certificates(
|
||||
@@ -1572,7 +1711,7 @@ impl ComputeNode {
|
||||
// symlink doesn't affect anything.
|
||||
//
|
||||
// See https://github.com/neondatabase/autoscaling/issues/800
|
||||
std::fs::remove_dir(pgdata_path.join("pg_dynshmem"))?;
|
||||
std::fs::remove_dir_all(pgdata_path.join("pg_dynshmem"))?;
|
||||
symlink("/dev/shm/", pgdata_path.join("pg_dynshmem"))?;
|
||||
|
||||
match spec.mode {
|
||||
@@ -1587,6 +1726,12 @@ impl ComputeNode {
|
||||
|
||||
/// Start and stop a postgres process to warm up the VM for startup.
|
||||
pub fn prewarm_postgres_vm_memory(&self) -> Result<()> {
|
||||
if self.params.lakebase_mode {
|
||||
// We are running in Hadron mode. Disabling this prewarming step for now as it could run
|
||||
// into dblet port conflicts and also doesn't add much value with our current infra.
|
||||
info!("Skipping postgres prewarming in Hadron mode");
|
||||
return Ok(());
|
||||
}
|
||||
info!("prewarming VM memory");
|
||||
|
||||
// Create pgdata
|
||||
@@ -1648,7 +1793,12 @@ impl ComputeNode {
|
||||
let databricks_env_vars = {
|
||||
let state = self.state.lock().unwrap();
|
||||
let spec = &state.pspec.as_ref().unwrap().spec;
|
||||
DatabricksEnvVars::new(spec, Some(&self.params.compute_id))
|
||||
DatabricksEnvVars::new(
|
||||
spec,
|
||||
Some(&self.params.compute_id),
|
||||
self.params.instance_id.clone(),
|
||||
self.params.lakebase_mode,
|
||||
)
|
||||
};
|
||||
|
||||
info!(
|
||||
@@ -1775,7 +1925,7 @@ impl ComputeNode {
|
||||
|
||||
// It doesn't matter what were the options before, here we just want
|
||||
// to connect and create a new superuser role.
|
||||
const ZENITH_OPTIONS: &str = "-c role=zenith_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0";
|
||||
const ZENITH_OPTIONS: &str = "-c role=zenith_admin -c default_transaction_read_only=off -c search_path='' -c statement_timeout=0";
|
||||
zenith_admin_conf.options(ZENITH_OPTIONS);
|
||||
|
||||
let mut client =
|
||||
@@ -1820,7 +1970,15 @@ impl ComputeNode {
|
||||
/// Do initial configuration of the already started Postgres.
|
||||
#[instrument(skip_all)]
|
||||
pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> {
|
||||
let conf = self.get_tokio_conn_conf(Some("compute_ctl:apply_config"));
|
||||
let mut conf = self.get_tokio_conn_conf(Some("compute_ctl:apply_config"));
|
||||
|
||||
if self.params.lakebase_mode {
|
||||
// Set a 2-minute statement_timeout for the session applying config. The individual SQL statements
|
||||
// used in apply_spec_sql() should not take long (they are just creating users and installing
|
||||
// extensions). If any of them are stuck for an extended period of time it usually indicates a
|
||||
// pageserver connectivity problem and we should bail out.
|
||||
conf.options("-c statement_timeout=2min");
|
||||
}
|
||||
|
||||
let conf = Arc::new(conf);
|
||||
let spec = Arc::new(
|
||||
@@ -2138,7 +2296,17 @@ impl ComputeNode {
|
||||
pub fn check_for_core_dumps(&self) -> Result<()> {
|
||||
let core_dump_dir = match std::env::consts::OS {
|
||||
"macos" => Path::new("/cores/"),
|
||||
_ => Path::new(&self.params.pgdata),
|
||||
// BEGIN HADRON
|
||||
// NB: Read core dump files from a fixed location outside of
|
||||
// the data directory since `compute_ctl` wipes the data directory
|
||||
// across container restarts.
|
||||
_ => {
|
||||
if self.params.lakebase_mode {
|
||||
Path::new("/databricks/logs/brickstore")
|
||||
} else {
|
||||
Path::new(&self.params.pgdata)
|
||||
}
|
||||
} // END HADRON
|
||||
};
|
||||
|
||||
// Collect core dump paths if any
|
||||
@@ -2212,13 +2380,13 @@ impl ComputeNode {
|
||||
let result = client
|
||||
.simple_query(
|
||||
"SELECT
|
||||
row_to_json(pg_stat_statements)
|
||||
pg_catalog.row_to_json(pss)
|
||||
FROM
|
||||
pg_stat_statements
|
||||
public.pg_stat_statements pss
|
||||
WHERE
|
||||
userid != 'cloud_admin'::regrole::oid
|
||||
pss.userid != 'cloud_admin'::pg_catalog.regrole::pg_catalog.oid
|
||||
ORDER BY
|
||||
(mean_exec_time + mean_plan_time) DESC
|
||||
(pss.mean_exec_time + pss.mean_plan_time) DESC
|
||||
LIMIT 100",
|
||||
)
|
||||
.await;
|
||||
@@ -2346,11 +2514,11 @@ LIMIT 100",
|
||||
|
||||
// check the role grants first - to gracefully handle read-replicas.
|
||||
let select = "SELECT privilege_type
|
||||
FROM pg_namespace
|
||||
JOIN LATERAL (SELECT * FROM aclexplode(nspacl) AS x) acl ON true
|
||||
JOIN pg_user users ON acl.grantee = users.usesysid
|
||||
WHERE users.usename = $1
|
||||
AND nspname = $2";
|
||||
FROM pg_catalog.pg_namespace
|
||||
JOIN LATERAL (SELECT * FROM aclexplode(nspacl) AS x) AS acl ON true
|
||||
JOIN pg_catalog.pg_user users ON acl.grantee = users.usesysid
|
||||
WHERE users.usename OPERATOR(pg_catalog.=) $1::pg_catalog.name
|
||||
AND nspname OPERATOR(pg_catalog.=) $2::pg_catalog.name";
|
||||
let rows = db_client
|
||||
.query(select, &[role_name, schema_name])
|
||||
.await
|
||||
@@ -2419,8 +2587,9 @@ LIMIT 100",
|
||||
.await
|
||||
.with_context(|| format!("Failed to execute query: {query}"))?;
|
||||
} else {
|
||||
let query =
|
||||
format!("CREATE EXTENSION IF NOT EXISTS {ext_name} WITH VERSION {quoted_version}");
|
||||
let query = format!(
|
||||
"CREATE EXTENSION IF NOT EXISTS {ext_name} WITH SCHEMA public VERSION {quoted_version}"
|
||||
);
|
||||
db_client
|
||||
.simple_query(&query)
|
||||
.await
|
||||
@@ -2451,7 +2620,7 @@ LIMIT 100",
|
||||
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
|
||||
libs_vec = libs
|
||||
.split(&[',', '\'', ' '])
|
||||
.filter(|s| *s != "neon" && !s.is_empty())
|
||||
.filter(|s| *s != "neon" && *s != "databricks_auth" && !s.is_empty())
|
||||
.map(str::to_string)
|
||||
.collect();
|
||||
}
|
||||
@@ -2470,7 +2639,7 @@ LIMIT 100",
|
||||
if let Some(libs) = shared_preload_libraries_line.split("='").nth(1) {
|
||||
preload_libs_vec = libs
|
||||
.split(&[',', '\'', ' '])
|
||||
.filter(|s| *s != "neon" && !s.is_empty())
|
||||
.filter(|s| *s != "neon" && *s != "databricks_auth" && !s.is_empty())
|
||||
.map(str::to_string)
|
||||
.collect();
|
||||
}
|
||||
@@ -2523,22 +2692,22 @@ LIMIT 100",
|
||||
/// The operation will time out after a specified duration.
|
||||
pub fn wait_timeout_while_pageserver_connstr_unchanged(&self, duration: Duration) {
|
||||
let state = self.state.lock().unwrap();
|
||||
let old_pageserver_connstr = state
|
||||
let old_pageserver_conninfo = state
|
||||
.pspec
|
||||
.as_ref()
|
||||
.expect("spec must be set")
|
||||
.pageserver_connstr
|
||||
.pageserver_conninfo
|
||||
.clone();
|
||||
let mut unchanged = true;
|
||||
let _ = self
|
||||
.state_changed
|
||||
.wait_timeout_while(state, duration, |s| {
|
||||
let pageserver_connstr = &s
|
||||
let pageserver_conninfo = &s
|
||||
.pspec
|
||||
.as_ref()
|
||||
.expect("spec must be set")
|
||||
.pageserver_connstr;
|
||||
unchanged = pageserver_connstr == &old_pageserver_connstr;
|
||||
.pageserver_conninfo;
|
||||
unchanged = pageserver_conninfo == &old_pageserver_conninfo;
|
||||
unchanged
|
||||
})
|
||||
.unwrap();
|
||||
@@ -2611,7 +2780,7 @@ LIMIT 100",
|
||||
// 4. We start again and try to prewarm with the state from 2. instead of the previous complete state
|
||||
if matches!(
|
||||
prewarm_state,
|
||||
LfcPrewarmState::Completed
|
||||
LfcPrewarmState::Completed { .. }
|
||||
| LfcPrewarmState::NotPrewarmed
|
||||
| LfcPrewarmState::Skipped
|
||||
) {
|
||||
@@ -2796,7 +2965,10 @@ mod tests {
|
||||
|
||||
match ParsedSpec::try_from(spec.clone()) {
|
||||
Ok(_p) => panic!("Failed to detect duplicate entry"),
|
||||
Err(e) => assert!(e.starts_with("duplicate entry in safekeeper_connstrings:")),
|
||||
Err(e) => assert!(
|
||||
e.to_string()
|
||||
.starts_with("duplicate entry in safekeeper_connstrings:")
|
||||
),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,18 +7,11 @@ use http::StatusCode;
|
||||
use reqwest::Client;
|
||||
use std::mem::replace;
|
||||
use std::sync::Arc;
|
||||
use tokio::{io::AsyncReadExt, spawn};
|
||||
use std::time::Instant;
|
||||
use tokio::{io::AsyncReadExt, select, spawn};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info};
|
||||
|
||||
#[derive(serde::Serialize, Default)]
|
||||
pub struct LfcPrewarmStateWithProgress {
|
||||
#[serde(flatten)]
|
||||
base: LfcPrewarmState,
|
||||
total: i32,
|
||||
prewarmed: i32,
|
||||
skipped: i32,
|
||||
}
|
||||
|
||||
/// A pair of url and a token to query endpoint storage for LFC prewarm-related tasks
|
||||
struct EndpointStoragePair {
|
||||
url: String,
|
||||
@@ -27,7 +20,7 @@ struct EndpointStoragePair {
|
||||
|
||||
const KEY: &str = "lfc_state";
|
||||
impl EndpointStoragePair {
|
||||
/// endpoint_id is set to None while prewarming from other endpoint, see replica promotion
|
||||
/// endpoint_id is set to None while prewarming from other endpoint, see compute_promote.rs
|
||||
/// If not None, takes precedence over pspec.spec.endpoint_id
|
||||
fn from_spec_and_endpoint(
|
||||
pspec: &crate::compute::ParsedSpec,
|
||||
@@ -53,36 +46,8 @@ impl EndpointStoragePair {
|
||||
}
|
||||
|
||||
impl ComputeNode {
|
||||
// If prewarm failed, we want to get overall number of segments as well as done ones.
|
||||
// However, this function should be reliable even if querying postgres failed.
|
||||
pub async fn lfc_prewarm_state(&self) -> LfcPrewarmStateWithProgress {
|
||||
info!("requesting LFC prewarm state from postgres");
|
||||
let mut state = LfcPrewarmStateWithProgress::default();
|
||||
{
|
||||
state.base = self.state.lock().unwrap().lfc_prewarm_state.clone();
|
||||
}
|
||||
|
||||
let client = match ComputeNode::get_maintenance_client(&self.tokio_conn_conf).await {
|
||||
Ok(client) => client,
|
||||
Err(err) => {
|
||||
error!(%err, "connecting to postgres");
|
||||
return state;
|
||||
}
|
||||
};
|
||||
let row = match client
|
||||
.query_one("select * from neon.get_prewarm_info()", &[])
|
||||
.await
|
||||
{
|
||||
Ok(row) => row,
|
||||
Err(err) => {
|
||||
error!(%err, "querying LFC prewarm status");
|
||||
return state;
|
||||
}
|
||||
};
|
||||
state.total = row.try_get(0).unwrap_or_default();
|
||||
state.prewarmed = row.try_get(1).unwrap_or_default();
|
||||
state.skipped = row.try_get(2).unwrap_or_default();
|
||||
state
|
||||
pub async fn lfc_prewarm_state(&self) -> LfcPrewarmState {
|
||||
self.state.lock().unwrap().lfc_prewarm_state.clone()
|
||||
}
|
||||
|
||||
pub fn lfc_offload_state(&self) -> LfcOffloadState {
|
||||
@@ -92,34 +57,35 @@ impl ComputeNode {
|
||||
/// If there is a prewarm request ongoing, return `false`, `true` otherwise.
|
||||
/// Has a failpoint "compute-prewarm"
|
||||
pub fn prewarm_lfc(self: &Arc<Self>, from_endpoint: Option<String>) -> bool {
|
||||
let token: CancellationToken;
|
||||
{
|
||||
let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
|
||||
if let LfcPrewarmState::Prewarming = replace(state, LfcPrewarmState::Prewarming) {
|
||||
let state = &mut self.state.lock().unwrap();
|
||||
token = state.lfc_prewarm_token.clone();
|
||||
if let LfcPrewarmState::Prewarming =
|
||||
replace(&mut state.lfc_prewarm_state, LfcPrewarmState::Prewarming)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
crate::metrics::LFC_PREWARMS.inc();
|
||||
|
||||
let cloned = self.clone();
|
||||
let this = self.clone();
|
||||
spawn(async move {
|
||||
let state = match cloned.prewarm_impl(from_endpoint).await {
|
||||
Ok(true) => LfcPrewarmState::Completed,
|
||||
Ok(false) => {
|
||||
info!(
|
||||
"skipping LFC prewarm because LFC state is not found in endpoint storage"
|
||||
);
|
||||
LfcPrewarmState::Skipped
|
||||
}
|
||||
let prewarm_state = match this.prewarm_impl(from_endpoint, token).await {
|
||||
Ok(state) => state,
|
||||
Err(err) => {
|
||||
crate::metrics::LFC_PREWARM_ERRORS.inc();
|
||||
error!(%err, "could not prewarm LFC");
|
||||
LfcPrewarmState::Failed {
|
||||
error: format!("{err:#}"),
|
||||
}
|
||||
let error = format!("{err:#}");
|
||||
LfcPrewarmState::Failed { error }
|
||||
}
|
||||
};
|
||||
|
||||
cloned.state.lock().unwrap().lfc_prewarm_state = state;
|
||||
let state = &mut this.state.lock().unwrap();
|
||||
if let LfcPrewarmState::Cancelled = prewarm_state {
|
||||
state.lfc_prewarm_token = CancellationToken::new();
|
||||
}
|
||||
state.lfc_prewarm_state = prewarm_state;
|
||||
});
|
||||
true
|
||||
}
|
||||
@@ -131,55 +97,101 @@ impl ComputeNode {
|
||||
}
|
||||
|
||||
/// Request LFC state from endpoint storage and load corresponding pages into Postgres.
|
||||
/// Returns a result with `false` if the LFC state is not found in endpoint storage.
|
||||
async fn prewarm_impl(&self, from_endpoint: Option<String>) -> Result<bool> {
|
||||
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(from_endpoint)?;
|
||||
async fn prewarm_impl(
|
||||
&self,
|
||||
from_endpoint: Option<String>,
|
||||
token: CancellationToken,
|
||||
) -> Result<LfcPrewarmState> {
|
||||
let EndpointStoragePair {
|
||||
url,
|
||||
token: storage_token,
|
||||
} = self.endpoint_storage_pair(from_endpoint)?;
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
fail::fail_point!("compute-prewarm", |_| {
|
||||
bail!("prewarm configured to fail because of a failpoint")
|
||||
});
|
||||
fail::fail_point!("compute-prewarm", |_| bail!("compute-prewarm failpoint"));
|
||||
|
||||
info!(%url, "requesting LFC state from endpoint storage");
|
||||
let request = Client::new().get(&url).bearer_auth(token);
|
||||
let res = request.send().await.context("querying endpoint storage")?;
|
||||
match res.status() {
|
||||
let mut now = Instant::now();
|
||||
let request = Client::new().get(&url).bearer_auth(storage_token);
|
||||
let response = select! {
|
||||
_ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
|
||||
response = request.send() => response
|
||||
}
|
||||
.context("querying endpoint storage")?;
|
||||
|
||||
match response.status() {
|
||||
StatusCode::OK => (),
|
||||
StatusCode::NOT_FOUND => {
|
||||
return Ok(false);
|
||||
}
|
||||
StatusCode::NOT_FOUND => return Ok(LfcPrewarmState::Skipped),
|
||||
status => bail!("{status} querying endpoint storage"),
|
||||
}
|
||||
let state_download_time_ms = now.elapsed().as_millis() as u32;
|
||||
now = Instant::now();
|
||||
|
||||
let mut uncompressed = Vec::new();
|
||||
let lfc_state = res
|
||||
.bytes()
|
||||
.await
|
||||
.context("getting request body from endpoint storage")?;
|
||||
ZstdDecoder::new(lfc_state.iter().as_slice())
|
||||
.read_to_end(&mut uncompressed)
|
||||
.await
|
||||
.context("decoding LFC state")?;
|
||||
let lfc_state = select! {
|
||||
_ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
|
||||
lfc_state = response.bytes() => lfc_state
|
||||
}
|
||||
.context("getting request body from endpoint storage")?;
|
||||
|
||||
let mut decoder = ZstdDecoder::new(lfc_state.iter().as_slice());
|
||||
select! {
|
||||
_ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
|
||||
read = decoder.read_to_end(&mut uncompressed) => read
|
||||
}
|
||||
.context("decoding LFC state")?;
|
||||
let uncompress_time_ms = now.elapsed().as_millis() as u32;
|
||||
now = Instant::now();
|
||||
|
||||
let uncompressed_len = uncompressed.len();
|
||||
info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}");
|
||||
|
||||
info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}, loading into Postgres");
|
||||
|
||||
ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
|
||||
// Client connection and prewarm info querying are fast and therefore don't need
|
||||
// cancellation
|
||||
let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
|
||||
.await
|
||||
.context("connecting to postgres")?
|
||||
.query_one("select neon.prewarm_local_cache($1)", &[&uncompressed])
|
||||
.await
|
||||
.context("loading LFC state into postgres")
|
||||
.map(|_| ())?;
|
||||
.context("connecting to postgres")?;
|
||||
let pg_token = client.cancel_token();
|
||||
|
||||
Ok(true)
|
||||
let params: Vec<&(dyn postgres_types::ToSql + Sync)> = vec![&uncompressed];
|
||||
select! {
|
||||
res = client.query_one("select neon.prewarm_local_cache($1)", ¶ms) => res,
|
||||
_ = token.cancelled() => {
|
||||
pg_token.cancel_query(postgres::NoTls).await
|
||||
.context("cancelling neon.prewarm_local_cache()")?;
|
||||
return Ok(LfcPrewarmState::Cancelled)
|
||||
}
|
||||
}
|
||||
.context("loading LFC state into postgres")
|
||||
.map(|_| ())?;
|
||||
let prewarm_time_ms = now.elapsed().as_millis() as u32;
|
||||
|
||||
let row = client
|
||||
.query_one("select * from neon.get_prewarm_info()", &[])
|
||||
.await
|
||||
.context("querying prewarm info")?;
|
||||
let total = row.try_get(0).unwrap_or_default();
|
||||
let prewarmed = row.try_get(1).unwrap_or_default();
|
||||
let skipped = row.try_get(2).unwrap_or_default();
|
||||
|
||||
Ok(LfcPrewarmState::Completed {
|
||||
total,
|
||||
prewarmed,
|
||||
skipped,
|
||||
state_download_time_ms,
|
||||
uncompress_time_ms,
|
||||
prewarm_time_ms,
|
||||
})
|
||||
}
|
||||
|
||||
/// If offload request is ongoing, return false, true otherwise
|
||||
pub fn offload_lfc(self: &Arc<Self>) -> bool {
|
||||
{
|
||||
let state = &mut self.state.lock().unwrap().lfc_offload_state;
|
||||
if replace(state, LfcOffloadState::Offloading) == LfcOffloadState::Offloading {
|
||||
if matches!(
|
||||
replace(state, LfcOffloadState::Offloading),
|
||||
LfcOffloadState::Offloading
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -191,7 +203,10 @@ impl ComputeNode {
|
||||
pub async fn offload_lfc_async(self: &Arc<Self>) {
|
||||
{
|
||||
let state = &mut self.state.lock().unwrap().lfc_offload_state;
|
||||
if replace(state, LfcOffloadState::Offloading) == LfcOffloadState::Offloading {
|
||||
if matches!(
|
||||
replace(state, LfcOffloadState::Offloading),
|
||||
LfcOffloadState::Offloading
|
||||
) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -200,23 +215,23 @@ impl ComputeNode {
|
||||
|
||||
async fn offload_lfc_with_state_update(&self) {
|
||||
crate::metrics::LFC_OFFLOADS.inc();
|
||||
|
||||
let Err(err) = self.offload_lfc_impl().await else {
|
||||
self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Completed;
|
||||
return;
|
||||
};
|
||||
|
||||
crate::metrics::LFC_OFFLOAD_ERRORS.inc();
|
||||
error!(%err, "could not offload LFC state to endpoint storage");
|
||||
self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed {
|
||||
error: format!("{err:#}"),
|
||||
let state = match self.offload_lfc_impl().await {
|
||||
Ok(state) => state,
|
||||
Err(err) => {
|
||||
crate::metrics::LFC_OFFLOAD_ERRORS.inc();
|
||||
error!(%err, "could not offload LFC");
|
||||
let error = format!("{err:#}");
|
||||
LfcOffloadState::Failed { error }
|
||||
}
|
||||
};
|
||||
self.state.lock().unwrap().lfc_offload_state = state;
|
||||
}
|
||||
|
||||
async fn offload_lfc_impl(&self) -> Result<()> {
|
||||
async fn offload_lfc_impl(&self) -> Result<LfcOffloadState> {
|
||||
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
|
||||
info!(%url, "requesting LFC state from Postgres");
|
||||
|
||||
let mut now = Instant::now();
|
||||
let row = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
|
||||
.await
|
||||
.context("connecting to postgres")?
|
||||
@@ -228,26 +243,41 @@ impl ComputeNode {
|
||||
.context("deserializing LFC state")?;
|
||||
let Some(state) = state else {
|
||||
info!(%url, "empty LFC state, not exporting");
|
||||
return Ok(());
|
||||
return Ok(LfcOffloadState::Skipped);
|
||||
};
|
||||
let state_query_time_ms = now.elapsed().as_millis() as u32;
|
||||
now = Instant::now();
|
||||
|
||||
let mut compressed = Vec::new();
|
||||
ZstdEncoder::new(state)
|
||||
.read_to_end(&mut compressed)
|
||||
.await
|
||||
.context("compressing LFC state")?;
|
||||
let compress_time_ms = now.elapsed().as_millis() as u32;
|
||||
now = Instant::now();
|
||||
|
||||
let compressed_len = compressed.len();
|
||||
info!(%url, "downloaded LFC state, compressed size {compressed_len}, writing to endpoint storage");
|
||||
info!(%url, "downloaded LFC state, compressed size {compressed_len}");
|
||||
|
||||
let request = Client::new().put(url).bearer_auth(token).body(compressed);
|
||||
match request.send().await {
|
||||
Ok(res) if res.status() == StatusCode::OK => Ok(()),
|
||||
Ok(res) => bail!(
|
||||
"Request to endpoint storage failed with status: {}",
|
||||
res.status()
|
||||
),
|
||||
Err(err) => Err(err).context("writing to endpoint storage"),
|
||||
let response = request
|
||||
.send()
|
||||
.await
|
||||
.context("writing to endpoint storage")?;
|
||||
let state_upload_time_ms = now.elapsed().as_millis() as u32;
|
||||
let status = response.status();
|
||||
if status != StatusCode::OK {
|
||||
bail!("request to endpoint storage failed: {status}");
|
||||
}
|
||||
|
||||
Ok(LfcOffloadState::Completed {
|
||||
compress_time_ms,
|
||||
state_query_time_ms,
|
||||
state_upload_time_ms,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn cancel_prewarm(self: &Arc<Self>) {
|
||||
self.state.lock().unwrap().lfc_prewarm_token.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,32 +1,24 @@
|
||||
use crate::compute::ComputeNode;
|
||||
use anyhow::{Context, Result, bail};
|
||||
use anyhow::{Context, bail};
|
||||
use compute_api::responses::{LfcPrewarmState, PromoteConfig, PromoteState};
|
||||
use compute_api::spec::ComputeMode;
|
||||
use itertools::Itertools;
|
||||
use std::collections::HashMap;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use tokio::time::sleep;
|
||||
use std::time::Instant;
|
||||
use tracing::info;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
impl ComputeNode {
|
||||
/// Returns only when promote fails or succeeds. If a network error occurs
|
||||
/// and http client disconnects, this does not stop promotion, and subsequent
|
||||
/// calls block until promote finishes.
|
||||
/// Returns only when promote fails or succeeds. If http client calling this function
|
||||
/// disconnects, this does not stop promotion, and subsequent calls block until promote finishes.
|
||||
/// Called by control plane on secondary after primary endpoint is terminated
|
||||
/// Has a failpoint "compute-promotion"
|
||||
pub async fn promote(self: &Arc<Self>, cfg: PromoteConfig) -> PromoteState {
|
||||
let cloned = self.clone();
|
||||
let promote_fn = async move || {
|
||||
let Err(err) = cloned.promote_impl(cfg).await else {
|
||||
return PromoteState::Completed;
|
||||
};
|
||||
tracing::error!(%err, "promoting");
|
||||
PromoteState::Failed {
|
||||
error: format!("{err:#}"),
|
||||
pub async fn promote(self: &std::sync::Arc<Self>, cfg: PromoteConfig) -> PromoteState {
|
||||
let this = self.clone();
|
||||
let promote_fn = async move || match this.promote_impl(cfg).await {
|
||||
Ok(state) => state,
|
||||
Err(err) => {
|
||||
tracing::error!(%err, "promoting replica");
|
||||
let error = format!("{err:#}");
|
||||
PromoteState::Failed { error }
|
||||
}
|
||||
};
|
||||
|
||||
let start_promotion = || {
|
||||
let (tx, rx) = tokio::sync::watch::channel(PromoteState::NotPromoted);
|
||||
tokio::spawn(async move { tx.send(promote_fn().await) });
|
||||
@@ -34,36 +26,31 @@ impl ComputeNode {
|
||||
};
|
||||
|
||||
let mut task;
|
||||
// self.state is unlocked after block ends so we lock it in promote_impl
|
||||
// and task.changed() is reached
|
||||
// promote_impl locks self.state so we need to unlock it before calling task.changed()
|
||||
{
|
||||
task = self
|
||||
.state
|
||||
.lock()
|
||||
.unwrap()
|
||||
.promote_state
|
||||
.get_or_insert_with(start_promotion)
|
||||
.clone()
|
||||
let promote_state = &mut self.state.lock().unwrap().promote_state;
|
||||
task = promote_state.get_or_insert_with(start_promotion).clone()
|
||||
}
|
||||
if task.changed().await.is_err() {
|
||||
let error = "promote sender dropped".to_string();
|
||||
return PromoteState::Failed { error };
|
||||
}
|
||||
task.changed().await.expect("promote sender dropped");
|
||||
task.borrow().clone()
|
||||
}
|
||||
|
||||
async fn promote_impl(&self, mut cfg: PromoteConfig) -> Result<()> {
|
||||
async fn promote_impl(&self, cfg: PromoteConfig) -> anyhow::Result<PromoteState> {
|
||||
{
|
||||
let state = self.state.lock().unwrap();
|
||||
let mode = &state.pspec.as_ref().unwrap().spec.mode;
|
||||
if *mode != ComputeMode::Replica {
|
||||
bail!("{} is not replica", mode.to_type_str());
|
||||
if *mode != compute_api::spec::ComputeMode::Replica {
|
||||
bail!("compute mode \"{}\" is not replica", mode.to_type_str());
|
||||
}
|
||||
|
||||
// we don't need to query Postgres so not self.lfc_prewarm_state()
|
||||
match &state.lfc_prewarm_state {
|
||||
LfcPrewarmState::NotPrewarmed | LfcPrewarmState::Prewarming => {
|
||||
bail!("prewarm not requested or pending")
|
||||
status @ (LfcPrewarmState::NotPrewarmed | LfcPrewarmState::Prewarming) => {
|
||||
bail!("compute {status}")
|
||||
}
|
||||
LfcPrewarmState::Failed { error } => {
|
||||
tracing::warn!(%error, "replica prewarm failed")
|
||||
tracing::warn!(%error, "compute prewarm failed")
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
@@ -72,26 +59,29 @@ impl ComputeNode {
|
||||
let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
|
||||
.await
|
||||
.context("connecting to postgres")?;
|
||||
let mut now = Instant::now();
|
||||
|
||||
let primary_lsn = cfg.wal_flush_lsn;
|
||||
let mut last_wal_replay_lsn: Lsn = Lsn::INVALID;
|
||||
let mut standby_lsn = utils::lsn::Lsn::INVALID;
|
||||
const RETRIES: i32 = 20;
|
||||
for i in 0..=RETRIES {
|
||||
let row = client
|
||||
.query_one("SELECT pg_last_wal_replay_lsn()", &[])
|
||||
.query_one("SELECT pg_catalog.pg_last_wal_replay_lsn()", &[])
|
||||
.await
|
||||
.context("getting last replay lsn")?;
|
||||
let lsn: u64 = row.get::<usize, postgres_types::PgLsn>(0).into();
|
||||
last_wal_replay_lsn = lsn.into();
|
||||
if last_wal_replay_lsn >= primary_lsn {
|
||||
standby_lsn = lsn.into();
|
||||
if standby_lsn >= primary_lsn {
|
||||
break;
|
||||
}
|
||||
info!("Try {i}, replica lsn {last_wal_replay_lsn}, primary lsn {primary_lsn}");
|
||||
sleep(Duration::from_secs(1)).await;
|
||||
info!(%standby_lsn, %primary_lsn, "catching up, try {i}");
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
if last_wal_replay_lsn < primary_lsn {
|
||||
if standby_lsn < primary_lsn {
|
||||
bail!("didn't catch up with primary in {RETRIES} retries");
|
||||
}
|
||||
let lsn_wait_time_ms = now.elapsed().as_millis() as u32;
|
||||
now = Instant::now();
|
||||
|
||||
// using $1 doesn't work with ALTER SYSTEM SET
|
||||
let safekeepers_sql = format!(
|
||||
@@ -103,26 +93,32 @@ impl ComputeNode {
|
||||
.await
|
||||
.context("setting safekeepers")?;
|
||||
client
|
||||
.query("SELECT pg_reload_conf()", &[])
|
||||
.query(
|
||||
"ALTER SYSTEM SET synchronous_standby_names=walproposer",
|
||||
&[],
|
||||
)
|
||||
.await
|
||||
.context("setting synchronous_standby_names")?;
|
||||
client
|
||||
.query("SELECT pg_catalog.pg_reload_conf()", &[])
|
||||
.await
|
||||
.context("reloading postgres config")?;
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
fail::fail_point!("compute-promotion", |_| {
|
||||
bail!("promotion configured to fail because of a failpoint")
|
||||
});
|
||||
fail::fail_point!("compute-promotion", |_| bail!(
|
||||
"compute-promotion failpoint"
|
||||
));
|
||||
|
||||
let row = client
|
||||
.query_one("SELECT * FROM pg_promote()", &[])
|
||||
.query_one("SELECT * FROM pg_catalog.pg_promote()", &[])
|
||||
.await
|
||||
.context("pg_promote")?;
|
||||
if !row.get::<usize, bool>(0) {
|
||||
bail!("pg_promote() returned false");
|
||||
bail!("pg_promote() failed");
|
||||
}
|
||||
let pg_promote_time_ms = now.elapsed().as_millis() as u32;
|
||||
let now = Instant::now();
|
||||
|
||||
let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
|
||||
.await
|
||||
.context("connecting to postgres")?;
|
||||
let row = client
|
||||
.query_one("SHOW transaction_read_only", &[])
|
||||
.await
|
||||
@@ -131,36 +127,47 @@ impl ComputeNode {
|
||||
bail!("replica in read only mode after promotion");
|
||||
}
|
||||
|
||||
// Already checked validity in http handler
|
||||
#[allow(unused_mut)]
|
||||
let mut new_pspec = crate::compute::ParsedSpec::try_from(cfg.spec).expect("invalid spec");
|
||||
{
|
||||
let mut state = self.state.lock().unwrap();
|
||||
let spec = &mut state.pspec.as_mut().unwrap().spec;
|
||||
spec.mode = ComputeMode::Primary;
|
||||
let new_conf = cfg.spec.cluster.postgresql_conf.as_mut().unwrap();
|
||||
let existing_conf = spec.cluster.postgresql_conf.as_ref().unwrap();
|
||||
Self::merge_spec(new_conf, existing_conf);
|
||||
|
||||
// Local setup has different ports for pg process (port=) for primary and secondary.
|
||||
// Primary is stopped so we need secondary's "port" value
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
let old_spec = &state.pspec.as_ref().unwrap().spec;
|
||||
let Some(old_conf) = old_spec.cluster.postgresql_conf.as_ref() else {
|
||||
bail!("pspec.spec.cluster.postgresql_conf missing for endpoint");
|
||||
};
|
||||
let set: std::collections::HashMap<&str, &str> = old_conf
|
||||
.split_terminator('\n')
|
||||
.map(|e| e.split_once("=").expect("invalid item"))
|
||||
.collect();
|
||||
|
||||
let Some(new_conf) = new_pspec.spec.cluster.postgresql_conf.as_mut() else {
|
||||
bail!("pspec.spec.cluster.postgresql_conf missing for supplied config");
|
||||
};
|
||||
new_conf.push_str(&format!("port={}\n", set["port"]));
|
||||
}
|
||||
|
||||
tracing::debug!("applied spec: {:#?}", new_pspec.spec);
|
||||
if self.params.lakebase_mode {
|
||||
ComputeNode::set_spec(&self.params, &mut state, new_pspec);
|
||||
} else {
|
||||
state.pspec = Some(new_pspec);
|
||||
}
|
||||
}
|
||||
|
||||
info!("applied new spec, reconfiguring as primary");
|
||||
self.reconfigure()
|
||||
}
|
||||
self.reconfigure()?;
|
||||
let reconfigure_time_ms = now.elapsed().as_millis() as u32;
|
||||
|
||||
/// Merge old and new Postgres conf specs to apply on secondary.
|
||||
/// Change new spec's port and safekeepers since they are supplied
|
||||
/// differenly
|
||||
fn merge_spec(new_conf: &mut String, existing_conf: &str) {
|
||||
let mut new_conf_set: HashMap<&str, &str> = new_conf
|
||||
.split_terminator('\n')
|
||||
.map(|e| e.split_once("=").expect("invalid item"))
|
||||
.collect();
|
||||
new_conf_set.remove("neon.safekeepers");
|
||||
|
||||
let existing_conf_set: HashMap<&str, &str> = existing_conf
|
||||
.split_terminator('\n')
|
||||
.map(|e| e.split_once("=").expect("invalid item"))
|
||||
.collect();
|
||||
new_conf_set.insert("port", existing_conf_set["port"]);
|
||||
*new_conf = new_conf_set
|
||||
.iter()
|
||||
.map(|(k, v)| format!("{k}={v}"))
|
||||
.join("\n");
|
||||
Ok(PromoteState::Completed {
|
||||
lsn_wait_time_ms,
|
||||
pg_promote_time_ms,
|
||||
reconfigure_time_ms,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,8 @@ use crate::pg_helpers::{
|
||||
};
|
||||
use crate::tls::{self, SERVER_CRT, SERVER_KEY};
|
||||
|
||||
use utils::shard::{ShardIndex, ShardNumber};
|
||||
|
||||
/// Check that `line` is inside a text file and put it there if it is not.
|
||||
/// Create file if it doesn't exist.
|
||||
pub fn line_in_file(path: &Path, line: &str) -> Result<bool> {
|
||||
@@ -63,15 +65,78 @@ pub fn write_postgres_conf(
|
||||
writeln!(file, "{conf}")?;
|
||||
}
|
||||
|
||||
// Stripe size GUC should be defined prior to connection string
|
||||
if let Some(stripe_size) = spec.shard_stripe_size {
|
||||
writeln!(file, "neon.stripe_size={stripe_size}")?;
|
||||
}
|
||||
// Add options for connecting to storage
|
||||
writeln!(file, "# Neon storage settings")?;
|
||||
if let Some(s) = &spec.pageserver_connstring {
|
||||
writeln!(file, "neon.pageserver_connstring={}", escape_conf_value(s))?;
|
||||
writeln!(file)?;
|
||||
if let Some(conninfo) = &spec.pageserver_connection_info {
|
||||
// Stripe size GUC should be defined prior to connection string
|
||||
if let Some(stripe_size) = conninfo.stripe_size {
|
||||
writeln!(
|
||||
file,
|
||||
"# from compute spec's pageserver_connection_info.stripe_size field"
|
||||
)?;
|
||||
writeln!(file, "neon.stripe_size={stripe_size}")?;
|
||||
}
|
||||
|
||||
let mut libpq_urls: Option<Vec<String>> = Some(Vec::new());
|
||||
let num_shards = if conninfo.shard_count.0 == 0 {
|
||||
1 // unsharded, treat it as a single shard
|
||||
} else {
|
||||
conninfo.shard_count.0
|
||||
};
|
||||
|
||||
for shard_number in 0..num_shards {
|
||||
let shard_index = ShardIndex {
|
||||
shard_number: ShardNumber(shard_number),
|
||||
shard_count: conninfo.shard_count,
|
||||
};
|
||||
let info = conninfo.shards.get(&shard_index).ok_or_else(|| {
|
||||
anyhow::anyhow!(
|
||||
"shard {shard_index} missing from pageserver_connection_info shard map"
|
||||
)
|
||||
})?;
|
||||
|
||||
let first_pageserver = info
|
||||
.pageservers
|
||||
.first()
|
||||
.expect("must have at least one pageserver");
|
||||
|
||||
// Add the libpq URL to the array, or if the URL is missing, reset the array
|
||||
// forgetting any previous entries. All servers must have a libpq URL, or none
|
||||
// at all.
|
||||
if let Some(url) = &first_pageserver.libpq_url {
|
||||
if let Some(ref mut urls) = libpq_urls {
|
||||
urls.push(url.clone());
|
||||
}
|
||||
} else {
|
||||
libpq_urls = None
|
||||
}
|
||||
}
|
||||
if let Some(libpq_urls) = libpq_urls {
|
||||
writeln!(
|
||||
file,
|
||||
"# derived from compute spec's pageserver_connection_info field"
|
||||
)?;
|
||||
writeln!(
|
||||
file,
|
||||
"neon.pageserver_connstring={}",
|
||||
escape_conf_value(&libpq_urls.join(","))
|
||||
)?;
|
||||
} else {
|
||||
writeln!(file, "# no neon.pageserver_connstring")?;
|
||||
}
|
||||
} else {
|
||||
// Stripe size GUC should be defined prior to connection string
|
||||
if let Some(stripe_size) = spec.shard_stripe_size {
|
||||
writeln!(file, "# from compute spec's shard_stripe_size field")?;
|
||||
writeln!(file, "neon.stripe_size={stripe_size}")?;
|
||||
}
|
||||
if let Some(s) = &spec.pageserver_connstring {
|
||||
writeln!(file, "# from compute spec's pageserver_connstring field")?;
|
||||
writeln!(file, "neon.pageserver_connstring={}", escape_conf_value(s))?;
|
||||
}
|
||||
}
|
||||
|
||||
if !spec.safekeeper_connstrings.is_empty() {
|
||||
let mut neon_safekeepers_value = String::new();
|
||||
tracing::info!(
|
||||
|
||||
@@ -122,8 +122,11 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
|
||||
// into the type system.
|
||||
assert_eq!(state.status, ComputeStatus::RefreshConfiguration);
|
||||
|
||||
if state.pspec.as_ref().map(|ps| ps.pageserver_connstr.clone())
|
||||
== Some(pspec.pageserver_connstr.clone())
|
||||
if state
|
||||
.pspec
|
||||
.as_ref()
|
||||
.map(|ps| ps.pageserver_conninfo.clone())
|
||||
== Some(pspec.pageserver_conninfo.clone())
|
||||
{
|
||||
info!(
|
||||
"Refresh configuration: Retrieved spec is the same as the current spec. Waiting for control plane to update the spec before attempting reconfiguration."
|
||||
|
||||
@@ -139,6 +139,15 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/LfcPrewarmState"
|
||||
delete:
|
||||
tags:
|
||||
- Prewarm
|
||||
summary: Cancel ongoing LFC prewarm
|
||||
description: ""
|
||||
operationId: cancelLfcPrewarm
|
||||
responses:
|
||||
202:
|
||||
description: Prewarm cancelled
|
||||
|
||||
/lfc/offload:
|
||||
post:
|
||||
@@ -608,9 +617,6 @@ components:
|
||||
type: object
|
||||
required:
|
||||
- status
|
||||
- total
|
||||
- prewarmed
|
||||
- skipped
|
||||
properties:
|
||||
status:
|
||||
description: LFC prewarm status
|
||||
@@ -628,6 +634,15 @@ components:
|
||||
skipped:
|
||||
description: Pages processed but not prewarmed
|
||||
type: integer
|
||||
state_download_time_ms:
|
||||
description: Time it takes to download LFC state to compute
|
||||
type: integer
|
||||
uncompress_time_ms:
|
||||
description: Time it takes to uncompress LFC state
|
||||
type: integer
|
||||
prewarm_time_ms:
|
||||
description: Time it takes to prewarm LFC state in Postgres
|
||||
type: integer
|
||||
|
||||
LfcOffloadState:
|
||||
type: object
|
||||
@@ -636,11 +651,21 @@ components:
|
||||
properties:
|
||||
status:
|
||||
description: LFC offload status
|
||||
enum: [not_offloaded, offloading, completed, failed]
|
||||
enum: [not_offloaded, offloading, completed, skipped, failed]
|
||||
type: string
|
||||
error:
|
||||
description: LFC offload error, if any
|
||||
type: string
|
||||
state_query_time_ms:
|
||||
description: Time it takes to get LFC state from Postgres
|
||||
type: integer
|
||||
compress_time_ms:
|
||||
description: Time it takes to compress LFC state
|
||||
type: integer
|
||||
state_upload_time_ms:
|
||||
description: Time it takes to upload LFC state to endpoint storage
|
||||
type: integer
|
||||
|
||||
|
||||
PromoteState:
|
||||
type: object
|
||||
@@ -654,6 +679,15 @@ components:
|
||||
error:
|
||||
description: Promote error, if any
|
||||
type: string
|
||||
lsn_wait_time_ms:
|
||||
description: Time it takes for secondary to catch up with primary WAL flush LSN
|
||||
type: integer
|
||||
pg_promote_time_ms:
|
||||
description: Time it takes to call pg_promote on secondary
|
||||
type: integer
|
||||
reconfigure_time_ms:
|
||||
description: Time it takes to reconfigure promoted secondary
|
||||
type: integer
|
||||
|
||||
SetRoleGrantsRequest:
|
||||
type: object
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
use crate::compute_prewarm::LfcPrewarmStateWithProgress;
|
||||
use crate::http::JsonResponse;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum::{Json, http::StatusCode};
|
||||
use axum_extra::extract::OptionalQuery;
|
||||
use compute_api::responses::LfcOffloadState;
|
||||
use compute_api::responses::{LfcOffloadState, LfcPrewarmState};
|
||||
type Compute = axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>;
|
||||
|
||||
pub(in crate::http) async fn prewarm_state(compute: Compute) -> Json<LfcPrewarmStateWithProgress> {
|
||||
pub(in crate::http) async fn prewarm_state(compute: Compute) -> Json<LfcPrewarmState> {
|
||||
Json(compute.lfc_prewarm_state().await)
|
||||
}
|
||||
|
||||
@@ -46,3 +45,8 @@ pub(in crate::http) async fn offload(compute: Compute) -> Response {
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub(in crate::http) async fn cancel_prewarm(compute: Compute) -> StatusCode {
|
||||
compute.cancel_prewarm();
|
||||
StatusCode::ACCEPTED
|
||||
}
|
||||
|
||||
@@ -1,11 +1,22 @@
|
||||
use crate::http::JsonResponse;
|
||||
use axum::extract::Json;
|
||||
use compute_api::responses::PromoteConfig;
|
||||
use http::StatusCode;
|
||||
|
||||
pub(in crate::http) async fn promote(
|
||||
compute: axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>,
|
||||
Json(cfg): Json<compute_api::responses::PromoteConfig>,
|
||||
Json(cfg): Json<PromoteConfig>,
|
||||
) -> axum::response::Response {
|
||||
// Return early at the cost of extra parsing spec
|
||||
let pspec = match crate::compute::ParsedSpec::try_from(cfg.spec) {
|
||||
Ok(p) => p,
|
||||
Err(e) => return JsonResponse::error(StatusCode::BAD_REQUEST, e),
|
||||
};
|
||||
|
||||
let cfg = PromoteConfig {
|
||||
spec: pspec.spec,
|
||||
wal_flush_lsn: cfg.wal_flush_lsn,
|
||||
};
|
||||
let state = compute.promote(cfg).await;
|
||||
if let compute_api::responses::PromoteState::Failed { error: _ } = state {
|
||||
return JsonResponse::create_response(StatusCode::INTERNAL_SERVER_ERROR, state);
|
||||
|
||||
@@ -99,7 +99,12 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
|
||||
);
|
||||
|
||||
let authenticated_router = Router::<Arc<ComputeNode>>::new()
|
||||
.route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm))
|
||||
.route(
|
||||
"/lfc/prewarm",
|
||||
get(lfc::prewarm_state)
|
||||
.post(lfc::prewarm)
|
||||
.delete(lfc::cancel_prewarm),
|
||||
)
|
||||
.route("/lfc/offload", get(lfc::offload_state).post(lfc::offload))
|
||||
.route("/promote", post(promote::promote))
|
||||
.route("/check_writability", post(check_writability::is_writable))
|
||||
|
||||
@@ -19,7 +19,7 @@ async fn list_dbs(client: &mut Client) -> Result<Vec<String>, PostgresError> {
|
||||
.query(
|
||||
"SELECT datname FROM pg_catalog.pg_database
|
||||
WHERE datallowconn
|
||||
AND datconnlimit <> - 2
|
||||
AND datconnlimit OPERATOR(pg_catalog.<>) (OPERATOR(pg_catalog.-) 2::pg_catalog.int4)
|
||||
LIMIT 500",
|
||||
&[],
|
||||
)
|
||||
@@ -67,7 +67,7 @@ pub async fn get_installed_extensions(
|
||||
|
||||
let extensions: Vec<(String, String, i32)> = client
|
||||
.query(
|
||||
"SELECT extname, extversion, extowner::integer FROM pg_catalog.pg_extension",
|
||||
"SELECT extname, extversion, extowner::pg_catalog.int4 FROM pg_catalog.pg_extension",
|
||||
&[],
|
||||
)
|
||||
.await?
|
||||
|
||||
@@ -4,14 +4,13 @@ use std::thread;
|
||||
use std::time::{Duration, SystemTime};
|
||||
|
||||
use anyhow::{Result, bail};
|
||||
use compute_api::spec::{ComputeMode, PageserverProtocol};
|
||||
use itertools::Itertools as _;
|
||||
use compute_api::spec::{ComputeMode, PageserverConnectionInfo, PageserverProtocol};
|
||||
use pageserver_page_api as page_api;
|
||||
use postgres::{NoTls, SimpleQueryMessage};
|
||||
use tracing::{info, warn};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::shard::{ShardCount, ShardNumber, TenantShardId};
|
||||
use utils::shard::TenantShardId;
|
||||
|
||||
use crate::compute::ComputeNode;
|
||||
|
||||
@@ -78,17 +77,16 @@ fn acquire_lsn_lease_with_retry(
|
||||
|
||||
loop {
|
||||
// Note: List of pageservers is dynamic, need to re-read configs before each attempt.
|
||||
let (connstrings, auth) = {
|
||||
let (conninfo, auth) = {
|
||||
let state = compute.state.lock().unwrap();
|
||||
let spec = state.pspec.as_ref().expect("spec must be set");
|
||||
(
|
||||
spec.pageserver_connstr.clone(),
|
||||
spec.pageserver_conninfo.clone(),
|
||||
spec.storage_auth_token.clone(),
|
||||
)
|
||||
};
|
||||
|
||||
let result =
|
||||
try_acquire_lsn_lease(&connstrings, auth.as_deref(), tenant_id, timeline_id, lsn);
|
||||
let result = try_acquire_lsn_lease(conninfo, auth.as_deref(), tenant_id, timeline_id, lsn);
|
||||
match result {
|
||||
Ok(Some(res)) => {
|
||||
return Ok(res);
|
||||
@@ -112,35 +110,44 @@ fn acquire_lsn_lease_with_retry(
|
||||
|
||||
/// Tries to acquire LSN leases on all Pageserver shards.
|
||||
fn try_acquire_lsn_lease(
|
||||
connstrings: &str,
|
||||
conninfo: PageserverConnectionInfo,
|
||||
auth: Option<&str>,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
lsn: Lsn,
|
||||
) -> Result<Option<SystemTime>> {
|
||||
let connstrings = connstrings.split(',').collect_vec();
|
||||
let shard_count = connstrings.len();
|
||||
let mut leases = Vec::new();
|
||||
|
||||
for (shard_number, &connstring) in connstrings.iter().enumerate() {
|
||||
let tenant_shard_id = match shard_count {
|
||||
0 | 1 => TenantShardId::unsharded(tenant_id),
|
||||
shard_count => TenantShardId {
|
||||
tenant_id,
|
||||
shard_number: ShardNumber(shard_number as u8),
|
||||
shard_count: ShardCount::new(shard_count as u8),
|
||||
},
|
||||
for (shard_index, shard) in conninfo.shards.into_iter() {
|
||||
let tenant_shard_id = TenantShardId {
|
||||
tenant_id,
|
||||
shard_number: shard_index.shard_number,
|
||||
shard_count: shard_index.shard_count,
|
||||
};
|
||||
|
||||
let lease = match PageserverProtocol::from_connstring(connstring)? {
|
||||
PageserverProtocol::Libpq => {
|
||||
acquire_lsn_lease_libpq(connstring, auth, tenant_shard_id, timeline_id, lsn)?
|
||||
}
|
||||
PageserverProtocol::Grpc => {
|
||||
acquire_lsn_lease_grpc(connstring, auth, tenant_shard_id, timeline_id, lsn)?
|
||||
}
|
||||
};
|
||||
leases.push(lease);
|
||||
// XXX: If there are more than pageserver for the one shard, do we need to get a
|
||||
// leas on all of them? Currently, that's what we assume, but this is hypothetical
|
||||
// as of this writing, as we never pass the info for more than one pageserver per
|
||||
// shard.
|
||||
for pageserver in shard.pageservers {
|
||||
let lease = match conninfo.prefer_protocol {
|
||||
PageserverProtocol::Grpc => acquire_lsn_lease_grpc(
|
||||
&pageserver.grpc_url.unwrap(),
|
||||
auth,
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
lsn,
|
||||
)?,
|
||||
PageserverProtocol::Libpq => acquire_lsn_lease_libpq(
|
||||
&pageserver.libpq_url.unwrap(),
|
||||
auth,
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
lsn,
|
||||
)?,
|
||||
};
|
||||
leases.push(lease);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(leases.into_iter().min().flatten())
|
||||
|
||||
@@ -76,7 +76,7 @@ impl<'m> MigrationRunner<'m> {
|
||||
self.client
|
||||
.simple_query("CREATE SCHEMA IF NOT EXISTS neon_migration")
|
||||
.await?;
|
||||
self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)").await?;
|
||||
self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key pg_catalog.int4 NOT NULL PRIMARY KEY, id pg_catalog.int8 NOT NULL DEFAULT 0)").await?;
|
||||
self.client
|
||||
.simple_query(
|
||||
"INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING",
|
||||
|
||||
@@ -15,17 +15,17 @@ DO $$
|
||||
DECLARE
|
||||
role_name text;
|
||||
BEGIN
|
||||
FOR role_name IN SELECT rolname FROM pg_roles WHERE pg_has_role(rolname, '{privileged_role_name}', 'member')
|
||||
FOR role_name IN SELECT rolname FROM pg_catalog.pg_roles WHERE pg_catalog.pg_has_role(rolname, '{privileged_role_name}', 'member')
|
||||
LOOP
|
||||
RAISE NOTICE 'EXECUTING ALTER ROLE % INHERIT', quote_ident(role_name);
|
||||
EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' INHERIT';
|
||||
RAISE NOTICE 'EXECUTING ALTER ROLE % INHERIT', pg_catalog.quote_ident(role_name);
|
||||
EXECUTE pg_catalog.format('ALTER ROLE %I INHERIT;', role_name);
|
||||
END LOOP;
|
||||
|
||||
FOR role_name IN SELECT rolname FROM pg_roles
|
||||
FOR role_name IN SELECT rolname FROM pg_catalog.pg_roles
|
||||
WHERE
|
||||
NOT pg_has_role(rolname, '{privileged_role_name}', 'member') AND NOT starts_with(rolname, 'pg_')
|
||||
NOT pg_catalog.pg_has_role(rolname, '{privileged_role_name}', 'member') AND NOT pg_catalog.starts_with(rolname, 'pg_')
|
||||
LOOP
|
||||
RAISE NOTICE 'EXECUTING ALTER ROLE % NOBYPASSRLS', quote_ident(role_name);
|
||||
EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' NOBYPASSRLS';
|
||||
RAISE NOTICE 'EXECUTING ALTER ROLE % NOBYPASSRLS', pg_catalog.quote_ident(role_name);
|
||||
EXECUTE pg_catalog.format('ALTER ROLE %I NOBYPASSRLS;', role_name);
|
||||
END LOOP;
|
||||
END $$;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
DO $$
|
||||
BEGIN
|
||||
IF (SELECT setting::numeric >= 160000 FROM pg_settings WHERE name = 'server_version_num') THEN
|
||||
IF (SELECT setting::pg_catalog.numeric >= 160000 FROM pg_catalog.pg_settings WHERE name = 'server_version_num') THEN
|
||||
EXECUTE 'GRANT pg_create_subscription TO {privileged_role_name}';
|
||||
END IF;
|
||||
END $$;
|
||||
|
||||
@@ -5,9 +5,9 @@ DO $$
|
||||
DECLARE
|
||||
role_name TEXT;
|
||||
BEGIN
|
||||
FOR role_name IN SELECT rolname FROM pg_roles WHERE rolreplication IS TRUE
|
||||
FOR role_name IN SELECT rolname FROM pg_catalog.pg_roles WHERE rolreplication IS TRUE
|
||||
LOOP
|
||||
RAISE NOTICE 'EXECUTING ALTER ROLE % NOREPLICATION', quote_ident(role_name);
|
||||
EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' NOREPLICATION';
|
||||
RAISE NOTICE 'EXECUTING ALTER ROLE % NOREPLICATION', pg_catalog.quote_ident(role_name);
|
||||
EXECUTE pg_catalog.format('ALTER ROLE %I NOREPLICATION;', role_name);
|
||||
END LOOP;
|
||||
END $$;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
DO $$
|
||||
BEGIN
|
||||
IF (SELECT setting::numeric >= 160000 FROM pg_settings WHERE name = 'server_version_num') THEN
|
||||
IF (SELECT setting::pg_catalog.numeric >= 160000 FROM pg_catalog.pg_settings WHERE name OPERATOR(pg_catalog.=) 'server_version_num'::pg_catalog.text) THEN
|
||||
EXECUTE 'GRANT EXECUTE ON FUNCTION pg_export_snapshot TO {privileged_role_name}';
|
||||
EXECUTE 'GRANT EXECUTE ON FUNCTION pg_log_standby_snapshot TO {privileged_role_name}';
|
||||
END IF;
|
||||
|
||||
@@ -2,7 +2,7 @@ DO $$
|
||||
DECLARE
|
||||
bypassrls boolean;
|
||||
BEGIN
|
||||
SELECT rolbypassrls INTO bypassrls FROM pg_roles WHERE rolname = 'neon_superuser';
|
||||
SELECT rolbypassrls INTO bypassrls FROM pg_catalog.pg_roles WHERE rolname = 'neon_superuser';
|
||||
IF NOT bypassrls THEN
|
||||
RAISE EXCEPTION 'neon_superuser cannot bypass RLS';
|
||||
END IF;
|
||||
|
||||
@@ -4,8 +4,8 @@ DECLARE
|
||||
BEGIN
|
||||
FOR role IN
|
||||
SELECT rolname AS name, rolinherit AS inherit
|
||||
FROM pg_roles
|
||||
WHERE pg_has_role(rolname, 'neon_superuser', 'member')
|
||||
FROM pg_catalog.pg_roles
|
||||
WHERE pg_catalog.pg_has_role(rolname, 'neon_superuser', 'member')
|
||||
LOOP
|
||||
IF NOT role.inherit THEN
|
||||
RAISE EXCEPTION '% cannot inherit', quote_ident(role.name);
|
||||
@@ -14,12 +14,12 @@ BEGIN
|
||||
|
||||
FOR role IN
|
||||
SELECT rolname AS name, rolbypassrls AS bypassrls
|
||||
FROM pg_roles
|
||||
WHERE NOT pg_has_role(rolname, 'neon_superuser', 'member')
|
||||
AND NOT starts_with(rolname, 'pg_')
|
||||
FROM pg_catalog.pg_roles
|
||||
WHERE NOT pg_catalog.pg_has_role(rolname, 'neon_superuser', 'member')
|
||||
AND NOT pg_catalog.starts_with(rolname, 'pg_')
|
||||
LOOP
|
||||
IF role.bypassrls THEN
|
||||
RAISE EXCEPTION '% can bypass RLS', quote_ident(role.name);
|
||||
RAISE EXCEPTION '% can bypass RLS', pg_catalog.quote_ident(role.name);
|
||||
END IF;
|
||||
END LOOP;
|
||||
END $$;
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
DO $$
|
||||
BEGIN
|
||||
IF (SELECT current_setting('server_version_num')::numeric < 160000) THEN
|
||||
IF (SELECT pg_catalog.current_setting('server_version_num')::pg_catalog.numeric < 160000) THEN
|
||||
RETURN;
|
||||
END IF;
|
||||
|
||||
IF NOT (SELECT pg_has_role('neon_superuser', 'pg_create_subscription', 'member')) THEN
|
||||
IF NOT (SELECT pg_catalog.pg_has_role('neon_superuser', 'pg_create_subscription', 'member')) THEN
|
||||
RAISE EXCEPTION 'neon_superuser cannot execute pg_create_subscription';
|
||||
END IF;
|
||||
END $$;
|
||||
|
||||
@@ -2,12 +2,12 @@ DO $$
|
||||
DECLARE
|
||||
monitor record;
|
||||
BEGIN
|
||||
SELECT pg_has_role('neon_superuser', 'pg_monitor', 'member') AS member,
|
||||
SELECT pg_catalog.pg_has_role('neon_superuser', 'pg_monitor', 'member') AS member,
|
||||
admin_option AS admin
|
||||
INTO monitor
|
||||
FROM pg_auth_members
|
||||
WHERE roleid = 'pg_monitor'::regrole
|
||||
AND member = 'neon_superuser'::regrole;
|
||||
FROM pg_catalog.pg_auth_members
|
||||
WHERE roleid = 'pg_monitor'::pg_catalog.regrole
|
||||
AND member = 'neon_superuser'::pg_catalog.regrole;
|
||||
|
||||
IF monitor IS NULL THEN
|
||||
RAISE EXCEPTION 'no entry in pg_auth_members for neon_superuser and pg_monitor';
|
||||
|
||||
@@ -2,11 +2,11 @@ DO $$
|
||||
DECLARE
|
||||
can_execute boolean;
|
||||
BEGIN
|
||||
SELECT bool_and(has_function_privilege('neon_superuser', oid, 'execute'))
|
||||
SELECT pg_catalog.bool_and(pg_catalog.has_function_privilege('neon_superuser', oid, 'execute'))
|
||||
INTO can_execute
|
||||
FROM pg_proc
|
||||
FROM pg_catalog.pg_proc
|
||||
WHERE proname IN ('pg_export_snapshot', 'pg_log_standby_snapshot')
|
||||
AND pronamespace = 'pg_catalog'::regnamespace;
|
||||
AND pronamespace = 'pg_catalog'::pg_catalog.regnamespace;
|
||||
IF NOT can_execute THEN
|
||||
RAISE EXCEPTION 'neon_superuser cannot execute both pg_export_snapshot and pg_log_standby_snapshot';
|
||||
END IF;
|
||||
|
||||
@@ -2,9 +2,9 @@ DO $$
|
||||
DECLARE
|
||||
can_execute boolean;
|
||||
BEGIN
|
||||
SELECT has_function_privilege('neon_superuser', oid, 'execute')
|
||||
SELECT pg_catalog.has_function_privilege('neon_superuser', oid, 'execute')
|
||||
INTO can_execute
|
||||
FROM pg_proc
|
||||
FROM pg_catalog.pg_proc
|
||||
WHERE proname = 'pg_show_replication_origin_status'
|
||||
AND pronamespace = 'pg_catalog'::regnamespace;
|
||||
IF NOT can_execute THEN
|
||||
|
||||
@@ -2,10 +2,10 @@ DO $$
|
||||
DECLARE
|
||||
signal_backend record;
|
||||
BEGIN
|
||||
SELECT pg_has_role('neon_superuser', 'pg_signal_backend', 'member') AS member,
|
||||
SELECT pg_catalog.pg_has_role('neon_superuser', 'pg_signal_backend', 'member') AS member,
|
||||
admin_option AS admin
|
||||
INTO signal_backend
|
||||
FROM pg_auth_members
|
||||
FROM pg_catalog.pg_auth_members
|
||||
WHERE roleid = 'pg_signal_backend'::regrole
|
||||
AND member = 'neon_superuser'::regrole;
|
||||
|
||||
|
||||
@@ -407,9 +407,9 @@ fn get_database_stats(cli: &mut Client) -> anyhow::Result<(f64, i64)> {
|
||||
// like `postgres_exporter` use it to query Postgres statistics.
|
||||
// Use explicit 8 bytes type casts to match Rust types.
|
||||
let stats = cli.query_one(
|
||||
"SELECT coalesce(sum(active_time), 0.0)::float8 AS total_active_time,
|
||||
coalesce(sum(sessions), 0)::bigint AS total_sessions
|
||||
FROM pg_stat_database
|
||||
"SELECT pg_catalog.coalesce(pg_catalog.sum(active_time), 0.0)::pg_catalog.float8 AS total_active_time,
|
||||
pg_catalog.coalesce(pg_catalog.sum(sessions), 0)::pg_catalog.bigint AS total_sessions
|
||||
FROM pg_catalog.pg_stat_database
|
||||
WHERE datname NOT IN (
|
||||
'postgres',
|
||||
'template0',
|
||||
@@ -445,11 +445,11 @@ fn get_backends_state_change(cli: &mut Client) -> anyhow::Result<Option<DateTime
|
||||
let mut last_active: Option<DateTime<Utc>> = None;
|
||||
// Get all running client backends except ourself, use RFC3339 DateTime format.
|
||||
let backends = cli.query(
|
||||
"SELECT state, to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"') AS state_change
|
||||
"SELECT state, pg_catalog.to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"'::pg_catalog.text) AS state_change
|
||||
FROM pg_stat_activity
|
||||
WHERE backend_type = 'client backend'
|
||||
AND pid != pg_backend_pid()
|
||||
AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors?
|
||||
WHERE backend_type OPERATOR(pg_catalog.=) 'client backend'::pg_catalog.text
|
||||
AND pid OPERATOR(pg_catalog.!=) pg_catalog.pg_backend_pid()
|
||||
AND usename OPERATOR(pg_catalog.!=) 'cloud_admin'::pg_catalog.name;", // XXX: find a better way to filter other monitors?
|
||||
&[],
|
||||
);
|
||||
|
||||
|
||||
@@ -299,9 +299,9 @@ pub async fn get_existing_dbs_async(
|
||||
.query_raw::<str, &String, &[String; 0]>(
|
||||
"SELECT
|
||||
datname AS name,
|
||||
(SELECT rolname FROM pg_roles WHERE oid = datdba) AS owner,
|
||||
(SELECT rolname FROM pg_catalog.pg_roles WHERE oid OPERATOR(pg_catalog.=) datdba) AS owner,
|
||||
NOT datallowconn AS restrict_conn,
|
||||
datconnlimit = - 2 AS invalid
|
||||
datconnlimit OPERATOR(pg_catalog.=) (OPERATOR(pg_catalog.-) 2) AS invalid
|
||||
FROM
|
||||
pg_catalog.pg_database;",
|
||||
&[],
|
||||
|
||||
@@ -13,17 +13,19 @@ use tokio_postgres::Client;
|
||||
use tokio_postgres::error::SqlState;
|
||||
use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
|
||||
|
||||
use crate::compute::{ComputeNode, ComputeNodeParams, ComputeState};
|
||||
use crate::compute::{ComputeNode, ComputeNodeParams, ComputeState, create_databricks_roles};
|
||||
use crate::hadron_metrics::COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS;
|
||||
use crate::pg_helpers::{
|
||||
DatabaseExt, Escaping, GenericOptionsSearch, RoleExt, get_existing_dbs_async,
|
||||
get_existing_roles_async,
|
||||
};
|
||||
use crate::spec_apply::ApplySpecPhase::{
|
||||
CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreatePgauditExtension,
|
||||
AddDatabricksGrants, AlterDatabricksRoles, CreateAndAlterDatabases, CreateAndAlterRoles,
|
||||
CreateAvailabilityCheck, CreateDatabricksMisc, CreateDatabricksRoles, CreatePgauditExtension,
|
||||
CreatePgauditlogtofileExtension, CreatePrivilegedRole, CreateSchemaNeon,
|
||||
DisablePostgresDBPgAudit, DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions,
|
||||
HandleNeonExtension, HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles,
|
||||
RunInEachDatabase,
|
||||
HandleDatabricksAuthExtension, HandleNeonExtension, HandleOtherExtensions,
|
||||
RenameAndDeleteDatabases, RenameRoles, RunInEachDatabase,
|
||||
};
|
||||
use crate::spec_apply::PerDatabasePhase::{
|
||||
ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions,
|
||||
@@ -80,7 +82,7 @@ impl ComputeNode {
|
||||
info!("Checking if drop subscription operation was already performed for timeline_id: {}", timeline_id);
|
||||
|
||||
drop_subscriptions_done = match
|
||||
client.query("select 1 from neon.drop_subscriptions_done where timeline_id = $1", &[&timeline_id.to_string()]).await {
|
||||
client.query("select 1 from neon.drop_subscriptions_done where timeline_id OPERATOR(pg_catalog.=) $1", &[&timeline_id.to_string()]).await {
|
||||
Ok(result) => !result.is_empty(),
|
||||
Err(e) =>
|
||||
{
|
||||
@@ -166,6 +168,7 @@ impl ComputeNode {
|
||||
concurrency_token.clone(),
|
||||
db,
|
||||
[DropLogicalSubscriptions].to_vec(),
|
||||
self.params.lakebase_mode,
|
||||
);
|
||||
|
||||
Ok(tokio::spawn(fut))
|
||||
@@ -186,15 +189,33 @@ impl ComputeNode {
|
||||
};
|
||||
}
|
||||
|
||||
for phase in [
|
||||
CreatePrivilegedRole,
|
||||
let phases = if self.params.lakebase_mode {
|
||||
vec![
|
||||
CreatePrivilegedRole,
|
||||
// BEGIN_HADRON
|
||||
CreateDatabricksRoles,
|
||||
AlterDatabricksRoles,
|
||||
// END_HADRON
|
||||
DropInvalidDatabases,
|
||||
RenameRoles,
|
||||
CreateAndAlterRoles,
|
||||
RenameAndDeleteDatabases,
|
||||
CreateAndAlterDatabases,
|
||||
CreateSchemaNeon,
|
||||
] {
|
||||
]
|
||||
} else {
|
||||
vec![
|
||||
CreatePrivilegedRole,
|
||||
DropInvalidDatabases,
|
||||
RenameRoles,
|
||||
CreateAndAlterRoles,
|
||||
RenameAndDeleteDatabases,
|
||||
CreateAndAlterDatabases,
|
||||
CreateSchemaNeon,
|
||||
]
|
||||
};
|
||||
|
||||
for phase in phases {
|
||||
info!("Applying phase {:?}", &phase);
|
||||
apply_operations(
|
||||
params.clone(),
|
||||
@@ -203,6 +224,7 @@ impl ComputeNode {
|
||||
jwks_roles.clone(),
|
||||
phase,
|
||||
|| async { Ok(&client) },
|
||||
self.params.lakebase_mode,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
@@ -254,6 +276,7 @@ impl ComputeNode {
|
||||
concurrency_token.clone(),
|
||||
db,
|
||||
phases,
|
||||
self.params.lakebase_mode,
|
||||
);
|
||||
|
||||
Ok(tokio::spawn(fut))
|
||||
@@ -265,12 +288,28 @@ impl ComputeNode {
|
||||
handle.await??;
|
||||
}
|
||||
|
||||
let mut phases = vec![
|
||||
let mut phases = if self.params.lakebase_mode {
|
||||
vec![
|
||||
HandleOtherExtensions,
|
||||
HandleNeonExtension, // This step depends on CreateSchemaNeon
|
||||
// BEGIN_HADRON
|
||||
HandleDatabricksAuthExtension,
|
||||
// END_HADRON
|
||||
CreateAvailabilityCheck,
|
||||
DropRoles,
|
||||
// BEGIN_HADRON
|
||||
AddDatabricksGrants,
|
||||
CreateDatabricksMisc,
|
||||
// END_HADRON
|
||||
]
|
||||
} else {
|
||||
vec![
|
||||
HandleOtherExtensions,
|
||||
HandleNeonExtension, // This step depends on CreateSchemaNeon
|
||||
CreateAvailabilityCheck,
|
||||
DropRoles,
|
||||
];
|
||||
]
|
||||
};
|
||||
|
||||
// This step depends on CreateSchemaNeon
|
||||
if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
|
||||
@@ -303,6 +342,7 @@ impl ComputeNode {
|
||||
jwks_roles.clone(),
|
||||
phase,
|
||||
|| async { Ok(&client) },
|
||||
self.params.lakebase_mode,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
@@ -328,6 +368,7 @@ impl ComputeNode {
|
||||
concurrency_token: Arc<tokio::sync::Semaphore>,
|
||||
db: DB,
|
||||
subphases: Vec<PerDatabasePhase>,
|
||||
lakebase_mode: bool,
|
||||
) -> Result<()> {
|
||||
let _permit = concurrency_token.acquire().await?;
|
||||
|
||||
@@ -355,6 +396,7 @@ impl ComputeNode {
|
||||
let client = client_conn.as_ref().unwrap();
|
||||
Ok(client)
|
||||
},
|
||||
lakebase_mode,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
@@ -477,6 +519,10 @@ pub enum PerDatabasePhase {
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum ApplySpecPhase {
|
||||
CreatePrivilegedRole,
|
||||
// BEGIN_HADRON
|
||||
CreateDatabricksRoles,
|
||||
AlterDatabricksRoles,
|
||||
// END_HADRON
|
||||
DropInvalidDatabases,
|
||||
RenameRoles,
|
||||
CreateAndAlterRoles,
|
||||
@@ -489,7 +535,14 @@ pub enum ApplySpecPhase {
|
||||
DisablePostgresDBPgAudit,
|
||||
HandleOtherExtensions,
|
||||
HandleNeonExtension,
|
||||
// BEGIN_HADRON
|
||||
HandleDatabricksAuthExtension,
|
||||
// END_HADRON
|
||||
CreateAvailabilityCheck,
|
||||
// BEGIN_HADRON
|
||||
AddDatabricksGrants,
|
||||
CreateDatabricksMisc,
|
||||
// END_HADRON
|
||||
DropRoles,
|
||||
FinalizeDropLogicalSubscriptions,
|
||||
}
|
||||
@@ -525,6 +578,7 @@ pub async fn apply_operations<'a, Fut, F>(
|
||||
jwks_roles: Arc<HashSet<String>>,
|
||||
apply_spec_phase: ApplySpecPhase,
|
||||
client: F,
|
||||
lakebase_mode: bool,
|
||||
) -> Result<()>
|
||||
where
|
||||
F: FnOnce() -> Fut,
|
||||
@@ -571,6 +625,23 @@ where
|
||||
},
|
||||
query
|
||||
);
|
||||
if !lakebase_mode {
|
||||
return res;
|
||||
}
|
||||
// BEGIN HADRON
|
||||
if let Err(e) = res.as_ref() {
|
||||
if let Some(sql_state) = e.code() {
|
||||
if sql_state.code() == "57014" {
|
||||
// SQL State 57014 (ERRCODE_QUERY_CANCELED) is used for statement timeouts.
|
||||
// Increment the counter whenever a statement timeout occurs. Timeouts on
|
||||
// this configuration path can only occur due to PS connectivity problems that
|
||||
// Postgres failed to recover from.
|
||||
COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS.inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
// END HADRON
|
||||
|
||||
res
|
||||
}
|
||||
.instrument(inspan)
|
||||
@@ -608,10 +679,44 @@ async fn get_operations<'a>(
|
||||
ApplySpecPhase::CreatePrivilegedRole => Ok(Box::new(once(Operation {
|
||||
query: format!(
|
||||
include_str!("sql/create_privileged_role.sql"),
|
||||
privileged_role_name = params.privileged_role_name
|
||||
privileged_role_name = params.privileged_role_name,
|
||||
privileges = if params.lakebase_mode {
|
||||
"CREATEDB CREATEROLE NOLOGIN BYPASSRLS"
|
||||
} else {
|
||||
"CREATEDB CREATEROLE NOLOGIN REPLICATION BYPASSRLS"
|
||||
}
|
||||
),
|
||||
comment: None,
|
||||
}))),
|
||||
// BEGIN_HADRON
|
||||
// New Hadron phase
|
||||
ApplySpecPhase::CreateDatabricksRoles => {
|
||||
let queries = create_databricks_roles();
|
||||
let operations = queries.into_iter().map(|query| Operation {
|
||||
query,
|
||||
comment: None,
|
||||
});
|
||||
Ok(Box::new(operations))
|
||||
}
|
||||
|
||||
// Backfill existing databricks_reader_* roles with statement timeout from GUC
|
||||
ApplySpecPhase::AlterDatabricksRoles => {
|
||||
let query = String::from(include_str!(
|
||||
"sql/alter_databricks_reader_roles_timeout.sql"
|
||||
));
|
||||
|
||||
let operations = once(Operation {
|
||||
query,
|
||||
comment: Some(
|
||||
"Backfill existing databricks_reader_* roles with statement timeout"
|
||||
.to_string(),
|
||||
),
|
||||
});
|
||||
|
||||
Ok(Box::new(operations))
|
||||
}
|
||||
// End of new Hadron Phase
|
||||
// END_HADRON
|
||||
ApplySpecPhase::DropInvalidDatabases => {
|
||||
let mut ctx = ctx.write().await;
|
||||
let databases = &mut ctx.dbs;
|
||||
@@ -981,7 +1086,10 @@ async fn get_operations<'a>(
|
||||
// N.B. this has to be properly dollar-escaped with `pg_quote_dollar()`
|
||||
role_name = escaped_role,
|
||||
outer_tag = outer_tag,
|
||||
),
|
||||
)
|
||||
// HADRON change:
|
||||
.replace("neon_superuser", ¶ms.privileged_role_name),
|
||||
// HADRON change end ,
|
||||
comment: None,
|
||||
},
|
||||
// This now will only drop privileges of the role
|
||||
@@ -1017,7 +1125,8 @@ async fn get_operations<'a>(
|
||||
comment: None,
|
||||
},
|
||||
Operation {
|
||||
query: String::from(include_str!("sql/default_grants.sql")),
|
||||
query: String::from(include_str!("sql/default_grants.sql"))
|
||||
.replace("neon_superuser", ¶ms.privileged_role_name),
|
||||
comment: None,
|
||||
},
|
||||
]
|
||||
@@ -1033,7 +1142,9 @@ async fn get_operations<'a>(
|
||||
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
|
||||
if libs.contains("pg_stat_statements") {
|
||||
return Ok(Box::new(once(Operation {
|
||||
query: String::from("CREATE EXTENSION IF NOT EXISTS pg_stat_statements"),
|
||||
query: String::from(
|
||||
"CREATE EXTENSION IF NOT EXISTS pg_stat_statements WITH SCHEMA public",
|
||||
),
|
||||
comment: Some(String::from("create system extensions")),
|
||||
})));
|
||||
}
|
||||
@@ -1041,11 +1152,13 @@ async fn get_operations<'a>(
|
||||
Ok(Box::new(empty()))
|
||||
}
|
||||
ApplySpecPhase::CreatePgauditExtension => Ok(Box::new(once(Operation {
|
||||
query: String::from("CREATE EXTENSION IF NOT EXISTS pgaudit"),
|
||||
query: String::from("CREATE EXTENSION IF NOT EXISTS pgaudit WITH SCHEMA public"),
|
||||
comment: Some(String::from("create pgaudit extensions")),
|
||||
}))),
|
||||
ApplySpecPhase::CreatePgauditlogtofileExtension => Ok(Box::new(once(Operation {
|
||||
query: String::from("CREATE EXTENSION IF NOT EXISTS pgauditlogtofile"),
|
||||
query: String::from(
|
||||
"CREATE EXTENSION IF NOT EXISTS pgauditlogtofile WITH SCHEMA public",
|
||||
),
|
||||
comment: Some(String::from("create pgauditlogtofile extensions")),
|
||||
}))),
|
||||
// Disable pgaudit logging for postgres database.
|
||||
@@ -1069,7 +1182,7 @@ async fn get_operations<'a>(
|
||||
},
|
||||
Operation {
|
||||
query: String::from(
|
||||
"UPDATE pg_extension SET extrelocatable = true WHERE extname = 'neon'",
|
||||
"UPDATE pg_catalog.pg_extension SET extrelocatable = true WHERE extname OPERATOR(pg_catalog.=) 'neon'::pg_catalog.name AND extrelocatable OPERATOR(pg_catalog.=) false",
|
||||
),
|
||||
comment: Some(String::from("compat/fix: make neon relocatable")),
|
||||
},
|
||||
@@ -1086,6 +1199,28 @@ async fn get_operations<'a>(
|
||||
|
||||
Ok(Box::new(operations))
|
||||
}
|
||||
// BEGIN_HADRON
|
||||
// Note: we may want to version the extension someday, but for now we just drop it and recreate it.
|
||||
ApplySpecPhase::HandleDatabricksAuthExtension => {
|
||||
let operations = vec![
|
||||
Operation {
|
||||
query: String::from("DROP EXTENSION IF EXISTS databricks_auth"),
|
||||
comment: Some(String::from("dropping existing databricks_auth extension")),
|
||||
},
|
||||
Operation {
|
||||
query: String::from("CREATE EXTENSION databricks_auth"),
|
||||
comment: Some(String::from("creating databricks_auth extension")),
|
||||
},
|
||||
Operation {
|
||||
query: String::from("GRANT SELECT ON databricks_auth_metrics TO pg_monitor"),
|
||||
comment: Some(String::from("grant select on databricks auth counters")),
|
||||
},
|
||||
]
|
||||
.into_iter();
|
||||
|
||||
Ok(Box::new(operations))
|
||||
}
|
||||
// END_HADRON
|
||||
ApplySpecPhase::CreateAvailabilityCheck => Ok(Box::new(once(Operation {
|
||||
query: String::from(include_str!("sql/add_availabilitycheck_tables.sql")),
|
||||
comment: None,
|
||||
@@ -1103,6 +1238,63 @@ async fn get_operations<'a>(
|
||||
|
||||
Ok(Box::new(operations))
|
||||
}
|
||||
|
||||
// BEGIN_HADRON
|
||||
// New Hadron phases
|
||||
//
|
||||
// Grants permissions to roles that are used by Databricks.
|
||||
ApplySpecPhase::AddDatabricksGrants => {
|
||||
let operations = vec![
|
||||
Operation {
|
||||
query: String::from("GRANT USAGE ON SCHEMA neon TO databricks_monitor"),
|
||||
comment: Some(String::from(
|
||||
"Permissions needed to execute neon.* functions (in the postgres database)",
|
||||
)),
|
||||
},
|
||||
Operation {
|
||||
query: String::from(
|
||||
"GRANT SELECT, INSERT, UPDATE ON health_check TO databricks_monitor",
|
||||
),
|
||||
comment: Some(String::from("Permissions needed for read and write probes")),
|
||||
},
|
||||
Operation {
|
||||
query: String::from(
|
||||
"GRANT EXECUTE ON FUNCTION pg_ls_dir(text) TO databricks_monitor",
|
||||
),
|
||||
comment: Some(String::from(
|
||||
"Permissions needed to monitor .snap file counts",
|
||||
)),
|
||||
},
|
||||
Operation {
|
||||
query: String::from(
|
||||
"GRANT SELECT ON neon.neon_perf_counters TO databricks_monitor",
|
||||
),
|
||||
comment: Some(String::from(
|
||||
"Permissions needed to access neon performance counters view",
|
||||
)),
|
||||
},
|
||||
Operation {
|
||||
query: String::from(
|
||||
"GRANT EXECUTE ON FUNCTION neon.get_perf_counters() TO databricks_monitor",
|
||||
),
|
||||
comment: Some(String::from(
|
||||
"Permissions needed to execute the underlying performance counters function",
|
||||
)),
|
||||
},
|
||||
]
|
||||
.into_iter();
|
||||
|
||||
Ok(Box::new(operations))
|
||||
}
|
||||
// Creates minor objects that are used by Databricks.
|
||||
ApplySpecPhase::CreateDatabricksMisc => Ok(Box::new(once(Operation {
|
||||
query: String::from(include_str!("sql/create_databricks_misc.sql")),
|
||||
comment: Some(String::from(
|
||||
"The function databricks_monitor uses to convert exception to 0 or 1",
|
||||
)),
|
||||
}))),
|
||||
// End of new Hadron phases
|
||||
// END_HADRON
|
||||
ApplySpecPhase::FinalizeDropLogicalSubscriptions => Ok(Box::new(once(Operation {
|
||||
query: String::from(include_str!("sql/finalize_drop_subscriptions.sql")),
|
||||
comment: None,
|
||||
|
||||
@@ -3,16 +3,17 @@ BEGIN
|
||||
IF NOT EXISTS(
|
||||
SELECT 1
|
||||
FROM pg_catalog.pg_tables
|
||||
WHERE tablename = 'health_check'
|
||||
WHERE tablename::pg_catalog.name OPERATOR(pg_catalog.=) 'health_check'::pg_catalog.name
|
||||
AND schemaname::pg_catalog.name OPERATOR(pg_catalog.=) 'public'::pg_catalog.name
|
||||
)
|
||||
THEN
|
||||
CREATE TABLE health_check (
|
||||
id serial primary key,
|
||||
updated_at timestamptz default now()
|
||||
CREATE TABLE public.health_check (
|
||||
id pg_catalog.int4 primary key generated by default as identity,
|
||||
updated_at pg_catalog.timestamptz default pg_catalog.now()
|
||||
);
|
||||
INSERT INTO health_check VALUES (1, now())
|
||||
INSERT INTO public.health_check VALUES (1, pg_catalog.now())
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET updated_at = now();
|
||||
SET updated_at = pg_catalog.now();
|
||||
END IF;
|
||||
END
|
||||
$$
|
||||
@@ -0,0 +1,25 @@
|
||||
DO $$
|
||||
DECLARE
|
||||
reader_role RECORD;
|
||||
timeout_value TEXT;
|
||||
BEGIN
|
||||
-- Get the current GUC setting for reader statement timeout
|
||||
SELECT current_setting('databricks.reader_statement_timeout', true) INTO timeout_value;
|
||||
|
||||
-- Only proceed if timeout_value is not null/empty and not '0' (disabled)
|
||||
IF timeout_value IS NOT NULL AND timeout_value != '' AND timeout_value != '0' THEN
|
||||
-- Find all databricks_reader_* roles and update their statement_timeout
|
||||
FOR reader_role IN
|
||||
SELECT r.rolname
|
||||
FROM pg_roles r
|
||||
WHERE r.rolname ~ '^databricks_reader_\d+$'
|
||||
LOOP
|
||||
-- Apply the timeout setting to the role (will overwrite existing setting)
|
||||
EXECUTE format('ALTER ROLE %I SET statement_timeout = %L',
|
||||
reader_role.rolname, timeout_value);
|
||||
|
||||
RAISE LOG 'Updated statement_timeout = % for role %', timeout_value, reader_role.rolname;
|
||||
END LOOP;
|
||||
END IF;
|
||||
END
|
||||
$$;
|
||||
@@ -1,12 +0,0 @@
|
||||
DO $$
|
||||
DECLARE
|
||||
query varchar;
|
||||
BEGIN
|
||||
FOR query IN SELECT 'ALTER FUNCTION '||nsp.nspname||'.'||p.proname||'('||pg_get_function_identity_arguments(p.oid)||') OWNER TO {db_owner};'
|
||||
FROM pg_proc p
|
||||
JOIN pg_namespace nsp ON p.pronamespace = nsp.oid
|
||||
WHERE nsp.nspname = 'anon' LOOP
|
||||
EXECUTE query;
|
||||
END LOOP;
|
||||
END
|
||||
$$;
|
||||
15
compute_tools/src/sql/create_databricks_misc.sql
Normal file
15
compute_tools/src/sql/create_databricks_misc.sql
Normal file
@@ -0,0 +1,15 @@
|
||||
ALTER ROLE databricks_monitor SET statement_timeout = '60s';
|
||||
|
||||
CREATE OR REPLACE FUNCTION health_check_write_succeeds()
|
||||
RETURNS INTEGER AS $$
|
||||
BEGIN
|
||||
INSERT INTO health_check VALUES (1, now())
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET updated_at = now();
|
||||
|
||||
RETURN 1;
|
||||
EXCEPTION WHEN OTHERS THEN
|
||||
RAISE EXCEPTION '[DATABRICKS_SMGR] health_check failed: [%] %', SQLSTATE, SQLERRM;
|
||||
RETURN 0;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
@@ -1,8 +1,8 @@
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = '{privileged_role_name}')
|
||||
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname OPERATOR(pg_catalog.=) '{privileged_role_name}'::pg_catalog.name)
|
||||
THEN
|
||||
CREATE ROLE {privileged_role_name} CREATEDB CREATEROLE NOLOGIN REPLICATION BYPASSRLS IN ROLE pg_read_all_data, pg_write_all_data;
|
||||
CREATE ROLE {privileged_role_name} {privileges} IN ROLE pg_read_all_data, pg_write_all_data;
|
||||
END IF;
|
||||
END
|
||||
$$;
|
||||
|
||||
@@ -4,14 +4,14 @@ $$
|
||||
IF EXISTS(
|
||||
SELECT nspname
|
||||
FROM pg_catalog.pg_namespace
|
||||
WHERE nspname = 'public'
|
||||
WHERE nspname OPERATOR(pg_catalog.=) 'public'
|
||||
) AND
|
||||
current_setting('server_version_num')::int / 10000 >= 15
|
||||
pg_catalog.current_setting('server_version_num')::int OPERATOR(pg_catalog./) 10000 OPERATOR(pg_catalog.>=) 15
|
||||
THEN
|
||||
IF EXISTS(
|
||||
SELECT rolname
|
||||
FROM pg_catalog.pg_roles
|
||||
WHERE rolname = 'web_access'
|
||||
WHERE rolname OPERATOR(pg_catalog.=) 'web_access'
|
||||
)
|
||||
THEN
|
||||
GRANT CREATE ON SCHEMA public TO web_access;
|
||||
@@ -20,7 +20,7 @@ $$
|
||||
IF EXISTS(
|
||||
SELECT nspname
|
||||
FROM pg_catalog.pg_namespace
|
||||
WHERE nspname = 'public'
|
||||
WHERE nspname OPERATOR(pg_catalog.=) 'public'
|
||||
)
|
||||
THEN
|
||||
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO neon_superuser WITH GRANT OPTION;
|
||||
|
||||
@@ -2,11 +2,17 @@ DO ${outer_tag}$
|
||||
DECLARE
|
||||
subname TEXT;
|
||||
BEGIN
|
||||
LOCK TABLE pg_subscription IN ACCESS EXCLUSIVE MODE;
|
||||
FOR subname IN SELECT pg_subscription.subname FROM pg_subscription WHERE subdbid = (SELECT oid FROM pg_database WHERE datname = {datname_str}) LOOP
|
||||
EXECUTE format('ALTER SUBSCRIPTION %I DISABLE;', subname);
|
||||
EXECUTE format('ALTER SUBSCRIPTION %I SET (slot_name = NONE);', subname);
|
||||
EXECUTE format('DROP SUBSCRIPTION %I;', subname);
|
||||
LOCK TABLE pg_catalog.pg_subscription IN ACCESS EXCLUSIVE MODE;
|
||||
FOR subname IN
|
||||
SELECT pg_subscription.subname
|
||||
FROM pg_catalog.pg_subscription
|
||||
WHERE subdbid OPERATOR(pg_catalog.=) (
|
||||
SELECT oid FROM pg_database WHERE datname OPERATOR(pg_catalog.=) {datname_str}::pg_catalog.name
|
||||
)
|
||||
LOOP
|
||||
EXECUTE pg_catalog.format('ALTER SUBSCRIPTION %I DISABLE;', subname);
|
||||
EXECUTE pg_catalog.format('ALTER SUBSCRIPTION %I SET (slot_name = NONE);', subname);
|
||||
EXECUTE pg_catalog.format('DROP SUBSCRIPTION %I;', subname);
|
||||
END LOOP;
|
||||
END;
|
||||
${outer_tag}$;
|
||||
|
||||
@@ -3,19 +3,19 @@ BEGIN
|
||||
IF NOT EXISTS(
|
||||
SELECT 1
|
||||
FROM pg_catalog.pg_tables
|
||||
WHERE tablename = 'drop_subscriptions_done'
|
||||
AND schemaname = 'neon'
|
||||
WHERE tablename OPERATOR(pg_catalog.=) 'drop_subscriptions_done'::pg_catalog.name
|
||||
AND schemaname OPERATOR(pg_catalog.=) 'neon'::pg_catalog.name
|
||||
)
|
||||
THEN
|
||||
CREATE TABLE neon.drop_subscriptions_done
|
||||
(id serial primary key, timeline_id text);
|
||||
(id pg_catalog.int4 primary key generated by default as identity, timeline_id pg_catalog.text);
|
||||
END IF;
|
||||
|
||||
-- preserve the timeline_id of the last drop_subscriptions run
|
||||
-- to ensure that the cleanup of a timeline is executed only once.
|
||||
-- use upsert to avoid the table bloat in case of cascade branching (branch of a branch)
|
||||
INSERT INTO neon.drop_subscriptions_done VALUES (1, current_setting('neon.timeline_id'))
|
||||
INSERT INTO neon.drop_subscriptions_done VALUES (1, pg_catalog.current_setting('neon.timeline_id'))
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET timeline_id = current_setting('neon.timeline_id');
|
||||
SET timeline_id = pg_catalog.current_setting('neon.timeline_id')::pg_catalog.text;
|
||||
END
|
||||
$$
|
||||
|
||||
@@ -15,15 +15,15 @@ BEGIN
|
||||
WHERE schema_name IN ('public')
|
||||
LOOP
|
||||
FOR grantor IN EXECUTE
|
||||
format(
|
||||
'SELECT DISTINCT rtg.grantor FROM information_schema.role_table_grants AS rtg WHERE grantee = %s',
|
||||
pg_catalog.format(
|
||||
'SELECT DISTINCT rtg.grantor FROM information_schema.role_table_grants AS rtg WHERE grantee OPERATOR(pg_catalog.=) %s',
|
||||
-- N.B. this has to be properly dollar-escaped with `pg_quote_dollar()`
|
||||
quote_literal({role_name})
|
||||
)
|
||||
LOOP
|
||||
EXECUTE format('SET LOCAL ROLE %I', grantor);
|
||||
EXECUTE pg_catalog.format('SET LOCAL ROLE %I', grantor);
|
||||
|
||||
revoke_query := format(
|
||||
revoke_query := pg_catalog.format(
|
||||
'REVOKE ALL PRIVILEGES ON ALL TABLES IN SCHEMA %I FROM %I GRANTED BY %I',
|
||||
schema,
|
||||
-- N.B. this has to be properly dollar-escaped with `pg_quote_dollar()`
|
||||
|
||||
@@ -5,17 +5,17 @@ DO ${outer_tag}$
|
||||
IF EXISTS(
|
||||
SELECT nspname
|
||||
FROM pg_catalog.pg_namespace
|
||||
WHERE nspname = 'public'
|
||||
WHERE nspname OPERATOR(pg_catalog.=) 'public'::pg_catalog.name
|
||||
)
|
||||
THEN
|
||||
SELECT nspowner::regrole::text
|
||||
FROM pg_catalog.pg_namespace
|
||||
WHERE nspname = 'public'
|
||||
WHERE nspname OPERATOR(pg_catalog.=) 'public'::pg_catalog.text
|
||||
INTO schema_owner;
|
||||
|
||||
IF schema_owner = 'cloud_admin' OR schema_owner = 'zenith_admin'
|
||||
IF schema_owner OPERATOR(pg_catalog.=) 'cloud_admin'::pg_catalog.text OR schema_owner OPERATOR(pg_catalog.=) 'zenith_admin'::pg_catalog.text
|
||||
THEN
|
||||
EXECUTE format('ALTER SCHEMA public OWNER TO %I', {db_owner});
|
||||
EXECUTE pg_catalog.format('ALTER SCHEMA public OWNER TO %I', {db_owner});
|
||||
END IF;
|
||||
END IF;
|
||||
END
|
||||
|
||||
@@ -3,10 +3,10 @@ DO ${outer_tag}$
|
||||
IF EXISTS(
|
||||
SELECT 1
|
||||
FROM pg_catalog.pg_database
|
||||
WHERE datname = {datname}
|
||||
WHERE datname OPERATOR(pg_catalog.=) {datname}::pg_catalog.name
|
||||
)
|
||||
THEN
|
||||
EXECUTE format('ALTER DATABASE %I is_template false', {datname});
|
||||
EXECUTE pg_catalog.format('ALTER DATABASE %I is_template false', {datname});
|
||||
END IF;
|
||||
END
|
||||
${outer_tag}$;
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -37,7 +37,7 @@
|
||||
//! <other PostgreSQL files>
|
||||
//! ```
|
||||
//!
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::fmt::Display;
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
|
||||
use std::path::PathBuf;
|
||||
@@ -58,8 +58,12 @@ use compute_api::responses::{
|
||||
};
|
||||
use compute_api::spec::{
|
||||
Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PageserverProtocol,
|
||||
PgIdent, RemoteExtSpec, Role,
|
||||
PageserverShardInfo, PgIdent, RemoteExtSpec, Role,
|
||||
};
|
||||
|
||||
// re-export these, because they're used in the reconfigure() function
|
||||
pub use compute_api::spec::{PageserverConnectionInfo, PageserverShardConnectionInfo};
|
||||
|
||||
use jsonwebtoken::jwk::{
|
||||
AlgorithmParameters, CommonParameters, EllipticCurve, Jwk, JwkSet, KeyAlgorithm, KeyOperations,
|
||||
OctetKeyPairParameters, OctetKeyPairType, PublicKeyUse,
|
||||
@@ -74,9 +78,11 @@ use sha2::{Digest, Sha256};
|
||||
use spki::der::Decode;
|
||||
use spki::{SubjectPublicKeyInfo, SubjectPublicKeyInfoRef};
|
||||
use tracing::debug;
|
||||
use url::Host;
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
use utils::shard::ShardStripeSize;
|
||||
use utils::shard::{ShardCount, ShardIndex, ShardNumber};
|
||||
|
||||
use pageserver_api::config::DEFAULT_GRPC_LISTEN_PORT as DEFAULT_PAGESERVER_GRPC_PORT;
|
||||
use postgres_connection::parse_host_port;
|
||||
|
||||
use crate::local_env::LocalEnv;
|
||||
use crate::postgresql_conf::PostgresConf;
|
||||
@@ -387,9 +393,8 @@ pub struct EndpointStartArgs {
|
||||
pub endpoint_storage_addr: String,
|
||||
pub safekeepers_generation: Option<SafekeeperGeneration>,
|
||||
pub safekeepers: Vec<NodeId>,
|
||||
pub pageservers: Vec<(PageserverProtocol, Host, u16)>,
|
||||
pub pageserver_conninfo: PageserverConnectionInfo,
|
||||
pub remote_ext_base_url: Option<String>,
|
||||
pub shard_stripe_size: usize,
|
||||
pub create_test_user: bool,
|
||||
pub start_timeout: Duration,
|
||||
pub autoprewarm: bool,
|
||||
@@ -662,14 +667,6 @@ impl Endpoint {
|
||||
}
|
||||
}
|
||||
|
||||
fn build_pageserver_connstr(pageservers: &[(PageserverProtocol, Host, u16)]) -> String {
|
||||
pageservers
|
||||
.iter()
|
||||
.map(|(scheme, host, port)| format!("{scheme}://no_user@{host}:{port}"))
|
||||
.collect::<Vec<_>>()
|
||||
.join(",")
|
||||
}
|
||||
|
||||
/// Map safekeepers ids to the actual connection strings.
|
||||
fn build_safekeepers_connstrs(&self, sk_ids: Vec<NodeId>) -> Result<Vec<String>> {
|
||||
let mut safekeeper_connstrings = Vec::new();
|
||||
@@ -715,9 +712,6 @@ impl Endpoint {
|
||||
std::fs::remove_dir_all(self.pgdata())?;
|
||||
}
|
||||
|
||||
let pageserver_connstring = Self::build_pageserver_connstr(&args.pageservers);
|
||||
assert!(!pageserver_connstring.is_empty());
|
||||
|
||||
let safekeeper_connstrings = self.build_safekeepers_connstrs(args.safekeepers)?;
|
||||
|
||||
// check for file remote_extensions_spec.json
|
||||
@@ -732,6 +726,44 @@ impl Endpoint {
|
||||
remote_extensions = None;
|
||||
};
|
||||
|
||||
// For the sake of backwards-compatibility, also fill in 'pageserver_connstring'
|
||||
//
|
||||
// XXX: I believe this is not really needed, except to make
|
||||
// test_forward_compatibility happy.
|
||||
//
|
||||
// Use a closure so that we can conviniently return None in the middle of the
|
||||
// loop.
|
||||
let pageserver_connstring: Option<String> = (|| {
|
||||
let num_shards = args.pageserver_conninfo.shard_count.count();
|
||||
let mut connstrings = Vec::new();
|
||||
for shard_no in 0..num_shards {
|
||||
let shard_index = ShardIndex {
|
||||
shard_count: args.pageserver_conninfo.shard_count,
|
||||
shard_number: ShardNumber(shard_no),
|
||||
};
|
||||
let shard = args
|
||||
.pageserver_conninfo
|
||||
.shards
|
||||
.get(&shard_index)
|
||||
.ok_or_else(|| {
|
||||
anyhow!(
|
||||
"shard {} not found in pageserver_connection_info",
|
||||
shard_index
|
||||
)
|
||||
})?;
|
||||
let pageserver = shard
|
||||
.pageservers
|
||||
.first()
|
||||
.ok_or(anyhow!("must have at least one pageserver"))?;
|
||||
if let Some(libpq_url) = &pageserver.libpq_url {
|
||||
connstrings.push(libpq_url.clone());
|
||||
} else {
|
||||
return Ok::<_, anyhow::Error>(None);
|
||||
}
|
||||
}
|
||||
Ok(Some(connstrings.join(",")))
|
||||
})()?;
|
||||
|
||||
// Create config file
|
||||
let config = {
|
||||
let mut spec = ComputeSpec {
|
||||
@@ -776,13 +808,14 @@ impl Endpoint {
|
||||
branch_id: None,
|
||||
endpoint_id: Some(self.endpoint_id.clone()),
|
||||
mode: self.mode,
|
||||
pageserver_connstring: Some(pageserver_connstring),
|
||||
pageserver_connection_info: Some(args.pageserver_conninfo.clone()),
|
||||
pageserver_connstring,
|
||||
safekeepers_generation: args.safekeepers_generation.map(|g| g.into_inner()),
|
||||
safekeeper_connstrings,
|
||||
storage_auth_token: args.auth_token.clone(),
|
||||
remote_extensions,
|
||||
pgbouncer_settings: None,
|
||||
shard_stripe_size: Some(args.shard_stripe_size),
|
||||
shard_stripe_size: args.pageserver_conninfo.stripe_size, // redundant with pageserver_connection_info.stripe_size
|
||||
local_proxy_config: None,
|
||||
reconfigure_concurrency: self.reconfigure_concurrency,
|
||||
drop_subscriptions_before_start: self.drop_subscriptions_before_start,
|
||||
@@ -966,7 +999,7 @@ impl Endpoint {
|
||||
// Update the pageservers in the spec file of the endpoint. This is useful to test the spec refresh scenario.
|
||||
pub async fn update_pageservers_in_config(
|
||||
&self,
|
||||
pageservers: Vec<(PageserverProtocol, Host, u16)>,
|
||||
pageserver_conninfo: &PageserverConnectionInfo,
|
||||
) -> Result<()> {
|
||||
let config_path = self.endpoint_path().join("config.json");
|
||||
let mut config: ComputeConfig = {
|
||||
@@ -974,10 +1007,8 @@ impl Endpoint {
|
||||
serde_json::from_reader(file)?
|
||||
};
|
||||
|
||||
let pageserver_connstring = Self::build_pageserver_connstr(&pageservers);
|
||||
assert!(!pageserver_connstring.is_empty());
|
||||
let mut spec = config.spec.unwrap();
|
||||
spec.pageserver_connstring = Some(pageserver_connstring);
|
||||
spec.pageserver_connection_info = Some(pageserver_conninfo.clone());
|
||||
config.spec = Some(spec);
|
||||
|
||||
let file = std::fs::File::create(&config_path)?;
|
||||
@@ -1020,8 +1051,7 @@ impl Endpoint {
|
||||
|
||||
pub async fn reconfigure(
|
||||
&self,
|
||||
pageservers: Option<Vec<(PageserverProtocol, Host, u16)>>,
|
||||
stripe_size: Option<ShardStripeSize>,
|
||||
pageserver_conninfo: Option<&PageserverConnectionInfo>,
|
||||
safekeepers: Option<Vec<NodeId>>,
|
||||
safekeeper_generation: Option<SafekeeperGeneration>,
|
||||
) -> Result<()> {
|
||||
@@ -1036,15 +1066,15 @@ impl Endpoint {
|
||||
let postgresql_conf = self.read_postgresql_conf()?;
|
||||
spec.cluster.postgresql_conf = Some(postgresql_conf);
|
||||
|
||||
// If pageservers are not specified, don't change them.
|
||||
if let Some(pageservers) = pageservers {
|
||||
anyhow::ensure!(!pageservers.is_empty(), "no pageservers provided");
|
||||
|
||||
let pageserver_connstr = Self::build_pageserver_connstr(&pageservers);
|
||||
spec.pageserver_connstring = Some(pageserver_connstr);
|
||||
if stripe_size.is_some() {
|
||||
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
|
||||
}
|
||||
if let Some(pageserver_conninfo) = pageserver_conninfo {
|
||||
// If pageservers are provided, we need to ensure that they are not empty.
|
||||
// This is a requirement for the compute_ctl configuration.
|
||||
anyhow::ensure!(
|
||||
!pageserver_conninfo.shards.is_empty(),
|
||||
"no pageservers provided"
|
||||
);
|
||||
spec.pageserver_connection_info = Some(pageserver_conninfo.clone());
|
||||
spec.shard_stripe_size = pageserver_conninfo.stripe_size;
|
||||
}
|
||||
|
||||
// If safekeepers are not specified, don't change them.
|
||||
@@ -1093,11 +1123,9 @@ impl Endpoint {
|
||||
|
||||
pub async fn reconfigure_pageservers(
|
||||
&self,
|
||||
pageservers: Vec<(PageserverProtocol, Host, u16)>,
|
||||
stripe_size: Option<ShardStripeSize>,
|
||||
pageservers: &PageserverConnectionInfo,
|
||||
) -> Result<()> {
|
||||
self.reconfigure(Some(pageservers), stripe_size, None, None)
|
||||
.await
|
||||
self.reconfigure(Some(pageservers), None, None).await
|
||||
}
|
||||
|
||||
pub async fn reconfigure_safekeepers(
|
||||
@@ -1105,7 +1133,7 @@ impl Endpoint {
|
||||
safekeepers: Vec<NodeId>,
|
||||
generation: SafekeeperGeneration,
|
||||
) -> Result<()> {
|
||||
self.reconfigure(None, None, Some(safekeepers), Some(generation))
|
||||
self.reconfigure(None, Some(safekeepers), Some(generation))
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -1188,3 +1216,84 @@ impl Endpoint {
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// If caller is telling us what pageserver to use, this is not a tenant which is
|
||||
/// fully managed by storage controller, therefore not sharded.
|
||||
pub fn local_pageserver_conf_to_conn_info(
|
||||
conf: &crate::local_env::PageServerConf,
|
||||
) -> Result<PageserverConnectionInfo> {
|
||||
let libpq_url = {
|
||||
let (host, port) = parse_host_port(&conf.listen_pg_addr)?;
|
||||
let port = port.unwrap_or(5432);
|
||||
Some(format!("postgres://no_user@{host}:{port}"))
|
||||
};
|
||||
let grpc_url = if let Some(grpc_addr) = &conf.listen_grpc_addr {
|
||||
let (host, port) = parse_host_port(grpc_addr)?;
|
||||
let port = port.unwrap_or(DEFAULT_PAGESERVER_GRPC_PORT);
|
||||
Some(format!("grpc://no_user@{host}:{port}"))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let ps_conninfo = PageserverShardConnectionInfo {
|
||||
id: Some(conf.id),
|
||||
libpq_url,
|
||||
grpc_url,
|
||||
};
|
||||
|
||||
let shard_info = PageserverShardInfo {
|
||||
pageservers: vec![ps_conninfo],
|
||||
};
|
||||
|
||||
let shards: HashMap<_, _> = vec![(ShardIndex::unsharded(), shard_info)]
|
||||
.into_iter()
|
||||
.collect();
|
||||
Ok(PageserverConnectionInfo {
|
||||
shard_count: ShardCount::unsharded(),
|
||||
stripe_size: None,
|
||||
shards,
|
||||
prefer_protocol: PageserverProtocol::default(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn tenant_locate_response_to_conn_info(
|
||||
response: &pageserver_api::controller_api::TenantLocateResponse,
|
||||
) -> Result<PageserverConnectionInfo> {
|
||||
let mut shards = HashMap::new();
|
||||
for shard in response.shards.iter() {
|
||||
tracing::info!("parsing {}", shard.listen_pg_addr);
|
||||
let libpq_url = {
|
||||
let host = &shard.listen_pg_addr;
|
||||
let port = shard.listen_pg_port;
|
||||
Some(format!("postgres://no_user@{host}:{port}"))
|
||||
};
|
||||
let grpc_url = if let Some(grpc_addr) = &shard.listen_grpc_addr {
|
||||
let host = grpc_addr;
|
||||
let port = shard.listen_grpc_port.expect("no gRPC port");
|
||||
Some(format!("grpc://no_user@{host}:{port}"))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let shard_info = PageserverShardInfo {
|
||||
pageservers: vec![PageserverShardConnectionInfo {
|
||||
id: Some(shard.node_id),
|
||||
libpq_url,
|
||||
grpc_url,
|
||||
}],
|
||||
};
|
||||
|
||||
shards.insert(shard.shard_id.to_index(), shard_info);
|
||||
}
|
||||
|
||||
let stripe_size = if response.shard_params.count.is_unsharded() {
|
||||
None
|
||||
} else {
|
||||
Some(response.shard_params.stripe_size)
|
||||
};
|
||||
Ok(PageserverConnectionInfo {
|
||||
shard_count: response.shard_params.count,
|
||||
stripe_size,
|
||||
shards,
|
||||
prefer_protocol: PageserverProtocol::default(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1074,7 +1074,7 @@ fn generate_auth_keys(private_key_path: &Path, public_key_path: &Path) -> anyhow
|
||||
}
|
||||
|
||||
fn generate_ssl_ca_cert(cert_path: &Path, key_path: &Path) -> anyhow::Result<()> {
|
||||
// openssl req -x509 -newkey rsa:2048 -nodes -subj "/CN=Neon Local CA" -days 36500 \
|
||||
// openssl req -x509 -newkey ed25519 -nodes -subj "/CN=Neon Local CA" -days 36500 \
|
||||
// -out rootCA.crt -keyout rootCA.key
|
||||
let keygen_output = Command::new("openssl")
|
||||
.args([
|
||||
@@ -1104,7 +1104,7 @@ fn generate_ssl_cert(
|
||||
let mut csr_path = cert_path.to_path_buf();
|
||||
csr_path.set_extension(".csr");
|
||||
|
||||
// openssl req -new -nodes -newkey rsa:2048 -keyout server.key -out server.csr \
|
||||
// openssl req -new -nodes -newkey ed25519 -keyout server.key -out server.csr \
|
||||
// -subj "/CN=localhost" -addext "subjectAltName=DNS:localhost,IP:127.0.0.1"
|
||||
let keygen_output = Command::new("openssl")
|
||||
.args(["req", "-new", "-nodes"])
|
||||
|
||||
@@ -233,8 +233,8 @@ impl PageServerNode {
|
||||
let mut identity_file = std::fs::OpenOptions::new()
|
||||
.create_new(true)
|
||||
.write(true)
|
||||
.open(identity_file_path)
|
||||
.with_context(|| format!("open identity toml for write: {config_file_path:?}"))?;
|
||||
.open(&identity_file_path)
|
||||
.with_context(|| format!("open identity toml for write: {identity_file_path:?}"))?;
|
||||
let identity_toml = self.pageserver_make_identity_toml(node_id);
|
||||
identity_file
|
||||
.write_all(identity_toml.to_string().as_bytes())
|
||||
@@ -560,12 +560,12 @@ impl PageServerNode {
|
||||
.remove("sampling_ratio")
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("Falied to parse 'sampling_ratio'")?,
|
||||
.context("Failed to parse 'sampling_ratio'")?,
|
||||
relsize_snapshot_cache_capacity: settings
|
||||
.remove("relsize snapshot cache capacity")
|
||||
.map(|x| x.parse::<usize>())
|
||||
.transpose()
|
||||
.context("Falied to parse 'relsize_snapshot_cache_capacity' as integer")?,
|
||||
.context("Failed to parse 'relsize_snapshot_cache_capacity' as integer")?,
|
||||
basebackup_cache_enabled: settings
|
||||
.remove("basebackup_cache_enabled")
|
||||
.map(|x| x.parse::<bool>())
|
||||
|
||||
@@ -303,6 +303,13 @@ enum Command {
|
||||
#[arg(long, required = true, value_delimiter = ',')]
|
||||
new_sk_set: Vec<NodeId>,
|
||||
},
|
||||
/// Abort ongoing safekeeper migration.
|
||||
TimelineSafekeeperMigrateAbort {
|
||||
#[arg(long)]
|
||||
tenant_id: TenantId,
|
||||
#[arg(long)]
|
||||
timeline_id: TimelineId,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
@@ -1396,6 +1403,17 @@ async fn main() -> anyhow::Result<()> {
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Command::TimelineSafekeeperMigrateAbort {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
} => {
|
||||
let path =
|
||||
format!("v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate_abort");
|
||||
|
||||
storcon_client
|
||||
.dispatch::<(), ()>(Method::POST, path, None)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
246
docs/rfcs/2025-07-07-node-deletion-api-improvement.md
Normal file
246
docs/rfcs/2025-07-07-node-deletion-api-improvement.md
Normal file
@@ -0,0 +1,246 @@
|
||||
# Node deletion API improvement
|
||||
|
||||
Created on 2025-07-07
|
||||
Implemented on _TBD_
|
||||
|
||||
## Summary
|
||||
|
||||
This RFC describes improvements to the storage controller API for gracefully deleting pageserver
|
||||
nodes.
|
||||
|
||||
## Motivation
|
||||
|
||||
The basic node deletion API introduced in [#8226](https://github.com/neondatabase/neon/issues/8333)
|
||||
has several limitations:
|
||||
|
||||
- Deleted nodes can re-add themselves if they restart (e.g., a flaky node that keeps restarting and
|
||||
we cannot reach via SSH to stop the pageserver). This issue has been resolved by tombstone
|
||||
mechanism in [#12036](https://github.com/neondatabase/neon/issues/12036)
|
||||
- Process of node deletion is not graceful, i.e. it just imitates a node failure
|
||||
|
||||
In this context, "graceful" node deletion means that users do not experience any disruption or
|
||||
negative effects, provided the system remains in a healthy state (i.e., the remaining pageservers
|
||||
can handle the workload and all requirements are met). To achieve this, the system must perform
|
||||
live migration of all tenant shards from the node being deleted while the node is still running
|
||||
and continue processing all incoming requests. The node is removed only after all tenant shards
|
||||
have been safely migrated.
|
||||
|
||||
Although live migrations can be achieved with the drain functionality, it leads to incorrect shard
|
||||
placement, such as not matching availability zones. This results in unnecessary work to optimize
|
||||
the placement that was just recently performed.
|
||||
|
||||
If we delete a node before its tenant shards are fully moved, the new node won't have all the
|
||||
needed data (e.g. heatmaps) ready. This means user requests to the new node will be much slower at
|
||||
first. If there are many tenant shards, this slowdown affects a huge amount of users.
|
||||
|
||||
Graceful node deletion is more complicated and can introduce new issues. It takes longer because
|
||||
live migration of each tenant shard can last several minutes. Using non-blocking accessors may
|
||||
also cause deletion to wait if other processes are holding inner state lock. It also gets trickier
|
||||
because we need to handle other requests, like drain and fill, at the same time.
|
||||
|
||||
## Impacted components (e.g. pageserver, safekeeper, console, etc)
|
||||
|
||||
- storage controller
|
||||
- pageserver (indirectly)
|
||||
|
||||
## Proposed implementation
|
||||
|
||||
### Tombstones
|
||||
|
||||
To resolve the problem of deleted nodes re-adding themselves, a tombstone mechanism was introduced
|
||||
as part of the node stored information. Each node has a separate `NodeLifecycle` field with two
|
||||
possible states: `Active` and `Deleted`. When node deletion completes, the database row is not
|
||||
deleted but instead has its `NodeLifecycle` column switched to `Deleted`. Nodes with `Deleted`
|
||||
lifecycle are treated as if the row is absent for most handlers, with several exceptions: reattach
|
||||
and register functionality must be aware of tombstones. Additionally, new debug handlers are
|
||||
available for listing and deleting tombstones via the `/debug/v1/tombstone` path.
|
||||
|
||||
### Gracefulness
|
||||
|
||||
The problem of making node deletion graceful is complex and involves several challenges:
|
||||
|
||||
- **Cancellable**: The operation must be cancellable to allow administrators to abort the process
|
||||
if needed, e.g. if run by mistake.
|
||||
- **Non-blocking**: We don't want to block deployment operations like draining/filling on the node
|
||||
deletion process. We need clear policies for handling concurrent operations: what happens when a
|
||||
drain/fill request arrives while deletion is in progress, and what happens when a delete request
|
||||
arrives while drain/fill is in progress.
|
||||
- **Persistent**: If the storage controller restarts during this long-running operation, we must
|
||||
preserve progress and automatically resume the deletion process after the storage controller
|
||||
restarts.
|
||||
- **Migrated correctly**: We cannot simply use the existing drain mechanism for nodes scheduled
|
||||
for deletion, as this would move shards to irrelevant locations. The drain process expects the
|
||||
node to return, so it only moves shards to backup locations, not to their preferred AZs. It also
|
||||
leaves secondary locations unmoved. This could result in unnecessary load on the storage
|
||||
controller and inefficient resource utilization.
|
||||
- **Force option**: Administrators need the ability to force immediate, non-graceful deletion when
|
||||
time constraints or emergency situations require it, bypassing the normal graceful migration
|
||||
process.
|
||||
|
||||
See below for a detailed breakdown of the proposed changes and mechanisms.
|
||||
|
||||
#### Node lifecycle
|
||||
|
||||
New `NodeLifecycle` enum and a matching database field with these values:
|
||||
- `Active`: The normal state. All operations are allowed.
|
||||
- `ScheduledForDeletion`: The node is marked to be deleted soon. Deletion may be in progress or
|
||||
will happen later, but the node will eventually be removed. All operations are allowed.
|
||||
- `Deleted`: The node is fully deleted. No operations are allowed, and the node cannot be brought
|
||||
back. The only action left is to remove its record from the database. Any attempt to register a
|
||||
node in this state will fail.
|
||||
|
||||
This state persists across storage controller restarts.
|
||||
|
||||
**State transition**
|
||||
```
|
||||
+--------------------+
|
||||
+---| Active |<---------------------+
|
||||
| +--------------------+ |
|
||||
| ^ |
|
||||
| start_node_delete | cancel_node_delete |
|
||||
v | |
|
||||
+----------------------------------+ |
|
||||
| ScheduledForDeletion | |
|
||||
+----------------------------------+ |
|
||||
| |
|
||||
| node_register |
|
||||
| |
|
||||
| delete_node (at the finish) |
|
||||
| |
|
||||
v |
|
||||
+---------+ tombstone_delete +----------+
|
||||
| Deleted |-------------------------------->| no row |
|
||||
+---------+ +----------+
|
||||
```
|
||||
|
||||
#### NodeSchedulingPolicy::Deleting
|
||||
|
||||
A `Deleting` variant to the `NodeSchedulingPolicy` enum. This means the deletion function is
|
||||
running for the node right now. Only one node can have the `Deleting` policy at a time.
|
||||
|
||||
The `NodeSchedulingPolicy::Deleting` state is persisted in the database. However, after a storage
|
||||
controller restart, any node previously marked as `Deleting` will have its scheduling policy reset
|
||||
to `Pause`. The policy will only transition back to `Deleting` when the deletion operation is
|
||||
actively started again, as triggered by the node's `NodeLifecycle::ScheduledForDeletion` state.
|
||||
|
||||
`NodeSchedulingPolicy` transition details:
|
||||
1. When `node_delete` begins, set the policy to `NodeSchedulingPolicy::Deleting`.
|
||||
2. If `node_delete` is cancelled (for example, due to a concurrent drain operation), revert the
|
||||
policy to its previous value. The policy is persisted in storcon DB.
|
||||
3. After `node_delete` completes, the final value of the scheduling policy is irrelevant, since
|
||||
`NodeLifecycle::Deleted` prevents any further access to this field.
|
||||
|
||||
The deletion process cannot be initiated for nodes currently undergoing deployment-related
|
||||
operations (`Draining`, `Filling`, or `PauseForRestart` policies). Deletion will only be triggered
|
||||
once the node transitions to either the `Active` or `Pause` state.
|
||||
|
||||
#### OperationTracker
|
||||
|
||||
A replacement for `Option<OperationHandler> ongoing_operation`, the `OperationTracker` is a
|
||||
dedicated service state object responsible for managing all long-running node operations (drain,
|
||||
fill, delete) with robust concurrency control.
|
||||
|
||||
Key responsibilities:
|
||||
- Orchestrates the execution of operations
|
||||
- Supports cancellation of currently running operations
|
||||
- Enforces operation constraints, e.g. allowing only single drain/fill operation at a time
|
||||
- Persists deletion state, enabling recovery of pending deletions across restarts
|
||||
- Ensures thread safety across concurrent requests
|
||||
|
||||
#### Attached tenant shard processing
|
||||
|
||||
When deleting a node, handle each attached tenant shard as follows:
|
||||
|
||||
1. Pick the best node to become the new attached (the candidate).
|
||||
2. If the candidate already has this shard as a secondary:
|
||||
- Create a new secondary for the shard on another suitable node.
|
||||
Otherwise:
|
||||
- Create a secondary for the shard on the candidate node.
|
||||
3. Wait until all secondaries are ready and pre-warmed.
|
||||
4. Promote the candidate's secondary to attached.
|
||||
5. Remove the secondary from the node being deleted.
|
||||
|
||||
This process safely moves all attached shards before deleting the node.
|
||||
|
||||
#### Secondary tenant shard processing
|
||||
|
||||
When deleting a node, handle each secondary tenant shard as follows:
|
||||
|
||||
1. Choose the best node to become the new secondary.
|
||||
2. Create a secondary for the shard on that node.
|
||||
3. Wait until the new secondary is ready.
|
||||
4. Remove the secondary from the node being deleted.
|
||||
|
||||
This ensures all secondary shards are safely moved before deleting the node.
|
||||
|
||||
### Reliability, failure modes and corner cases
|
||||
|
||||
In case of a storage controller failure and following restart, the system behavior depends on the
|
||||
`NodeLifecycle` state:
|
||||
|
||||
- If `NodeLifecycle` is `Active`: No action is taken for this node.
|
||||
- If `NodeLifecycle` is `Deleted`: The node will not be re-added.
|
||||
- If `NodeLifecycle` is `ScheduledForDeletion`: A deletion background task will be launched for
|
||||
this node.
|
||||
|
||||
In case of a pageserver node failure during deletion, the behavior depends on the `force` flag:
|
||||
- If `force` is set: The node deletion will proceed regardless of the node's availability.
|
||||
- If `force` is not set: The deletion will be retried a limited number of times. If the node
|
||||
remains unavailable, the deletion process will pause and automatically resume when the node
|
||||
becomes healthy again.
|
||||
|
||||
### Operations concurrency
|
||||
|
||||
The following sections describe the behavior when different types of requests arrive at the storage
|
||||
controller and how they interact with ongoing operations.
|
||||
|
||||
#### Delete request
|
||||
|
||||
Handler: `PUT /control/v1/node/:node_id/delete`
|
||||
|
||||
1. If node lifecycle is `NodeLifecycle::ScheduledForDeletion`:
|
||||
- Return `200 OK`: there is already an ongoing deletion request for this node
|
||||
2. Update & persist lifecycle to `NodeLifecycle::ScheduledForDeletion`
|
||||
3. Persist current scheduling policy
|
||||
4. If there is no active operation (drain/fill/delete):
|
||||
- Run deletion process for this node
|
||||
|
||||
#### Cancel delete request
|
||||
|
||||
Handler: `DELETE /control/v1/node/:node_id/delete`
|
||||
|
||||
1. If node lifecycle is not `NodeLifecycle::ScheduledForDeletion`:
|
||||
- Return `404 Not Found`: there is no current deletion request for this node
|
||||
2. If the active operation is deleting this node, cancel it
|
||||
3. Update & persist lifecycle to `NodeLifecycle::Active`
|
||||
4. Restore the last scheduling policy from persistence
|
||||
|
||||
#### Drain/fill request
|
||||
|
||||
1. If there are already ongoing drain/fill processes:
|
||||
- Return `409 Conflict`: queueing of drain/fill processes is not supported
|
||||
2. If there is an ongoing delete process:
|
||||
- Cancel it and wait until it is cancelled
|
||||
3. Run the drain/fill process
|
||||
4. After the drain/fill process is cancelled or finished:
|
||||
- Try to find another candidate to delete and run the deletion process for that node
|
||||
|
||||
#### Drain/fill cancel request
|
||||
|
||||
1. If the active operation is not the related process:
|
||||
- Return `400 Bad Request`: cancellation request is incorrect, operations are not the same
|
||||
2. Cancel the active operation
|
||||
3. Try to find another candidate to delete and run the deletion process for that node
|
||||
|
||||
## Definition of Done
|
||||
|
||||
- [x] Fix flaky node scenario and introduce related debug handlers
|
||||
- [ ] Node deletion intent is persistent - a node will be eventually deleted after a deletion
|
||||
request regardless of draining/filling requests and restarts
|
||||
- [ ] Node deletion can be graceful - deletion completes only after moving all tenant shards to
|
||||
recommended locations
|
||||
- [ ] Deploying does not break due to long deletions - drain/fill operations override deletion
|
||||
process and deletion resumes after drain/fill completes
|
||||
- [ ] `force` flag is implemented and provides fast, failure-tolerant node removal (e.g., when a
|
||||
pageserver node does not respond)
|
||||
- [ ] Legacy delete handler code is removed from storage_controller, test_runner, and storcon_cli
|
||||
@@ -1,10 +1,9 @@
|
||||
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
|
||||
|
||||
use std::fmt::Display;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use jsonwebtoken::jwk::JwkSet;
|
||||
use serde::{Deserialize, Serialize, Serializer};
|
||||
use std::fmt::Display;
|
||||
|
||||
use crate::privilege::Privilege;
|
||||
use crate::spec::{ComputeSpec, Database, ExtVersion, PgIdent, Role};
|
||||
@@ -49,7 +48,7 @@ pub struct ExtensionInstallResponse {
|
||||
/// Status of the LFC prewarm process. The same state machine is reused for
|
||||
/// both autoprewarm (prewarm after compute/Postgres start using the previously
|
||||
/// stored LFC state) and explicit prewarming via API.
|
||||
#[derive(Serialize, Default, Debug, Clone, PartialEq)]
|
||||
#[derive(Serialize, Default, Debug, Clone)]
|
||||
#[serde(tag = "status", rename_all = "snake_case")]
|
||||
pub enum LfcPrewarmState {
|
||||
/// Default value when compute boots up.
|
||||
@@ -59,7 +58,14 @@ pub enum LfcPrewarmState {
|
||||
Prewarming,
|
||||
/// We found requested LFC state in the endpoint storage and
|
||||
/// completed prewarming successfully.
|
||||
Completed,
|
||||
Completed {
|
||||
total: i32,
|
||||
prewarmed: i32,
|
||||
skipped: i32,
|
||||
state_download_time_ms: u32,
|
||||
uncompress_time_ms: u32,
|
||||
prewarm_time_ms: u32,
|
||||
},
|
||||
/// Unexpected error happened during prewarming. Note, `Not Found 404`
|
||||
/// response from the endpoint storage is explicitly excluded here
|
||||
/// because it can normally happen on the first compute start,
|
||||
@@ -68,11 +74,15 @@ pub enum LfcPrewarmState {
|
||||
/// We tried to fetch the corresponding LFC state from the endpoint storage,
|
||||
/// but received `Not Found 404`. This should normally happen only during the
|
||||
/// first endpoint start after creation with `autoprewarm: true`.
|
||||
/// This may also happen if LFC is turned off or not initialized
|
||||
///
|
||||
/// During the orchestrated prewarm via API, when a caller explicitly
|
||||
/// provides the LFC state key to prewarm from, it's the caller responsibility
|
||||
/// to handle this status as an error state in this case.
|
||||
Skipped,
|
||||
/// LFC prewarm was cancelled. Some pages in LFC cache may be prewarmed if query
|
||||
/// has started working before cancellation
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
impl Display for LfcPrewarmState {
|
||||
@@ -80,32 +90,44 @@ impl Display for LfcPrewarmState {
|
||||
match self {
|
||||
LfcPrewarmState::NotPrewarmed => f.write_str("NotPrewarmed"),
|
||||
LfcPrewarmState::Prewarming => f.write_str("Prewarming"),
|
||||
LfcPrewarmState::Completed => f.write_str("Completed"),
|
||||
LfcPrewarmState::Completed { .. } => f.write_str("Completed"),
|
||||
LfcPrewarmState::Skipped => f.write_str("Skipped"),
|
||||
LfcPrewarmState::Failed { error } => write!(f, "Error({error})"),
|
||||
LfcPrewarmState::Cancelled => f.write_str("Cancelled"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Default, Debug, Clone, PartialEq)]
|
||||
#[derive(Serialize, Default, Debug, Clone)]
|
||||
#[serde(tag = "status", rename_all = "snake_case")]
|
||||
pub enum LfcOffloadState {
|
||||
#[default]
|
||||
NotOffloaded,
|
||||
Offloading,
|
||||
Completed,
|
||||
Completed {
|
||||
state_query_time_ms: u32,
|
||||
compress_time_ms: u32,
|
||||
state_upload_time_ms: u32,
|
||||
},
|
||||
Failed {
|
||||
error: String,
|
||||
},
|
||||
/// LFC state was empty so it wasn't offloaded
|
||||
Skipped,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug, Clone, PartialEq)]
|
||||
#[derive(Serialize, Debug, Clone)]
|
||||
#[serde(tag = "status", rename_all = "snake_case")]
|
||||
/// Response of /promote
|
||||
pub enum PromoteState {
|
||||
NotPromoted,
|
||||
Completed,
|
||||
Failed { error: String },
|
||||
Completed {
|
||||
lsn_wait_time_ms: u32,
|
||||
pg_promote_time_ms: u32,
|
||||
reconfigure_time_ms: u32,
|
||||
},
|
||||
Failed {
|
||||
error: String,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Default, Debug)]
|
||||
|
||||
@@ -12,8 +12,9 @@ use regex::Regex;
|
||||
use remote_storage::RemotePath;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use url::Url;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::shard::{ShardCount, ShardIndex, ShardNumber, ShardStripeSize};
|
||||
|
||||
use crate::responses::TlsConfig;
|
||||
|
||||
@@ -105,8 +106,27 @@ pub struct ComputeSpec {
|
||||
// updated to fill these fields, we can make these non optional.
|
||||
pub tenant_id: Option<TenantId>,
|
||||
pub timeline_id: Option<TimelineId>,
|
||||
|
||||
/// Pageserver information can be passed in three different ways:
|
||||
/// 1. Here in `pageserver_connection_info`
|
||||
/// 2. In the `pageserver_connstring` field.
|
||||
/// 3. in `cluster.settings`.
|
||||
///
|
||||
/// The goal is to use method 1. everywhere. But for backwards-compatibility with old
|
||||
/// versions of the control plane, `compute_ctl` will check 2. and 3. if the
|
||||
/// `pageserver_connection_info` field is missing.
|
||||
///
|
||||
/// If both `pageserver_connection_info` and `pageserver_connstring`+`shard_stripe_size` are
|
||||
/// given, they must contain the same information.
|
||||
pub pageserver_connection_info: Option<PageserverConnectionInfo>,
|
||||
|
||||
pub pageserver_connstring: Option<String>,
|
||||
|
||||
/// Stripe size for pageserver sharding, in pages. This is set together with the legacy
|
||||
/// `pageserver_connstring` field. When the modern `pageserver_connection_info` field is used,
|
||||
/// the stripe size is stored in `pageserver_connection_info.stripe_size` instead.
|
||||
pub shard_stripe_size: Option<ShardStripeSize>,
|
||||
|
||||
// More neon ids that we expose to the compute_ctl
|
||||
// and to postgres as neon extension GUCs.
|
||||
pub project_id: Option<String>,
|
||||
@@ -139,10 +159,6 @@ pub struct ComputeSpec {
|
||||
|
||||
pub pgbouncer_settings: Option<IndexMap<String, String>>,
|
||||
|
||||
// Stripe size for pageserver sharding, in pages
|
||||
#[serde(default)]
|
||||
pub shard_stripe_size: Option<usize>,
|
||||
|
||||
/// Local Proxy configuration used for JWT authentication
|
||||
#[serde(default)]
|
||||
pub local_proxy_config: Option<LocalProxySpec>,
|
||||
@@ -217,6 +233,140 @@ pub enum ComputeFeature {
|
||||
UnknownFeature,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
|
||||
pub struct PageserverConnectionInfo {
|
||||
/// NB: 0 for unsharded tenants, 1 for sharded tenants with 1 shard, following storage
|
||||
pub shard_count: ShardCount,
|
||||
|
||||
/// INVARIANT: null if shard_count is 0, otherwise non-null and immutable
|
||||
pub stripe_size: Option<ShardStripeSize>,
|
||||
|
||||
pub shards: HashMap<ShardIndex, PageserverShardInfo>,
|
||||
|
||||
/// If the compute supports both protocols, this indicates which one it should use. The compute
|
||||
/// may use other available protocols too, if it doesn't support the preferred one. The URL's
|
||||
/// for the protocol specified here must be present for all shards, i.e. do not mark a protocol
|
||||
/// as preferred if it cannot actually be used with all the pageservers.
|
||||
#[serde(default)]
|
||||
pub prefer_protocol: PageserverProtocol,
|
||||
}
|
||||
|
||||
/// Extract PageserverConnectionInfo from a comma-separated list of libpq connection strings.
|
||||
///
|
||||
/// This is used for backwards-compatibility, to parse the legacy
|
||||
/// [ComputeSpec::pageserver_connstring] field, or the 'neon.pageserver_connstring' GUC. Nowadays,
|
||||
/// the 'pageserver_connection_info' field should be used instead.
|
||||
impl PageserverConnectionInfo {
|
||||
pub fn from_connstr(
|
||||
connstr: &str,
|
||||
stripe_size: Option<ShardStripeSize>,
|
||||
) -> Result<PageserverConnectionInfo, anyhow::Error> {
|
||||
let shard_infos: Vec<_> = connstr
|
||||
.split(',')
|
||||
.map(|connstr| PageserverShardInfo {
|
||||
pageservers: vec![PageserverShardConnectionInfo {
|
||||
id: None,
|
||||
libpq_url: Some(connstr.to_string()),
|
||||
grpc_url: None,
|
||||
}],
|
||||
})
|
||||
.collect();
|
||||
|
||||
match shard_infos.len() {
|
||||
0 => anyhow::bail!("empty connection string"),
|
||||
1 => {
|
||||
// We assume that if there's only connection string, it means "unsharded",
|
||||
// rather than a sharded system with just a single shard. The latter is
|
||||
// possible in principle, but we never do it.
|
||||
let shard_count = ShardCount::unsharded();
|
||||
let only_shard = shard_infos.first().unwrap().clone();
|
||||
let shards = vec![(ShardIndex::unsharded(), only_shard)];
|
||||
Ok(PageserverConnectionInfo {
|
||||
shard_count,
|
||||
stripe_size: None,
|
||||
shards: shards.into_iter().collect(),
|
||||
prefer_protocol: PageserverProtocol::Libpq,
|
||||
})
|
||||
}
|
||||
n => {
|
||||
if stripe_size.is_none() {
|
||||
anyhow::bail!("{n} shards but no stripe_size");
|
||||
}
|
||||
let shard_count = ShardCount(n.try_into()?);
|
||||
let shards = shard_infos
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(idx, shard_info)| {
|
||||
(
|
||||
ShardIndex {
|
||||
shard_count,
|
||||
shard_number: ShardNumber(
|
||||
idx.try_into().expect("shard number fits in u8"),
|
||||
),
|
||||
},
|
||||
shard_info,
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
Ok(PageserverConnectionInfo {
|
||||
shard_count,
|
||||
stripe_size,
|
||||
shards,
|
||||
prefer_protocol: PageserverProtocol::Libpq,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Convenience routine to get the connection string for a shard.
|
||||
pub fn shard_url(
|
||||
&self,
|
||||
shard_number: ShardNumber,
|
||||
protocol: PageserverProtocol,
|
||||
) -> anyhow::Result<&str> {
|
||||
let shard_index = ShardIndex {
|
||||
shard_number,
|
||||
shard_count: self.shard_count,
|
||||
};
|
||||
let shard = self.shards.get(&shard_index).ok_or_else(|| {
|
||||
anyhow::anyhow!("shard connection info missing for shard {}", shard_index)
|
||||
})?;
|
||||
|
||||
// Just use the first pageserver in the list. That's good enough for this
|
||||
// convenience routine; if you need more control, like round robin policy or
|
||||
// failover support, roll your own. (As of this writing, we never have more than
|
||||
// one pageserver per shard anyway, but that will change in the future.)
|
||||
let pageserver = shard
|
||||
.pageservers
|
||||
.first()
|
||||
.ok_or(anyhow::anyhow!("must have at least one pageserver"))?;
|
||||
|
||||
let result = match protocol {
|
||||
PageserverProtocol::Grpc => pageserver
|
||||
.grpc_url
|
||||
.as_ref()
|
||||
.ok_or(anyhow::anyhow!("no grpc_url for shard {shard_index}"))?,
|
||||
PageserverProtocol::Libpq => pageserver
|
||||
.libpq_url
|
||||
.as_ref()
|
||||
.ok_or(anyhow::anyhow!("no libpq_url for shard {shard_index}"))?,
|
||||
};
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
|
||||
pub struct PageserverShardInfo {
|
||||
pub pageservers: Vec<PageserverShardConnectionInfo>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
|
||||
pub struct PageserverShardConnectionInfo {
|
||||
pub id: Option<NodeId>,
|
||||
pub libpq_url: Option<String>,
|
||||
pub grpc_url: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
|
||||
pub struct RemoteExtSpec {
|
||||
pub public_extensions: Option<Vec<String>>,
|
||||
@@ -334,6 +484,12 @@ impl ComputeMode {
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for ComputeMode {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_str(self.to_type_str())
|
||||
}
|
||||
}
|
||||
|
||||
/// Log level for audit logging
|
||||
#[derive(Clone, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
|
||||
pub enum ComputeAudit {
|
||||
@@ -470,13 +626,15 @@ pub struct JwksSettings {
|
||||
pub jwt_audience: Option<String>,
|
||||
}
|
||||
|
||||
/// Protocol used to connect to a Pageserver. Parsed from the connstring scheme.
|
||||
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
|
||||
/// Protocol used to connect to a Pageserver.
|
||||
#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
|
||||
pub enum PageserverProtocol {
|
||||
/// The original protocol based on libpq and COPY. Uses postgresql:// or postgres:// scheme.
|
||||
#[default]
|
||||
#[serde(rename = "libpq")]
|
||||
Libpq,
|
||||
/// A newer, gRPC-based protocol. Uses grpc:// scheme.
|
||||
#[serde(rename = "grpc")]
|
||||
Grpc,
|
||||
}
|
||||
|
||||
|
||||
@@ -558,11 +558,11 @@ async fn add_request_id_header_to_response(
|
||||
mut res: Response<Body>,
|
||||
req_info: RequestInfo,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
if let Some(request_id) = req_info.context::<RequestId>() {
|
||||
if let Ok(request_header_value) = HeaderValue::from_str(&request_id.0) {
|
||||
res.headers_mut()
|
||||
.insert(&X_REQUEST_ID_HEADER, request_header_value);
|
||||
};
|
||||
if let Some(request_id) = req_info.context::<RequestId>()
|
||||
&& let Ok(request_header_value) = HeaderValue::from_str(&request_id.0)
|
||||
{
|
||||
res.headers_mut()
|
||||
.insert(&X_REQUEST_ID_HEADER, request_header_value);
|
||||
};
|
||||
|
||||
Ok(res)
|
||||
|
||||
@@ -72,10 +72,10 @@ impl Server {
|
||||
if err.is_incomplete_message() || err.is_closed() || err.is_timeout() {
|
||||
return true;
|
||||
}
|
||||
if let Some(inner) = err.source() {
|
||||
if let Some(io) = inner.downcast_ref::<std::io::Error>() {
|
||||
return suppress_io_error(io);
|
||||
}
|
||||
if let Some(inner) = err.source()
|
||||
&& let Some(io) = inner.downcast_ref::<std::io::Error>()
|
||||
{
|
||||
return suppress_io_error(io);
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
@@ -363,7 +363,7 @@ where
|
||||
// TODO: An Iterator might be nicer. The communicator's clock algorithm needs to
|
||||
// _slowly_ iterate through all buckets with its clock hand, without holding a lock.
|
||||
// If we switch to an Iterator, it must not hold the lock.
|
||||
pub fn get_at_bucket(&self, pos: usize) -> Option<ValueReadGuard<(K, V)>> {
|
||||
pub fn get_at_bucket(&self, pos: usize) -> Option<ValueReadGuard<'_, (K, V)>> {
|
||||
let map = unsafe { self.shared_ptr.as_ref() }.unwrap().read();
|
||||
if pos >= map.buckets.len() {
|
||||
return None;
|
||||
|
||||
@@ -5,8 +5,12 @@ edition = "2024"
|
||||
license.workspace = true
|
||||
|
||||
[features]
|
||||
default = ["io-align-512"]
|
||||
# See pageserver/Cargo.toml
|
||||
testing = ["dep:nix"]
|
||||
# Direct IO alignment options (mutually exclusive)
|
||||
io-align-512 = []
|
||||
io-align-4k = []
|
||||
|
||||
[dependencies]
|
||||
serde.workspace = true
|
||||
|
||||
@@ -703,6 +703,11 @@ pub mod defaults {
|
||||
|
||||
pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
|
||||
|
||||
#[cfg(feature = "io-align-4k")]
|
||||
pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 4096;
|
||||
#[cfg(all(feature = "io-align-512", not(feature = "io-align-4k")))]
|
||||
pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512;
|
||||
#[cfg(not(any(feature = "io-align-512", feature = "io-align-4k")))]
|
||||
pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512;
|
||||
|
||||
pub const DEFAULT_SSL_KEY_FILE: &str = "server.key";
|
||||
|
||||
@@ -9,10 +9,7 @@ regex.workspace = true
|
||||
bytes.workspace = true
|
||||
anyhow.workspace = true
|
||||
crc32c.workspace = true
|
||||
criterion.workspace = true
|
||||
once_cell.workspace = true
|
||||
log.workspace = true
|
||||
memoffset.workspace = true
|
||||
pprof.workspace = true
|
||||
thiserror.workspace = true
|
||||
serde.workspace = true
|
||||
@@ -22,6 +19,7 @@ tracing.workspace = true
|
||||
postgres_versioninfo.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
criterion.workspace = true
|
||||
env_logger.workspace = true
|
||||
postgres.workspace = true
|
||||
|
||||
|
||||
@@ -34,9 +34,8 @@ const SIZEOF_CONTROLDATA: usize = size_of::<ControlFileData>();
|
||||
impl ControlFileData {
|
||||
/// Compute the offset of the `crc` field within the `ControlFileData` struct.
|
||||
/// Equivalent to offsetof(ControlFileData, crc) in C.
|
||||
// Someday this can be const when the right compiler features land.
|
||||
fn pg_control_crc_offset() -> usize {
|
||||
memoffset::offset_of!(ControlFileData, crc)
|
||||
const fn pg_control_crc_offset() -> usize {
|
||||
std::mem::offset_of!(ControlFileData, crc)
|
||||
}
|
||||
|
||||
///
|
||||
|
||||
@@ -4,12 +4,11 @@
|
||||
use crate::pg_constants;
|
||||
use crate::transaction_id_precedes;
|
||||
use bytes::BytesMut;
|
||||
use log::*;
|
||||
|
||||
use super::bindings::MultiXactId;
|
||||
|
||||
pub fn transaction_id_set_status(xid: u32, status: u8, page: &mut BytesMut) {
|
||||
trace!(
|
||||
tracing::trace!(
|
||||
"handle_apply_request for RM_XACT_ID-{} (1-commit, 2-abort, 3-sub_commit)",
|
||||
status
|
||||
);
|
||||
|
||||
@@ -14,7 +14,6 @@ use super::xlog_utils::*;
|
||||
use crate::WAL_SEGMENT_SIZE;
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use crc32c::*;
|
||||
use log::*;
|
||||
use std::cmp::min;
|
||||
use std::num::NonZeroU32;
|
||||
use utils::lsn::Lsn;
|
||||
@@ -236,7 +235,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder {
|
||||
// XLOG_SWITCH records are special. If we see one, we need to skip
|
||||
// to the next WAL segment.
|
||||
let next_lsn = if xlogrec.is_xlog_switch_record() {
|
||||
trace!("saw xlog switch record at {}", self.lsn);
|
||||
tracing::trace!("saw xlog switch record at {}", self.lsn);
|
||||
self.lsn + self.lsn.calc_padding(WAL_SEGMENT_SIZE as u64)
|
||||
} else {
|
||||
// Pad to an 8-byte boundary
|
||||
|
||||
@@ -23,8 +23,6 @@ use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
|
||||
use bytes::BytesMut;
|
||||
use bytes::{Buf, Bytes};
|
||||
|
||||
use log::*;
|
||||
|
||||
use serde::Serialize;
|
||||
use std::ffi::{CString, OsStr};
|
||||
use std::fs::File;
|
||||
@@ -235,7 +233,7 @@ pub fn find_end_of_wal(
|
||||
let mut curr_lsn = start_lsn;
|
||||
let mut buf = [0u8; XLOG_BLCKSZ];
|
||||
let pg_version = MY_PGVERSION;
|
||||
debug!("find_end_of_wal PG_VERSION: {}", pg_version);
|
||||
tracing::debug!("find_end_of_wal PG_VERSION: {}", pg_version);
|
||||
|
||||
let mut decoder = WalStreamDecoder::new(start_lsn, pg_version);
|
||||
|
||||
@@ -247,7 +245,7 @@ pub fn find_end_of_wal(
|
||||
match open_wal_segment(&seg_file_path)? {
|
||||
None => {
|
||||
// no more segments
|
||||
debug!(
|
||||
tracing::debug!(
|
||||
"find_end_of_wal reached end at {:?}, segment {:?} doesn't exist",
|
||||
result, seg_file_path
|
||||
);
|
||||
@@ -260,7 +258,7 @@ pub fn find_end_of_wal(
|
||||
while curr_lsn.segment_number(wal_seg_size) == segno {
|
||||
let bytes_read = segment.read(&mut buf)?;
|
||||
if bytes_read == 0 {
|
||||
debug!(
|
||||
tracing::debug!(
|
||||
"find_end_of_wal reached end at {:?}, EOF in segment {:?} at offset {}",
|
||||
result,
|
||||
seg_file_path,
|
||||
@@ -276,7 +274,7 @@ pub fn find_end_of_wal(
|
||||
match decoder.poll_decode() {
|
||||
Ok(Some(record)) => result = record.0,
|
||||
Err(e) => {
|
||||
debug!(
|
||||
tracing::debug!(
|
||||
"find_end_of_wal reached end at {:?}, decode error: {:?}",
|
||||
result, e
|
||||
);
|
||||
|
||||
@@ -9,7 +9,7 @@ use postgres_protocol2::message::backend::{ErrorFields, ErrorResponseBody};
|
||||
pub use self::sqlstate::*;
|
||||
|
||||
#[allow(clippy::unreadable_literal)]
|
||||
mod sqlstate;
|
||||
pub mod sqlstate;
|
||||
|
||||
/// The severity of a Postgres error or notice.
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||
|
||||
@@ -19,7 +19,8 @@ camino = { workspace = true, features = ["serde1"] }
|
||||
humantime-serde.workspace = true
|
||||
hyper = { workspace = true, features = ["client"] }
|
||||
futures.workspace = true
|
||||
reqwest.workspace = true
|
||||
reqwest = { workspace = true, features = ["multipart", "stream"] }
|
||||
chrono = { version = "0.4", default-features = false, features = ["clock"] }
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tokio = { workspace = true, features = ["sync", "fs", "io-util"] }
|
||||
@@ -41,6 +42,10 @@ http-types.workspace = true
|
||||
http-body-util.workspace = true
|
||||
itertools.workspace = true
|
||||
sync_wrapper = { workspace = true, features = ["futures"] }
|
||||
gcp_auth = "0.12.3"
|
||||
url.workspace = true
|
||||
http.workspace = true
|
||||
uuid.workspace = true
|
||||
|
||||
byteorder = "1.4"
|
||||
rand.workspace = true
|
||||
|
||||
@@ -41,6 +41,7 @@ impl RemoteStorageKind {
|
||||
RemoteStorageKind::LocalFs { .. } => None,
|
||||
RemoteStorageKind::AwsS3(config) => Some(&config.bucket_name),
|
||||
RemoteStorageKind::AzureContainer(config) => Some(&config.container_name),
|
||||
RemoteStorageKind::GCS(config) => Some(&config.bucket_name),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -51,6 +52,7 @@ impl RemoteStorageConfig {
|
||||
match &self.storage {
|
||||
RemoteStorageKind::LocalFs { .. } => DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT,
|
||||
RemoteStorageKind::AwsS3(c) => c.concurrency_limit.into(),
|
||||
RemoteStorageKind::GCS(c) => c.concurrency_limit.into(),
|
||||
RemoteStorageKind::AzureContainer(c) => c.concurrency_limit.into(),
|
||||
}
|
||||
}
|
||||
@@ -85,6 +87,9 @@ pub enum RemoteStorageKind {
|
||||
/// Azure Blob based storage, storing all files in the container
|
||||
/// specified by the config
|
||||
AzureContainer(AzureConfig),
|
||||
/// Google Cloud based storage, storing all files in the GCS bucket
|
||||
/// specified by the config
|
||||
GCS(GCSConfig),
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
@@ -176,6 +181,32 @@ impl Debug for S3Config {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Eq, Deserialize, Serialize)]
|
||||
pub struct GCSConfig {
|
||||
/// Name of the bucket to connect to.
|
||||
pub bucket_name: String,
|
||||
/// A "subfolder" in the bucket, to use the same bucket separately by multiple remote storage users at once.
|
||||
pub prefix_in_bucket: Option<String>,
|
||||
#[serde(default = "default_remote_storage_s3_concurrency_limit")]
|
||||
pub concurrency_limit: NonZeroUsize,
|
||||
#[serde(default = "default_max_keys_per_list_response")]
|
||||
pub max_keys_per_list_response: Option<i32>,
|
||||
}
|
||||
|
||||
impl Debug for GCSConfig {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("GCSConfig")
|
||||
.field("bucket_name", &self.bucket_name)
|
||||
.field("prefix_in_bucket", &self.prefix_in_bucket)
|
||||
.field("concurrency_limit", &self.concurrency_limit)
|
||||
.field(
|
||||
"max_keys_per_list_response",
|
||||
&self.max_keys_per_list_response,
|
||||
)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// Azure bucket coordinates and access credentials to manage the bucket contents (read and write).
|
||||
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct AzureConfig {
|
||||
@@ -304,6 +335,30 @@ timeout = '5s'";
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_gcs_parsing() {
|
||||
let toml = "\
|
||||
bucket_name = 'foo-bar'
|
||||
prefix_in_bucket = '/pageserver'
|
||||
";
|
||||
|
||||
let config = parse(toml).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
config,
|
||||
RemoteStorageConfig {
|
||||
storage: RemoteStorageKind::GCS(GCSConfig {
|
||||
bucket_name: "foo-bar".into(),
|
||||
prefix_in_bucket: Some("pageserver/".into()),
|
||||
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
|
||||
concurrency_limit: std::num::NonZero::new(100).unwrap(),
|
||||
}),
|
||||
timeout: Duration::from_secs(120),
|
||||
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_s3_parsing() {
|
||||
let toml = "\
|
||||
|
||||
1434
libs/remote_storage/src/gcs_bucket.rs
Normal file
1434
libs/remote_storage/src/gcs_bucket.rs
Normal file
File diff suppressed because it is too large
Load Diff
@@ -12,6 +12,7 @@
|
||||
mod azure_blob;
|
||||
mod config;
|
||||
mod error;
|
||||
mod gcs_bucket;
|
||||
mod local_fs;
|
||||
mod metrics;
|
||||
mod s3_bucket;
|
||||
@@ -43,10 +44,11 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
|
||||
pub use self::azure_blob::AzureBlobStorage;
|
||||
pub use self::gcs_bucket::GCSBucket;
|
||||
pub use self::local_fs::LocalFs;
|
||||
pub use self::s3_bucket::S3Bucket;
|
||||
pub use self::simulate_failures::UnreliableWrapper;
|
||||
pub use crate::config::{AzureConfig, RemoteStorageConfig, RemoteStorageKind, S3Config};
|
||||
pub use crate::config::{AzureConfig, GCSConfig, RemoteStorageConfig, RemoteStorageKind, S3Config};
|
||||
|
||||
/// Default concurrency limit for S3 operations
|
||||
///
|
||||
@@ -81,8 +83,12 @@ pub const MAX_KEYS_PER_DELETE_S3: usize = 1000;
|
||||
/// <https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch>
|
||||
pub const MAX_KEYS_PER_DELETE_AZURE: usize = 256;
|
||||
|
||||
pub const MAX_KEYS_PER_DELETE_GCS: usize = 1000;
|
||||
|
||||
const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
|
||||
|
||||
const GCS_SCOPES: &[&str] = &["https://www.googleapis.com/auth/cloud-platform"];
|
||||
|
||||
/// Path on the remote storage, relative to some inner prefix.
|
||||
/// The prefix is an implementation detail, that allows representing local paths
|
||||
/// as the remote ones, stripping the local storage prefix away.
|
||||
@@ -182,6 +188,7 @@ pub struct VersionListing {
|
||||
pub versions: Vec<Version>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Version {
|
||||
pub key: RemotePath,
|
||||
pub last_modified: SystemTime,
|
||||
@@ -203,6 +210,47 @@ pub enum VersionKind {
|
||||
Version(VersionId),
|
||||
}
|
||||
|
||||
// I was going to do an `enum GenericVersion` but this feels cleaner.
|
||||
#[derive(Default)]
|
||||
pub struct GCSVersionListing {
|
||||
pub versions: Vec<GCSVersion>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct GCSVersion {
|
||||
pub key: RemotePath,
|
||||
pub last_modified: SystemTime,
|
||||
pub id: VersionId,
|
||||
pub time_deleted: Option<SystemTime>,
|
||||
}
|
||||
|
||||
impl From<GCSVersionListing> for VersionListing {
|
||||
fn from(gcs_listing: GCSVersionListing) -> Self {
|
||||
let version_listing = gcs_listing
|
||||
.versions
|
||||
.into_iter()
|
||||
.map(
|
||||
|GCSVersion {
|
||||
key,
|
||||
last_modified,
|
||||
id,
|
||||
..
|
||||
}| {
|
||||
Version {
|
||||
key,
|
||||
last_modified,
|
||||
kind: VersionKind::Version(VersionId(id.0)),
|
||||
}
|
||||
},
|
||||
)
|
||||
.collect::<Vec<Version>>();
|
||||
|
||||
VersionListing {
|
||||
versions: version_listing,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Options for downloads. The default value is a plain GET.
|
||||
pub struct DownloadOpts {
|
||||
/// If given, returns [`DownloadError::Unmodified`] if the object still has
|
||||
@@ -481,6 +529,7 @@ pub enum GenericRemoteStorage<Other: Clone = Arc<UnreliableWrapper>> {
|
||||
AwsS3(Arc<S3Bucket>),
|
||||
AzureBlob(Arc<AzureBlobStorage>),
|
||||
Unreliable(Other),
|
||||
GCS(Arc<GCSBucket>),
|
||||
}
|
||||
|
||||
impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
@@ -497,6 +546,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
Self::AwsS3(s) => s.list(prefix, mode, max_keys, cancel).await,
|
||||
Self::AzureBlob(s) => s.list(prefix, mode, max_keys, cancel).await,
|
||||
Self::Unreliable(s) => s.list(prefix, mode, max_keys, cancel).await,
|
||||
Self::GCS(s) => s.list(prefix, mode, max_keys, cancel).await,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -514,6 +564,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
Self::AwsS3(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
|
||||
Self::AzureBlob(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
|
||||
Self::Unreliable(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
|
||||
Self::GCS(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -530,6 +581,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
Self::AwsS3(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
|
||||
Self::AzureBlob(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
|
||||
Self::Unreliable(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
|
||||
Self::GCS(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -544,6 +596,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
Self::AwsS3(s) => s.head_object(key, cancel).await,
|
||||
Self::AzureBlob(s) => s.head_object(key, cancel).await,
|
||||
Self::Unreliable(s) => s.head_object(key, cancel).await,
|
||||
Self::GCS(s) => s.head_object(key, cancel).await,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -561,6 +614,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
|
||||
Self::AzureBlob(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
|
||||
Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
|
||||
Self::GCS(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -576,6 +630,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
Self::AwsS3(s) => s.download(from, opts, cancel).await,
|
||||
Self::AzureBlob(s) => s.download(from, opts, cancel).await,
|
||||
Self::Unreliable(s) => s.download(from, opts, cancel).await,
|
||||
Self::GCS(s) => s.download(from, opts, cancel).await,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -590,6 +645,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
Self::AwsS3(s) => s.delete(path, cancel).await,
|
||||
Self::AzureBlob(s) => s.delete(path, cancel).await,
|
||||
Self::Unreliable(s) => s.delete(path, cancel).await,
|
||||
Self::GCS(s) => s.delete(path, cancel).await,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -604,6 +660,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
Self::AwsS3(s) => s.delete_objects(paths, cancel).await,
|
||||
Self::AzureBlob(s) => s.delete_objects(paths, cancel).await,
|
||||
Self::Unreliable(s) => s.delete_objects(paths, cancel).await,
|
||||
Self::GCS(s) => s.delete_objects(paths, cancel).await,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -614,6 +671,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
Self::AwsS3(s) => s.max_keys_per_delete(),
|
||||
Self::AzureBlob(s) => s.max_keys_per_delete(),
|
||||
Self::Unreliable(s) => s.max_keys_per_delete(),
|
||||
Self::GCS(s) => s.max_keys_per_delete(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -628,6 +686,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
Self::AwsS3(s) => s.delete_prefix(prefix, cancel).await,
|
||||
Self::AzureBlob(s) => s.delete_prefix(prefix, cancel).await,
|
||||
Self::Unreliable(s) => s.delete_prefix(prefix, cancel).await,
|
||||
Self::GCS(s) => s.delete_prefix(prefix, cancel).await,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -643,6 +702,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
Self::AwsS3(s) => s.copy(from, to, cancel).await,
|
||||
Self::AzureBlob(s) => s.copy(from, to, cancel).await,
|
||||
Self::Unreliable(s) => s.copy(from, to, cancel).await,
|
||||
Self::GCS(s) => s.copy(from, to, cancel).await,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -672,6 +732,10 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
|
||||
.await
|
||||
}
|
||||
Self::GCS(s) => {
|
||||
s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -687,11 +751,18 @@ impl GenericRemoteStorage {
|
||||
}
|
||||
|
||||
pub async fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
|
||||
info!("RemoteStorageConfig: {:?}", storage_config);
|
||||
|
||||
let timeout = storage_config.timeout;
|
||||
|
||||
// If somkeone overrides timeout to be small without adjusting small_timeout, then adjust it automatically
|
||||
// If someone overrides timeout to be small without adjusting small_timeout, then adjust it automatically
|
||||
let small_timeout = std::cmp::min(storage_config.small_timeout, timeout);
|
||||
|
||||
info!(
|
||||
"RemoteStorageConfig's storage attribute: {:?}",
|
||||
storage_config.storage
|
||||
);
|
||||
|
||||
Ok(match &storage_config.storage {
|
||||
RemoteStorageKind::LocalFs { local_path: path } => {
|
||||
info!("Using fs root '{path}' as a remote storage");
|
||||
@@ -729,6 +800,16 @@ impl GenericRemoteStorage {
|
||||
small_timeout,
|
||||
)?))
|
||||
}
|
||||
RemoteStorageKind::GCS(gcs_config) => {
|
||||
let google_application_credentials =
|
||||
std::env::var("GOOGLE_APPLICATION_CREDENTIALS")
|
||||
.unwrap_or_else(|_| "<none>".into());
|
||||
info!(
|
||||
"Using gcs bucket '{}' as a remote storage, prefix in bucket: '{:?}', GOOGLE_APPLICATION_CREDENTIALS: {google_application_credentials }",
|
||||
gcs_config.bucket_name, gcs_config.prefix_in_bucket
|
||||
);
|
||||
Self::GCS(Arc::new(GCSBucket::new(gcs_config, timeout).await?))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -764,6 +845,7 @@ impl GenericRemoteStorage {
|
||||
Self::AwsS3(s) => Some(s.bucket_name()),
|
||||
Self::AzureBlob(s) => Some(s.container_name()),
|
||||
Self::Unreliable(_s) => None,
|
||||
Self::GCS(s) => Some(s.bucket_name()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -63,6 +63,7 @@ impl UnreliableWrapper {
|
||||
GenericRemoteStorage::Unreliable(_s) => {
|
||||
panic!("Can't wrap unreliable wrapper unreliably")
|
||||
}
|
||||
GenericRemoteStorage::GCS(s) => GenericRemoteStorage::GCS(s),
|
||||
};
|
||||
let actual_attempt_failure_probability = cmp::min(attempt_failure_probability, 100);
|
||||
UnreliableWrapper {
|
||||
|
||||
255
libs/remote_storage/tests/test_real_gcs.rs
Normal file
255
libs/remote_storage/tests/test_real_gcs.rs
Normal file
@@ -0,0 +1,255 @@
|
||||
#![allow(dead_code)]
|
||||
#![allow(unused)]
|
||||
|
||||
mod common;
|
||||
|
||||
use crate::common::{download_to_vec, upload_stream};
|
||||
use anyhow::Context;
|
||||
use camino::Utf8Path;
|
||||
use futures::StreamExt;
|
||||
use futures::stream::Stream;
|
||||
use remote_storage::{
|
||||
DownloadKind, DownloadOpts, GCSConfig, GenericRemoteStorage, ListingMode, RemotePath,
|
||||
RemoteStorageConfig, RemoteStorageKind, StorageMetadata,
|
||||
};
|
||||
use std::collections::HashMap;
|
||||
#[path = "common/tests.rs"]
|
||||
use std::collections::HashSet;
|
||||
use std::fmt::{Debug, Display};
|
||||
use std::io::Cursor;
|
||||
use std::ops::Bound;
|
||||
use std::pin::pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
use test_context::{AsyncTestContext, test_context};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::backoff;
|
||||
|
||||
// A minimal working GCS client I can pass around in async context
|
||||
|
||||
const BASE_PREFIX: &str = "test";
|
||||
|
||||
async fn create_gcs_client() -> anyhow::Result<Arc<GenericRemoteStorage>> {
|
||||
let bucket_name = std::env::var("GCS_TEST_BUCKET").expect("GCS_TEST_BUCKET must be set");
|
||||
let gcs_config = GCSConfig {
|
||||
bucket_name,
|
||||
prefix_in_bucket: Some("testing-path/".into()),
|
||||
max_keys_per_list_response: Some(100),
|
||||
concurrency_limit: std::num::NonZero::new(100).unwrap(),
|
||||
};
|
||||
|
||||
let remote_storage_config = RemoteStorageConfig {
|
||||
storage: RemoteStorageKind::GCS(gcs_config),
|
||||
timeout: Duration::from_secs(120),
|
||||
small_timeout: std::time::Duration::from_secs(120),
|
||||
};
|
||||
Ok(Arc::new(
|
||||
GenericRemoteStorage::from_config(&remote_storage_config)
|
||||
.await
|
||||
.context("remote storage init")?,
|
||||
))
|
||||
}
|
||||
|
||||
struct EnabledGCS {
|
||||
client: Arc<GenericRemoteStorage>,
|
||||
base_prefix: &'static str,
|
||||
}
|
||||
|
||||
impl EnabledGCS {
|
||||
async fn setup() -> Self {
|
||||
let client = create_gcs_client()
|
||||
.await
|
||||
.context("gcs client creation")
|
||||
.expect("gcs client creation failed");
|
||||
EnabledGCS {
|
||||
client,
|
||||
base_prefix: BASE_PREFIX,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncTestContext for EnabledGCS {
|
||||
async fn setup() -> Self {
|
||||
Self::setup().await
|
||||
}
|
||||
}
|
||||
|
||||
#[test_context(EnabledGCS)]
|
||||
#[tokio::test]
|
||||
async fn gcs_test_suite(ctx: &mut EnabledGCS) -> anyhow::Result<()> {
|
||||
// ------------------------------------------------
|
||||
// --- `time_travel_recover`, showcasing `upload`, `delete_objects`, `copy`
|
||||
// ------------------------------------------------
|
||||
|
||||
// Our test depends on discrepancies in the clock between S3 and the environment the tests
|
||||
// run in. Therefore, wait a little bit before and after. The alternative would be
|
||||
// to take the time from S3 response headers.
|
||||
const WAIT_TIME: Duration = Duration::from_millis(3_000);
|
||||
|
||||
async fn retry<T, O, F, E>(op: O) -> Result<T, E>
|
||||
where
|
||||
E: Display + Debug + 'static,
|
||||
O: FnMut() -> F,
|
||||
F: Future<Output = Result<T, E>>,
|
||||
{
|
||||
let warn_threshold = 3;
|
||||
let max_retries = 10;
|
||||
backoff::retry(
|
||||
op,
|
||||
|_e| false,
|
||||
warn_threshold,
|
||||
max_retries,
|
||||
"test retry",
|
||||
&CancellationToken::new(),
|
||||
)
|
||||
.await
|
||||
.expect("never cancelled")
|
||||
}
|
||||
|
||||
async fn time_point() -> SystemTime {
|
||||
tokio::time::sleep(WAIT_TIME).await;
|
||||
let ret = SystemTime::now();
|
||||
tokio::time::sleep(WAIT_TIME).await;
|
||||
ret
|
||||
}
|
||||
|
||||
async fn list_files(
|
||||
client: &Arc<GenericRemoteStorage>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<HashSet<RemotePath>> {
|
||||
Ok(
|
||||
retry(|| client.list(None, ListingMode::NoDelimiter, None, cancel))
|
||||
.await
|
||||
.context("list root files failure")?
|
||||
.keys
|
||||
.into_iter()
|
||||
.map(|o| o.key)
|
||||
.collect::<HashSet<_>>(),
|
||||
)
|
||||
}
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
|
||||
.with_context(|| "RemotePath conversion")?;
|
||||
|
||||
let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
|
||||
.with_context(|| "RemotePath conversion")?;
|
||||
|
||||
let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
|
||||
.with_context(|| "RemotePath conversion")?;
|
||||
|
||||
// ---------------- t0 ---------------
|
||||
// Upload 'path1'
|
||||
retry(|| {
|
||||
let (data, len) = upload_stream("remote blob data1".as_bytes().into());
|
||||
ctx.client.upload(data, len, &path1, None, &cancel)
|
||||
})
|
||||
.await?;
|
||||
let t0_files = list_files(&ctx.client, &cancel).await?;
|
||||
let t0 = time_point().await;
|
||||
|
||||
// Show 'path1'
|
||||
println!("at t0: {t0_files:?}");
|
||||
|
||||
// Upload 'path2'
|
||||
let old_data = "remote blob data2";
|
||||
retry(|| {
|
||||
let (data, len) = upload_stream(old_data.as_bytes().into());
|
||||
ctx.client.upload(data, len, &path2, None, &cancel)
|
||||
})
|
||||
.await?;
|
||||
|
||||
// ---------------- t1 ---------------
|
||||
// Show 'path1' and 'path2'
|
||||
let t1_files = list_files(&ctx.client, &cancel).await?;
|
||||
let t1 = time_point().await;
|
||||
println!("at t1: {t1_files:?}");
|
||||
|
||||
{
|
||||
let opts = DownloadOpts::default();
|
||||
let dl = retry(|| ctx.client.download(&path2, &opts, &cancel)).await?;
|
||||
let last_modified = dl.last_modified;
|
||||
let half_wt = WAIT_TIME.mul_f32(0.5);
|
||||
let t0_hwt = t0 + half_wt;
|
||||
let t1_hwt = t1 - half_wt;
|
||||
if !(t0_hwt..=t1_hwt).contains(&last_modified) {
|
||||
panic!(
|
||||
"last_modified={last_modified:?} is not between t0_hwt={t0_hwt:?} and t1_hwt={t1_hwt:?}. \
|
||||
This likely means a large lock discrepancy between S3 and the local clock."
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Upload 'path3'
|
||||
retry(|| {
|
||||
let (data, len) = upload_stream("remote blob data3".as_bytes().into());
|
||||
ctx.client.upload(data, len, &path3, None, &cancel)
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Overwrite 'path2'
|
||||
let new_data = "new remote blob data2";
|
||||
retry(|| {
|
||||
let (data, len) = upload_stream(new_data.as_bytes().into());
|
||||
ctx.client.upload(data, len, &path2, None, &cancel)
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Delete 'path1'
|
||||
retry(|| ctx.client.delete(&path1, &cancel)).await?;
|
||||
|
||||
// Show 'path2' and `path3`
|
||||
let t2_files = list_files(&ctx.client, &cancel).await?;
|
||||
let t2 = time_point().await;
|
||||
println!("at t2: {t2_files:?}");
|
||||
|
||||
// No changes after recovery to t2 (no-op)
|
||||
let t_final = time_point().await;
|
||||
ctx.client
|
||||
.time_travel_recover(None, t2, t_final, &cancel, None)
|
||||
.await?;
|
||||
let t2_files_recovered = list_files(&ctx.client, &cancel).await?;
|
||||
println!("after recovery to t2: {t2_files_recovered:?}");
|
||||
|
||||
assert_eq!(t2_files, t2_files_recovered);
|
||||
let path2_recovered_t2 = download_to_vec(
|
||||
ctx.client
|
||||
.download(&path2, &DownloadOpts::default(), &cancel)
|
||||
.await?,
|
||||
)
|
||||
.await?;
|
||||
assert_eq!(path2_recovered_t2, new_data.as_bytes());
|
||||
|
||||
// after recovery to t1: path1 is back, path2 has the old content
|
||||
let t_final = time_point().await;
|
||||
ctx.client
|
||||
.time_travel_recover(None, t1, t_final, &cancel, None)
|
||||
.await?;
|
||||
let t1_files_recovered = list_files(&ctx.client, &cancel).await?;
|
||||
println!("after recovery to t1: {t1_files_recovered:?}");
|
||||
assert_eq!(t1_files, t1_files_recovered);
|
||||
let path2_recovered_t1 = download_to_vec(
|
||||
ctx.client
|
||||
.download(&path2, &DownloadOpts::default(), &cancel)
|
||||
.await?,
|
||||
)
|
||||
.await?;
|
||||
assert_eq!(path2_recovered_t1, old_data.as_bytes());
|
||||
|
||||
// after recovery to t0: everything is gone except for path1
|
||||
let t_final = time_point().await;
|
||||
ctx.client
|
||||
.time_travel_recover(None, t0, t_final, &cancel, None)
|
||||
.await?;
|
||||
let t0_files_recovered = list_files(&ctx.client, &cancel).await?;
|
||||
println!("after recovery to t0: {t0_files_recovered:?}");
|
||||
assert_eq!(t0_files, t0_files_recovered);
|
||||
|
||||
// cleanup
|
||||
let paths = &[path1, path2, path3];
|
||||
retry(|| ctx.client.delete_objects(paths, &cancel)).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -49,7 +49,7 @@ impl PerfSpan {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn enter(&self) -> PerfSpanEntered {
|
||||
pub fn enter(&self) -> PerfSpanEntered<'_> {
|
||||
if let Some(ref id) = self.inner.id() {
|
||||
self.dispatch.enter(id);
|
||||
}
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user