mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 21:20:37 +00:00
Compare commits
1 Commits
alexk/comp
...
jcsp/storc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1d18b74324 |
@@ -348,10 +348,6 @@ jobs:
|
||||
rerun_failed: true
|
||||
pg_version: ${{ matrix.pg_version }}
|
||||
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
|
||||
# `--session-timeout` is equal to (timeout-minutes - 10 minutes) * 60 seconds.
|
||||
# Attempt to stop tests gracefully to generate test reports
|
||||
# until they are forcibly stopped by the stricter `timeout-minutes` limit.
|
||||
extra_params: --session-timeout=${{ inputs.sanitizers != 'enabled' && 3000 || 10200 }}
|
||||
env:
|
||||
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
|
||||
CHECK_ONDISK_DATA_COMPATIBILITY: nonempty
|
||||
|
||||
@@ -1,76 +0,0 @@
|
||||
name: Force Test Upgrading of Extension
|
||||
on:
|
||||
schedule:
|
||||
# * is a special character in YAML so you have to quote this string
|
||||
# ┌───────────── minute (0 - 59)
|
||||
# │ ┌───────────── hour (0 - 23)
|
||||
# │ │ ┌───────────── day of the month (1 - 31)
|
||||
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
|
||||
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
|
||||
- cron: '45 2 * * *' # run once a day, timezone is utc
|
||||
workflow_dispatch: # adds ability to run this manually
|
||||
|
||||
defaults:
|
||||
run:
|
||||
shell: bash -euxo pipefail {0}
|
||||
|
||||
concurrency:
|
||||
# Allow only one workflow
|
||||
group: ${{ github.workflow }}
|
||||
cancel-in-progress: true
|
||||
|
||||
permissions:
|
||||
id-token: write # aws-actions/configure-aws-credentials
|
||||
statuses: write
|
||||
contents: read
|
||||
|
||||
jobs:
|
||||
regress:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
pg-version: [16, 17]
|
||||
|
||||
runs-on: small
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
submodules: false
|
||||
|
||||
- name: Get the last compute release tag
|
||||
id: get-last-compute-release-tag
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
run: |
|
||||
tag=$(gh api -q '[.[].tag_name | select(startswith("release-compute"))][0]'\
|
||||
-H "Accept: application/vnd.github+json" \
|
||||
-H "X-GitHub-Api-Version: 2022-11-28" \
|
||||
"/repos/${GITHUB_REPOSITORY}/releases")
|
||||
echo tag=${tag} >> ${GITHUB_OUTPUT}
|
||||
|
||||
- name: Test extension upgrade
|
||||
timeout-minutes: 20
|
||||
env:
|
||||
NEWTAG: latest
|
||||
OLDTAG: ${{ steps.get-last-compute-release-tag.outputs.tag }}
|
||||
PG_VERSION: ${{ matrix.pg-version }}
|
||||
FORCE_ALL_UPGRADE_TESTS: true
|
||||
run: ./docker-compose/test_extensions_upgrade.sh
|
||||
|
||||
- name: Print logs and clean up
|
||||
if: always()
|
||||
run: |
|
||||
docker compose --profile test-extensions -f ./docker-compose/docker-compose.yml logs || true
|
||||
docker compose --profile test-extensions -f ./docker-compose/docker-compose.yml down
|
||||
|
||||
- name: Post to the Slack channel
|
||||
if: ${{ github.event.schedule && failure() }}
|
||||
uses: slackapi/slack-github-action@v1
|
||||
with:
|
||||
channel-id: ${{ vars.SLACK_ON_CALL_QA_STAGING_STREAM }}
|
||||
slack-message: |
|
||||
Test upgrading of extensions: ${{ job.status }}
|
||||
<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
|
||||
env:
|
||||
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
41
.github/workflows/regenerate-pg-setting.yml
vendored
41
.github/workflows/regenerate-pg-setting.yml
vendored
@@ -1,41 +0,0 @@
|
||||
name: Regenerate Postgres Settings
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
types:
|
||||
- opened
|
||||
- synchronize
|
||||
- reopened
|
||||
paths:
|
||||
- pgxn/neon/**.c
|
||||
- vendor/postgres-v*
|
||||
- vendor/revisions.json
|
||||
|
||||
concurrency:
|
||||
group: ${{ github.workflow }}-${{ github.head_ref }}
|
||||
cancel-in-progress: true
|
||||
|
||||
permissions:
|
||||
pull-requests: write
|
||||
|
||||
jobs:
|
||||
regenerate-pg-settings:
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
steps:
|
||||
- name: Add comment
|
||||
uses: thollander/actions-comment-pull-request@v3
|
||||
with:
|
||||
comment-tag: ${{ github.job }}
|
||||
pr-number: ${{ github.event.number }}
|
||||
message: |
|
||||
If this PR added a GUC in the Postgres fork or `neon` extension,
|
||||
please regenerate the Postgres settings in the `cloud` repo:
|
||||
|
||||
```
|
||||
make NEON_WORKDIR=path/to/neon/checkout \
|
||||
-C goapp/internal/shareddomain/postgres generate
|
||||
```
|
||||
|
||||
If you're an external contributor, a Neon employee will assist in
|
||||
making sure this step is done.
|
||||
100
Cargo.lock
generated
100
Cargo.lock
generated
@@ -786,7 +786,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "azure_core"
|
||||
version = "0.21.0"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#f64bd57262ced51afce5d8909c06dcb11a6dd85a"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#c36ed4c039bb3d59b5a1705f2cc337636c73b541"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"base64 0.22.1",
|
||||
@@ -815,7 +815,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "azure_identity"
|
||||
version = "0.21.0"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#f64bd57262ced51afce5d8909c06dcb11a6dd85a"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#c36ed4c039bb3d59b5a1705f2cc337636c73b541"
|
||||
dependencies = [
|
||||
"async-lock",
|
||||
"async-trait",
|
||||
@@ -834,7 +834,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "azure_storage"
|
||||
version = "0.21.0"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#f64bd57262ced51afce5d8909c06dcb11a6dd85a"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#c36ed4c039bb3d59b5a1705f2cc337636c73b541"
|
||||
dependencies = [
|
||||
"RustyXML",
|
||||
"async-lock",
|
||||
@@ -852,7 +852,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "azure_storage_blobs"
|
||||
version = "0.21.0"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#f64bd57262ced51afce5d8909c06dcb11a6dd85a"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#c36ed4c039bb3d59b5a1705f2cc337636c73b541"
|
||||
dependencies = [
|
||||
"RustyXML",
|
||||
"azure_core",
|
||||
@@ -872,7 +872,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "azure_svc_blobstorage"
|
||||
version = "0.21.0"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#f64bd57262ced51afce5d8909c06dcb11a6dd85a"
|
||||
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#c36ed4c039bb3d59b5a1705f2cc337636c73b541"
|
||||
dependencies = [
|
||||
"azure_core",
|
||||
"bytes",
|
||||
@@ -1029,6 +1029,12 @@ dependencies = [
|
||||
"generic-array",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "boxcar"
|
||||
version = "0.2.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2721c3c5a6f0e7f7e607125d963fedeb765f545f67adc9d71ed934693881eb42"
|
||||
|
||||
[[package]]
|
||||
name = "bstr"
|
||||
version = "1.5.0"
|
||||
@@ -1303,7 +1309,6 @@ dependencies = [
|
||||
"aws-config",
|
||||
"aws-sdk-kms",
|
||||
"aws-sdk-s3",
|
||||
"aws-smithy-types",
|
||||
"axum",
|
||||
"base64 0.13.1",
|
||||
"bytes",
|
||||
@@ -1352,7 +1357,6 @@ dependencies = [
|
||||
"utils",
|
||||
"uuid",
|
||||
"vm_monitor",
|
||||
"walkdir",
|
||||
"workspace_hack",
|
||||
"zstd",
|
||||
]
|
||||
@@ -2394,9 +2398,9 @@ checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
|
||||
|
||||
[[package]]
|
||||
name = "futures-timer"
|
||||
version = "3.0.2"
|
||||
version = "3.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
|
||||
checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24"
|
||||
|
||||
[[package]]
|
||||
name = "futures-util"
|
||||
@@ -2499,6 +2503,27 @@ version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
|
||||
|
||||
[[package]]
|
||||
name = "governor"
|
||||
version = "0.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "842dc78579ce01e6a1576ad896edc92fca002dd60c9c3746b7fc2bec6fb429d0"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"dashmap 6.1.0",
|
||||
"futures-sink",
|
||||
"futures-timer",
|
||||
"futures-util",
|
||||
"no-std-compat",
|
||||
"nonzero_ext",
|
||||
"parking_lot 0.12.1",
|
||||
"portable-atomic",
|
||||
"quanta",
|
||||
"rand 0.8.5",
|
||||
"smallvec",
|
||||
"spinning_top",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "group"
|
||||
version = "0.12.1"
|
||||
@@ -3698,6 +3723,12 @@ dependencies = [
|
||||
"memoffset 0.9.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "no-std-compat"
|
||||
version = "0.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c"
|
||||
|
||||
[[package]]
|
||||
name = "nom"
|
||||
version = "7.1.3"
|
||||
@@ -3708,6 +3739,12 @@ dependencies = [
|
||||
"minimal-lexical",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nonzero_ext"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
|
||||
|
||||
[[package]]
|
||||
name = "notify"
|
||||
version = "8.0.0"
|
||||
@@ -4566,6 +4603,12 @@ dependencies = [
|
||||
"never-say-never",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "portable-atomic"
|
||||
version = "1.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "280dc24453071f1b63954171985a0b0d30058d287960968b9b2aca264c8d4ee6"
|
||||
|
||||
[[package]]
|
||||
name = "postgres"
|
||||
version = "0.19.7"
|
||||
@@ -4925,6 +4968,7 @@ dependencies = [
|
||||
"aws-sdk-iam",
|
||||
"aws-sigv4",
|
||||
"base64 0.13.1",
|
||||
"boxcar",
|
||||
"bstr",
|
||||
"bytes",
|
||||
"camino",
|
||||
@@ -4976,6 +5020,7 @@ dependencies = [
|
||||
"postgres-protocol2",
|
||||
"postgres_backend",
|
||||
"pq_proto",
|
||||
"prometheus",
|
||||
"rand 0.8.5",
|
||||
"rand_distr",
|
||||
"rcgen",
|
||||
@@ -5000,6 +5045,7 @@ dependencies = [
|
||||
"smallvec",
|
||||
"smol_str",
|
||||
"socket2",
|
||||
"strum",
|
||||
"strum_macros",
|
||||
"subtle",
|
||||
"thiserror 1.0.69",
|
||||
@@ -5014,6 +5060,7 @@ dependencies = [
|
||||
"tracing",
|
||||
"tracing-log",
|
||||
"tracing-opentelemetry",
|
||||
"tracing-serde",
|
||||
"tracing-subscriber",
|
||||
"tracing-utils",
|
||||
"try-lock",
|
||||
@@ -5028,6 +5075,21 @@ dependencies = [
|
||||
"zerocopy",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quanta"
|
||||
version = "0.12.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3bd1fe6824cea6538803de3ff1bc0cf3949024db3d43c9643024bfb33a807c0e"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
"libc",
|
||||
"once_cell",
|
||||
"raw-cpuid",
|
||||
"wasi 0.11.0+wasi-snapshot-preview1",
|
||||
"web-sys",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quick-xml"
|
||||
version = "0.26.0"
|
||||
@@ -5158,6 +5220,15 @@ dependencies = [
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "raw-cpuid"
|
||||
version = "11.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c6928fa44c097620b706542d428957635951bade7143269085389d42c8a4927e"
|
||||
dependencies = [
|
||||
"bitflags 2.8.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rayon"
|
||||
version = "1.7.0"
|
||||
@@ -6366,6 +6437,15 @@ version = "0.9.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
|
||||
|
||||
[[package]]
|
||||
name = "spinning_top"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300"
|
||||
dependencies = [
|
||||
"lock_api",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "spki"
|
||||
version = "0.6.0"
|
||||
@@ -6441,6 +6521,7 @@ dependencies = [
|
||||
"diesel_migrations",
|
||||
"fail",
|
||||
"futures",
|
||||
"governor",
|
||||
"hex",
|
||||
"http-utils",
|
||||
"humantime",
|
||||
@@ -6454,7 +6535,6 @@ dependencies = [
|
||||
"pageserver_client",
|
||||
"postgres_connection",
|
||||
"rand 0.8.5",
|
||||
"regex",
|
||||
"reqwest",
|
||||
"routerify",
|
||||
"rustls 0.23.18",
|
||||
|
||||
@@ -148,7 +148,7 @@ RUN case $DEBIAN_VERSION in \
|
||||
apt install --no-install-recommends --no-install-suggests -y \
|
||||
ninja-build git autoconf automake libtool build-essential bison flex libreadline-dev \
|
||||
zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget ca-certificates pkg-config libssl-dev \
|
||||
libicu-dev libxslt1-dev liblz4-dev libzstd-dev zstd curl unzip g++ \
|
||||
libicu-dev libxslt1-dev liblz4-dev libzstd-dev zstd curl unzip \
|
||||
$VERSION_INSTALLS \
|
||||
&& apt clean && rm -rf /var/lib/apt/lists/*
|
||||
|
||||
@@ -1464,31 +1464,6 @@ RUN make release -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make install -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_mooncake.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg-duckdb-pg-build"
|
||||
# compile pg_duckdb extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS pg_duckdb-src
|
||||
WORKDIR /ext-src
|
||||
COPY compute/patches/pg_duckdb_v031.patch .
|
||||
# pg_duckdb build requires source dir to be a git repo to get submodules
|
||||
# allow neon_superuser to execute some functions that in pg_duckdb are available to superuser only:
|
||||
# - extension management function duckdb.install_extension()
|
||||
# - access to duckdb.extensions table and its sequence
|
||||
RUN git clone --depth 1 --branch v0.3.1 https://github.com/duckdb/pg_duckdb.git pg_duckdb-src && \
|
||||
cd pg_duckdb-src && \
|
||||
git submodule update --init --recursive && \
|
||||
patch -p1 < /ext-src/pg_duckdb_v031.patch
|
||||
|
||||
FROM pg-build AS pg_duckdb-build
|
||||
ARG PG_VERSION
|
||||
COPY --from=pg_duckdb-src /ext-src/ /ext-src/
|
||||
WORKDIR /ext-src/pg_duckdb-src
|
||||
RUN make install -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_duckdb.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg_repack"
|
||||
@@ -1602,7 +1577,6 @@ COPY --from=pg_anon-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_ivm-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_partman-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_mooncake-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_duckdb-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg_repack-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
#########################################################################################
|
||||
@@ -1695,6 +1669,29 @@ RUN if [ "$TARGETARCH" = "amd64" ]; then\
|
||||
&& echo "${pgbouncer_exporter_sha256} pgbouncer_exporter" | sha256sum -c -\
|
||||
&& echo "${sql_exporter_sha256} sql_exporter" | sha256sum -c -
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "awscli"
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS awscli
|
||||
ARG TARGETARCH
|
||||
RUN set -ex; \
|
||||
if [ "${TARGETARCH}" = "amd64" ]; then \
|
||||
TARGETARCH_ALT="x86_64"; \
|
||||
CHECKSUM="c9a9df3770a3ff9259cb469b6179e02829687a464e0824d5c32d378820b53a00"; \
|
||||
elif [ "${TARGETARCH}" = "arm64" ]; then \
|
||||
TARGETARCH_ALT="aarch64"; \
|
||||
CHECKSUM="8181730be7891582b38b028112e81b4899ca817e8c616aad807c9e9d1289223a"; \
|
||||
else \
|
||||
echo "Unsupported architecture: ${TARGETARCH}"; exit 1; \
|
||||
fi; \
|
||||
curl --retry 5 -L "https://awscli.amazonaws.com/awscli-exe-linux-${TARGETARCH_ALT}-2.17.5.zip" -o /tmp/awscliv2.zip; \
|
||||
echo "${CHECKSUM} /tmp/awscliv2.zip" | sha256sum -c -; \
|
||||
unzip /tmp/awscliv2.zip -d /tmp/awscliv2; \
|
||||
/tmp/awscliv2/aws/install; \
|
||||
rm -rf /tmp/awscliv2.zip /tmp/awscliv2
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Clean up postgres folder before inclusion
|
||||
@@ -1864,6 +1861,9 @@ RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
|
||||
mkdir /usr/local/download_extensions && \
|
||||
chown -R postgres:postgres /usr/local/download_extensions
|
||||
|
||||
# aws cli is used by fast_import
|
||||
COPY --from=awscli /usr/local/aws-cli /usr/local/aws-cli
|
||||
|
||||
# pgbouncer and its config
|
||||
COPY --from=pgbouncer /usr/local/pgbouncer/bin/pgbouncer /usr/local/bin/pgbouncer
|
||||
COPY --chmod=0666 --chown=postgres compute/etc/pgbouncer.ini /etc/pgbouncer.ini
|
||||
|
||||
@@ -1,11 +0,0 @@
|
||||
diff --git a/sql/pg_duckdb--0.2.0--0.3.0.sql b/sql/pg_duckdb--0.2.0--0.3.0.sql
|
||||
index d777d76..af60106 100644
|
||||
--- a/sql/pg_duckdb--0.2.0--0.3.0.sql
|
||||
+++ b/sql/pg_duckdb--0.2.0--0.3.0.sql
|
||||
@@ -1056,3 +1056,6 @@ GRANT ALL ON FUNCTION duckdb.cache(TEXT, TEXT) TO PUBLIC;
|
||||
GRANT ALL ON FUNCTION duckdb.cache_info() TO PUBLIC;
|
||||
GRANT ALL ON FUNCTION duckdb.cache_delete(TEXT) TO PUBLIC;
|
||||
GRANT ALL ON PROCEDURE duckdb.recycle_ddb() TO PUBLIC;
|
||||
+GRANT ALL ON FUNCTION duckdb.install_extension(TEXT) TO neon_superuser;
|
||||
+GRANT ALL ON TABLE duckdb.extensions TO neon_superuser;
|
||||
+GRANT ALL ON SEQUENCE duckdb.extensions_table_seq TO neon_superuser;
|
||||
@@ -47,9 +47,7 @@ files:
|
||||
# Allow postgres user (which is what compute_ctl runs as) to run /neonvm/bin/resize-swap
|
||||
# and /neonvm/bin/set-disk-quota as root without requiring entering a password (NOPASSWD),
|
||||
# regardless of hostname (ALL)
|
||||
#
|
||||
# Also allow it to shut down the VM. The fast_import job does that when it's finished.
|
||||
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff
|
||||
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota
|
||||
- filename: cgconfig.conf
|
||||
content: |
|
||||
# Configuration for cgroups in VM compute nodes
|
||||
|
||||
@@ -14,7 +14,6 @@ base64.workspace = true
|
||||
aws-config.workspace = true
|
||||
aws-sdk-s3.workspace = true
|
||||
aws-sdk-kms.workspace = true
|
||||
aws-smithy-types.workspace = true
|
||||
anyhow.workspace = true
|
||||
axum = { workspace = true, features = [] }
|
||||
camino.workspace = true
|
||||
@@ -55,7 +54,6 @@ thiserror.workspace = true
|
||||
url.workspace = true
|
||||
uuid.workspace = true
|
||||
prometheus.workspace = true
|
||||
walkdir.workspace = true
|
||||
|
||||
postgres_initdb.workspace = true
|
||||
compute_api.workspace = true
|
||||
|
||||
@@ -25,10 +25,10 @@
|
||||
//! docker push localhost:3030/localregistry/compute-node-v14:latest
|
||||
//! ```
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use anyhow::Context;
|
||||
use aws_config::BehaviorVersion;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use clap::{Parser, Subcommand};
|
||||
use clap::Parser;
|
||||
use compute_tools::extension_server::{get_pg_version, PostgresMajorVersion};
|
||||
use nix::unistd::Pid;
|
||||
use tracing::{error, info, info_span, warn, Instrument};
|
||||
@@ -44,59 +44,32 @@ mod s3_uri;
|
||||
const PG_WAIT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(600);
|
||||
const PG_WAIT_RETRY_INTERVAL: std::time::Duration = std::time::Duration::from_millis(300);
|
||||
|
||||
#[derive(Subcommand, Debug)]
|
||||
enum Command {
|
||||
/// Runs local postgres (neon binary), restores into it,
|
||||
/// uploads pgdata to s3 to be consumed by pageservers
|
||||
Pgdata {
|
||||
/// Raw connection string to the source database. Used only in tests,
|
||||
/// real scenario uses encrypted connection string in spec.json from s3.
|
||||
#[clap(long)]
|
||||
source_connection_string: Option<String>,
|
||||
/// If specified, will not shut down the local postgres after the import. Used in local testing
|
||||
#[clap(short, long)]
|
||||
interactive: bool,
|
||||
/// Port to run postgres on. Default is 5432.
|
||||
#[clap(long, default_value_t = 5432)]
|
||||
pg_port: u16, // port to run postgres on, 5432 is default
|
||||
|
||||
/// Number of CPUs in the system. This is used to configure # of
|
||||
/// parallel worker processes, for index creation.
|
||||
#[clap(long, env = "NEON_IMPORTER_NUM_CPUS")]
|
||||
num_cpus: Option<usize>,
|
||||
|
||||
/// Amount of RAM in the system. This is used to configure shared_buffers
|
||||
/// and maintenance_work_mem.
|
||||
#[clap(long, env = "NEON_IMPORTER_MEMORY_MB")]
|
||||
memory_mb: Option<usize>,
|
||||
},
|
||||
|
||||
/// Runs pg_dump-pg_restore from source to destination without running local postgres.
|
||||
DumpRestore {
|
||||
/// Raw connection string to the source database. Used only in tests,
|
||||
/// real scenario uses encrypted connection string in spec.json from s3.
|
||||
#[clap(long)]
|
||||
source_connection_string: Option<String>,
|
||||
/// Raw connection string to the destination database. Used only in tests,
|
||||
/// real scenario uses encrypted connection string in spec.json from s3.
|
||||
#[clap(long)]
|
||||
destination_connection_string: Option<String>,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(clap::Parser)]
|
||||
struct Args {
|
||||
#[clap(long, env = "NEON_IMPORTER_WORKDIR")]
|
||||
#[clap(long)]
|
||||
working_directory: Utf8PathBuf,
|
||||
#[clap(long, env = "NEON_IMPORTER_S3_PREFIX")]
|
||||
s3_prefix: Option<s3_uri::S3Uri>,
|
||||
#[clap(long, env = "NEON_IMPORTER_PG_BIN_DIR")]
|
||||
#[clap(long)]
|
||||
source_connection_string: Option<String>,
|
||||
#[clap(short, long)]
|
||||
interactive: bool,
|
||||
#[clap(long)]
|
||||
pg_bin_dir: Utf8PathBuf,
|
||||
#[clap(long, env = "NEON_IMPORTER_PG_LIB_DIR")]
|
||||
#[clap(long)]
|
||||
pg_lib_dir: Utf8PathBuf,
|
||||
#[clap(long)]
|
||||
pg_port: Option<u16>, // port to run postgres on, 5432 is default
|
||||
|
||||
#[clap(subcommand)]
|
||||
command: Command,
|
||||
/// Number of CPUs in the system. This is used to configure # of
|
||||
/// parallel worker processes, for index creation.
|
||||
#[clap(long, env = "NEON_IMPORTER_NUM_CPUS")]
|
||||
num_cpus: Option<usize>,
|
||||
|
||||
/// Amount of RAM in the system. This is used to configure shared_buffers
|
||||
/// and maintenance_work_mem.
|
||||
#[clap(long, env = "NEON_IMPORTER_MEMORY_MB")]
|
||||
memory_mb: Option<usize>,
|
||||
}
|
||||
|
||||
#[serde_with::serde_as]
|
||||
@@ -105,8 +78,6 @@ struct Spec {
|
||||
encryption_secret: EncryptionSecret,
|
||||
#[serde_as(as = "serde_with::base64::Base64")]
|
||||
source_connstring_ciphertext_base64: Vec<u8>,
|
||||
#[serde_as(as = "Option<serde_with::base64::Base64>")]
|
||||
destination_connstring_ciphertext_base64: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
@@ -122,150 +93,192 @@ const DEFAULT_LOCALE: &str = if cfg!(target_os = "macos") {
|
||||
"C.UTF-8"
|
||||
};
|
||||
|
||||
async fn decode_connstring(
|
||||
kms_client: &aws_sdk_kms::Client,
|
||||
key_id: &String,
|
||||
connstring_ciphertext_base64: Vec<u8>,
|
||||
) -> Result<String, anyhow::Error> {
|
||||
let mut output = kms_client
|
||||
.decrypt()
|
||||
.key_id(key_id)
|
||||
.ciphertext_blob(aws_sdk_s3::primitives::Blob::new(
|
||||
connstring_ciphertext_base64,
|
||||
))
|
||||
.send()
|
||||
.await
|
||||
.context("decrypt connection string")?;
|
||||
#[tokio::main]
|
||||
pub(crate) async fn main() -> anyhow::Result<()> {
|
||||
utils::logging::init(
|
||||
utils::logging::LogFormat::Plain,
|
||||
utils::logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
|
||||
utils::logging::Output::Stdout,
|
||||
)?;
|
||||
|
||||
let plaintext = output
|
||||
.plaintext
|
||||
.take()
|
||||
.context("get plaintext connection string")?;
|
||||
info!("starting");
|
||||
|
||||
String::from_utf8(plaintext.into_inner()).context("parse connection string as utf8")
|
||||
}
|
||||
let args = Args::parse();
|
||||
|
||||
struct PostgresProcess {
|
||||
pgdata_dir: Utf8PathBuf,
|
||||
pg_bin_dir: Utf8PathBuf,
|
||||
pgbin: Utf8PathBuf,
|
||||
pg_lib_dir: Utf8PathBuf,
|
||||
postgres_proc: Option<tokio::process::Child>,
|
||||
}
|
||||
|
||||
impl PostgresProcess {
|
||||
fn new(pgdata_dir: Utf8PathBuf, pg_bin_dir: Utf8PathBuf, pg_lib_dir: Utf8PathBuf) -> Self {
|
||||
Self {
|
||||
pgdata_dir,
|
||||
pgbin: pg_bin_dir.join("postgres"),
|
||||
pg_bin_dir,
|
||||
pg_lib_dir,
|
||||
postgres_proc: None,
|
||||
}
|
||||
// Validate arguments
|
||||
if args.s3_prefix.is_none() && args.source_connection_string.is_none() {
|
||||
anyhow::bail!("either s3_prefix or source_connection_string must be specified");
|
||||
}
|
||||
if args.s3_prefix.is_some() && args.source_connection_string.is_some() {
|
||||
anyhow::bail!("only one of s3_prefix or source_connection_string can be specified");
|
||||
}
|
||||
|
||||
async fn prepare(&self, initdb_user: &str) -> Result<(), anyhow::Error> {
|
||||
tokio::fs::create_dir(&self.pgdata_dir)
|
||||
.await
|
||||
.context("create pgdata directory")?;
|
||||
let working_directory = args.working_directory;
|
||||
let pg_bin_dir = args.pg_bin_dir;
|
||||
let pg_lib_dir = args.pg_lib_dir;
|
||||
let pg_port = args.pg_port.unwrap_or_else(|| {
|
||||
info!("pg_port not specified, using default 5432");
|
||||
5432
|
||||
});
|
||||
|
||||
let pg_version = match get_pg_version(self.pgbin.as_ref()) {
|
||||
PostgresMajorVersion::V14 => 14,
|
||||
PostgresMajorVersion::V15 => 15,
|
||||
PostgresMajorVersion::V16 => 16,
|
||||
PostgresMajorVersion::V17 => 17,
|
||||
// Initialize AWS clients only if s3_prefix is specified
|
||||
let (aws_config, kms_client) = if args.s3_prefix.is_some() {
|
||||
let config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await;
|
||||
let kms = aws_sdk_kms::Client::new(&config);
|
||||
(Some(config), Some(kms))
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
|
||||
// Get source connection string either from S3 spec or direct argument
|
||||
let source_connection_string = if let Some(s3_prefix) = &args.s3_prefix {
|
||||
let spec: Spec = {
|
||||
let spec_key = s3_prefix.append("/spec.json");
|
||||
let s3_client = aws_sdk_s3::Client::new(aws_config.as_ref().unwrap());
|
||||
let object = s3_client
|
||||
.get_object()
|
||||
.bucket(&spec_key.bucket)
|
||||
.key(spec_key.key)
|
||||
.send()
|
||||
.await
|
||||
.context("get spec from s3")?
|
||||
.body
|
||||
.collect()
|
||||
.await
|
||||
.context("download spec body")?;
|
||||
serde_json::from_slice(&object.into_bytes()).context("parse spec as json")?
|
||||
};
|
||||
postgres_initdb::do_run_initdb(postgres_initdb::RunInitdbArgs {
|
||||
superuser: initdb_user,
|
||||
locale: DEFAULT_LOCALE, // XXX: this shouldn't be hard-coded,
|
||||
pg_version,
|
||||
initdb_bin: self.pg_bin_dir.join("initdb").as_ref(),
|
||||
library_search_path: &self.pg_lib_dir, // TODO: is this right? Prob works in compute image, not sure about neon_local.
|
||||
pgdata: &self.pgdata_dir,
|
||||
})
|
||||
|
||||
match spec.encryption_secret {
|
||||
EncryptionSecret::KMS { key_id } => {
|
||||
let mut output = kms_client
|
||||
.unwrap()
|
||||
.decrypt()
|
||||
.key_id(key_id)
|
||||
.ciphertext_blob(aws_sdk_s3::primitives::Blob::new(
|
||||
spec.source_connstring_ciphertext_base64,
|
||||
))
|
||||
.send()
|
||||
.await
|
||||
.context("decrypt source connection string")?;
|
||||
let plaintext = output
|
||||
.plaintext
|
||||
.take()
|
||||
.context("get plaintext source connection string")?;
|
||||
String::from_utf8(plaintext.into_inner())
|
||||
.context("parse source connection string as utf8")?
|
||||
}
|
||||
}
|
||||
} else {
|
||||
args.source_connection_string.unwrap()
|
||||
};
|
||||
|
||||
match tokio::fs::create_dir(&working_directory).await {
|
||||
Ok(()) => {}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
|
||||
if !is_directory_empty(&working_directory)
|
||||
.await
|
||||
.context("check if working directory is empty")?
|
||||
{
|
||||
anyhow::bail!("working directory is not empty");
|
||||
} else {
|
||||
// ok
|
||||
}
|
||||
}
|
||||
Err(e) => return Err(anyhow::Error::new(e).context("create working directory")),
|
||||
}
|
||||
|
||||
let pgdata_dir = working_directory.join("pgdata");
|
||||
tokio::fs::create_dir(&pgdata_dir)
|
||||
.await
|
||||
.context("initdb")
|
||||
}
|
||||
.context("create pgdata directory")?;
|
||||
|
||||
async fn start(
|
||||
&mut self,
|
||||
initdb_user: &str,
|
||||
port: u16,
|
||||
nproc: usize,
|
||||
memory_mb: usize,
|
||||
) -> Result<&tokio::process::Child, anyhow::Error> {
|
||||
self.prepare(initdb_user).await?;
|
||||
let pgbin = pg_bin_dir.join("postgres");
|
||||
let pg_version = match get_pg_version(pgbin.as_ref()) {
|
||||
PostgresMajorVersion::V14 => 14,
|
||||
PostgresMajorVersion::V15 => 15,
|
||||
PostgresMajorVersion::V16 => 16,
|
||||
PostgresMajorVersion::V17 => 17,
|
||||
};
|
||||
let superuser = "cloud_admin"; // XXX: this shouldn't be hard-coded
|
||||
postgres_initdb::do_run_initdb(postgres_initdb::RunInitdbArgs {
|
||||
superuser,
|
||||
locale: DEFAULT_LOCALE, // XXX: this shouldn't be hard-coded,
|
||||
pg_version,
|
||||
initdb_bin: pg_bin_dir.join("initdb").as_ref(),
|
||||
library_search_path: &pg_lib_dir, // TODO: is this right? Prob works in compute image, not sure about neon_local.
|
||||
pgdata: &pgdata_dir,
|
||||
})
|
||||
.await
|
||||
.context("initdb")?;
|
||||
|
||||
// Somewhat arbitrarily, use 10 % of memory for shared buffer cache, 70% for
|
||||
// maintenance_work_mem (i.e. for sorting during index creation), and leave the rest
|
||||
// available for misc other stuff that PostgreSQL uses memory for.
|
||||
let shared_buffers_mb = ((memory_mb as f32) * 0.10) as usize;
|
||||
let maintenance_work_mem_mb = ((memory_mb as f32) * 0.70) as usize;
|
||||
// If the caller didn't specify CPU / RAM to use for sizing, default to
|
||||
// number of CPUs in the system, and pretty arbitrarily, 256 MB of RAM.
|
||||
let nproc = args.num_cpus.unwrap_or_else(num_cpus::get);
|
||||
let memory_mb = args.memory_mb.unwrap_or(256);
|
||||
|
||||
//
|
||||
// Launch postgres process
|
||||
//
|
||||
let mut proc = tokio::process::Command::new(&self.pgbin)
|
||||
.arg("-D")
|
||||
.arg(&self.pgdata_dir)
|
||||
.args(["-p", &format!("{port}")])
|
||||
.args(["-c", "wal_level=minimal"])
|
||||
.args(["-c", &format!("shared_buffers={shared_buffers_mb}MB")])
|
||||
.args(["-c", "max_wal_senders=0"])
|
||||
.args(["-c", "fsync=off"])
|
||||
.args(["-c", "full_page_writes=off"])
|
||||
.args(["-c", "synchronous_commit=off"])
|
||||
.args([
|
||||
"-c",
|
||||
&format!("maintenance_work_mem={maintenance_work_mem_mb}MB"),
|
||||
])
|
||||
.args(["-c", &format!("max_parallel_maintenance_workers={nproc}")])
|
||||
.args(["-c", &format!("max_parallel_workers={nproc}")])
|
||||
.args(["-c", &format!("max_parallel_workers_per_gather={nproc}")])
|
||||
.args(["-c", &format!("max_worker_processes={nproc}")])
|
||||
.args(["-c", "effective_io_concurrency=100"])
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", &self.pg_lib_dir)
|
||||
.env(
|
||||
"ASAN_OPTIONS",
|
||||
std::env::var("ASAN_OPTIONS").unwrap_or_default(),
|
||||
)
|
||||
.env(
|
||||
"UBSAN_OPTIONS",
|
||||
std::env::var("UBSAN_OPTIONS").unwrap_or_default(),
|
||||
)
|
||||
.stdout(std::process::Stdio::piped())
|
||||
.stderr(std::process::Stdio::piped())
|
||||
.spawn()
|
||||
.context("spawn postgres")?;
|
||||
// Somewhat arbitrarily, use 10 % of memory for shared buffer cache, 70% for
|
||||
// maintenance_work_mem (i.e. for sorting during index creation), and leave the rest
|
||||
// available for misc other stuff that PostgreSQL uses memory for.
|
||||
let shared_buffers_mb = ((memory_mb as f32) * 0.10) as usize;
|
||||
let maintenance_work_mem_mb = ((memory_mb as f32) * 0.70) as usize;
|
||||
|
||||
info!("spawned postgres, waiting for it to become ready");
|
||||
tokio::spawn(
|
||||
child_stdio_to_log::relay_process_output(proc.stdout.take(), proc.stderr.take())
|
||||
.instrument(info_span!("postgres")),
|
||||
);
|
||||
|
||||
self.postgres_proc = Some(proc);
|
||||
Ok(self.postgres_proc.as_ref().unwrap())
|
||||
}
|
||||
|
||||
async fn shutdown(&mut self) -> Result<(), anyhow::Error> {
|
||||
let proc: &mut tokio::process::Child = self.postgres_proc.as_mut().unwrap();
|
||||
info!("shutdown postgres");
|
||||
nix::sys::signal::kill(
|
||||
Pid::from_raw(i32::try_from(proc.id().unwrap()).expect("convert child pid to i32")),
|
||||
nix::sys::signal::SIGTERM,
|
||||
//
|
||||
// Launch postgres process
|
||||
//
|
||||
let mut postgres_proc = tokio::process::Command::new(pgbin)
|
||||
.arg("-D")
|
||||
.arg(&pgdata_dir)
|
||||
.args(["-p", &format!("{pg_port}")])
|
||||
.args(["-c", "wal_level=minimal"])
|
||||
.args(["-c", &format!("shared_buffers={shared_buffers_mb}MB")])
|
||||
.args(["-c", "max_wal_senders=0"])
|
||||
.args(["-c", "fsync=off"])
|
||||
.args(["-c", "full_page_writes=off"])
|
||||
.args(["-c", "synchronous_commit=off"])
|
||||
.args([
|
||||
"-c",
|
||||
&format!("maintenance_work_mem={maintenance_work_mem_mb}MB"),
|
||||
])
|
||||
.args(["-c", &format!("max_parallel_maintenance_workers={nproc}")])
|
||||
.args(["-c", &format!("max_parallel_workers={nproc}")])
|
||||
.args(["-c", &format!("max_parallel_workers_per_gather={nproc}")])
|
||||
.args(["-c", &format!("max_worker_processes={nproc}")])
|
||||
.args([
|
||||
"-c",
|
||||
&format!(
|
||||
"effective_io_concurrency={}",
|
||||
if cfg!(target_os = "macos") { 0 } else { 100 }
|
||||
),
|
||||
])
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", &pg_lib_dir)
|
||||
.env(
|
||||
"ASAN_OPTIONS",
|
||||
std::env::var("ASAN_OPTIONS").unwrap_or_default(),
|
||||
)
|
||||
.context("signal postgres to shut down")?;
|
||||
proc.wait()
|
||||
.await
|
||||
.context("wait for postgres to shut down")
|
||||
.map(|_| ())
|
||||
}
|
||||
}
|
||||
.env(
|
||||
"UBSAN_OPTIONS",
|
||||
std::env::var("UBSAN_OPTIONS").unwrap_or_default(),
|
||||
)
|
||||
.stdout(std::process::Stdio::piped())
|
||||
.stderr(std::process::Stdio::piped())
|
||||
.spawn()
|
||||
.context("spawn postgres")?;
|
||||
|
||||
info!("spawned postgres, waiting for it to become ready");
|
||||
tokio::spawn(
|
||||
child_stdio_to_log::relay_process_output(
|
||||
postgres_proc.stdout.take(),
|
||||
postgres_proc.stderr.take(),
|
||||
)
|
||||
.instrument(info_span!("postgres")),
|
||||
);
|
||||
|
||||
async fn wait_until_ready(connstring: String, create_dbname: String) {
|
||||
// Create neondb database in the running postgres
|
||||
let restore_pg_connstring =
|
||||
format!("host=localhost port={pg_port} user={superuser} dbname=postgres");
|
||||
|
||||
let start_time = std::time::Instant::now();
|
||||
|
||||
loop {
|
||||
@@ -276,12 +289,7 @@ async fn wait_until_ready(connstring: String, create_dbname: String) {
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
match tokio_postgres::connect(
|
||||
&connstring.replace("dbname=neondb", "dbname=postgres"),
|
||||
tokio_postgres::NoTls,
|
||||
)
|
||||
.await
|
||||
{
|
||||
match tokio_postgres::connect(&restore_pg_connstring, tokio_postgres::NoTls).await {
|
||||
Ok((client, connection)) => {
|
||||
// Spawn the connection handling task to maintain the connection
|
||||
tokio::spawn(async move {
|
||||
@@ -290,12 +298,9 @@ async fn wait_until_ready(connstring: String, create_dbname: String) {
|
||||
}
|
||||
});
|
||||
|
||||
match client
|
||||
.simple_query(format!("CREATE DATABASE {create_dbname};").as_str())
|
||||
.await
|
||||
{
|
||||
match client.simple_query("CREATE DATABASE neondb;").await {
|
||||
Ok(_) => {
|
||||
info!("created {} database", create_dbname);
|
||||
info!("created neondb database");
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -319,16 +324,10 @@ async fn wait_until_ready(connstring: String, create_dbname: String) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_dump_restore(
|
||||
workdir: Utf8PathBuf,
|
||||
pg_bin_dir: Utf8PathBuf,
|
||||
pg_lib_dir: Utf8PathBuf,
|
||||
source_connstring: String,
|
||||
destination_connstring: String,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let dumpdir = workdir.join("dumpdir");
|
||||
let restore_pg_connstring = restore_pg_connstring.replace("dbname=postgres", "dbname=neondb");
|
||||
|
||||
let dumpdir = working_directory.join("dumpdir");
|
||||
|
||||
let common_args = [
|
||||
// schema mapping (prob suffices to specify them on one side)
|
||||
@@ -357,7 +356,7 @@ async fn run_dump_restore(
|
||||
.arg("--no-sync")
|
||||
// POSITIONAL args
|
||||
// source db (db name included in connection string)
|
||||
.arg(&source_connstring)
|
||||
.arg(&source_connection_string)
|
||||
// how we run it
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", &pg_lib_dir)
|
||||
@@ -377,18 +376,19 @@ async fn run_dump_restore(
|
||||
let st = pg_dump.wait().await.context("wait for pg_dump")?;
|
||||
info!(status=?st, "pg_dump exited");
|
||||
if !st.success() {
|
||||
error!(status=%st, "pg_dump failed, restore will likely fail as well");
|
||||
bail!("pg_dump failed");
|
||||
warn!(status=%st, "pg_dump failed, restore will likely fail as well");
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: maybe do it in a streaming way, plenty of internal research done on this already
|
||||
// TODO: do it in a streaming way, plenty of internal research done on this already
|
||||
// TODO: do the unlogged table trick
|
||||
|
||||
info!("restore from working directory into vanilla postgres");
|
||||
{
|
||||
let mut pg_restore = tokio::process::Command::new(pg_bin_dir.join("pg_restore"))
|
||||
.args(&common_args)
|
||||
.arg("-d")
|
||||
.arg(&destination_connstring)
|
||||
.arg(&restore_pg_connstring)
|
||||
// POSITIONAL args
|
||||
.arg(&dumpdir)
|
||||
// how we run it
|
||||
@@ -411,259 +411,48 @@ async fn run_dump_restore(
|
||||
let st = pg_restore.wait().await.context("wait for pg_restore")?;
|
||||
info!(status=?st, "pg_restore exited");
|
||||
if !st.success() {
|
||||
error!(status=%st, "pg_restore failed, restore will likely fail as well");
|
||||
bail!("pg_restore failed");
|
||||
warn!(status=%st, "pg_restore failed, restore will likely fail as well");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn cmd_pgdata(
|
||||
s3_client: Option<aws_sdk_s3::Client>,
|
||||
kms_client: Option<aws_sdk_kms::Client>,
|
||||
maybe_s3_prefix: Option<s3_uri::S3Uri>,
|
||||
maybe_spec: Option<Spec>,
|
||||
source_connection_string: Option<String>,
|
||||
interactive: bool,
|
||||
pg_port: u16,
|
||||
workdir: Utf8PathBuf,
|
||||
pg_bin_dir: Utf8PathBuf,
|
||||
pg_lib_dir: Utf8PathBuf,
|
||||
num_cpus: Option<usize>,
|
||||
memory_mb: Option<usize>,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
if maybe_spec.is_none() && source_connection_string.is_none() {
|
||||
bail!("spec must be provided for pgdata command");
|
||||
}
|
||||
if maybe_spec.is_some() && source_connection_string.is_some() {
|
||||
bail!("only one of spec or source_connection_string can be provided");
|
||||
}
|
||||
|
||||
let source_connection_string = if let Some(spec) = maybe_spec {
|
||||
match spec.encryption_secret {
|
||||
EncryptionSecret::KMS { key_id } => {
|
||||
decode_connstring(
|
||||
kms_client.as_ref().unwrap(),
|
||||
&key_id,
|
||||
spec.source_connstring_ciphertext_base64,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
}
|
||||
} else {
|
||||
source_connection_string.unwrap()
|
||||
};
|
||||
|
||||
let superuser = "cloud_admin";
|
||||
let destination_connstring = format!(
|
||||
"host=localhost port={} user={} dbname=neondb",
|
||||
pg_port, superuser
|
||||
);
|
||||
|
||||
let pgdata_dir = workdir.join("pgdata");
|
||||
let mut proc = PostgresProcess::new(pgdata_dir.clone(), pg_bin_dir.clone(), pg_lib_dir.clone());
|
||||
let nproc = num_cpus.unwrap_or_else(num_cpus::get);
|
||||
let memory_mb = memory_mb.unwrap_or(256);
|
||||
proc.start(superuser, pg_port, nproc, memory_mb).await?;
|
||||
wait_until_ready(destination_connstring.clone(), "neondb".to_string()).await;
|
||||
|
||||
run_dump_restore(
|
||||
workdir.clone(),
|
||||
pg_bin_dir,
|
||||
pg_lib_dir,
|
||||
source_connection_string,
|
||||
destination_connstring,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// If interactive mode, wait for Ctrl+C
|
||||
if interactive {
|
||||
if args.interactive {
|
||||
info!("Running in interactive mode. Press Ctrl+C to shut down.");
|
||||
tokio::signal::ctrl_c().await.context("wait for ctrl-c")?;
|
||||
}
|
||||
|
||||
proc.shutdown().await?;
|
||||
info!("shutdown postgres");
|
||||
{
|
||||
nix::sys::signal::kill(
|
||||
Pid::from_raw(
|
||||
i32::try_from(postgres_proc.id().unwrap()).expect("convert child pid to i32"),
|
||||
),
|
||||
nix::sys::signal::SIGTERM,
|
||||
)
|
||||
.context("signal postgres to shut down")?;
|
||||
postgres_proc
|
||||
.wait()
|
||||
.await
|
||||
.context("wait for postgres to shut down")?;
|
||||
}
|
||||
|
||||
// Only sync if s3_prefix was specified
|
||||
if let Some(s3_prefix) = maybe_s3_prefix {
|
||||
if let Some(s3_prefix) = args.s3_prefix {
|
||||
info!("upload pgdata");
|
||||
aws_s3_sync::upload_dir_recursive(
|
||||
s3_client.as_ref().unwrap(),
|
||||
Utf8Path::new(&pgdata_dir),
|
||||
&s3_prefix.append("/pgdata/"),
|
||||
)
|
||||
.await
|
||||
.context("sync dump directory to destination")?;
|
||||
aws_s3_sync::sync(Utf8Path::new(&pgdata_dir), &s3_prefix.append("/pgdata/"))
|
||||
.await
|
||||
.context("sync dump directory to destination")?;
|
||||
|
||||
info!("write status");
|
||||
{
|
||||
let status_dir = workdir.join("status");
|
||||
let status_dir = working_directory.join("status");
|
||||
std::fs::create_dir(&status_dir).context("create status directory")?;
|
||||
let status_file = status_dir.join("pgdata");
|
||||
std::fs::write(&status_file, serde_json::json!({"done": true}).to_string())
|
||||
.context("write status file")?;
|
||||
aws_s3_sync::upload_dir_recursive(
|
||||
s3_client.as_ref().unwrap(),
|
||||
&status_dir,
|
||||
&s3_prefix.append("/status/"),
|
||||
)
|
||||
.await
|
||||
.context("sync status directory to destination")?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn cmd_dumprestore(
|
||||
kms_client: Option<aws_sdk_kms::Client>,
|
||||
maybe_spec: Option<Spec>,
|
||||
source_connection_string: Option<String>,
|
||||
destination_connection_string: Option<String>,
|
||||
workdir: Utf8PathBuf,
|
||||
pg_bin_dir: Utf8PathBuf,
|
||||
pg_lib_dir: Utf8PathBuf,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let (source_connstring, destination_connstring) = if let Some(spec) = maybe_spec {
|
||||
match spec.encryption_secret {
|
||||
EncryptionSecret::KMS { key_id } => {
|
||||
let source = decode_connstring(
|
||||
kms_client.as_ref().unwrap(),
|
||||
&key_id,
|
||||
spec.source_connstring_ciphertext_base64,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let dest = if let Some(dest_ciphertext) =
|
||||
spec.destination_connstring_ciphertext_base64
|
||||
{
|
||||
decode_connstring(kms_client.as_ref().unwrap(), &key_id, dest_ciphertext)
|
||||
.await?
|
||||
} else {
|
||||
bail!("destination connection string must be provided in spec for dump_restore command");
|
||||
};
|
||||
|
||||
(source, dest)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
(
|
||||
source_connection_string.unwrap(),
|
||||
if let Some(val) = destination_connection_string {
|
||||
val
|
||||
} else {
|
||||
bail!("destination connection string must be provided for dump_restore command");
|
||||
},
|
||||
)
|
||||
};
|
||||
|
||||
run_dump_restore(
|
||||
workdir,
|
||||
pg_bin_dir,
|
||||
pg_lib_dir,
|
||||
source_connstring,
|
||||
destination_connstring,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
pub(crate) async fn main() -> anyhow::Result<()> {
|
||||
utils::logging::init(
|
||||
utils::logging::LogFormat::Json,
|
||||
utils::logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
|
||||
utils::logging::Output::Stdout,
|
||||
)?;
|
||||
|
||||
info!("starting");
|
||||
|
||||
let args = Args::parse();
|
||||
|
||||
// Initialize AWS clients only if s3_prefix is specified
|
||||
let (s3_client, kms_client) = if args.s3_prefix.is_some() {
|
||||
let config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await;
|
||||
let s3_client = aws_sdk_s3::Client::new(&config);
|
||||
let kms = aws_sdk_kms::Client::new(&config);
|
||||
(Some(s3_client), Some(kms))
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
|
||||
let spec: Option<Spec> = if let Some(s3_prefix) = &args.s3_prefix {
|
||||
let spec_key = s3_prefix.append("/spec.json");
|
||||
let object = s3_client
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.get_object()
|
||||
.bucket(&spec_key.bucket)
|
||||
.key(spec_key.key)
|
||||
.send()
|
||||
.await
|
||||
.context("get spec from s3")?
|
||||
.body
|
||||
.collect()
|
||||
.await
|
||||
.context("download spec body")?;
|
||||
serde_json::from_slice(&object.into_bytes()).context("parse spec as json")?
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
match tokio::fs::create_dir(&args.working_directory).await {
|
||||
Ok(()) => {}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
|
||||
if !is_directory_empty(&args.working_directory)
|
||||
aws_s3_sync::sync(&status_dir, &s3_prefix.append("/status/"))
|
||||
.await
|
||||
.context("check if working directory is empty")?
|
||||
{
|
||||
bail!("working directory is not empty");
|
||||
} else {
|
||||
// ok
|
||||
}
|
||||
}
|
||||
Err(e) => return Err(anyhow::Error::new(e).context("create working directory")),
|
||||
}
|
||||
|
||||
match args.command {
|
||||
Command::Pgdata {
|
||||
source_connection_string,
|
||||
interactive,
|
||||
pg_port,
|
||||
num_cpus,
|
||||
memory_mb,
|
||||
} => {
|
||||
cmd_pgdata(
|
||||
s3_client,
|
||||
kms_client,
|
||||
args.s3_prefix,
|
||||
spec,
|
||||
source_connection_string,
|
||||
interactive,
|
||||
pg_port,
|
||||
args.working_directory,
|
||||
args.pg_bin_dir,
|
||||
args.pg_lib_dir,
|
||||
num_cpus,
|
||||
memory_mb,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Command::DumpRestore {
|
||||
source_connection_string,
|
||||
destination_connection_string,
|
||||
} => {
|
||||
cmd_dumprestore(
|
||||
kms_client,
|
||||
spec,
|
||||
source_connection_string,
|
||||
destination_connection_string,
|
||||
args.working_directory,
|
||||
args.pg_bin_dir,
|
||||
args.pg_lib_dir,
|
||||
)
|
||||
.await?;
|
||||
.context("sync status directory to destination")?;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,102 +1,24 @@
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use tokio::task::JoinSet;
|
||||
use walkdir::WalkDir;
|
||||
use anyhow::Context;
|
||||
use camino::Utf8Path;
|
||||
|
||||
use super::s3_uri::S3Uri;
|
||||
|
||||
use tracing::{info, warn};
|
||||
|
||||
const MAX_PARALLEL_UPLOADS: usize = 10;
|
||||
|
||||
/// Upload all files from 'local' to 'remote'
|
||||
pub(crate) async fn upload_dir_recursive(
|
||||
s3_client: &aws_sdk_s3::Client,
|
||||
local: &Utf8Path,
|
||||
remote: &S3Uri,
|
||||
) -> anyhow::Result<()> {
|
||||
// Recursively scan directory
|
||||
let mut dirwalker = WalkDir::new(local)
|
||||
.into_iter()
|
||||
.map(|entry| {
|
||||
let entry = entry?;
|
||||
let file_type = entry.file_type();
|
||||
let path = <&Utf8Path>::try_from(entry.path())?.to_path_buf();
|
||||
Ok((file_type, path))
|
||||
})
|
||||
.filter_map(|e: anyhow::Result<(std::fs::FileType, Utf8PathBuf)>| {
|
||||
match e {
|
||||
Ok((file_type, path)) if file_type.is_file() => Some(Ok(path)),
|
||||
Ok((file_type, _path)) if file_type.is_dir() => {
|
||||
// The WalkDir iterator will recurse into directories, but we don't want
|
||||
// to do anything with directories as such. There's no concept of uploading
|
||||
// an empty directory to S3.
|
||||
None
|
||||
}
|
||||
Ok((file_type, path)) if file_type.is_symlink() => {
|
||||
// huh, didn't expect a symlink. Can't upload that to S3. Warn and skip.
|
||||
warn!("cannot upload symlink ({})", path);
|
||||
None
|
||||
}
|
||||
Ok((_file_type, path)) => {
|
||||
// should not happen
|
||||
warn!("directory entry has unexpected type ({})", path);
|
||||
None
|
||||
}
|
||||
Err(e) => Some(Err(e)),
|
||||
}
|
||||
});
|
||||
|
||||
// Spawn upload tasks for each file, keeping MAX_PARALLEL_UPLOADS active in
|
||||
// parallel.
|
||||
let mut joinset = JoinSet::new();
|
||||
loop {
|
||||
// Could we upload more?
|
||||
while joinset.len() < MAX_PARALLEL_UPLOADS {
|
||||
if let Some(full_local_path) = dirwalker.next() {
|
||||
let full_local_path = full_local_path?;
|
||||
let relative_local_path = full_local_path
|
||||
.strip_prefix(local)
|
||||
.expect("all paths start from the walkdir root");
|
||||
let remote_path = remote.append(relative_local_path.as_str());
|
||||
info!(
|
||||
"starting upload of {} to {}",
|
||||
&full_local_path, &remote_path
|
||||
);
|
||||
let upload_task = upload_file(s3_client.clone(), full_local_path, remote_path);
|
||||
joinset.spawn(upload_task);
|
||||
} else {
|
||||
info!("draining upload tasks");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for an upload to complete
|
||||
if let Some(res) = joinset.join_next().await {
|
||||
let _ = res?;
|
||||
} else {
|
||||
// all done!
|
||||
break;
|
||||
}
|
||||
pub(crate) async fn sync(local: &Utf8Path, remote: &S3Uri) -> anyhow::Result<()> {
|
||||
let mut builder = tokio::process::Command::new("aws");
|
||||
builder
|
||||
.arg("s3")
|
||||
.arg("sync")
|
||||
.arg(local.as_str())
|
||||
.arg(remote.to_string());
|
||||
let st = builder
|
||||
.spawn()
|
||||
.context("spawn aws s3 sync")?
|
||||
.wait()
|
||||
.await
|
||||
.context("wait for aws s3 sync")?;
|
||||
if st.success() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(anyhow::anyhow!("aws s3 sync failed"))
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn upload_file(
|
||||
s3_client: aws_sdk_s3::Client,
|
||||
local_path: Utf8PathBuf,
|
||||
remote: S3Uri,
|
||||
) -> anyhow::Result<()> {
|
||||
use aws_smithy_types::byte_stream::ByteStream;
|
||||
let stream = ByteStream::from_path(&local_path).await?;
|
||||
|
||||
let _result = s3_client
|
||||
.put_object()
|
||||
.bucket(remote.bucket)
|
||||
.key(&remote.key)
|
||||
.body(stream)
|
||||
.send()
|
||||
.await?;
|
||||
info!("upload of {} to {} finished", &local_path, &remote.key);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -7,7 +7,6 @@ use std::{
|
||||
|
||||
use anyhow::Result;
|
||||
use axum::{
|
||||
body::Body,
|
||||
extract::Request,
|
||||
middleware::{self, Next},
|
||||
response::{IntoResponse, Response},
|
||||
@@ -17,7 +16,6 @@ use axum::{
|
||||
use http::StatusCode;
|
||||
use tokio::net::TcpListener;
|
||||
use tower::ServiceBuilder;
|
||||
use tower_http::classify::ServerErrorsFailureClass;
|
||||
use tower_http::{request_id::PropagateRequestIdLayer, trace::TraceLayer};
|
||||
use tracing::{debug, error, info, Span};
|
||||
use uuid::Uuid;
|
||||
@@ -86,85 +84,46 @@ impl From<Server> for Router<Arc<ComputeNode>> {
|
||||
.route("/terminate", post(terminate::terminate)),
|
||||
};
|
||||
|
||||
router
|
||||
.fallback(Server::handle_404)
|
||||
.method_not_allowed_fallback(Server::handle_405)
|
||||
.layer(
|
||||
ServiceBuilder::new()
|
||||
// Add this middleware since we assume the request ID exists
|
||||
.layer(middleware::from_fn(maybe_add_request_id_header))
|
||||
.layer(
|
||||
TraceLayer::new_for_http()
|
||||
.make_span_with(|request: &Request<Body>| {
|
||||
let request_id = request
|
||||
router.fallback(Server::handle_404).method_not_allowed_fallback(Server::handle_405).layer(
|
||||
ServiceBuilder::new()
|
||||
// Add this middleware since we assume the request ID exists
|
||||
.layer(middleware::from_fn(maybe_add_request_id_header))
|
||||
.layer(
|
||||
TraceLayer::new_for_http()
|
||||
.on_request(|request: &http::Request<_>, _span: &Span| {
|
||||
let request_id = request
|
||||
.headers()
|
||||
.get(X_REQUEST_ID)
|
||||
.unwrap()
|
||||
.to_str()
|
||||
.unwrap();
|
||||
|
||||
match request.uri().path() {
|
||||
"/metrics" => {
|
||||
debug!(%request_id, "{} {}", request.method(), request.uri())
|
||||
}
|
||||
_ => info!(%request_id, "{} {}", request.method(), request.uri()),
|
||||
};
|
||||
})
|
||||
.on_response(
|
||||
|response: &http::Response<_>, latency: Duration, _span: &Span| {
|
||||
let request_id = response
|
||||
.headers()
|
||||
.get(X_REQUEST_ID)
|
||||
.unwrap()
|
||||
.to_str()
|
||||
.unwrap();
|
||||
|
||||
match request.uri().path() {
|
||||
"/metrics" => {
|
||||
tracing::span!(
|
||||
tracing::Level::DEBUG,
|
||||
"",
|
||||
method = tracing::field::display(request.method()),
|
||||
uri = tracing::field::display(request.uri()),
|
||||
request_id = tracing::field::display(request_id)
|
||||
)
|
||||
}
|
||||
_ => tracing::span!(
|
||||
tracing::Level::INFO,
|
||||
"",
|
||||
method = tracing::field::display(request.method()),
|
||||
uri = tracing::field::display(request.uri()),
|
||||
request_id = tracing::field::display(request_id)
|
||||
),
|
||||
}
|
||||
})
|
||||
.on_request(|request: &http::Request<_>, _span: &Span| {
|
||||
match request.uri().path() {
|
||||
"/metrics" => debug!("incoming request"),
|
||||
_ => info!("incoming request"),
|
||||
};
|
||||
})
|
||||
.on_response(
|
||||
|response: &http::Response<_>, latency: Duration, _span: &Span| {
|
||||
// All errors will be logged in the on_failure handler
|
||||
if let 200..=399 = response.status().as_u16() {
|
||||
info!(
|
||||
message = "request finished",
|
||||
code = %response.status().as_u16(),
|
||||
latency_ms = %latency.as_millis()
|
||||
)
|
||||
}
|
||||
},
|
||||
)
|
||||
.on_failure(
|
||||
|error: ServerErrorsFailureClass,
|
||||
latency: Duration,
|
||||
_span: &Span| {
|
||||
match error {
|
||||
ServerErrorsFailureClass::StatusCode(code) => {
|
||||
error!(
|
||||
message = "request failed",
|
||||
code = %code,
|
||||
latency_ms = %latency.as_millis()
|
||||
);
|
||||
}
|
||||
ServerErrorsFailureClass::Error(error) => {
|
||||
error!(
|
||||
message = "request failed unexpectedly",
|
||||
error = %error,
|
||||
latency_ms = %latency.as_millis()
|
||||
);
|
||||
}
|
||||
}
|
||||
},
|
||||
),
|
||||
)
|
||||
.layer(PropagateRequestIdLayer::x_request_id()),
|
||||
)
|
||||
info!(
|
||||
%request_id,
|
||||
code = response.status().as_u16(),
|
||||
latency = latency.as_millis()
|
||||
)
|
||||
},
|
||||
),
|
||||
)
|
||||
.layer(PropagateRequestIdLayer::x_request_id()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -11,7 +11,6 @@ if [ -z ${OLDTAG+x} ] || [ -z ${NEWTAG+x} ] || [ -z "${OLDTAG}" ] || [ -z "${NEW
|
||||
exit 1
|
||||
fi
|
||||
export PG_VERSION=${PG_VERSION:-16}
|
||||
export PG_TEST_VERSION=${PG_VERSION}
|
||||
function wait_for_ready {
|
||||
TIME=0
|
||||
while ! docker compose logs compute_is_ready | grep -q "accepting connections" && [ ${TIME} -le 300 ] ; do
|
||||
@@ -60,12 +59,8 @@ docker compose cp ext-src neon-test-extensions:/
|
||||
docker compose exec neon-test-extensions psql -c "DROP DATABASE IF EXISTS contrib_regression"
|
||||
docker compose exec neon-test-extensions psql -c "CREATE DATABASE contrib_regression"
|
||||
create_extensions "${EXTNAMES}"
|
||||
if [ "${FORCE_ALL_UPGRADE_TESTS:-false}" = true ]; then
|
||||
exts="${EXTNAMES}"
|
||||
else
|
||||
query="select pge.extname from pg_extension pge join (select key as extname, value as extversion from json_each_text('${new_vers}')) x on pge.extname=x.extname and pge.extversion <> x.extversion"
|
||||
exts=$(docker compose exec neon-test-extensions psql -Aqt -d contrib_regression -c "$query")
|
||||
fi
|
||||
query="select pge.extname from pg_extension pge join (select key as extname, value as extversion from json_each_text('${new_vers}')) x on pge.extname=x.extname and pge.extversion <> x.extversion"
|
||||
exts=$(docker compose exec neon-test-extensions psql -Aqt -d contrib_regression -c "$query")
|
||||
if [ -z "${exts}" ]; then
|
||||
echo "No extensions were upgraded"
|
||||
else
|
||||
@@ -93,10 +88,7 @@ else
|
||||
exit 1
|
||||
fi
|
||||
docker compose exec neon-test-extensions psql -d contrib_regression -c "\dx ${ext}"
|
||||
if ! docker compose exec neon-test-extensions sh -c /ext-src/${EXTDIR}/test-upgrade.sh; then
|
||||
docker compose exec neon-test-extensions cat /ext-src/${EXTDIR}/regression.diffs
|
||||
exit 1
|
||||
fi
|
||||
docker compose exec neon-test-extensions sh -c /ext-src/${EXTDIR}/test-upgrade.sh
|
||||
docker compose exec neon-test-extensions psql -d contrib_regression -c "alter extension ${ext} update"
|
||||
docker compose exec neon-test-extensions psql -d contrib_regression -c "\dx ${ext}"
|
||||
done
|
||||
|
||||
@@ -351,7 +351,7 @@ pub struct TenantConfigToml {
|
||||
|
||||
/// Enable rel_size_v2 for this tenant. Once enabled, the tenant will persist this information into
|
||||
/// `index_part.json`, and it cannot be reversed.
|
||||
pub rel_size_v2_enabled: bool,
|
||||
pub rel_size_v2_enabled: Option<bool>,
|
||||
|
||||
// gc-compaction related configs
|
||||
/// Enable automatic gc-compaction trigger on this tenant.
|
||||
@@ -633,7 +633,7 @@ impl Default for TenantConfigToml {
|
||||
lsn_lease_length_for_ts: LsnLease::DEFAULT_LENGTH_FOR_TS,
|
||||
timeline_offloading: true,
|
||||
wal_receiver_protocol_override: None,
|
||||
rel_size_v2_enabled: false,
|
||||
rel_size_v2_enabled: None,
|
||||
gc_compaction_enabled: DEFAULT_GC_COMPACTION_ENABLED,
|
||||
gc_compaction_initial_threshold_kb: DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB,
|
||||
gc_compaction_ratio_percent: DEFAULT_GC_COMPACTION_RATIO_PERCENT,
|
||||
|
||||
@@ -1,12 +1,10 @@
|
||||
use anyhow::{bail, Result};
|
||||
use byteorder::{ByteOrder, BE};
|
||||
use bytes::Bytes;
|
||||
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use postgres_ffi::Oid;
|
||||
use postgres_ffi::RepOriginId;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{fmt, ops::Range};
|
||||
use utils::const_assert;
|
||||
|
||||
use crate::reltag::{BlockNumber, RelTag, SlruKind};
|
||||
|
||||
@@ -51,64 +49,6 @@ pub const AUX_KEY_PREFIX: u8 = 0x62;
|
||||
/// The key prefix of ReplOrigin keys.
|
||||
pub const REPL_ORIGIN_KEY_PREFIX: u8 = 0x63;
|
||||
|
||||
/// The key prefix of db directory keys.
|
||||
pub const DB_DIR_KEY_PREFIX: u8 = 0x64;
|
||||
|
||||
/// The key prefix of rel directory keys.
|
||||
pub const REL_DIR_KEY_PREFIX: u8 = 0x65;
|
||||
|
||||
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
|
||||
pub enum RelDirExists {
|
||||
Exists,
|
||||
Removed,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DecodeError;
|
||||
|
||||
impl fmt::Display for DecodeError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "invalid marker")
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for DecodeError {}
|
||||
|
||||
impl RelDirExists {
|
||||
/// The value of the rel directory keys that indicates the existence of a relation.
|
||||
const REL_EXISTS_MARKER: Bytes = Bytes::from_static(b"r");
|
||||
|
||||
pub fn encode(&self) -> Bytes {
|
||||
match self {
|
||||
Self::Exists => Self::REL_EXISTS_MARKER.clone(),
|
||||
Self::Removed => SPARSE_TOMBSTONE_MARKER.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn decode_option(data: Option<impl AsRef<[u8]>>) -> Result<Self, DecodeError> {
|
||||
match data {
|
||||
Some(marker) if marker.as_ref() == Self::REL_EXISTS_MARKER => Ok(Self::Exists),
|
||||
// Any other marker is invalid
|
||||
Some(_) => Err(DecodeError),
|
||||
None => Ok(Self::Removed),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn decode(data: impl AsRef<[u8]>) -> Result<Self, DecodeError> {
|
||||
let data = data.as_ref();
|
||||
if data == Self::REL_EXISTS_MARKER {
|
||||
Ok(Self::Exists)
|
||||
} else if data == SPARSE_TOMBSTONE_MARKER {
|
||||
Ok(Self::Removed)
|
||||
} else {
|
||||
Err(DecodeError)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A tombstone in the sparse keyspace, which is an empty buffer.
|
||||
pub const SPARSE_TOMBSTONE_MARKER: Bytes = Bytes::from_static(b"");
|
||||
|
||||
/// Check if the key falls in the range of metadata keys.
|
||||
pub const fn is_metadata_key_slice(key: &[u8]) -> bool {
|
||||
key[0] >= METADATA_KEY_BEGIN_PREFIX && key[0] < METADATA_KEY_END_PREFIX
|
||||
@@ -170,24 +110,6 @@ impl Key {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn rel_dir_sparse_key_range() -> Range<Self> {
|
||||
Key {
|
||||
field1: REL_DIR_KEY_PREFIX,
|
||||
field2: 0,
|
||||
field3: 0,
|
||||
field4: 0,
|
||||
field5: 0,
|
||||
field6: 0,
|
||||
}..Key {
|
||||
field1: REL_DIR_KEY_PREFIX + 1,
|
||||
field2: 0,
|
||||
field3: 0,
|
||||
field4: 0,
|
||||
field5: 0,
|
||||
field6: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// This function checks more extensively what keys we can take on the write path.
|
||||
/// If a key beginning with 00 does not have a global/default tablespace OID, it
|
||||
/// will be rejected on the write path.
|
||||
@@ -518,36 +440,6 @@ pub fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key {
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn rel_tag_sparse_key(spcnode: Oid, dbnode: Oid, relnode: Oid, forknum: u8) -> Key {
|
||||
Key {
|
||||
field1: REL_DIR_KEY_PREFIX,
|
||||
field2: spcnode,
|
||||
field3: dbnode,
|
||||
field4: relnode,
|
||||
field5: forknum,
|
||||
field6: 1,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn rel_tag_sparse_key_range(spcnode: Oid, dbnode: Oid) -> Range<Key> {
|
||||
Key {
|
||||
field1: REL_DIR_KEY_PREFIX,
|
||||
field2: spcnode,
|
||||
field3: dbnode,
|
||||
field4: 0,
|
||||
field5: 0,
|
||||
field6: 0,
|
||||
}..Key {
|
||||
field1: REL_DIR_KEY_PREFIX,
|
||||
field2: spcnode,
|
||||
field3: dbnode,
|
||||
field4: u32::MAX,
|
||||
field5: u8::MAX,
|
||||
field6: u32::MAX,
|
||||
} // it's fine to exclude the last key b/c we only use field6 == 1
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
|
||||
Key {
|
||||
@@ -842,9 +734,9 @@ impl Key {
|
||||
self.field1 == RELATION_SIZE_PREFIX
|
||||
}
|
||||
|
||||
pub const fn sparse_non_inherited_keyspace() -> Range<Key> {
|
||||
pub fn sparse_non_inherited_keyspace() -> Range<Key> {
|
||||
// The two keys are adjacent; if we will have non-adjancent keys in the future, we should return a keyspace
|
||||
const_assert!(AUX_KEY_PREFIX + 1 == REPL_ORIGIN_KEY_PREFIX);
|
||||
debug_assert_eq!(AUX_KEY_PREFIX + 1, REPL_ORIGIN_KEY_PREFIX);
|
||||
Key {
|
||||
field1: AUX_KEY_PREFIX,
|
||||
field2: 0,
|
||||
|
||||
@@ -1144,7 +1144,6 @@ pub struct TimelineInfo {
|
||||
/// The LSN up to which GC has advanced: older data may still exist but it is not available for clients.
|
||||
/// This LSN is not suitable for deciding where to create branches etc: use [`TimelineInfo::min_readable_lsn`] instead,
|
||||
/// as it is easier to reason about.
|
||||
#[serde(default)]
|
||||
pub applied_gc_cutoff_lsn: Lsn,
|
||||
|
||||
/// The upper bound of data which is either already GC'ed, or elegible to be GC'ed at any time based on PITR interval.
|
||||
@@ -1153,7 +1152,6 @@ pub struct TimelineInfo {
|
||||
///
|
||||
/// Note that holders of valid LSN leases may be able to create branches and read pages earlier
|
||||
/// than this LSN, but new leases may not be taken out earlier than this LSN.
|
||||
#[serde(default)]
|
||||
pub min_readable_lsn: Lsn,
|
||||
|
||||
pub disk_consistent_lsn: Lsn,
|
||||
|
||||
@@ -9,8 +9,6 @@ use bytes::Bytes;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::io::ErrorKind;
|
||||
use std::net::SocketAddr;
|
||||
use std::os::fd::AsRawFd;
|
||||
use std::os::fd::RawFd;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{ready, Poll};
|
||||
@@ -270,7 +268,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> MaybeWriteOnly<IO> {
|
||||
}
|
||||
|
||||
pub struct PostgresBackend<IO> {
|
||||
pub socket_fd: RawFd,
|
||||
framed: MaybeWriteOnly<IO>,
|
||||
|
||||
pub state: ProtoState,
|
||||
@@ -296,11 +293,9 @@ impl PostgresBackend<tokio::net::TcpStream> {
|
||||
tls_config: Option<Arc<rustls::ServerConfig>>,
|
||||
) -> io::Result<Self> {
|
||||
let peer_addr = socket.peer_addr()?;
|
||||
let socket_fd = socket.as_raw_fd();
|
||||
let stream = MaybeTlsStream::Unencrypted(socket);
|
||||
|
||||
Ok(Self {
|
||||
socket_fd,
|
||||
framed: MaybeWriteOnly::Full(Framed::new(stream)),
|
||||
state: ProtoState::Initialization,
|
||||
auth_type,
|
||||
@@ -312,7 +307,6 @@ impl PostgresBackend<tokio::net::TcpStream> {
|
||||
|
||||
impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
|
||||
pub fn new_from_io(
|
||||
socket_fd: RawFd,
|
||||
socket: IO,
|
||||
peer_addr: SocketAddr,
|
||||
auth_type: AuthType,
|
||||
@@ -321,7 +315,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
|
||||
let stream = MaybeTlsStream::Unencrypted(socket);
|
||||
|
||||
Ok(Self {
|
||||
socket_fd,
|
||||
framed: MaybeWriteOnly::Full(Framed::new(stream)),
|
||||
state: ProtoState::Initialization,
|
||||
auth_type,
|
||||
|
||||
@@ -10,8 +10,8 @@ use crate::simple_query::SimpleQueryStream;
|
||||
use crate::types::{Oid, ToSql, Type};
|
||||
|
||||
use crate::{
|
||||
query, simple_query, slice_iter, CancelToken, Error, ReadyForQueryStatus, Row,
|
||||
SimpleQueryMessage, Statement, Transaction, TransactionBuilder,
|
||||
prepare, query, simple_query, slice_iter, CancelToken, Error, ReadyForQueryStatus, Row,
|
||||
SimpleQueryMessage, Statement, ToStatement, Transaction, TransactionBuilder,
|
||||
};
|
||||
use bytes::BytesMut;
|
||||
use fallible_iterator::FallibleIterator;
|
||||
@@ -54,18 +54,18 @@ impl Responses {
|
||||
}
|
||||
|
||||
/// A cache of type info and prepared statements for fetching type info
|
||||
/// (corresponding to the queries in the [crate::prepare] module).
|
||||
/// (corresponding to the queries in the [prepare] module).
|
||||
#[derive(Default)]
|
||||
struct CachedTypeInfo {
|
||||
/// A statement for basic information for a type from its
|
||||
/// OID. Corresponds to [TYPEINFO_QUERY](crate::prepare::TYPEINFO_QUERY) (or its
|
||||
/// OID. Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_QUERY) (or its
|
||||
/// fallback).
|
||||
typeinfo: Option<Statement>,
|
||||
/// A statement for getting information for a composite type from its OID.
|
||||
/// Corresponds to [TYPEINFO_QUERY](crate::prepare::TYPEINFO_COMPOSITE_QUERY).
|
||||
/// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY).
|
||||
typeinfo_composite: Option<Statement>,
|
||||
/// A statement for getting information for a composite type from its OID.
|
||||
/// Corresponds to [TYPEINFO_QUERY](crate::prepare::TYPEINFO_COMPOSITE_QUERY) (or
|
||||
/// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY) (or
|
||||
/// its fallback).
|
||||
typeinfo_enum: Option<Statement>,
|
||||
|
||||
@@ -190,6 +190,26 @@ impl Client {
|
||||
&self.inner
|
||||
}
|
||||
|
||||
/// Creates a new prepared statement.
|
||||
///
|
||||
/// Prepared statements can be executed repeatedly, and may contain query parameters (indicated by `$1`, `$2`, etc),
|
||||
/// which are set when executed. Prepared statements can only be used with the connection that created them.
|
||||
pub async fn prepare(&self, query: &str) -> Result<Statement, Error> {
|
||||
self.prepare_typed(query, &[]).await
|
||||
}
|
||||
|
||||
/// Like `prepare`, but allows the types of query parameters to be explicitly specified.
|
||||
///
|
||||
/// The list of types may be smaller than the number of parameters - the types of the remaining parameters will be
|
||||
/// inferred. For example, `client.prepare_typed(query, &[])` is equivalent to `client.prepare(query)`.
|
||||
pub async fn prepare_typed(
|
||||
&self,
|
||||
query: &str,
|
||||
parameter_types: &[Type],
|
||||
) -> Result<Statement, Error> {
|
||||
prepare::prepare(&self.inner, query, parameter_types).await
|
||||
}
|
||||
|
||||
/// Executes a statement, returning a vector of the resulting rows.
|
||||
///
|
||||
/// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
|
||||
@@ -202,11 +222,14 @@ impl Client {
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if the number of parameters provided does not match the number expected.
|
||||
pub async fn query(
|
||||
pub async fn query<T>(
|
||||
&self,
|
||||
statement: Statement,
|
||||
statement: &T,
|
||||
params: &[&(dyn ToSql + Sync)],
|
||||
) -> Result<Vec<Row>, Error> {
|
||||
) -> Result<Vec<Row>, Error>
|
||||
where
|
||||
T: ?Sized + ToStatement,
|
||||
{
|
||||
self.query_raw(statement, slice_iter(params))
|
||||
.await?
|
||||
.try_collect()
|
||||
@@ -227,15 +250,13 @@ impl Client {
|
||||
/// Panics if the number of parameters provided does not match the number expected.
|
||||
///
|
||||
/// [`query`]: #method.query
|
||||
pub async fn query_raw<'a, I>(
|
||||
&self,
|
||||
statement: Statement,
|
||||
params: I,
|
||||
) -> Result<RowStream, Error>
|
||||
pub async fn query_raw<'a, T, I>(&self, statement: &T, params: I) -> Result<RowStream, Error>
|
||||
where
|
||||
T: ?Sized + ToStatement,
|
||||
I: IntoIterator<Item = &'a (dyn ToSql + Sync)>,
|
||||
I::IntoIter: ExactSizeIterator,
|
||||
{
|
||||
let statement = statement.__convert().into_statement(self).await?;
|
||||
query::query(&self.inner, statement, params).await
|
||||
}
|
||||
|
||||
@@ -250,6 +271,55 @@ impl Client {
|
||||
query::query_txt(&self.inner, statement, params).await
|
||||
}
|
||||
|
||||
/// Executes a statement, returning the number of rows modified.
|
||||
///
|
||||
/// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
|
||||
/// provided, 1-indexed.
|
||||
///
|
||||
/// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
|
||||
/// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
|
||||
/// with the `prepare` method.
|
||||
///
|
||||
/// If the statement does not modify any rows (e.g. `SELECT`), 0 is returned.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if the number of parameters provided does not match the number expected.
|
||||
pub async fn execute<T>(
|
||||
&self,
|
||||
statement: &T,
|
||||
params: &[&(dyn ToSql + Sync)],
|
||||
) -> Result<u64, Error>
|
||||
where
|
||||
T: ?Sized + ToStatement,
|
||||
{
|
||||
self.execute_raw(statement, slice_iter(params)).await
|
||||
}
|
||||
|
||||
/// The maximally flexible version of [`execute`].
|
||||
///
|
||||
/// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
|
||||
/// provided, 1-indexed.
|
||||
///
|
||||
/// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
|
||||
/// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
|
||||
/// with the `prepare` method.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if the number of parameters provided does not match the number expected.
|
||||
///
|
||||
/// [`execute`]: #method.execute
|
||||
pub async fn execute_raw<'a, T, I>(&self, statement: &T, params: I) -> Result<u64, Error>
|
||||
where
|
||||
T: ?Sized + ToStatement,
|
||||
I: IntoIterator<Item = &'a (dyn ToSql + Sync)>,
|
||||
I::IntoIter: ExactSizeIterator,
|
||||
{
|
||||
let statement = statement.__convert().into_statement(self).await?;
|
||||
query::execute(self.inner(), statement, params).await
|
||||
}
|
||||
|
||||
/// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows.
|
||||
///
|
||||
/// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
#![allow(async_fn_in_trait)]
|
||||
|
||||
use crate::query::RowStream;
|
||||
use crate::types::Type;
|
||||
use crate::{Client, Error, Transaction};
|
||||
use async_trait::async_trait;
|
||||
use postgres_protocol2::Oid;
|
||||
|
||||
mod private {
|
||||
@@ -12,6 +11,7 @@ mod private {
|
||||
/// A trait allowing abstraction over connections and transactions.
|
||||
///
|
||||
/// This trait is "sealed", and cannot be implemented outside of this crate.
|
||||
#[async_trait]
|
||||
pub trait GenericClient: private::Sealed {
|
||||
/// Like `Client::query_raw_txt`.
|
||||
async fn query_raw_txt<S, I>(&self, statement: &str, params: I) -> Result<RowStream, Error>
|
||||
@@ -26,6 +26,7 @@ pub trait GenericClient: private::Sealed {
|
||||
|
||||
impl private::Sealed for Client {}
|
||||
|
||||
#[async_trait]
|
||||
impl GenericClient for Client {
|
||||
async fn query_raw_txt<S, I>(&self, statement: &str, params: I) -> Result<RowStream, Error>
|
||||
where
|
||||
@@ -38,12 +39,14 @@ impl GenericClient for Client {
|
||||
|
||||
/// Query for type information
|
||||
async fn get_type(&self, oid: Oid) -> Result<Type, Error> {
|
||||
crate::prepare::get_type(self.inner(), oid).await
|
||||
self.get_type(oid).await
|
||||
}
|
||||
}
|
||||
|
||||
impl private::Sealed for Transaction<'_> {}
|
||||
|
||||
#[async_trait]
|
||||
#[allow(clippy::needless_lifetimes)]
|
||||
impl GenericClient for Transaction<'_> {
|
||||
async fn query_raw_txt<S, I>(&self, statement: &str, params: I) -> Result<RowStream, Error>
|
||||
where
|
||||
|
||||
@@ -14,6 +14,7 @@ pub use crate::row::{Row, SimpleQueryRow};
|
||||
pub use crate::simple_query::SimpleQueryStream;
|
||||
pub use crate::statement::{Column, Statement};
|
||||
pub use crate::tls::NoTls;
|
||||
pub use crate::to_statement::ToStatement;
|
||||
pub use crate::transaction::Transaction;
|
||||
pub use crate::transaction_builder::{IsolationLevel, TransactionBuilder};
|
||||
use crate::types::ToSql;
|
||||
@@ -64,6 +65,7 @@ pub mod row;
|
||||
mod simple_query;
|
||||
mod statement;
|
||||
pub mod tls;
|
||||
mod to_statement;
|
||||
mod transaction;
|
||||
mod transaction_builder;
|
||||
pub mod types;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use crate::client::InnerClient;
|
||||
use crate::codec::FrontendMessage;
|
||||
use crate::connection::RequestMessages;
|
||||
use crate::error::SqlState;
|
||||
use crate::types::{Field, Kind, Oid, Type};
|
||||
use crate::{query, slice_iter};
|
||||
use crate::{Column, Error, Statement};
|
||||
@@ -12,6 +13,7 @@ use postgres_protocol2::message::backend::Message;
|
||||
use postgres_protocol2::message::frontend;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
pub(crate) const TYPEINFO_QUERY: &str = "\
|
||||
@@ -22,6 +24,14 @@ INNER JOIN pg_catalog.pg_namespace n ON t.typnamespace = n.oid
|
||||
WHERE t.oid = $1
|
||||
";
|
||||
|
||||
// Range types weren't added until Postgres 9.2, so pg_range may not exist
|
||||
const TYPEINFO_FALLBACK_QUERY: &str = "\
|
||||
SELECT t.typname, t.typtype, t.typelem, NULL::OID, t.typbasetype, n.nspname, t.typrelid
|
||||
FROM pg_catalog.pg_type t
|
||||
INNER JOIN pg_catalog.pg_namespace n ON t.typnamespace = n.oid
|
||||
WHERE t.oid = $1
|
||||
";
|
||||
|
||||
const TYPEINFO_ENUM_QUERY: &str = "\
|
||||
SELECT enumlabel
|
||||
FROM pg_catalog.pg_enum
|
||||
@@ -29,6 +39,14 @@ WHERE enumtypid = $1
|
||||
ORDER BY enumsortorder
|
||||
";
|
||||
|
||||
// Postgres 9.0 didn't have enumsortorder
|
||||
const TYPEINFO_ENUM_FALLBACK_QUERY: &str = "\
|
||||
SELECT enumlabel
|
||||
FROM pg_catalog.pg_enum
|
||||
WHERE enumtypid = $1
|
||||
ORDER BY oid
|
||||
";
|
||||
|
||||
pub(crate) const TYPEINFO_COMPOSITE_QUERY: &str = "\
|
||||
SELECT attname, atttypid
|
||||
FROM pg_catalog.pg_attribute
|
||||
@@ -38,13 +56,15 @@ AND attnum > 0
|
||||
ORDER BY attnum
|
||||
";
|
||||
|
||||
static NEXT_ID: AtomicUsize = AtomicUsize::new(0);
|
||||
|
||||
pub async fn prepare(
|
||||
client: &Arc<InnerClient>,
|
||||
name: &'static str,
|
||||
query: &str,
|
||||
types: &[Type],
|
||||
) -> Result<Statement, Error> {
|
||||
let buf = encode(client, name, query, types)?;
|
||||
let name = format!("s{}", NEXT_ID.fetch_add(1, Ordering::SeqCst));
|
||||
let buf = encode(client, &name, query, types)?;
|
||||
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
|
||||
|
||||
match responses.next().await? {
|
||||
@@ -85,11 +105,10 @@ pub async fn prepare(
|
||||
|
||||
fn prepare_rec<'a>(
|
||||
client: &'a Arc<InnerClient>,
|
||||
name: &'static str,
|
||||
query: &'a str,
|
||||
types: &'a [Type],
|
||||
) -> Pin<Box<dyn Future<Output = Result<Statement, Error>> + 'a + Send>> {
|
||||
Box::pin(prepare(client, name, query, types))
|
||||
Box::pin(prepare(client, query, types))
|
||||
}
|
||||
|
||||
fn encode(client: &InnerClient, name: &str, query: &str, types: &[Type]) -> Result<Bytes, Error> {
|
||||
@@ -173,8 +192,13 @@ async fn typeinfo_statement(client: &Arc<InnerClient>) -> Result<Statement, Erro
|
||||
return Ok(stmt);
|
||||
}
|
||||
|
||||
let typeinfo = "neon_proxy_typeinfo";
|
||||
let stmt = prepare_rec(client, typeinfo, TYPEINFO_QUERY, &[]).await?;
|
||||
let stmt = match prepare_rec(client, TYPEINFO_QUERY, &[]).await {
|
||||
Ok(stmt) => stmt,
|
||||
Err(ref e) if e.code() == Some(&SqlState::UNDEFINED_TABLE) => {
|
||||
prepare_rec(client, TYPEINFO_FALLBACK_QUERY, &[]).await?
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
|
||||
client.set_typeinfo(&stmt);
|
||||
Ok(stmt)
|
||||
@@ -195,8 +219,13 @@ async fn typeinfo_enum_statement(client: &Arc<InnerClient>) -> Result<Statement,
|
||||
return Ok(stmt);
|
||||
}
|
||||
|
||||
let typeinfo = "neon_proxy_typeinfo_enum";
|
||||
let stmt = prepare_rec(client, typeinfo, TYPEINFO_ENUM_QUERY, &[]).await?;
|
||||
let stmt = match prepare_rec(client, TYPEINFO_ENUM_QUERY, &[]).await {
|
||||
Ok(stmt) => stmt,
|
||||
Err(ref e) if e.code() == Some(&SqlState::UNDEFINED_COLUMN) => {
|
||||
prepare_rec(client, TYPEINFO_ENUM_FALLBACK_QUERY, &[]).await?
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
|
||||
client.set_typeinfo_enum(&stmt);
|
||||
Ok(stmt)
|
||||
@@ -226,8 +255,7 @@ async fn typeinfo_composite_statement(client: &Arc<InnerClient>) -> Result<State
|
||||
return Ok(stmt);
|
||||
}
|
||||
|
||||
let typeinfo = "neon_proxy_typeinfo_composite";
|
||||
let stmt = prepare_rec(client, typeinfo, TYPEINFO_COMPOSITE_QUERY, &[]).await?;
|
||||
let stmt = prepare_rec(client, TYPEINFO_COMPOSITE_QUERY, &[]).await?;
|
||||
|
||||
client.set_typeinfo_composite(&stmt);
|
||||
Ok(stmt)
|
||||
|
||||
@@ -157,6 +157,49 @@ where
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn execute<'a, I>(
|
||||
client: &InnerClient,
|
||||
statement: Statement,
|
||||
params: I,
|
||||
) -> Result<u64, Error>
|
||||
where
|
||||
I: IntoIterator<Item = &'a (dyn ToSql + Sync)>,
|
||||
I::IntoIter: ExactSizeIterator,
|
||||
{
|
||||
let buf = if log_enabled!(Level::Debug) {
|
||||
let params = params.into_iter().collect::<Vec<_>>();
|
||||
debug!(
|
||||
"executing statement {} with parameters: {:?}",
|
||||
statement.name(),
|
||||
BorrowToSqlParamsDebug(params.as_slice()),
|
||||
);
|
||||
encode(client, &statement, params)?
|
||||
} else {
|
||||
encode(client, &statement, params)?
|
||||
};
|
||||
let mut responses = start(client, buf).await?;
|
||||
|
||||
let mut rows = 0;
|
||||
loop {
|
||||
match responses.next().await? {
|
||||
Message::DataRow(_) => {}
|
||||
Message::CommandComplete(body) => {
|
||||
rows = body
|
||||
.tag()
|
||||
.map_err(Error::parse)?
|
||||
.rsplit(' ')
|
||||
.next()
|
||||
.unwrap()
|
||||
.parse()
|
||||
.unwrap_or(0);
|
||||
}
|
||||
Message::EmptyQueryResponse => rows = 0,
|
||||
Message::ReadyForQuery(_) => return Ok(rows),
|
||||
_ => return Err(Error::unexpected_message()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn start(client: &InnerClient, buf: Bytes) -> Result<Responses, Error> {
|
||||
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@ use std::{
|
||||
|
||||
struct StatementInner {
|
||||
client: Weak<InnerClient>,
|
||||
name: &'static str,
|
||||
name: String,
|
||||
params: Vec<Type>,
|
||||
columns: Vec<Column>,
|
||||
}
|
||||
@@ -22,7 +22,7 @@ impl Drop for StatementInner {
|
||||
fn drop(&mut self) {
|
||||
if let Some(client) = self.client.upgrade() {
|
||||
let buf = client.with_buf(|buf| {
|
||||
frontend::close(b'S', self.name, buf).unwrap();
|
||||
frontend::close(b'S', &self.name, buf).unwrap();
|
||||
frontend::sync(buf);
|
||||
buf.split().freeze()
|
||||
});
|
||||
@@ -40,7 +40,7 @@ pub struct Statement(Arc<StatementInner>);
|
||||
impl Statement {
|
||||
pub(crate) fn new(
|
||||
inner: &Arc<InnerClient>,
|
||||
name: &'static str,
|
||||
name: String,
|
||||
params: Vec<Type>,
|
||||
columns: Vec<Column>,
|
||||
) -> Statement {
|
||||
@@ -55,14 +55,14 @@ impl Statement {
|
||||
pub(crate) fn new_anonymous(params: Vec<Type>, columns: Vec<Column>) -> Statement {
|
||||
Statement(Arc::new(StatementInner {
|
||||
client: Weak::new(),
|
||||
name: "<anonymous>",
|
||||
name: String::new(),
|
||||
params,
|
||||
columns,
|
||||
}))
|
||||
}
|
||||
|
||||
pub(crate) fn name(&self) -> &str {
|
||||
self.0.name
|
||||
&self.0.name
|
||||
}
|
||||
|
||||
/// Returns the expected types of the statement's parameters.
|
||||
|
||||
57
libs/proxy/tokio-postgres2/src/to_statement.rs
Normal file
57
libs/proxy/tokio-postgres2/src/to_statement.rs
Normal file
@@ -0,0 +1,57 @@
|
||||
use crate::to_statement::private::{Sealed, ToStatementType};
|
||||
use crate::Statement;
|
||||
|
||||
mod private {
|
||||
use crate::{Client, Error, Statement};
|
||||
|
||||
pub trait Sealed {}
|
||||
|
||||
pub enum ToStatementType<'a> {
|
||||
Statement(&'a Statement),
|
||||
Query(&'a str),
|
||||
}
|
||||
|
||||
impl ToStatementType<'_> {
|
||||
pub async fn into_statement(self, client: &Client) -> Result<Statement, Error> {
|
||||
match self {
|
||||
ToStatementType::Statement(s) => Ok(s.clone()),
|
||||
ToStatementType::Query(s) => client.prepare(s).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A trait abstracting over prepared and unprepared statements.
|
||||
///
|
||||
/// Many methods are generic over this bound, so that they support both a raw query string as well as a statement which
|
||||
/// was prepared previously.
|
||||
///
|
||||
/// This trait is "sealed" and cannot be implemented by anything outside this crate.
|
||||
pub trait ToStatement: Sealed {
|
||||
#[doc(hidden)]
|
||||
fn __convert(&self) -> ToStatementType<'_>;
|
||||
}
|
||||
|
||||
impl ToStatement for Statement {
|
||||
fn __convert(&self) -> ToStatementType<'_> {
|
||||
ToStatementType::Statement(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl Sealed for Statement {}
|
||||
|
||||
impl ToStatement for str {
|
||||
fn __convert(&self) -> ToStatementType<'_> {
|
||||
ToStatementType::Query(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl Sealed for str {}
|
||||
|
||||
impl ToStatement for String {
|
||||
fn __convert(&self) -> ToStatementType<'_> {
|
||||
ToStatementType::Query(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl Sealed for String {}
|
||||
@@ -28,7 +28,7 @@ inferno.workspace = true
|
||||
fail.workspace = true
|
||||
futures = { workspace = true }
|
||||
jsonwebtoken.workspace = true
|
||||
nix = {workspace = true, features = [ "ioctl" ] }
|
||||
nix.workspace = true
|
||||
once_cell.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
regex.workspace = true
|
||||
|
||||
@@ -93,9 +93,6 @@ pub mod try_rcu;
|
||||
|
||||
pub mod guard_arc_swap;
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
pub mod linux_socket_ioctl;
|
||||
|
||||
// Re-export used in macro. Avoids adding git-version as dep in target crates.
|
||||
#[doc(hidden)]
|
||||
pub use git_version;
|
||||
|
||||
@@ -1,35 +0,0 @@
|
||||
//! Linux-specific socket ioctls.
|
||||
//!
|
||||
//! <https://elixir.bootlin.com/linux/v6.1.128/source/include/uapi/linux/sockios.h#L25-L27>
|
||||
|
||||
use std::{
|
||||
io,
|
||||
mem::MaybeUninit,
|
||||
os::{fd::RawFd, raw::c_int},
|
||||
};
|
||||
|
||||
use nix::libc::{FIONREAD, TIOCOUTQ};
|
||||
|
||||
unsafe fn do_ioctl(socket_fd: RawFd, cmd: nix::libc::Ioctl) -> io::Result<c_int> {
|
||||
let mut inq: MaybeUninit<c_int> = MaybeUninit::uninit();
|
||||
let err = nix::libc::ioctl(socket_fd, cmd, inq.as_mut_ptr());
|
||||
if err == 0 {
|
||||
Ok(inq.assume_init())
|
||||
} else {
|
||||
Err(io::Error::last_os_error())
|
||||
}
|
||||
}
|
||||
|
||||
/// # Safety
|
||||
///
|
||||
/// Caller must ensure that `socket_fd` is a valid TCP socket file descriptor.
|
||||
pub unsafe fn inq(socket_fd: RawFd) -> io::Result<c_int> {
|
||||
do_ioctl(socket_fd, FIONREAD)
|
||||
}
|
||||
|
||||
/// # Safety
|
||||
///
|
||||
/// Caller must ensure that `socket_fd` is a valid TCP socket file descriptor.
|
||||
pub unsafe fn outq(socket_fd: RawFd) -> io::Result<c_int> {
|
||||
do_ioctl(socket_fd, TIOCOUTQ)
|
||||
}
|
||||
@@ -13,7 +13,7 @@
|
||||
use anyhow::{anyhow, Context};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use fail::fail_point;
|
||||
use pageserver_api::key::{rel_block_to_key, Key};
|
||||
use pageserver_api::key::Key;
|
||||
use postgres_ffi::pg_constants;
|
||||
use std::fmt::Write as FmtWrite;
|
||||
use std::time::{Instant, SystemTime};
|
||||
@@ -501,9 +501,13 @@ where
|
||||
for blknum in startblk..endblk {
|
||||
let img = self
|
||||
.timeline
|
||||
// TODO: investigate using get_vectored for the entire startblk..endblk range.
|
||||
// But this code path is not on the critical path for most basebackups (?).
|
||||
.get(rel_block_to_key(src, blknum), self.lsn, self.ctx)
|
||||
.get_rel_page_at_lsn(
|
||||
src,
|
||||
blknum,
|
||||
Version::Lsn(self.lsn),
|
||||
self.ctx,
|
||||
self.io_concurrency.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Server(e.into()))?;
|
||||
segment_data.extend_from_slice(&img[..]);
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use std::collections::HashMap;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::os::fd::RawFd;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::{Arc, Mutex};
|
||||
@@ -130,7 +129,7 @@ pub(crate) static LAYERS_PER_READ: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
"Layers visited to serve a single read (read amplification). In a batch, all visited layers count towards every read.",
|
||||
&["tenant_id", "shard_id", "timeline_id"],
|
||||
// Low resolution to reduce cardinality.
|
||||
vec![4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0],
|
||||
vec![1.0, 5.0, 10.0, 25.0, 50.0, 100.0],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
@@ -1440,66 +1439,27 @@ impl Drop for SmgrOpTimer {
|
||||
}
|
||||
|
||||
impl SmgrOpFlushInProgress {
|
||||
/// The caller must guarantee that `socket_fd`` outlives this function.
|
||||
pub(crate) async fn measure<Fut, O>(
|
||||
self,
|
||||
started_at: Instant,
|
||||
mut fut: Fut,
|
||||
socket_fd: RawFd,
|
||||
) -> O
|
||||
pub(crate) async fn measure<Fut, O>(self, mut started_at: Instant, mut fut: Fut) -> O
|
||||
where
|
||||
Fut: std::future::Future<Output = O>,
|
||||
{
|
||||
let mut fut = std::pin::pin!(fut);
|
||||
|
||||
let mut logged = false;
|
||||
let mut last_counter_increment_at = started_at;
|
||||
// Whenever observe_guard gets called, or dropped,
|
||||
// it adds the time elapsed since its last call to metrics.
|
||||
// Last call is tracked in `now`.
|
||||
let mut observe_guard = scopeguard::guard(
|
||||
|is_timeout| {
|
||||
|| {
|
||||
let now = Instant::now();
|
||||
|
||||
// Increment counter
|
||||
{
|
||||
let elapsed_since_last_observe = now - last_counter_increment_at;
|
||||
self.global_micros
|
||||
.inc_by(u64::try_from(elapsed_since_last_observe.as_micros()).unwrap());
|
||||
self.per_timeline_micros
|
||||
.inc_by(u64::try_from(elapsed_since_last_observe.as_micros()).unwrap());
|
||||
last_counter_increment_at = now;
|
||||
}
|
||||
|
||||
// Log something on every timeout, and on completion but only if we hit a timeout.
|
||||
if is_timeout || logged {
|
||||
logged = true;
|
||||
let elapsed_total = now - started_at;
|
||||
let msg = if is_timeout {
|
||||
"slow flush ongoing"
|
||||
} else {
|
||||
"slow flush completed or cancelled"
|
||||
};
|
||||
|
||||
let (inq, outq) = {
|
||||
// SAFETY: caller guarantees that `socket_fd` outlives this function.
|
||||
#[cfg(target_os = "linux")]
|
||||
unsafe {
|
||||
(
|
||||
utils::linux_socket_ioctl::inq(socket_fd).unwrap_or(-2),
|
||||
utils::linux_socket_ioctl::outq(socket_fd).unwrap_or(-2),
|
||||
)
|
||||
}
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
{
|
||||
_ = socket_fd; // appease unused lint on macOS
|
||||
(-1, -1)
|
||||
}
|
||||
};
|
||||
|
||||
let elapsed_total_secs = format!("{:.6}", elapsed_total.as_secs_f64());
|
||||
tracing::info!(elapsed_total_secs, inq, outq, msg);
|
||||
}
|
||||
let elapsed = now - started_at;
|
||||
self.global_micros
|
||||
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
|
||||
self.per_timeline_micros
|
||||
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
|
||||
started_at = now;
|
||||
},
|
||||
|mut observe| {
|
||||
observe(false);
|
||||
observe();
|
||||
},
|
||||
);
|
||||
|
||||
@@ -1507,7 +1467,7 @@ impl SmgrOpFlushInProgress {
|
||||
match tokio::time::timeout(Duration::from_secs(10), &mut fut).await {
|
||||
Ok(v) => return v,
|
||||
Err(_timeout) => {
|
||||
(*observe_guard)(true);
|
||||
(*observe_guard)();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,7 +73,6 @@ use pageserver_api::models::PageTraceEvent;
|
||||
use pageserver_api::reltag::SlruKind;
|
||||
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use std::os::fd::AsRawFd;
|
||||
|
||||
/// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::Tenant`] which
|
||||
/// is not yet in state [`TenantState::Active`].
|
||||
@@ -237,7 +236,7 @@ pub async fn libpq_listener_main(
|
||||
|
||||
type ConnectionHandlerResult = anyhow::Result<()>;
|
||||
|
||||
#[instrument(skip_all, fields(peer_addr, application_name))]
|
||||
#[instrument(skip_all, fields(peer_addr))]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn page_service_conn_main(
|
||||
conf: &'static PageServerConf,
|
||||
@@ -258,8 +257,6 @@ async fn page_service_conn_main(
|
||||
.set_nodelay(true)
|
||||
.context("could not set TCP_NODELAY")?;
|
||||
|
||||
let socket_fd = socket.as_raw_fd();
|
||||
|
||||
let peer_addr = socket.peer_addr().context("get peer address")?;
|
||||
tracing::Span::current().record("peer_addr", field::display(peer_addr));
|
||||
|
||||
@@ -308,7 +305,7 @@ async fn page_service_conn_main(
|
||||
cancel.clone(),
|
||||
gate_guard,
|
||||
);
|
||||
let pgbackend = PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, None)?;
|
||||
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
|
||||
|
||||
match pgbackend.run(&mut conn_handler, &cancel).await {
|
||||
Ok(()) => {
|
||||
@@ -1289,15 +1286,12 @@ impl PageServerHandler {
|
||||
))?;
|
||||
|
||||
// what we want to do
|
||||
let socket_fd = pgb_writer.socket_fd;
|
||||
let flush_fut = pgb_writer.flush();
|
||||
// metric for how long flushing takes
|
||||
let flush_fut = match flushing_timer {
|
||||
Some(flushing_timer) => futures::future::Either::Left(flushing_timer.measure(
|
||||
Instant::now(),
|
||||
flush_fut,
|
||||
socket_fd,
|
||||
)),
|
||||
Some(flushing_timer) => {
|
||||
futures::future::Either::Left(flushing_timer.measure(Instant::now(), flush_fut))
|
||||
}
|
||||
None => futures::future::Either::Right(flush_fut),
|
||||
};
|
||||
// do it while respecting cancellation
|
||||
@@ -2463,16 +2457,9 @@ where
|
||||
fn startup(
|
||||
&mut self,
|
||||
_pgb: &mut PostgresBackend<IO>,
|
||||
sm: &FeStartupPacket,
|
||||
_sm: &FeStartupPacket,
|
||||
) -> Result<(), QueryError> {
|
||||
fail::fail_point!("ps::connection-start::startup-packet");
|
||||
|
||||
if let FeStartupPacket::StartupMessage { params, .. } = sm {
|
||||
if let Some(app_name) = params.get("application_name") {
|
||||
Span::current().record("application_name", field::display(app_name));
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -23,14 +23,13 @@ use anyhow::{ensure, Context};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use enum_map::Enum;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::key::{
|
||||
dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
|
||||
rel_tag_sparse_key_range, relmap_file_key, repl_origin_key, repl_origin_key_range,
|
||||
slru_block_to_key, slru_dir_to_key, slru_segment_key_range, slru_segment_size_to_key,
|
||||
twophase_file_key, twophase_key_range, CompactKey, RelDirExists, AUX_FILES_KEY, CHECKPOINT_KEY,
|
||||
CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
|
||||
relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
|
||||
slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
|
||||
CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
|
||||
};
|
||||
use pageserver_api::key::{rel_tag_sparse_key, Key};
|
||||
use pageserver_api::keyspace::SparseKeySpace;
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
|
||||
@@ -491,33 +490,12 @@ impl Timeline {
|
||||
if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
// Read path: first read the new reldir keyspace. Early return if the relation exists.
|
||||
// Otherwise, read the old reldir keyspace.
|
||||
// TODO: if IndexPart::rel_size_migration is `Migrated`, we only need to read from v2.
|
||||
|
||||
if self.get_rel_size_v2_enabled() {
|
||||
// fetch directory listing (new)
|
||||
let key = rel_tag_sparse_key(tag.spcnode, tag.dbnode, tag.relnode, tag.forknum);
|
||||
let buf = RelDirExists::decode_option(version.sparse_get(self, key, ctx).await?)
|
||||
.map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
|
||||
let exists_v2 = buf == RelDirExists::Exists;
|
||||
// Fast path: if the relation exists in the new format, return true.
|
||||
// TODO: we should have a verification mode that checks both keyspaces
|
||||
// to ensure the relation only exists in one of them.
|
||||
if exists_v2 {
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
|
||||
// fetch directory listing (old)
|
||||
|
||||
// fetch directory listing
|
||||
let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
|
||||
let buf = version.get(self, key, ctx).await?;
|
||||
|
||||
let dir = RelDirectory::des(&buf)?;
|
||||
let exists_v1 = dir.rels.contains(&(tag.relnode, tag.forknum));
|
||||
Ok(exists_v1)
|
||||
Ok(dir.rels.contains(&(tag.relnode, tag.forknum)))
|
||||
}
|
||||
|
||||
/// Get a list of all existing relations in given tablespace and database.
|
||||
@@ -535,12 +513,12 @@ impl Timeline {
|
||||
version: Version<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<HashSet<RelTag>, PageReconstructError> {
|
||||
// fetch directory listing (old)
|
||||
// fetch directory listing
|
||||
let key = rel_dir_to_key(spcnode, dbnode);
|
||||
let buf = version.get(self, key, ctx).await?;
|
||||
|
||||
let dir = RelDirectory::des(&buf)?;
|
||||
let rels_v1: HashSet<RelTag> =
|
||||
let rels: HashSet<RelTag> =
|
||||
HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
|
||||
spcnode,
|
||||
dbnode,
|
||||
@@ -548,46 +526,6 @@ impl Timeline {
|
||||
forknum: *forknum,
|
||||
}));
|
||||
|
||||
if !self.get_rel_size_v2_enabled() {
|
||||
return Ok(rels_v1);
|
||||
}
|
||||
|
||||
// scan directory listing (new), merge with the old results
|
||||
let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.gate
|
||||
.enter()
|
||||
.map_err(|_| PageReconstructError::Cancelled)?,
|
||||
);
|
||||
let results = self
|
||||
.scan(
|
||||
KeySpace::single(key_range),
|
||||
version.get_lsn(),
|
||||
ctx,
|
||||
io_concurrency,
|
||||
)
|
||||
.await?;
|
||||
let mut rels = rels_v1;
|
||||
for (key, val) in results {
|
||||
let val = RelDirExists::decode(&val?)
|
||||
.map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
|
||||
assert_eq!(key.field6, 1);
|
||||
assert_eq!(key.field2, spcnode);
|
||||
assert_eq!(key.field3, dbnode);
|
||||
let tag = RelTag {
|
||||
spcnode,
|
||||
dbnode,
|
||||
relnode: key.field4,
|
||||
forknum: key.field5,
|
||||
};
|
||||
if val == RelDirExists::Removed {
|
||||
debug_assert!(!rels.contains(&tag), "removed reltag in v2");
|
||||
continue;
|
||||
}
|
||||
let did_not_contain = rels.insert(tag);
|
||||
debug_assert!(did_not_contain, "duplicate reltag in v2");
|
||||
}
|
||||
Ok(rels)
|
||||
}
|
||||
|
||||
@@ -1206,11 +1144,7 @@ impl Timeline {
|
||||
|
||||
let dense_keyspace = result.to_keyspace();
|
||||
let sparse_keyspace = SparseKeySpace(KeySpace {
|
||||
ranges: vec![
|
||||
Key::metadata_aux_key_range(),
|
||||
repl_origin_key_range(),
|
||||
Key::rel_dir_sparse_key_range(),
|
||||
],
|
||||
ranges: vec![Key::metadata_aux_key_range(), repl_origin_key_range()],
|
||||
});
|
||||
|
||||
if cfg!(debug_assertions) {
|
||||
@@ -1340,22 +1274,12 @@ pub struct DatadirModification<'a> {
|
||||
|
||||
/// For special "directory" keys that store key-value maps, track the size of the map
|
||||
/// if it was updated in this modification.
|
||||
pending_directory_entries: Vec<(DirectoryKind, MetricsUpdate)>,
|
||||
pending_directory_entries: Vec<(DirectoryKind, usize)>,
|
||||
|
||||
/// An **approximation** of how many metadata bytes will be written to the EphemeralFile.
|
||||
pending_metadata_bytes: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum MetricsUpdate {
|
||||
/// Set the metrics to this value
|
||||
Set(u64),
|
||||
/// Increment the metrics by this value
|
||||
Add(u64),
|
||||
/// Decrement the metrics by this value
|
||||
Sub(u64),
|
||||
}
|
||||
|
||||
impl DatadirModification<'_> {
|
||||
// When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can
|
||||
// contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
|
||||
@@ -1435,8 +1359,7 @@ impl DatadirModification<'_> {
|
||||
let buf = DbDirectory::ser(&DbDirectory {
|
||||
dbdirs: HashMap::new(),
|
||||
})?;
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Db, MetricsUpdate::Set(0)));
|
||||
self.pending_directory_entries.push((DirectoryKind::Db, 0));
|
||||
self.put(DBDIR_KEY, Value::Image(buf.into()));
|
||||
|
||||
let buf = if self.tline.pg_version >= 17 {
|
||||
@@ -1449,7 +1372,7 @@ impl DatadirModification<'_> {
|
||||
})
|
||||
}?;
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::TwoPhase, MetricsUpdate::Set(0)));
|
||||
.push((DirectoryKind::TwoPhase, 0));
|
||||
self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
|
||||
|
||||
let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
|
||||
@@ -1459,23 +1382,17 @@ impl DatadirModification<'_> {
|
||||
// harmless but they'd just be dropped on later compaction.
|
||||
if self.tline.tenant_shard_id.is_shard_zero() {
|
||||
self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
|
||||
self.pending_directory_entries.push((
|
||||
DirectoryKind::SlruSegment(SlruKind::Clog),
|
||||
MetricsUpdate::Set(0),
|
||||
));
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
|
||||
self.put(
|
||||
slru_dir_to_key(SlruKind::MultiXactMembers),
|
||||
empty_dir.clone(),
|
||||
);
|
||||
self.pending_directory_entries.push((
|
||||
DirectoryKind::SlruSegment(SlruKind::Clog),
|
||||
MetricsUpdate::Set(0),
|
||||
));
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
|
||||
self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
|
||||
self.pending_directory_entries.push((
|
||||
DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets),
|
||||
MetricsUpdate::Set(0),
|
||||
));
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -1741,16 +1658,10 @@ impl DatadirModification<'_> {
|
||||
}
|
||||
if r.is_none() {
|
||||
// Create RelDirectory
|
||||
// TODO: if we have fully migrated to v2, no need to create this directory
|
||||
let buf = RelDirectory::ser(&RelDirectory {
|
||||
rels: HashSet::new(),
|
||||
})?;
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
|
||||
if self.tline.get_rel_size_v2_enabled() {
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
|
||||
}
|
||||
self.pending_directory_entries.push((DirectoryKind::Rel, 0));
|
||||
self.put(
|
||||
rel_dir_to_key(spcnode, dbnode),
|
||||
Value::Image(Bytes::from(buf)),
|
||||
@@ -1774,10 +1685,8 @@ impl DatadirModification<'_> {
|
||||
if !dir.xids.insert(xid) {
|
||||
anyhow::bail!("twophase file for xid {} already exists", xid);
|
||||
}
|
||||
self.pending_directory_entries.push((
|
||||
DirectoryKind::TwoPhase,
|
||||
MetricsUpdate::Set(dir.xids.len() as u64),
|
||||
));
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::TwoPhase, dir.xids.len()));
|
||||
Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
|
||||
} else {
|
||||
let xid = xid as u32;
|
||||
@@ -1785,10 +1694,8 @@ impl DatadirModification<'_> {
|
||||
if !dir.xids.insert(xid) {
|
||||
anyhow::bail!("twophase file for xid {} already exists", xid);
|
||||
}
|
||||
self.pending_directory_entries.push((
|
||||
DirectoryKind::TwoPhase,
|
||||
MetricsUpdate::Set(dir.xids.len() as u64),
|
||||
));
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::TwoPhase, dir.xids.len()));
|
||||
Bytes::from(TwoPhaseDirectory::ser(&dir)?)
|
||||
};
|
||||
self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
|
||||
@@ -1837,10 +1744,8 @@ impl DatadirModification<'_> {
|
||||
let mut dir = DbDirectory::des(&buf)?;
|
||||
if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
|
||||
let buf = DbDirectory::ser(&dir)?;
|
||||
self.pending_directory_entries.push((
|
||||
DirectoryKind::Db,
|
||||
MetricsUpdate::Set(dir.dbdirs.len() as u64),
|
||||
));
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Db, dir.dbdirs.len()));
|
||||
self.put(DBDIR_KEY, Value::Image(buf.into()));
|
||||
} else {
|
||||
warn!(
|
||||
@@ -1873,85 +1778,39 @@ impl DatadirModification<'_> {
|
||||
// tablespace. Create the reldir entry for it if so.
|
||||
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
|
||||
.context("deserialize db")?;
|
||||
|
||||
let dbdir_exists =
|
||||
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
|
||||
let mut rel_dir =
|
||||
if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
|
||||
// Didn't exist. Update dbdir
|
||||
e.insert(false);
|
||||
let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
|
||||
self.pending_directory_entries.push((
|
||||
DirectoryKind::Db,
|
||||
MetricsUpdate::Set(dbdir.dbdirs.len() as u64),
|
||||
));
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Db, dbdir.dbdirs.len()));
|
||||
self.put(DBDIR_KEY, Value::Image(buf.into()));
|
||||
false
|
||||
} else {
|
||||
true
|
||||
};
|
||||
|
||||
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
|
||||
let mut rel_dir = if !dbdir_exists {
|
||||
// Create the RelDirectory
|
||||
RelDirectory::default()
|
||||
} else {
|
||||
// reldir already exists, fetch it
|
||||
RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
|
||||
.context("deserialize db")?
|
||||
};
|
||||
// and create the RelDirectory
|
||||
RelDirectory::default()
|
||||
} else {
|
||||
// reldir already exists, fetch it
|
||||
RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
|
||||
.context("deserialize db")?
|
||||
};
|
||||
|
||||
// Add the new relation to the rel directory entry, and write it back
|
||||
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
|
||||
return Err(RelationError::AlreadyExists);
|
||||
}
|
||||
|
||||
if self.tline.get_rel_size_v2_enabled() {
|
||||
let sparse_rel_dir_key =
|
||||
rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
|
||||
// check if the rel_dir_key exists in v2
|
||||
let val = self
|
||||
.sparse_get(sparse_rel_dir_key, ctx)
|
||||
.await
|
||||
.map_err(|e| RelationError::Other(e.into()))?;
|
||||
let val = RelDirExists::decode_option(val)
|
||||
.map_err(|_| RelationError::Other(anyhow::anyhow!("invalid reldir key")))?;
|
||||
if val == RelDirExists::Exists {
|
||||
return Err(RelationError::AlreadyExists);
|
||||
}
|
||||
self.put(
|
||||
sparse_rel_dir_key,
|
||||
Value::Image(RelDirExists::Exists.encode()),
|
||||
);
|
||||
if !dbdir_exists {
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
|
||||
// We don't write `rel_dir_key -> rel_dir.rels` back to the storage in the v2 path unless it's the initial creation.
|
||||
// TODO: if we have fully migrated to v2, no need to create this directory. Otherwise, there
|
||||
// will be key not found errors if we don't create an empty one for rel_size_v2.
|
||||
self.put(
|
||||
rel_dir_key,
|
||||
Value::Image(Bytes::from(
|
||||
RelDirectory::ser(&RelDirectory::default()).context("serialize")?,
|
||||
)),
|
||||
);
|
||||
}
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::RelV2, MetricsUpdate::Add(1)));
|
||||
} else {
|
||||
if !dbdir_exists {
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Rel, MetricsUpdate::Set(0)))
|
||||
}
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Rel, MetricsUpdate::Add(1)));
|
||||
self.put(
|
||||
rel_dir_key,
|
||||
Value::Image(Bytes::from(
|
||||
RelDirectory::ser(&rel_dir).context("serialize")?,
|
||||
)),
|
||||
);
|
||||
}
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Rel, rel_dir.rels.len()));
|
||||
|
||||
self.put(
|
||||
rel_dir_key,
|
||||
Value::Image(Bytes::from(
|
||||
RelDirectory::ser(&rel_dir).context("serialize")?,
|
||||
)),
|
||||
);
|
||||
|
||||
// Put size
|
||||
let size_key = rel_size_to_key(rel);
|
||||
let buf = nblocks.to_le_bytes();
|
||||
@@ -2037,34 +1896,9 @@ impl DatadirModification<'_> {
|
||||
|
||||
let mut dirty = false;
|
||||
for rel_tag in rel_tags {
|
||||
let found = if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Rel, MetricsUpdate::Sub(1)));
|
||||
if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
|
||||
dirty = true;
|
||||
true
|
||||
} else if self.tline.get_rel_size_v2_enabled() {
|
||||
// The rel is not found in the old reldir key, so we need to check the new sparse keyspace.
|
||||
// Note that a relation can only exist in one of the two keyspaces (guaranteed by the ingestion
|
||||
// logic).
|
||||
let key =
|
||||
rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum);
|
||||
let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?)
|
||||
.map_err(|_| RelationError::Other(anyhow::anyhow!("invalid reldir key")))?;
|
||||
if val == RelDirExists::Exists {
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::RelV2, MetricsUpdate::Sub(1)));
|
||||
// put tombstone
|
||||
self.put(key, Value::Image(RelDirExists::Removed.encode()));
|
||||
// no need to set dirty to true
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
if found {
|
||||
// update logical size
|
||||
let size_key = rel_size_to_key(rel_tag);
|
||||
let old_size = self.get(size_key, ctx).await?.get_u32_le();
|
||||
@@ -2080,6 +1914,8 @@ impl DatadirModification<'_> {
|
||||
|
||||
if dirty {
|
||||
self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Rel, dir.rels.len()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2103,10 +1939,8 @@ impl DatadirModification<'_> {
|
||||
if !dir.segments.insert(segno) {
|
||||
anyhow::bail!("slru segment {kind:?}/{segno} already exists");
|
||||
}
|
||||
self.pending_directory_entries.push((
|
||||
DirectoryKind::SlruSegment(kind),
|
||||
MetricsUpdate::Set(dir.segments.len() as u64),
|
||||
));
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
|
||||
self.put(
|
||||
dir_key,
|
||||
Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
|
||||
@@ -2153,10 +1987,8 @@ impl DatadirModification<'_> {
|
||||
if !dir.segments.remove(&segno) {
|
||||
warn!("slru segment {:?}/{} does not exist", kind, segno);
|
||||
}
|
||||
self.pending_directory_entries.push((
|
||||
DirectoryKind::SlruSegment(kind),
|
||||
MetricsUpdate::Set(dir.segments.len() as u64),
|
||||
));
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
|
||||
self.put(
|
||||
dir_key,
|
||||
Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
|
||||
@@ -2188,10 +2020,8 @@ impl DatadirModification<'_> {
|
||||
if !dir.xids.remove(&xid) {
|
||||
warn!("twophase file for xid {} does not exist", xid);
|
||||
}
|
||||
self.pending_directory_entries.push((
|
||||
DirectoryKind::TwoPhase,
|
||||
MetricsUpdate::Set(dir.xids.len() as u64),
|
||||
));
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::TwoPhase, dir.xids.len()));
|
||||
Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
|
||||
} else {
|
||||
let xid: u32 = u32::try_from(xid)?;
|
||||
@@ -2200,10 +2030,8 @@ impl DatadirModification<'_> {
|
||||
if !dir.xids.remove(&xid) {
|
||||
warn!("twophase file for xid {} does not exist", xid);
|
||||
}
|
||||
self.pending_directory_entries.push((
|
||||
DirectoryKind::TwoPhase,
|
||||
MetricsUpdate::Set(dir.xids.len() as u64),
|
||||
));
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::TwoPhase, dir.xids.len()));
|
||||
Bytes::from(TwoPhaseDirectory::ser(&dir)?)
|
||||
};
|
||||
self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
|
||||
@@ -2319,7 +2147,7 @@ impl DatadirModification<'_> {
|
||||
}
|
||||
|
||||
for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
|
||||
writer.update_directory_entries_count(kind, count);
|
||||
writer.update_directory_entries_count(kind, count as u64);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -2405,7 +2233,7 @@ impl DatadirModification<'_> {
|
||||
}
|
||||
|
||||
for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
|
||||
writer.update_directory_entries_count(kind, count);
|
||||
writer.update_directory_entries_count(kind, count as u64);
|
||||
}
|
||||
|
||||
self.pending_metadata_bytes = 0;
|
||||
@@ -2469,22 +2297,6 @@ impl DatadirModification<'_> {
|
||||
self.tline.get(key, lsn, ctx).await
|
||||
}
|
||||
|
||||
/// Get a key from the sparse keyspace. Automatically converts the missing key error
|
||||
/// and the empty value into None.
|
||||
async fn sparse_get(
|
||||
&self,
|
||||
key: Key,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Option<Bytes>, PageReconstructError> {
|
||||
let val = self.get(key, ctx).await;
|
||||
match val {
|
||||
Ok(val) if val.is_empty() => Ok(None),
|
||||
Ok(val) => Ok(Some(val)),
|
||||
Err(PageReconstructError::MissingKey(_)) => Ok(None),
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
|
||||
fn put(&mut self, key: Key, val: Value) {
|
||||
if Self::is_data_key(&key) {
|
||||
self.put_data(key.to_compact(), val)
|
||||
@@ -2567,23 +2379,6 @@ impl Version<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a key from the sparse keyspace. Automatically converts the missing key error
|
||||
/// and the empty value into None.
|
||||
async fn sparse_get(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
key: Key,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Option<Bytes>, PageReconstructError> {
|
||||
let val = self.get(timeline, key, ctx).await;
|
||||
match val {
|
||||
Ok(val) if val.is_empty() => Ok(None),
|
||||
Ok(val) => Ok(Some(val)),
|
||||
Err(PageReconstructError::MissingKey(_)) => Ok(None),
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_lsn(&self) -> Lsn {
|
||||
match self {
|
||||
Version::Lsn(lsn) => *lsn,
|
||||
@@ -2643,7 +2438,6 @@ pub(crate) enum DirectoryKind {
|
||||
Rel,
|
||||
AuxFiles,
|
||||
SlruSegment(SlruKind),
|
||||
RelV2,
|
||||
}
|
||||
|
||||
impl DirectoryKind {
|
||||
|
||||
@@ -3924,13 +3924,6 @@ impl Tenant {
|
||||
.unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
|
||||
}
|
||||
|
||||
pub fn get_rel_size_v2_enabled(&self) -> bool {
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.rel_size_v2_enabled
|
||||
.unwrap_or(self.conf.default_tenant_conf.rel_size_v2_enabled)
|
||||
}
|
||||
|
||||
pub fn get_compaction_upper_limit(&self) -> usize {
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
@@ -5647,7 +5640,7 @@ pub(crate) mod harness {
|
||||
lsn_lease_length_for_ts: Some(tenant_conf.lsn_lease_length_for_ts),
|
||||
timeline_offloading: Some(tenant_conf.timeline_offloading),
|
||||
wal_receiver_protocol_override: tenant_conf.wal_receiver_protocol_override,
|
||||
rel_size_v2_enabled: Some(tenant_conf.rel_size_v2_enabled),
|
||||
rel_size_v2_enabled: tenant_conf.rel_size_v2_enabled,
|
||||
gc_compaction_enabled: Some(tenant_conf.gc_compaction_enabled),
|
||||
gc_compaction_initial_threshold_kb: Some(
|
||||
tenant_conf.gc_compaction_initial_threshold_kb,
|
||||
|
||||
@@ -485,9 +485,7 @@ impl TenantConfOpt {
|
||||
wal_receiver_protocol_override: self
|
||||
.wal_receiver_protocol_override
|
||||
.or(global_conf.wal_receiver_protocol_override),
|
||||
rel_size_v2_enabled: self
|
||||
.rel_size_v2_enabled
|
||||
.unwrap_or(global_conf.rel_size_v2_enabled),
|
||||
rel_size_v2_enabled: self.rel_size_v2_enabled.or(global_conf.rel_size_v2_enabled),
|
||||
gc_compaction_enabled: self
|
||||
.gc_compaction_enabled
|
||||
.unwrap_or(global_conf.gc_compaction_enabled),
|
||||
|
||||
@@ -117,7 +117,7 @@ use pageserver_api::config::tenant_conf_defaults::DEFAULT_PITR_INTERVAL;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
use crate::metrics::{TimelineMetrics, DELTAS_PER_READ_GLOBAL, LAYERS_PER_READ_GLOBAL};
|
||||
use crate::pgdatadir_mapping::{CalculateLogicalSizeError, MetricsUpdate};
|
||||
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
use pageserver_api::reltag::RelTag;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
@@ -327,7 +327,6 @@ pub struct Timeline {
|
||||
// in `crate::page_service` writes these metrics.
|
||||
pub(crate) query_metrics: crate::metrics::SmgrQueryTimePerTimeline,
|
||||
|
||||
directory_metrics_inited: [AtomicBool; DirectoryKind::KINDS_NUM],
|
||||
directory_metrics: [AtomicU64; DirectoryKind::KINDS_NUM],
|
||||
|
||||
/// Ensures layers aren't frozen by checkpointer between
|
||||
@@ -2356,14 +2355,6 @@ impl Timeline {
|
||||
.unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
|
||||
}
|
||||
|
||||
pub(crate) fn get_rel_size_v2_enabled(&self) -> bool {
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.rel_size_v2_enabled
|
||||
.unwrap_or(self.conf.default_tenant_conf.rel_size_v2_enabled)
|
||||
}
|
||||
|
||||
fn get_compaction_upper_limit(&self) -> usize {
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
@@ -2673,7 +2664,6 @@ impl Timeline {
|
||||
),
|
||||
|
||||
directory_metrics: array::from_fn(|_| AtomicU64::new(0)),
|
||||
directory_metrics_inited: array::from_fn(|_| AtomicBool::new(false)),
|
||||
|
||||
flush_loop_state: Mutex::new(FlushLoopState::NotStarted),
|
||||
|
||||
@@ -3440,42 +3430,8 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn update_directory_entries_count(&self, kind: DirectoryKind, count: MetricsUpdate) {
|
||||
// TODO: this directory metrics is not correct -- we could have multiple reldirs in the system
|
||||
// for each of the database, but we only store one value, and therefore each pgdirmodification
|
||||
// would overwrite the previous value if they modify different databases.
|
||||
|
||||
match count {
|
||||
MetricsUpdate::Set(count) => {
|
||||
self.directory_metrics[kind.offset()].store(count, AtomicOrdering::Relaxed);
|
||||
self.directory_metrics_inited[kind.offset()].store(true, AtomicOrdering::Relaxed);
|
||||
}
|
||||
MetricsUpdate::Add(count) => {
|
||||
// TODO: these operations are not atomic; but we only have one writer to the metrics, so
|
||||
// it's fine.
|
||||
if self.directory_metrics_inited[kind.offset()].load(AtomicOrdering::Relaxed) {
|
||||
// The metrics has been initialized with `MetricsUpdate::Set` before, so we can add/sub
|
||||
// the value reliably.
|
||||
self.directory_metrics[kind.offset()].fetch_add(count, AtomicOrdering::Relaxed);
|
||||
}
|
||||
// Otherwise, ignore this update
|
||||
}
|
||||
MetricsUpdate::Sub(count) => {
|
||||
// TODO: these operations are not atomic; but we only have one writer to the metrics, so
|
||||
// it's fine.
|
||||
if self.directory_metrics_inited[kind.offset()].load(AtomicOrdering::Relaxed) {
|
||||
// The metrics has been initialized with `MetricsUpdate::Set` before.
|
||||
// The operation could overflow so we need to normalize the value.
|
||||
let prev_val =
|
||||
self.directory_metrics[kind.offset()].load(AtomicOrdering::Relaxed);
|
||||
let res = prev_val.saturating_sub(count);
|
||||
self.directory_metrics[kind.offset()].store(res, AtomicOrdering::Relaxed);
|
||||
}
|
||||
// Otherwise, ignore this update
|
||||
}
|
||||
};
|
||||
|
||||
// TODO: remove this, there's no place in the code that updates this aux metrics.
|
||||
pub(crate) fn update_directory_entries_count(&self, kind: DirectoryKind, count: u64) {
|
||||
self.directory_metrics[kind.offset()].store(count, AtomicOrdering::Relaxed);
|
||||
let aux_metric =
|
||||
self.directory_metrics[DirectoryKind::AuxFiles.offset()].load(AtomicOrdering::Relaxed);
|
||||
|
||||
@@ -3693,9 +3649,7 @@ impl Timeline {
|
||||
// space. If that's not the case, we had at least one key encounter a gap in the image layer
|
||||
// and stop the search as a result of that.
|
||||
let mut removed = keyspace.remove_overlapping_with(&image_covered_keyspace);
|
||||
// Do not fire missing key error and end early for sparse keys. Note that we hava already removed
|
||||
// non-inherited keyspaces before, so we can safely do a full `SPARSE_RANGE` remove instead of
|
||||
// figuring out what is the inherited key range and do a fine-grained pruning.
|
||||
// Do not fire missing key error for sparse keys.
|
||||
removed.remove_overlapping_with(&KeySpace {
|
||||
ranges: vec![SPARSE_RANGE],
|
||||
});
|
||||
|
||||
@@ -7,9 +7,7 @@ use super::Timeline;
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::tenant::remote_timeline_client::ShutdownIfArchivedError;
|
||||
use crate::tenant::timeline::delete::{make_timeline_delete_guard, TimelineDeleteGuardKind};
|
||||
use crate::tenant::{
|
||||
DeleteTimelineError, OffloadedTimeline, Tenant, TenantManifestError, TimelineOrOffloaded,
|
||||
};
|
||||
use crate::tenant::{OffloadedTimeline, Tenant, TenantManifestError, TimelineOrOffloaded};
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum OffloadError {
|
||||
@@ -39,25 +37,12 @@ pub(crate) async fn offload_timeline(
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
tracing::info!("offloading archived timeline");
|
||||
|
||||
let delete_guard_res = make_timeline_delete_guard(
|
||||
let (timeline, guard) = make_timeline_delete_guard(
|
||||
tenant,
|
||||
timeline.timeline_id,
|
||||
TimelineDeleteGuardKind::Offload,
|
||||
);
|
||||
if let Err(DeleteTimelineError::HasChildren(children)) = delete_guard_res {
|
||||
let is_archived = timeline.is_archived();
|
||||
if is_archived == Some(true) {
|
||||
tracing::error!("timeline is archived but has non-archived children: {children:?}");
|
||||
return Err(OffloadError::NotArchived);
|
||||
}
|
||||
tracing::info!(
|
||||
?is_archived,
|
||||
"timeline is not archived and has unarchived children"
|
||||
);
|
||||
return Err(OffloadError::NotArchived);
|
||||
};
|
||||
let (timeline, guard) =
|
||||
delete_guard_res.map_err(|e| OffloadError::Other(anyhow::anyhow!(e)))?;
|
||||
)
|
||||
.map_err(|e| OffloadError::Other(anyhow::anyhow!(e)))?;
|
||||
|
||||
let TimelineOrOffloaded::Timeline(timeline) = timeline else {
|
||||
tracing::error!("timeline already offloaded, but given timeline object");
|
||||
|
||||
@@ -18,8 +18,6 @@
|
||||
#include "neon_utils.h"
|
||||
|
||||
static int extension_server_port = 0;
|
||||
static int extension_server_request_timeout = 60;
|
||||
static int extension_server_connect_timeout = 60;
|
||||
|
||||
static download_extension_file_hook_type prev_download_extension_file_hook = NULL;
|
||||
|
||||
@@ -36,18 +34,19 @@ static download_extension_file_hook_type prev_download_extension_file_hook = NUL
|
||||
static bool
|
||||
neon_download_extension_file_http(const char *filename, bool is_library)
|
||||
{
|
||||
static CURL *handle = NULL;
|
||||
|
||||
CURLcode res;
|
||||
bool ret = false;
|
||||
CURL *handle = NULL;
|
||||
char *compute_ctl_url;
|
||||
bool ret = false;
|
||||
|
||||
handle = alloc_curl_handle();
|
||||
if (handle == NULL)
|
||||
{
|
||||
handle = alloc_curl_handle();
|
||||
|
||||
curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, "POST");
|
||||
if (extension_server_request_timeout > 0)
|
||||
curl_easy_setopt(handle, CURLOPT_TIMEOUT, (long)extension_server_request_timeout /* seconds */ );
|
||||
if (extension_server_connect_timeout > 0)
|
||||
curl_easy_setopt(handle, CURLOPT_CONNECTTIMEOUT, (long)extension_server_connect_timeout /* seconds */ );
|
||||
curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, "POST");
|
||||
curl_easy_setopt(handle, CURLOPT_TIMEOUT, 60L /* seconds */ );
|
||||
}
|
||||
|
||||
compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s%s",
|
||||
extension_server_port, filename, is_library ? "?is_library=true" : "");
|
||||
@@ -58,8 +57,6 @@ neon_download_extension_file_http(const char *filename, bool is_library)
|
||||
|
||||
/* Perform the request, res will get the return code */
|
||||
res = curl_easy_perform(handle);
|
||||
curl_easy_cleanup(handle);
|
||||
|
||||
/* Check for errors */
|
||||
if (res == CURLE_OK)
|
||||
{
|
||||
@@ -91,24 +88,6 @@ pg_init_extension_server()
|
||||
0, /* no flags required */
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable("neon.extension_server_request_timeout",
|
||||
"timeout for fetching extensions in seconds",
|
||||
NULL,
|
||||
&extension_server_request_timeout,
|
||||
60, 0, INT_MAX,
|
||||
PGC_SUSET,
|
||||
GUC_UNIT_S,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable("neon.extension_server_connect_timeout",
|
||||
"timeout for connecting to the extension server in seconds",
|
||||
NULL,
|
||||
&extension_server_connect_timeout,
|
||||
60, 0, INT_MAX,
|
||||
PGC_SUSET,
|
||||
GUC_UNIT_S,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
/* set download_extension_file_hook */
|
||||
prev_download_extension_file_hook = download_extension_file_hook;
|
||||
download_extension_file_hook = neon_download_extension_file_http;
|
||||
|
||||
@@ -122,8 +122,8 @@ addSHLL(HyperLogLogState *cState, uint32 hash)
|
||||
index = hash >> HLL_C_BITS;
|
||||
|
||||
/* Compute the rank of the remaining 32 - "k" (registerWidth) bits */
|
||||
count = rho(hash << HLL_BIT_WIDTH, HLL_C_BITS) - 1;
|
||||
Assert(count <= HLL_C_BITS);
|
||||
count = rho(hash << HLL_BIT_WIDTH, HLL_C_BITS);
|
||||
|
||||
cState->regs[index][count] = now;
|
||||
}
|
||||
|
||||
@@ -136,7 +136,7 @@ getMaximum(const TimestampTz* reg, TimestampTz since)
|
||||
{
|
||||
if (reg[i] >= since)
|
||||
{
|
||||
max = i + 1;
|
||||
max = i;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -378,9 +378,8 @@ pageserver_connect(shardno_t shard_no, int elevel)
|
||||
{
|
||||
case PS_Disconnected:
|
||||
{
|
||||
const char *keywords[4];
|
||||
const char *values[4];
|
||||
char pid_str[16];
|
||||
const char *keywords[3];
|
||||
const char *values[3];
|
||||
int n_pgsql_params;
|
||||
TimestampTz now;
|
||||
int64 us_since_last_attempt;
|
||||
@@ -425,30 +424,14 @@ pageserver_connect(shardno_t shard_no, int elevel)
|
||||
* can override the password from the env variable. Seems useful, although
|
||||
* we don't currently use that capability anywhere.
|
||||
*/
|
||||
n_pgsql_params = 0;
|
||||
|
||||
/*
|
||||
* Pageserver logs include this in the connection's tracing span.
|
||||
* This allows for reasier log correlation between compute and pageserver.
|
||||
*/
|
||||
keywords[n_pgsql_params] = "application_name";
|
||||
{
|
||||
int ret = snprintf(pid_str, sizeof(pid_str), "%d", MyProcPid);
|
||||
if (ret < 0 || ret >= (int)(sizeof(pid_str)))
|
||||
elog(FATAL, "stack-allocated buffer too small to hold pid");
|
||||
}
|
||||
/* lifetime: PQconnectStartParams strdups internally */
|
||||
values[n_pgsql_params] = (const char*) pid_str;
|
||||
n_pgsql_params++;
|
||||
|
||||
keywords[n_pgsql_params] = "dbname";
|
||||
values[n_pgsql_params] = connstr;
|
||||
n_pgsql_params++;
|
||||
keywords[0] = "dbname";
|
||||
values[0] = connstr;
|
||||
n_pgsql_params = 1;
|
||||
|
||||
if (neon_auth_token)
|
||||
{
|
||||
keywords[n_pgsql_params] = "password";
|
||||
values[n_pgsql_params] = neon_auth_token;
|
||||
keywords[1] = "password";
|
||||
values[1] = neon_auth_token;
|
||||
n_pgsql_params++;
|
||||
}
|
||||
|
||||
|
||||
25
poetry.lock
generated
25
poetry.lock
generated
@@ -412,7 +412,6 @@ files = [
|
||||
|
||||
[package.dependencies]
|
||||
botocore-stubs = "*"
|
||||
mypy-boto3-kms = {version = ">=1.26.0,<1.27.0", optional = true, markers = "extra == \"kms\""}
|
||||
mypy-boto3-s3 = {version = ">=1.26.0,<1.27.0", optional = true, markers = "extra == \"s3\""}
|
||||
types-s3transfer = "*"
|
||||
typing-extensions = ">=4.1.0"
|
||||
@@ -2023,18 +2022,6 @@ install-types = ["pip"]
|
||||
mypyc = ["setuptools (>=50)"]
|
||||
reports = ["lxml"]
|
||||
|
||||
[[package]]
|
||||
name = "mypy-boto3-kms"
|
||||
version = "1.26.147"
|
||||
description = "Type annotations for boto3.KMS 1.26.147 service generated with mypy-boto3-builder 7.14.5"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "mypy-boto3-kms-1.26.147.tar.gz", hash = "sha256:816a4d1bb0585e1b9620a3f96c1d69a06f53b7b5621858579dd77c60dbb5fa5c"},
|
||||
{file = "mypy_boto3_kms-1.26.147-py3-none-any.whl", hash = "sha256:493f0db674a25c88769f5cb8ab8ac00d3dda5dfc903d5cda34c990ee64689f79"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mypy-boto3-s3"
|
||||
version = "1.26.0.post1"
|
||||
@@ -2771,18 +2758,18 @@ pytest = ">=5,<8"
|
||||
|
||||
[[package]]
|
||||
name = "pytest-timeout"
|
||||
version = "2.3.1"
|
||||
version = "2.1.0"
|
||||
description = "pytest plugin to abort hanging tests"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
python-versions = ">=3.6"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "pytest-timeout-2.3.1.tar.gz", hash = "sha256:12397729125c6ecbdaca01035b9e5239d4db97352320af155b3f5de1ba5165d9"},
|
||||
{file = "pytest_timeout-2.3.1-py3-none-any.whl", hash = "sha256:68188cb703edfc6a18fad98dc25a3c61e9f24d644b0b70f33af545219fc7813e"},
|
||||
{file = "pytest-timeout-2.1.0.tar.gz", hash = "sha256:c07ca07404c612f8abbe22294b23c368e2e5104b521c1790195561f37e1ac3d9"},
|
||||
{file = "pytest_timeout-2.1.0-py3-none-any.whl", hash = "sha256:f6f50101443ce70ad325ceb4473c4255e9d74e3c7cd0ef827309dfa4c0d975c6"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
pytest = ">=7.0.0"
|
||||
pytest = ">=5.0.0"
|
||||
|
||||
[[package]]
|
||||
name = "pytest-xdist"
|
||||
@@ -3820,4 +3807,4 @@ cffi = ["cffi (>=1.11)"]
|
||||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = "^3.11"
|
||||
content-hash = "00ddc42c32e235b6171845fc066dcab078282ed832cd464d5e8a0afa959dd04a"
|
||||
content-hash = "4dc3165fe22c0e0f7a030ea0f8a680ae2ff74561d8658c393abbe9112caaf5d7"
|
||||
|
||||
@@ -19,6 +19,7 @@ aws-config.workspace = true
|
||||
aws-sdk-iam.workspace = true
|
||||
aws-sigv4.workspace = true
|
||||
base64.workspace = true
|
||||
boxcar = "0.2.8"
|
||||
bstr.workspace = true
|
||||
bytes = { workspace = true, features = ["serde"] }
|
||||
camino.workspace = true
|
||||
@@ -62,6 +63,7 @@ postgres_backend.workspace = true
|
||||
postgres-client = { package = "tokio-postgres2", path = "../libs/proxy/tokio-postgres2" }
|
||||
postgres-protocol = { package = "postgres-protocol2", path = "../libs/proxy/postgres-protocol2" }
|
||||
pq_proto.workspace = true
|
||||
prometheus.workspace = true
|
||||
rand.workspace = true
|
||||
regex.workspace = true
|
||||
remote_storage = { version = "0.1", path = "../libs/remote_storage/" }
|
||||
@@ -79,6 +81,7 @@ sha2 = { workspace = true, features = ["asm", "oid"] }
|
||||
smol_str.workspace = true
|
||||
smallvec.workspace = true
|
||||
socket2.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
subtle.workspace = true
|
||||
thiserror.workspace = true
|
||||
@@ -92,6 +95,7 @@ tracing-subscriber.workspace = true
|
||||
tracing-utils.workspace = true
|
||||
tracing.workspace = true
|
||||
tracing-log.workspace = true
|
||||
tracing-serde.workspace = true
|
||||
tracing-opentelemetry.workspace = true
|
||||
try-lock.workspace = true
|
||||
typed-json.workspace = true
|
||||
|
||||
@@ -140,8 +140,9 @@ async fn authenticate(
|
||||
let (psql_session_id, waiter) = loop {
|
||||
let psql_session_id = new_psql_session_id();
|
||||
|
||||
if let Ok(waiter) = control_plane::mgmt::get_waiter(&psql_session_id) {
|
||||
break (psql_session_id, waiter);
|
||||
match control_plane::mgmt::get_waiter(&psql_session_id) {
|
||||
Ok(waiter) => break (psql_session_id, waiter),
|
||||
Err(_e) => continue,
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -220,11 +220,11 @@ async fn fetch_jwks(
|
||||
}
|
||||
|
||||
impl JwkCacheEntryLock {
|
||||
async fn acquire_permit(self: &Arc<Self>) -> JwkRenewalPermit<'_> {
|
||||
async fn acquire_permit<'a>(self: &'a Arc<Self>) -> JwkRenewalPermit<'a> {
|
||||
JwkRenewalPermit::acquire_permit(self).await
|
||||
}
|
||||
|
||||
fn try_acquire_permit(self: &Arc<Self>) -> Option<JwkRenewalPermit<'_>> {
|
||||
fn try_acquire_permit<'a>(self: &'a Arc<Self>) -> Option<JwkRenewalPermit<'a>> {
|
||||
JwkRenewalPermit::try_acquire_permit(self)
|
||||
}
|
||||
|
||||
@@ -393,7 +393,7 @@ impl JwkCacheEntryLock {
|
||||
verify_rsa_signature(header_payload.as_bytes(), &sig, key, &header.algorithm)?;
|
||||
}
|
||||
key => return Err(JwtError::UnsupportedKeyType(key.into())),
|
||||
}
|
||||
};
|
||||
|
||||
tracing::debug!(?payload, "JWT signature valid with claims");
|
||||
|
||||
@@ -510,7 +510,7 @@ fn verify_rsa_signature(
|
||||
key.verify(data, &sig)?;
|
||||
}
|
||||
_ => return Err(JwtError::InvalidRsaSigningAlgorithm),
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -4,20 +4,6 @@ use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{bail, ensure, Context};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use clap::Parser;
|
||||
use compute_api::spec::LocalProxySpec;
|
||||
use futures::future::Either;
|
||||
use thiserror::Error;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::Notify;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, warn};
|
||||
use utils::sentry_init::init_sentry;
|
||||
use utils::{pid_file, project_build_tag, project_git_version};
|
||||
|
||||
use crate::auth::backend::jwt::JwkCache;
|
||||
use crate::auth::backend::local::{LocalBackend, JWKS_ROLE_MAP};
|
||||
use crate::auth::{self};
|
||||
@@ -39,10 +25,24 @@ use crate::serverless::{self, GlobalConnPoolOptions};
|
||||
use crate::tls::client_config::compute_client_config_with_root_certs;
|
||||
use crate::types::RoleName;
|
||||
use crate::url::ApiUrl;
|
||||
use anyhow::{bail, ensure, Context};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use compute_api::spec::LocalProxySpec;
|
||||
use futures::future::Either;
|
||||
|
||||
project_git_version!(GIT_VERSION);
|
||||
project_build_tag!(BUILD_TAG);
|
||||
|
||||
use clap::Parser;
|
||||
use thiserror::Error;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::Notify;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, warn};
|
||||
use utils::sentry_init::init_sentry;
|
||||
use utils::{pid_file, project_build_tag, project_git_version};
|
||||
|
||||
/// Neon proxy/router
|
||||
#[derive(Parser)]
|
||||
#[command(version = GIT_VERSION, about)]
|
||||
|
||||
@@ -5,6 +5,12 @@
|
||||
/// the outside. Similar to an ingress controller for HTTPS.
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::metrics::{Metrics, ThreadPoolMetrics};
|
||||
use crate::protocol2::ConnectionInfo;
|
||||
use crate::proxy::{copy_bidirectional_client_compute, run_until_cancelled, ErrorSource};
|
||||
use crate::stream::{PqStream, Stream};
|
||||
use crate::tls::TlsServerEndPoint;
|
||||
use anyhow::{anyhow, bail, ensure, Context};
|
||||
use clap::Arg;
|
||||
use futures::future::Either;
|
||||
@@ -19,13 +25,6 @@ use tracing::{error, info, Instrument};
|
||||
use utils::project_git_version;
|
||||
use utils::sentry_init::init_sentry;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::metrics::{Metrics, ThreadPoolMetrics};
|
||||
use crate::protocol2::ConnectionInfo;
|
||||
use crate::proxy::{copy_bidirectional_client_compute, run_until_cancelled, ErrorSource};
|
||||
use crate::stream::{PqStream, Stream};
|
||||
use crate::tls::TlsServerEndPoint;
|
||||
|
||||
project_git_version!(GIT_VERSION);
|
||||
|
||||
fn cli() -> clap::Command {
|
||||
|
||||
@@ -3,16 +3,6 @@ use std::pin::pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::bail;
|
||||
use futures::future::Either;
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info, warn, Instrument};
|
||||
use utils::sentry_init::init_sentry;
|
||||
use utils::{project_build_tag, project_git_version};
|
||||
|
||||
use crate::auth::backend::jwt::JwkCache;
|
||||
use crate::auth::backend::{AuthRateLimiter, ConsoleRedirectBackend, MaybeOwned};
|
||||
use crate::cancellation::{handle_cancel_messages, CancellationHandler};
|
||||
@@ -34,6 +24,15 @@ use crate::serverless::cancel_set::CancelSet;
|
||||
use crate::serverless::GlobalConnPoolOptions;
|
||||
use crate::tls::client_config::compute_client_config_with_root_certs;
|
||||
use crate::{auth, control_plane, http, serverless, usage_metrics};
|
||||
use anyhow::bail;
|
||||
use futures::future::Either;
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info, warn, Instrument};
|
||||
use utils::sentry_init::init_sentry;
|
||||
use utils::{project_build_tag, project_git_version};
|
||||
|
||||
project_git_version!(GIT_VERSION);
|
||||
project_build_tag!(BUILD_TAG);
|
||||
@@ -304,7 +303,7 @@ pub async fn run() -> anyhow::Result<()> {
|
||||
match auth_backend {
|
||||
Either::Left(auth_backend) => info!("Authentication backend: {auth_backend}"),
|
||||
Either::Right(auth_backend) => info!("Authentication backend: {auth_backend:?}"),
|
||||
}
|
||||
};
|
||||
info!("Using region: {}", args.aws_region);
|
||||
|
||||
// TODO: untangle the config args
|
||||
@@ -804,9 +803,8 @@ fn build_auth_backend(
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use clap::Parser;
|
||||
|
||||
use crate::rate_limiter::RateBucketInfo;
|
||||
use clap::Parser;
|
||||
|
||||
#[test]
|
||||
fn parse_endpoint_rps_limit() {
|
||||
|
||||
2
proxy/src/cache/endpoints.rs
vendored
2
proxy/src/cache/endpoints.rs
vendored
@@ -242,7 +242,7 @@ impl EndpointsCache {
|
||||
});
|
||||
tracing::error!("error parsing value {value:?}: {err:?}");
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
if total.is_power_of_two() {
|
||||
tracing::debug!("endpoints read {}", total);
|
||||
|
||||
@@ -137,8 +137,8 @@ impl ConnCfg {
|
||||
match k {
|
||||
// Only set `user` if it's not present in the config.
|
||||
// Console redirect auth flow takes username from the console's response.
|
||||
"user" if self.user_is_set() => {}
|
||||
"database" if self.db_is_set() => {}
|
||||
"user" if self.user_is_set() => continue,
|
||||
"database" if self.db_is_set() => continue,
|
||||
"options" => {
|
||||
if let Some(options) = filtered_options(v) {
|
||||
self.set_param(k, &options);
|
||||
|
||||
@@ -82,7 +82,7 @@ pub async fn task_main(
|
||||
error!("per-client task finished with an error: failed to set socket option: {e:#}");
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let ctx = RequestContext::new(
|
||||
session_id,
|
||||
|
||||
@@ -19,7 +19,8 @@ use crate::cache::{Cached, TimedLru};
|
||||
use crate::config::ComputeConfig;
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::messages::{ControlPlaneErrorMessage, MetricsAuxInfo};
|
||||
use crate::intern::{AccountIdInt, ProjectIdInt};
|
||||
use crate::intern::AccountIdInt;
|
||||
use crate::intern::ProjectIdInt;
|
||||
use crate::types::{EndpointCacheKey, EndpointId};
|
||||
use crate::{compute, scram};
|
||||
|
||||
|
||||
@@ -7,8 +7,9 @@ use chrono::{DateTime, Utc};
|
||||
use opentelemetry::trace::TraceContextExt;
|
||||
use scopeguard::defer;
|
||||
use serde::ser::{SerializeMap, Serializer};
|
||||
use tracing::span;
|
||||
use tracing::subscriber::Interest;
|
||||
use tracing::{callsite, span, Event, Metadata, Span, Subscriber};
|
||||
use tracing::{callsite, Event, Metadata, Span, Subscriber};
|
||||
use tracing_opentelemetry::OpenTelemetrySpanExt;
|
||||
use tracing_subscriber::filter::{EnvFilter, LevelFilter};
|
||||
use tracing_subscriber::fmt::format::{Format, Full};
|
||||
|
||||
@@ -119,7 +119,7 @@ pub(crate) async fn read_proxy_protocol<T: AsyncRead + Unpin>(
|
||||
// if no more bytes available then exit
|
||||
if bytes_read == 0 {
|
||||
return Ok((ChainRW { inner: read, buf }, ConnectHeader::Missing));
|
||||
}
|
||||
};
|
||||
|
||||
// check if we have enough bytes to continue
|
||||
if let Some(header) = buf.try_get::<ProxyProtocolV2Header>() {
|
||||
@@ -169,7 +169,7 @@ fn process_proxy_payload(
|
||||
header.version_and_command
|
||||
),
|
||||
)),
|
||||
}
|
||||
};
|
||||
|
||||
let size_err =
|
||||
"invalid proxy protocol length. payload not large enough to fit requested IP addresses";
|
||||
|
||||
@@ -198,7 +198,7 @@ where
|
||||
|
||||
warn!(error = ?e, num_retries, retriable = true, COULD_NOT_CONNECT);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let wait_duration = retry_after(num_retries, compute.retry);
|
||||
num_retries += 1;
|
||||
|
||||
@@ -118,7 +118,7 @@ pub async fn task_main(
|
||||
error!("per-client task finished with an error: failed to set socket option: {e:#}");
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let ctx = RequestContext::new(
|
||||
session_id,
|
||||
|
||||
@@ -169,7 +169,7 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
|
||||
});
|
||||
tracing::error!("broken message: {e}");
|
||||
}
|
||||
}
|
||||
};
|
||||
return Ok(());
|
||||
}
|
||||
Ok(msg) => msg,
|
||||
@@ -180,7 +180,7 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
|
||||
match serde_json::from_str::<NotificationHeader>(&payload) {
|
||||
Ok(header) => tracing::error!(topic = header.topic, "broken message: {e}"),
|
||||
Err(_) => tracing::error!("broken message: {e}"),
|
||||
}
|
||||
};
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
@@ -372,7 +372,7 @@ impl PoolingBackend {
|
||||
debug!("setting up backend session state");
|
||||
|
||||
// initiates the auth session
|
||||
if let Err(e) = client.batch_execute("select auth.init();").await {
|
||||
if let Err(e) = client.execute("select auth.init()", &[]).await {
|
||||
discard.discard();
|
||||
return Err(e.into());
|
||||
}
|
||||
@@ -651,7 +651,7 @@ async fn connect_http2(
|
||||
e,
|
||||
)));
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
let (client, connection) = hyper::client::conn::http2::Builder::new(TokioExecutor::new())
|
||||
|
||||
@@ -23,6 +23,7 @@ use indexmap::IndexMap;
|
||||
use jose_jwk::jose_b64::base64ct::{Base64UrlUnpadded, Encoding};
|
||||
use parking_lot::RwLock;
|
||||
use postgres_client::tls::NoTlsStream;
|
||||
use postgres_client::types::ToSql;
|
||||
use postgres_client::AsyncMessage;
|
||||
use serde_json::value::RawValue;
|
||||
use tokio::net::TcpStream;
|
||||
@@ -280,9 +281,13 @@ impl ClientInnerCommon<postgres_client::Client> {
|
||||
let token = resign_jwt(&local_data.key, payload, local_data.jti)?;
|
||||
|
||||
// initiates the auth session
|
||||
// this is safe from query injections as the jwt format free of any escape characters.
|
||||
let query = format!("discard all; select auth.jwt_session_init('{token}')");
|
||||
self.inner.batch_execute(&query).await?;
|
||||
self.inner.batch_execute("discard all").await?;
|
||||
self.inner
|
||||
.execute(
|
||||
"select auth.jwt_session_init($1)",
|
||||
&[&&*token as &(dyn ToSql + Sync)],
|
||||
)
|
||||
.await?;
|
||||
|
||||
let pid = self.inner.get_process_id();
|
||||
info!(pid, jti = local_data.jti, "user session state init");
|
||||
|
||||
@@ -17,12 +17,12 @@ Jinja2 = "^3.1.5"
|
||||
types-requests = "^2.31.0.0"
|
||||
types-psycopg2 = "^2.9.21.20241019"
|
||||
boto3 = "^1.34.11"
|
||||
boto3-stubs = {extras = ["s3", "kms"], version = "^1.26.16"}
|
||||
boto3-stubs = {extras = ["s3"], version = "^1.26.16"}
|
||||
moto = {extras = ["server"], version = "^5.0.6"}
|
||||
backoff = "^2.2.1"
|
||||
pytest-lazy-fixture = "^0.6.3"
|
||||
prometheus-client = "^0.14.1"
|
||||
pytest-timeout = "^2.3.1"
|
||||
pytest-timeout = "^2.1.0"
|
||||
Werkzeug = "^3.0.6"
|
||||
pytest-order = "^1.1.0"
|
||||
allure-pytest = "^2.13.2"
|
||||
|
||||
@@ -11,7 +11,7 @@ markers =
|
||||
testpaths =
|
||||
test_runner
|
||||
minversion = 6.0
|
||||
log_format = %(asctime)s.%(msecs)03d %(levelname)s [%(filename)s:%(lineno)d] %(message)s
|
||||
log_format = %(asctime)s.%(msecs)-3d %(levelname)s [%(filename)s:%(lineno)d] %(message)s
|
||||
log_date_format = %Y-%m-%d %H:%M:%S
|
||||
log_cli = true
|
||||
timeout = 300
|
||||
|
||||
@@ -13,8 +13,6 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::{auth::Scope, measured_stream::MeasuredStream};
|
||||
|
||||
use std::os::fd::AsRawFd;
|
||||
|
||||
use crate::metrics::TrafficMetrics;
|
||||
use crate::SafeKeeperConf;
|
||||
use crate::{handler::SafekeeperPostgresHandler, GlobalTimelines};
|
||||
@@ -64,7 +62,6 @@ async fn handle_socket(
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
) -> Result<(), QueryError> {
|
||||
socket.set_nodelay(true)?;
|
||||
let socket_fd = socket.as_raw_fd();
|
||||
let peer_addr = socket.peer_addr()?;
|
||||
|
||||
// Set timeout on reading from the socket. It prevents hanged up connection
|
||||
@@ -110,7 +107,7 @@ async fn handle_socket(
|
||||
auth_pair,
|
||||
global_timelines,
|
||||
);
|
||||
let pgbackend = PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, None)?;
|
||||
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
|
||||
// libpq protocol between safekeeper and walproposer / pageserver
|
||||
// We don't use shutdown.
|
||||
pgbackend
|
||||
|
||||
@@ -26,6 +26,7 @@ humantime.workspace = true
|
||||
itertools.workspace = true
|
||||
lasso.workspace = true
|
||||
once_cell.workspace = true
|
||||
governor = {version = "0.8.0"}
|
||||
pageserver_api.workspace = true
|
||||
pageserver_client.workspace = true
|
||||
postgres_connection.workspace = true
|
||||
@@ -34,7 +35,6 @@ reqwest = { workspace = true, features = ["stream"] }
|
||||
routerify.workspace = true
|
||||
safekeeper_api.workspace = true
|
||||
safekeeper_client.workspace = true
|
||||
regex.workspace = true
|
||||
rustls-native-certs.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
@@ -8,6 +8,7 @@ use crate::reconciler::ReconcileError;
|
||||
use crate::service::{LeadershipStatus, Service, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIMEOUT};
|
||||
use anyhow::Context;
|
||||
use futures::Future;
|
||||
use governor::{Quota, RateLimiter};
|
||||
use http_utils::{
|
||||
endpoint::{self, auth_middleware, check_permission_with, request_span},
|
||||
error::ApiError,
|
||||
@@ -32,6 +33,7 @@ use pageserver_api::models::{
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::{mgmt_api, BlockUnblock};
|
||||
use std::num::NonZero;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
@@ -516,16 +518,13 @@ async fn handle_tenant_timeline_block_unblock_gc(
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
// For metric labels where we would like to include the approximate path, but exclude high-cardinality fields like query parameters
|
||||
// and tenant/timeline IDs. Since we are proxying to arbitrary paths, we don't have routing templates to
|
||||
// compare to, so we can just filter out our well known ID format with regexes.
|
||||
fn path_without_ids(path: &str) -> String {
|
||||
static ID_REGEX: std::sync::OnceLock<regex::Regex> = std::sync::OnceLock::new();
|
||||
ID_REGEX
|
||||
.get_or_init(|| regex::Regex::new(r"([0-9a-fA-F]{32}(-[0-9]{4})?|\?.*)").unwrap())
|
||||
.replace_all(path, "")
|
||||
.to_string()
|
||||
}
|
||||
static PASSTHROUGH_RATE_LIMITER: std::sync::OnceLock<
|
||||
RateLimiter<
|
||||
TenantId,
|
||||
governor::state::keyed::DefaultKeyedStateStore<TenantId>,
|
||||
governor::clock::DefaultClock,
|
||||
>,
|
||||
> = std::sync::OnceLock::new();
|
||||
|
||||
async fn handle_tenant_timeline_passthrough(
|
||||
service: Arc<Service>,
|
||||
@@ -548,6 +547,19 @@ async fn handle_tenant_timeline_passthrough(
|
||||
|
||||
tracing::info!("Proxying request for tenant {} ({})", tenant_id, path);
|
||||
|
||||
// Proxied requests are expected to be rare on a per-tenant basis: these are things
|
||||
// like inspecting a timeline's details or doing an LSN<->timestamp mapping. Not anything
|
||||
// that has high throughput.
|
||||
let limiter = PASSTHROUGH_RATE_LIMITER.get_or_init(|| {
|
||||
RateLimiter::new(
|
||||
Quota::per_second(NonZero::new(10).unwrap()),
|
||||
governor::state::keyed::DefaultKeyedStateStore::new(),
|
||||
governor::clock::DefaultClock::default(),
|
||||
)
|
||||
});
|
||||
|
||||
limiter.until_key_ready(&tenant_id).await;
|
||||
|
||||
// Find the node that holds shard zero
|
||||
let (node, tenant_shard_id) = service.tenant_shard0_node(tenant_id).await?;
|
||||
|
||||
@@ -562,7 +574,10 @@ async fn handle_tenant_timeline_passthrough(
|
||||
.metrics_group
|
||||
.storage_controller_passthrough_request_latency;
|
||||
|
||||
let path_label = path_without_ids(&path)
|
||||
// This is a bit awkward. We remove the param from the request
|
||||
// and join the words by '_' to get a label for the request.
|
||||
let just_path = path.replace(&tenant_shard_str, "");
|
||||
let path_label = just_path
|
||||
.split('/')
|
||||
.filter(|token| !token.is_empty())
|
||||
.collect::<Vec<_>>()
|
||||
@@ -2097,16 +2112,3 @@ pub fn make_router(
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
|
||||
use super::path_without_ids;
|
||||
|
||||
#[test]
|
||||
fn test_path_without_ids() {
|
||||
assert_eq!(path_without_ids("/v1/tenant/1a2b3344556677881122334455667788/timeline/AA223344556677881122334455667788"), "/v1/tenant//timeline/");
|
||||
assert_eq!(path_without_ids("/v1/tenant/1a2b3344556677881122334455667788-0108/timeline/AA223344556677881122334455667788"), "/v1/tenant//timeline/");
|
||||
assert_eq!(path_without_ids("/v1/tenant/1a2b3344556677881122334455667788-0108/timeline/AA223344556677881122334455667788?parameter=foo"), "/v1/tenant//timeline/");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,8 +12,7 @@ use storage_controller::persistence::Persistence;
|
||||
use storage_controller::service::chaos_injector::ChaosInjector;
|
||||
use storage_controller::service::{
|
||||
Config, Service, HEARTBEAT_INTERVAL_DEFAULT, LONG_RECONCILE_THRESHOLD_DEFAULT,
|
||||
MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT,
|
||||
PRIORITY_RECONCILER_CONCURRENCY_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT,
|
||||
MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT,
|
||||
};
|
||||
use tokio::signal::unix::SignalKind;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -76,14 +75,10 @@ struct Cli {
|
||||
#[arg(long)]
|
||||
split_threshold: Option<u64>,
|
||||
|
||||
/// Maximum number of normal-priority reconcilers that may run in parallel
|
||||
/// Maximum number of reconcilers that may run in parallel
|
||||
#[arg(long)]
|
||||
reconciler_concurrency: Option<usize>,
|
||||
|
||||
/// Maximum number of high-priority reconcilers that may run in parallel
|
||||
#[arg(long)]
|
||||
priority_reconciler_concurrency: Option<usize>,
|
||||
|
||||
/// How long to wait for the initial database connection to be available.
|
||||
#[arg(long, default_value = "5s")]
|
||||
db_connect_timeout: humantime::Duration,
|
||||
@@ -294,9 +289,6 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
reconciler_concurrency: args
|
||||
.reconciler_concurrency
|
||||
.unwrap_or(RECONCILER_CONCURRENCY_DEFAULT),
|
||||
priority_reconciler_concurrency: args
|
||||
.priority_reconciler_concurrency
|
||||
.unwrap_or(PRIORITY_RECONCILER_CONCURRENCY_DEFAULT),
|
||||
split_threshold: args.split_threshold,
|
||||
neon_local_repo_dir: args.neon_local_repo_dir,
|
||||
max_secondary_lag_bytes: args.max_secondary_lag_bytes,
|
||||
|
||||
@@ -91,10 +91,9 @@ pub(crate) struct ReconcilerConfigBuilder {
|
||||
}
|
||||
|
||||
impl ReconcilerConfigBuilder {
|
||||
/// Priority is special: you must pick one thoughtfully, do not just use 'normal' as the default
|
||||
pub(crate) fn new(priority: ReconcilerPriority) -> Self {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
config: ReconcilerConfig::new(priority),
|
||||
config: ReconcilerConfig::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -130,18 +129,8 @@ impl ReconcilerConfigBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
// Higher priorities are used for user-facing tasks, so that a long backlog of housekeeping work (e.g. reconciling on startup, rescheduling
|
||||
// things on node changes) does not starve user-facing tasks.
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub(crate) enum ReconcilerPriority {
|
||||
Normal,
|
||||
High,
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
#[derive(Default, Debug, Copy, Clone)]
|
||||
pub(crate) struct ReconcilerConfig {
|
||||
pub(crate) priority: ReconcilerPriority,
|
||||
|
||||
// During live migration give up on warming-up the secondary
|
||||
// after this timeout.
|
||||
secondary_warmup_timeout: Option<Duration>,
|
||||
@@ -156,18 +145,6 @@ pub(crate) struct ReconcilerConfig {
|
||||
}
|
||||
|
||||
impl ReconcilerConfig {
|
||||
/// Configs are always constructed with an explicit priority, to force callers to think about whether
|
||||
/// the operation they're scheduling is high-priority or not. Normal priority is not a safe default, because
|
||||
/// scheduling something user-facing at normal priority can result in it getting starved out by background work.
|
||||
pub(crate) fn new(priority: ReconcilerPriority) -> Self {
|
||||
Self {
|
||||
priority,
|
||||
secondary_warmup_timeout: None,
|
||||
secondary_download_request_timeout: None,
|
||||
tenant_creation_hint: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_secondary_warmup_timeout(&self) -> Duration {
|
||||
const SECONDARY_WARMUP_TIMEOUT_DEFAULT: Duration = Duration::from_secs(300);
|
||||
self.secondary_warmup_timeout
|
||||
@@ -187,9 +164,7 @@ impl ReconcilerConfig {
|
||||
|
||||
impl From<&MigrationConfig> for ReconcilerConfig {
|
||||
fn from(value: &MigrationConfig) -> Self {
|
||||
// Run reconciler at high priority because MigrationConfig comes from human requests that should
|
||||
// be presumed urgent.
|
||||
let mut builder = ReconcilerConfigBuilder::new(ReconcilerPriority::High);
|
||||
let mut builder = ReconcilerConfigBuilder::new();
|
||||
|
||||
if let Some(timeout) = value.secondary_warmup_timeout {
|
||||
builder = builder.secondary_warmup_timeout(timeout)
|
||||
|
||||
@@ -30,10 +30,7 @@ use crate::{
|
||||
AbortShardSplitStatus, ControllerPersistence, DatabaseResult, MetadataHealthPersistence,
|
||||
ShardGenerationState, TenantFilter,
|
||||
},
|
||||
reconciler::{
|
||||
ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder,
|
||||
ReconcilerPriority,
|
||||
},
|
||||
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
|
||||
safekeeper::Safekeeper,
|
||||
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
|
||||
tenant_shard::{
|
||||
@@ -82,7 +79,7 @@ use pageserver_api::{
|
||||
},
|
||||
};
|
||||
use pageserver_client::{mgmt_api, BlockUnblock};
|
||||
use tokio::sync::{mpsc::error::TrySendError, TryAcquireError};
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::{
|
||||
completion::Barrier,
|
||||
@@ -198,7 +195,6 @@ pub(crate) enum LeadershipStatus {
|
||||
}
|
||||
|
||||
pub const RECONCILER_CONCURRENCY_DEFAULT: usize = 128;
|
||||
pub const PRIORITY_RECONCILER_CONCURRENCY_DEFAULT: usize = 256;
|
||||
|
||||
// Depth of the channel used to enqueue shards for reconciliation when they can't do it immediately.
|
||||
// This channel is finite-size to avoid using excessive memory if we get into a state where reconciles are finishing more slowly
|
||||
@@ -370,12 +366,9 @@ pub struct Config {
|
||||
/// and/or upon handling the re-attach request from a node.
|
||||
pub max_warming_up_interval: Duration,
|
||||
|
||||
/// How many normal-priority Reconcilers may be spawned concurrently
|
||||
/// How many Reconcilers may be spawned concurrently
|
||||
pub reconciler_concurrency: usize,
|
||||
|
||||
/// How many high-priority Reconcilers may be spawned concurrently
|
||||
pub priority_reconciler_concurrency: usize,
|
||||
|
||||
/// How large must a shard grow in bytes before we split it?
|
||||
/// None disables auto-splitting.
|
||||
pub split_threshold: Option<u64>,
|
||||
@@ -443,14 +436,9 @@ pub struct Service {
|
||||
// that transition it to/from Active.
|
||||
node_op_locks: IdLockMap<NodeId, NodeOperations>,
|
||||
|
||||
// Limit how many Reconcilers we will spawn concurrently for normal-priority tasks such as background reconciliations
|
||||
// and reconciliation on startup.
|
||||
// Limit how many Reconcilers we will spawn concurrently
|
||||
reconciler_concurrency: Arc<tokio::sync::Semaphore>,
|
||||
|
||||
// Limit how many Reconcilers we will spawn concurrently for high-priority tasks such as tenant/timeline CRUD, which
|
||||
// a human user might be waiting for.
|
||||
priority_reconciler_concurrency: Arc<tokio::sync::Semaphore>,
|
||||
|
||||
/// Queue of tenants who are waiting for concurrency limits to permit them to reconcile
|
||||
/// Send into this queue to promptly attempt to reconcile this shard next time units are available.
|
||||
///
|
||||
@@ -1275,15 +1263,12 @@ impl Service {
|
||||
}
|
||||
|
||||
// Maybe some other work can proceed now that this job finished.
|
||||
//
|
||||
// Only bother with this if we have some semaphore units available in the normal-priority semaphore (these
|
||||
// reconciles are scheduled at `[ReconcilerPriority::Normal]`).
|
||||
if self.reconciler_concurrency.available_permits() > 0 {
|
||||
while let Ok(tenant_shard_id) = locked.delayed_reconcile_rx.try_recv() {
|
||||
let (nodes, tenants, _scheduler) = locked.parts_mut();
|
||||
if let Some(shard) = tenants.get_mut(&tenant_shard_id) {
|
||||
shard.delayed_reconcile = false;
|
||||
self.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::Normal);
|
||||
self.maybe_reconcile_shard(shard, nodes);
|
||||
}
|
||||
|
||||
if self.reconciler_concurrency.available_permits() == 0 {
|
||||
@@ -1580,9 +1565,6 @@ impl Service {
|
||||
reconciler_concurrency: Arc::new(tokio::sync::Semaphore::new(
|
||||
config.reconciler_concurrency,
|
||||
)),
|
||||
priority_reconciler_concurrency: Arc::new(tokio::sync::Semaphore::new(
|
||||
config.priority_reconciler_concurrency,
|
||||
)),
|
||||
delayed_reconcile_tx,
|
||||
abort_tx,
|
||||
startup_complete: startup_complete.clone(),
|
||||
@@ -2355,7 +2337,7 @@ impl Service {
|
||||
let waiters = {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, tenants, _scheduler) = locked.parts_mut();
|
||||
let config = ReconcilerConfigBuilder::new(ReconcilerPriority::High)
|
||||
let config = ReconcilerConfigBuilder::new()
|
||||
.tenant_creation_hint(true)
|
||||
.build();
|
||||
tenants
|
||||
@@ -2830,8 +2812,7 @@ impl Service {
|
||||
|
||||
shard.schedule(scheduler, &mut schedule_context)?;
|
||||
|
||||
let maybe_waiter =
|
||||
self.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::High);
|
||||
let maybe_waiter = self.maybe_reconcile_shard(shard, nodes);
|
||||
if let Some(waiter) = maybe_waiter {
|
||||
waiters.push(waiter);
|
||||
}
|
||||
@@ -2952,9 +2933,7 @@ impl Service {
|
||||
let (nodes, tenants, _scheduler) = locked.parts_mut();
|
||||
for (_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
|
||||
shard.config = config.clone();
|
||||
if let Some(waiter) =
|
||||
self.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::High)
|
||||
{
|
||||
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) {
|
||||
waiters.push(waiter);
|
||||
}
|
||||
}
|
||||
@@ -3236,9 +3215,7 @@ impl Service {
|
||||
debug_assert!(shard.intent.get_attached().is_none());
|
||||
debug_assert!(shard.intent.get_secondary().is_empty());
|
||||
|
||||
if let Some(waiter) =
|
||||
self.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::High)
|
||||
{
|
||||
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) {
|
||||
detach_waiters.push(waiter);
|
||||
}
|
||||
}
|
||||
@@ -3390,7 +3367,7 @@ impl Service {
|
||||
|
||||
// In case scheduling is being switched back on, try it now.
|
||||
shard.schedule(scheduler, &mut schedule_context).ok();
|
||||
self.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::High);
|
||||
self.maybe_reconcile_shard(shard, nodes);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -4439,7 +4416,7 @@ impl Service {
|
||||
tracing::warn!("Failed to schedule {tenant_shard_id} during shard abort: {e}")
|
||||
}
|
||||
|
||||
self.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::High);
|
||||
self.maybe_reconcile_shard(shard, nodes);
|
||||
}
|
||||
|
||||
// We don't expect any new_shard_count shards to exist here, but drop them just in case
|
||||
@@ -4605,11 +4582,7 @@ impl Service {
|
||||
tracing::warn!("Failed to schedule child shard {child}: {e}");
|
||||
}
|
||||
// In the background, attach secondary locations for the new shards
|
||||
if let Some(waiter) = self.maybe_reconcile_shard(
|
||||
&mut child_state,
|
||||
nodes,
|
||||
ReconcilerPriority::High,
|
||||
) {
|
||||
if let Some(waiter) = self.maybe_reconcile_shard(&mut child_state, nodes) {
|
||||
waiters.push(waiter);
|
||||
}
|
||||
|
||||
@@ -4974,9 +4947,7 @@ impl Service {
|
||||
shard.intent.clear_secondary(scheduler);
|
||||
|
||||
// Run Reconciler to execute detach fo secondary locations.
|
||||
if let Some(waiter) =
|
||||
self.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::High)
|
||||
{
|
||||
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) {
|
||||
waiters.push(waiter);
|
||||
}
|
||||
}
|
||||
@@ -5244,7 +5215,7 @@ impl Service {
|
||||
|
||||
let reconciler_config = match migrate_req.migration_config {
|
||||
Some(cfg) => (&cfg).into(),
|
||||
None => ReconcilerConfig::new(ReconcilerPriority::High),
|
||||
None => ReconcilerConfig::default(),
|
||||
};
|
||||
|
||||
self.maybe_configured_reconcile_shard(shard, nodes, reconciler_config)
|
||||
@@ -5310,7 +5281,7 @@ impl Service {
|
||||
);
|
||||
}
|
||||
|
||||
self.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::High)
|
||||
self.maybe_reconcile_shard(shard, nodes)
|
||||
};
|
||||
|
||||
if let Some(waiter) = waiter {
|
||||
@@ -5722,7 +5693,7 @@ impl Service {
|
||||
)
|
||||
}
|
||||
|
||||
self.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::Normal);
|
||||
self.maybe_reconcile_shard(shard, nodes);
|
||||
}
|
||||
|
||||
// Here we remove an existing observed location for the node we're removing, and it will
|
||||
@@ -6091,14 +6062,7 @@ impl Service {
|
||||
tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", node_id);
|
||||
}
|
||||
Ok(()) => {
|
||||
if self
|
||||
.maybe_reconcile_shard(
|
||||
tenant_shard,
|
||||
nodes,
|
||||
ReconcilerPriority::Normal,
|
||||
)
|
||||
.is_some()
|
||||
{
|
||||
if self.maybe_reconcile_shard(tenant_shard, nodes).is_some() {
|
||||
tenants_affected += 1;
|
||||
};
|
||||
}
|
||||
@@ -6129,11 +6093,7 @@ impl Service {
|
||||
|
||||
if let Some(observed_loc) = tenant_shard.observed.locations.get_mut(&node_id) {
|
||||
if observed_loc.conf.is_none() {
|
||||
self.maybe_reconcile_shard(
|
||||
tenant_shard,
|
||||
nodes,
|
||||
ReconcilerPriority::Normal,
|
||||
);
|
||||
self.maybe_reconcile_shard(tenant_shard, nodes);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -6497,36 +6457,8 @@ impl Service {
|
||||
&self,
|
||||
shard: &mut TenantShard,
|
||||
nodes: &Arc<HashMap<NodeId, Node>>,
|
||||
priority: ReconcilerPriority,
|
||||
) -> Option<ReconcilerWaiter> {
|
||||
self.maybe_configured_reconcile_shard(shard, nodes, ReconcilerConfig::new(priority))
|
||||
}
|
||||
|
||||
/// Before constructing a Reconciler, acquire semaphore units from the appropriate concurrency limit (depends on priority)
|
||||
fn get_reconciler_units(
|
||||
&self,
|
||||
priority: ReconcilerPriority,
|
||||
) -> Result<ReconcileUnits, TryAcquireError> {
|
||||
let units = match priority {
|
||||
ReconcilerPriority::Normal => self.reconciler_concurrency.clone().try_acquire_owned(),
|
||||
ReconcilerPriority::High => {
|
||||
match self
|
||||
.priority_reconciler_concurrency
|
||||
.clone()
|
||||
.try_acquire_owned()
|
||||
{
|
||||
Ok(u) => Ok(u),
|
||||
Err(TryAcquireError::NoPermits) => {
|
||||
// If the high priority semaphore is exhausted, then high priority tasks may steal units from
|
||||
// the normal priority semaphore.
|
||||
self.reconciler_concurrency.clone().try_acquire_owned()
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
units.map(ReconcileUnits::new)
|
||||
self.maybe_configured_reconcile_shard(shard, nodes, ReconcilerConfig::default())
|
||||
}
|
||||
|
||||
/// Wrap [`TenantShard`] reconciliation methods with acquisition of [`Gate`] and [`ReconcileUnits`],
|
||||
@@ -6546,8 +6478,8 @@ impl Service {
|
||||
}
|
||||
};
|
||||
|
||||
let units = match self.get_reconciler_units(reconciler_config.priority) {
|
||||
Ok(u) => u,
|
||||
let units = match self.reconciler_concurrency.clone().try_acquire_owned() {
|
||||
Ok(u) => ReconcileUnits::new(u),
|
||||
Err(_) => {
|
||||
tracing::info!(tenant_id=%shard.tenant_shard_id.tenant_id, shard_id=%shard.tenant_shard_id.shard_slug(),
|
||||
"Concurrency limited: enqueued for reconcile later");
|
||||
@@ -6640,10 +6572,7 @@ impl Service {
|
||||
|
||||
// Eventual consistency: if an earlier reconcile job failed, and the shard is still
|
||||
// dirty, spawn another rone
|
||||
if self
|
||||
.maybe_reconcile_shard(shard, &pageservers, ReconcilerPriority::Normal)
|
||||
.is_some()
|
||||
{
|
||||
if self.maybe_reconcile_shard(shard, &pageservers).is_some() {
|
||||
reconciles_spawned += 1;
|
||||
} else if shard.delayed_reconcile {
|
||||
// Shard wanted to reconcile but for some reason couldn't.
|
||||
@@ -6729,10 +6658,7 @@ impl Service {
|
||||
tracing::info!(tenant_shard_id=%tenant_shard_id, "Applying optimization: {optimization:?}");
|
||||
if shard.apply_optimization(scheduler, optimization) {
|
||||
optimizations_applied += 1;
|
||||
if self
|
||||
.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::Normal)
|
||||
.is_some()
|
||||
{
|
||||
if self.maybe_reconcile_shard(shard, nodes).is_some() {
|
||||
reconciles_spawned += 1;
|
||||
}
|
||||
}
|
||||
@@ -7282,7 +7208,7 @@ impl Service {
|
||||
// to not stall the operation when a cold secondary is encountered.
|
||||
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(20);
|
||||
const SECONDARY_DOWNLOAD_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
let reconciler_config = ReconcilerConfigBuilder::new(ReconcilerPriority::Normal)
|
||||
let reconciler_config = ReconcilerConfigBuilder::new()
|
||||
.secondary_warmup_timeout(SECONDARY_WARMUP_TIMEOUT)
|
||||
.secondary_download_request_timeout(SECONDARY_DOWNLOAD_REQUEST_TIMEOUT)
|
||||
.build();
|
||||
@@ -7615,7 +7541,7 @@ impl Service {
|
||||
) -> Result<(), OperationError> {
|
||||
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(20);
|
||||
const SECONDARY_DOWNLOAD_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
let reconciler_config = ReconcilerConfigBuilder::new(ReconcilerPriority::Normal)
|
||||
let reconciler_config = ReconcilerConfigBuilder::new()
|
||||
.secondary_warmup_timeout(SECONDARY_WARMUP_TIMEOUT)
|
||||
.secondary_download_request_timeout(SECONDARY_DOWNLOAD_REQUEST_TIMEOUT)
|
||||
.build();
|
||||
|
||||
@@ -88,11 +88,7 @@ impl ChaosInjector {
|
||||
|
||||
shard.intent.demote_attached(scheduler, old_location);
|
||||
shard.intent.promote_attached(scheduler, new_location);
|
||||
self.service.maybe_reconcile_shard(
|
||||
shard,
|
||||
nodes,
|
||||
crate::reconciler::ReconcilerPriority::Normal,
|
||||
);
|
||||
self.service.maybe_reconcile_shard(shard, nodes);
|
||||
}
|
||||
|
||||
async fn inject_chaos(&mut self) {
|
||||
|
||||
@@ -4,10 +4,8 @@ import subprocess
|
||||
import tempfile
|
||||
from collections.abc import Iterator
|
||||
from pathlib import Path
|
||||
from typing import cast
|
||||
|
||||
import pytest
|
||||
from _pytest.config import Config
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_cli import AbstractNeonCli
|
||||
@@ -25,7 +23,6 @@ class FastImport(AbstractNeonCli):
|
||||
pg_distrib_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
workdir: Path,
|
||||
cleanup: bool = True,
|
||||
):
|
||||
if extra_env is None:
|
||||
env_vars = {}
|
||||
@@ -50,43 +47,12 @@ class FastImport(AbstractNeonCli):
|
||||
if not workdir.exists():
|
||||
raise Exception(f"Working directory '{workdir}' does not exist")
|
||||
self.workdir = workdir
|
||||
self.cleanup = cleanup
|
||||
|
||||
def run_pgdata(
|
||||
self,
|
||||
s3prefix: str | None = None,
|
||||
pg_port: int | None = None,
|
||||
source_connection_string: str | None = None,
|
||||
interactive: bool = False,
|
||||
):
|
||||
return self.run(
|
||||
"pgdata",
|
||||
s3prefix=s3prefix,
|
||||
pg_port=pg_port,
|
||||
source_connection_string=source_connection_string,
|
||||
interactive=interactive,
|
||||
)
|
||||
|
||||
def run_dump_restore(
|
||||
self,
|
||||
s3prefix: str | None = None,
|
||||
source_connection_string: str | None = None,
|
||||
destination_connection_string: str | None = None,
|
||||
):
|
||||
return self.run(
|
||||
"dump-restore",
|
||||
s3prefix=s3prefix,
|
||||
source_connection_string=source_connection_string,
|
||||
destination_connection_string=destination_connection_string,
|
||||
)
|
||||
|
||||
def run(
|
||||
self,
|
||||
command: str,
|
||||
s3prefix: str | None = None,
|
||||
pg_port: int | None = None,
|
||||
pg_port: int,
|
||||
source_connection_string: str | None = None,
|
||||
destination_connection_string: str | None = None,
|
||||
s3prefix: str | None = None,
|
||||
interactive: bool = False,
|
||||
) -> subprocess.CompletedProcess[str]:
|
||||
if self.cmd is not None:
|
||||
@@ -94,17 +60,13 @@ class FastImport(AbstractNeonCli):
|
||||
args = [
|
||||
f"--pg-bin-dir={self.pg_bin}",
|
||||
f"--pg-lib-dir={self.pg_lib}",
|
||||
f"--pg-port={pg_port}",
|
||||
f"--working-directory={self.workdir}",
|
||||
]
|
||||
if s3prefix is not None:
|
||||
args.append(f"--s3-prefix={s3prefix}")
|
||||
args.append(command)
|
||||
if pg_port is not None:
|
||||
args.append(f"--pg-port={pg_port}")
|
||||
if source_connection_string is not None:
|
||||
args.append(f"--source-connection-string={source_connection_string}")
|
||||
if destination_connection_string is not None:
|
||||
args.append(f"--destination-connection-string={destination_connection_string}")
|
||||
if s3prefix is not None:
|
||||
args.append(f"--s3-prefix={s3prefix}")
|
||||
if interactive:
|
||||
args.append("--interactive")
|
||||
|
||||
@@ -115,7 +77,7 @@ class FastImport(AbstractNeonCli):
|
||||
return self
|
||||
|
||||
def __exit__(self, *args):
|
||||
if self.workdir.exists() and self.cleanup:
|
||||
if self.workdir.exists():
|
||||
shutil.rmtree(self.workdir)
|
||||
|
||||
|
||||
@@ -125,17 +87,9 @@ def fast_import(
|
||||
test_output_dir: Path,
|
||||
neon_binpath: Path,
|
||||
pg_distrib_dir: Path,
|
||||
pytestconfig: Config,
|
||||
) -> Iterator[FastImport]:
|
||||
workdir = Path(tempfile.mkdtemp(dir=test_output_dir, prefix="fast_import_"))
|
||||
with FastImport(
|
||||
None,
|
||||
neon_binpath,
|
||||
pg_distrib_dir,
|
||||
pg_version,
|
||||
workdir,
|
||||
cleanup=not cast(bool, pytestconfig.getoption("--preserve-database-files")),
|
||||
) as fi:
|
||||
workdir = Path(tempfile.mkdtemp())
|
||||
with FastImport(None, neon_binpath, pg_distrib_dir, pg_version, workdir) as fi:
|
||||
yield fi
|
||||
|
||||
if fi.cmd is None:
|
||||
|
||||
@@ -27,7 +27,6 @@ from urllib.parse import quote, urlparse
|
||||
|
||||
import asyncpg
|
||||
import backoff
|
||||
import boto3
|
||||
import httpx
|
||||
import psycopg2
|
||||
import psycopg2.sql
|
||||
@@ -38,8 +37,6 @@ from _pytest.config import Config
|
||||
from _pytest.config.argparsing import Parser
|
||||
from _pytest.fixtures import FixtureRequest
|
||||
from jwcrypto import jwk
|
||||
from mypy_boto3_kms import KMSClient
|
||||
from mypy_boto3_s3 import S3Client
|
||||
|
||||
# Type-related stuff
|
||||
from psycopg2.extensions import connection as PgConnection
|
||||
@@ -202,30 +199,6 @@ def mock_s3_server(port_distributor: PortDistributor) -> Iterator[MockS3Server]:
|
||||
mock_s3_server.kill()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def mock_kms(mock_s3_server: MockS3Server) -> Iterator[KMSClient]:
|
||||
yield boto3.client(
|
||||
"kms",
|
||||
endpoint_url=mock_s3_server.endpoint(),
|
||||
region_name=mock_s3_server.region(),
|
||||
aws_access_key_id=mock_s3_server.access_key(),
|
||||
aws_secret_access_key=mock_s3_server.secret_key(),
|
||||
aws_session_token=mock_s3_server.session_token(),
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def mock_s3_client(mock_s3_server: MockS3Server) -> Iterator[S3Client]:
|
||||
yield boto3.client(
|
||||
"s3",
|
||||
endpoint_url=mock_s3_server.endpoint(),
|
||||
region_name=mock_s3_server.region(),
|
||||
aws_access_key_id=mock_s3_server.access_key(),
|
||||
aws_secret_access_key=mock_s3_server.secret_key(),
|
||||
aws_session_token=mock_s3_server.session_token(),
|
||||
)
|
||||
|
||||
|
||||
class PgProtocol:
|
||||
"""Reusable connection logic"""
|
||||
|
||||
@@ -491,7 +464,6 @@ class NeonEnvBuilder:
|
||||
self.test_may_use_compatibility_snapshot_binaries = False
|
||||
self.version_combination = combination
|
||||
self.mixdir = self.test_output_dir / "mixdir_neon"
|
||||
|
||||
if self.version_combination is not None:
|
||||
assert (
|
||||
self.compatibility_neon_binpath is not None
|
||||
@@ -703,11 +675,6 @@ class NeonEnvBuilder:
|
||||
|
||||
def _mix_versions(self):
|
||||
assert self.version_combination is not None, "version combination must be set"
|
||||
|
||||
# Always use a newer version of `neon_local`
|
||||
(self.mixdir / "neon_local").symlink_to(self.neon_binpath / "neon_local")
|
||||
self.neon_local_binpath = self.mixdir
|
||||
|
||||
for component, paths in COMPONENT_BINARIES.items():
|
||||
directory = (
|
||||
self.neon_binpath
|
||||
@@ -717,10 +684,9 @@ class NeonEnvBuilder:
|
||||
for filename in paths:
|
||||
destination = self.mixdir / filename
|
||||
destination.symlink_to(directory / filename)
|
||||
self.neon_binpath = self.mixdir
|
||||
|
||||
if self.version_combination["compute"] == "old":
|
||||
self.pg_distrib_dir = self.compatibility_pg_distrib_dir
|
||||
self.neon_binpath = self.mixdir
|
||||
|
||||
def overlay_mount(self, ident: str, srcdir: Path, dstdir: Path):
|
||||
"""
|
||||
|
||||
@@ -52,11 +52,11 @@ COMPONENT_BINARIES = {
|
||||
# Disable auto-formatting for better readability
|
||||
# fmt: off
|
||||
VERSIONS_COMBINATIONS = (
|
||||
{"storage_controller": "new", "storage_broker": "new", "compute": "new", "safekeeper": "new", "pageserver": "new"}, # combination: nnnnn
|
||||
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "old", "pageserver": "old"}, # combination: ooonn
|
||||
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "old", "pageserver": "new"}, # combination: ononn
|
||||
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "new", "pageserver": "new"}, # combination: onnnn
|
||||
{"storage_controller": "old", "storage_broker": "old", "compute": "new", "safekeeper": "new", "pageserver": "new"}, # combination: nnnoo
|
||||
{"storage_controller": "new", "storage_broker": "new", "compute": "new", "safekeeper": "new", "pageserver": "new"},
|
||||
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "old", "pageserver": "old"},
|
||||
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "old", "pageserver": "new"},
|
||||
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "new", "pageserver": "new"},
|
||||
{"storage_controller": "old", "storage_broker": "old", "compute": "new", "safekeeper": "new", "pageserver": "new"},
|
||||
)
|
||||
# fmt: on
|
||||
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
import base64
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
|
||||
import psycopg2
|
||||
import psycopg2.errors
|
||||
@@ -16,12 +14,8 @@ from fixtures.pageserver.http import (
|
||||
ImportPgdataIdemptencyKey,
|
||||
PageserverApiException,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
from fixtures.remote_storage import MockS3Server, RemoteStorageKind
|
||||
from mypy_boto3_kms import KMSClient
|
||||
from mypy_boto3_kms.type_defs import EncryptResponseTypeDef
|
||||
from mypy_boto3_s3 import S3Client
|
||||
from fixtures.remote_storage import RemoteStorageKind
|
||||
from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
@@ -109,15 +103,13 @@ def test_pgdata_import_smoke(
|
||||
while True:
|
||||
relblock_size = vanilla_pg.safe_psql_scalar("select pg_relation_size('t')")
|
||||
log.info(
|
||||
f"relblock size: {relblock_size / 8192} pages (target: {target_relblock_size // 8192}) pages"
|
||||
f"relblock size: {relblock_size/8192} pages (target: {target_relblock_size//8192}) pages"
|
||||
)
|
||||
if relblock_size >= target_relblock_size:
|
||||
break
|
||||
addrows = int((target_relblock_size - relblock_size) // 8192)
|
||||
assert addrows >= 1, "forward progress"
|
||||
vanilla_pg.safe_psql(
|
||||
f"insert into t select generate_series({nrows + 1}, {nrows + addrows})"
|
||||
)
|
||||
vanilla_pg.safe_psql(f"insert into t select generate_series({nrows+1}, {nrows + addrows})")
|
||||
nrows += addrows
|
||||
expect_nrows = nrows
|
||||
expect_sum = (
|
||||
@@ -340,224 +332,6 @@ def test_pgdata_import_smoke(
|
||||
br_initdb_endpoint.safe_psql("select * from othertable")
|
||||
|
||||
|
||||
def test_fast_import_with_pageserver_ingest(
|
||||
test_output_dir,
|
||||
vanilla_pg: VanillaPostgres,
|
||||
port_distributor: PortDistributor,
|
||||
fast_import: FastImport,
|
||||
pg_distrib_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
mock_s3_server: MockS3Server,
|
||||
mock_kms: KMSClient,
|
||||
mock_s3_client: S3Client,
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
make_httpserver: HTTPServer,
|
||||
):
|
||||
# Prepare KMS and S3
|
||||
key_response = mock_kms.create_key(
|
||||
Description="Test key",
|
||||
KeyUsage="ENCRYPT_DECRYPT",
|
||||
Origin="AWS_KMS",
|
||||
)
|
||||
key_id = key_response["KeyMetadata"]["KeyId"]
|
||||
|
||||
def encrypt(x: str) -> EncryptResponseTypeDef:
|
||||
return mock_kms.encrypt(KeyId=key_id, Plaintext=x)
|
||||
|
||||
# Start source postgres and ingest data
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("CREATE TABLE foo (a int); INSERT INTO foo SELECT generate_series(1, 10);")
|
||||
|
||||
# Setup pageserver and fake cplane for import progress
|
||||
def handler(request: Request) -> Response:
|
||||
log.info(f"control plane request: {request.json}")
|
||||
return Response(json.dumps({}), status=200)
|
||||
|
||||
cplane_mgmt_api_server = make_httpserver
|
||||
cplane_mgmt_api_server.expect_request(re.compile(".*")).respond_with_handler(handler)
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3)
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.pageserver.patch_config_toml_nonrecursive(
|
||||
{
|
||||
"import_pgdata_upcall_api": f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/path/to/mgmt/api",
|
||||
# because import_pgdata code uses this endpoint, not the one in common remote storage config
|
||||
# TODO: maybe use common remote_storage config in pageserver?
|
||||
"import_pgdata_aws_endpoint_url": env.s3_mock_server.endpoint(),
|
||||
}
|
||||
)
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
# Encrypt connstrings and put spec into S3
|
||||
source_connstring_encrypted = encrypt(vanilla_pg.connstr())
|
||||
spec = {
|
||||
"encryption_secret": {"KMS": {"key_id": key_id}},
|
||||
"source_connstring_ciphertext_base64": base64.b64encode(
|
||||
source_connstring_encrypted["CiphertextBlob"]
|
||||
).decode("utf-8"),
|
||||
"project_id": "someproject",
|
||||
"branch_id": "somebranch",
|
||||
}
|
||||
|
||||
bucket = "test-bucket"
|
||||
key_prefix = "test-prefix"
|
||||
mock_s3_client.create_bucket(Bucket=bucket)
|
||||
mock_s3_client.put_object(Bucket=bucket, Key=f"{key_prefix}/spec.json", Body=json.dumps(spec))
|
||||
|
||||
# Create timeline with import_pgdata
|
||||
tenant_id = TenantId.generate()
|
||||
env.storage_controller.tenant_create(tenant_id)
|
||||
|
||||
timeline_id = TimelineId.generate()
|
||||
log.info("starting import")
|
||||
start = time.monotonic()
|
||||
|
||||
idempotency = ImportPgdataIdemptencyKey.random()
|
||||
log.info(f"idempotency key {idempotency}")
|
||||
# TODO: teach neon_local CLI about the idempotency & 429 error so we can run inside the loop
|
||||
# and check for 429
|
||||
|
||||
import_branch_name = "imported"
|
||||
env.storage_controller.timeline_create(
|
||||
tenant_id,
|
||||
{
|
||||
"new_timeline_id": str(timeline_id),
|
||||
"import_pgdata": {
|
||||
"idempotency_key": str(idempotency),
|
||||
"location": {
|
||||
"AwsS3": {
|
||||
"region": env.s3_mock_server.region(),
|
||||
"bucket": bucket,
|
||||
"key": key_prefix,
|
||||
}
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
env.neon_cli.mappings_map_branch(import_branch_name, tenant_id, timeline_id)
|
||||
|
||||
# Run fast_import
|
||||
if fast_import.extra_env is None:
|
||||
fast_import.extra_env = {}
|
||||
fast_import.extra_env["AWS_ACCESS_KEY_ID"] = mock_s3_server.access_key()
|
||||
fast_import.extra_env["AWS_SECRET_ACCESS_KEY"] = mock_s3_server.secret_key()
|
||||
fast_import.extra_env["AWS_SESSION_TOKEN"] = mock_s3_server.session_token()
|
||||
fast_import.extra_env["AWS_REGION"] = mock_s3_server.region()
|
||||
fast_import.extra_env["AWS_ENDPOINT_URL"] = mock_s3_server.endpoint()
|
||||
fast_import.extra_env["RUST_LOG"] = "aws_config=debug,aws_sdk_kms=debug"
|
||||
pg_port = port_distributor.get_port()
|
||||
fast_import.run_pgdata(pg_port=pg_port, s3prefix=f"s3://{bucket}/{key_prefix}")
|
||||
vanilla_pg.stop()
|
||||
|
||||
def validate_vanilla_equivalence(ep):
|
||||
res = ep.safe_psql("SELECT count(*), sum(a) FROM foo;", dbname="neondb")
|
||||
assert res[0] == (10, 55), f"got result: {res}"
|
||||
|
||||
# Sanity check that data in pgdata is expected:
|
||||
pgbin = PgBin(test_output_dir, fast_import.pg_distrib_dir, fast_import.pg_version)
|
||||
with VanillaPostgres(
|
||||
fast_import.workdir / "pgdata", pgbin, pg_port, False
|
||||
) as new_pgdata_vanilla_pg:
|
||||
new_pgdata_vanilla_pg.start()
|
||||
|
||||
# database name and user are hardcoded in fast_import binary, and they are different from normal vanilla postgres
|
||||
conn = PgProtocol(dsn=f"postgresql://cloud_admin@localhost:{pg_port}/neondb")
|
||||
validate_vanilla_equivalence(conn)
|
||||
|
||||
# Poll pageserver statuses in s3
|
||||
while True:
|
||||
locations = env.storage_controller.locate(tenant_id)
|
||||
active_count = 0
|
||||
for location in locations:
|
||||
shard_id = TenantShardId.parse(location["shard_id"])
|
||||
ps = env.get_pageserver(location["node_id"])
|
||||
try:
|
||||
detail = ps.http_client().timeline_detail(shard_id, timeline_id)
|
||||
log.info(f"timeline {tenant_id}/{timeline_id} detail: {detail}")
|
||||
state = detail["state"]
|
||||
log.info(f"shard {shard_id} state: {state}")
|
||||
if state == "Active":
|
||||
active_count += 1
|
||||
except PageserverApiException as e:
|
||||
if e.status_code == 404:
|
||||
log.info("not found, import is in progress")
|
||||
continue
|
||||
elif e.status_code == 429:
|
||||
log.info("import is in progress")
|
||||
continue
|
||||
else:
|
||||
raise
|
||||
|
||||
if state == "Active":
|
||||
key = f"{key_prefix}/status/shard-{shard_id.shard_index}"
|
||||
shard_status_file_contents = (
|
||||
mock_s3_client.get_object(Bucket=bucket, Key=key)["Body"].read().decode("utf-8")
|
||||
)
|
||||
shard_status = json.loads(shard_status_file_contents)
|
||||
assert shard_status["done"] is True
|
||||
|
||||
if active_count == len(locations):
|
||||
log.info("all shards are active")
|
||||
break
|
||||
time.sleep(0.5)
|
||||
|
||||
import_duration = time.monotonic() - start
|
||||
log.info(f"import complete; duration={import_duration:.2f}s")
|
||||
|
||||
ep = env.endpoints.create_start(branch_name=import_branch_name, tenant_id=tenant_id)
|
||||
|
||||
# check that data is there
|
||||
validate_vanilla_equivalence(ep)
|
||||
|
||||
# check that we can do basic ops
|
||||
|
||||
ep.safe_psql("create table othertable(values text)", dbname="neondb")
|
||||
rw_lsn = Lsn(ep.safe_psql_scalar("select pg_current_wal_flush_lsn()"))
|
||||
ep.stop()
|
||||
|
||||
# ... at the tip
|
||||
_ = env.create_branch(
|
||||
new_branch_name="br-tip",
|
||||
ancestor_branch_name=import_branch_name,
|
||||
tenant_id=tenant_id,
|
||||
ancestor_start_lsn=rw_lsn,
|
||||
)
|
||||
br_tip_endpoint = env.endpoints.create_start(
|
||||
branch_name="br-tip", endpoint_id="br-tip-ro", tenant_id=tenant_id
|
||||
)
|
||||
validate_vanilla_equivalence(br_tip_endpoint)
|
||||
br_tip_endpoint.safe_psql("select * from othertable", dbname="neondb")
|
||||
br_tip_endpoint.stop()
|
||||
|
||||
# ... at the initdb lsn
|
||||
locations = env.storage_controller.locate(tenant_id)
|
||||
[shard_zero] = [
|
||||
loc for loc in locations if TenantShardId.parse(loc["shard_id"]).shard_number == 0
|
||||
]
|
||||
shard_zero_ps = env.get_pageserver(shard_zero["node_id"])
|
||||
shard_zero_timeline_info = shard_zero_ps.http_client().timeline_detail(
|
||||
shard_zero["shard_id"], timeline_id
|
||||
)
|
||||
initdb_lsn = Lsn(shard_zero_timeline_info["initdb_lsn"])
|
||||
_ = env.create_branch(
|
||||
new_branch_name="br-initdb",
|
||||
ancestor_branch_name=import_branch_name,
|
||||
tenant_id=tenant_id,
|
||||
ancestor_start_lsn=initdb_lsn,
|
||||
)
|
||||
br_initdb_endpoint = env.endpoints.create_start(
|
||||
branch_name="br-initdb", endpoint_id="br-initdb-ro", tenant_id=tenant_id
|
||||
)
|
||||
validate_vanilla_equivalence(br_initdb_endpoint)
|
||||
with pytest.raises(psycopg2.errors.UndefinedTable):
|
||||
br_initdb_endpoint.safe_psql("select * from othertable", dbname="neondb")
|
||||
br_initdb_endpoint.stop()
|
||||
|
||||
env.pageserver.stop(immediate=True)
|
||||
|
||||
|
||||
def test_fast_import_binary(
|
||||
test_output_dir,
|
||||
vanilla_pg: VanillaPostgres,
|
||||
@@ -568,7 +342,7 @@ def test_fast_import_binary(
|
||||
vanilla_pg.safe_psql("CREATE TABLE foo (a int); INSERT INTO foo SELECT generate_series(1, 10);")
|
||||
|
||||
pg_port = port_distributor.get_port()
|
||||
fast_import.run_pgdata(pg_port=pg_port, source_connection_string=vanilla_pg.connstr())
|
||||
fast_import.run(pg_port, vanilla_pg.connstr())
|
||||
vanilla_pg.stop()
|
||||
|
||||
pgbin = PgBin(test_output_dir, fast_import.pg_distrib_dir, fast_import.pg_version)
|
||||
@@ -584,118 +358,6 @@ def test_fast_import_binary(
|
||||
assert res[0][0] == 10
|
||||
|
||||
|
||||
def test_fast_import_restore_to_connstring(
|
||||
test_output_dir,
|
||||
vanilla_pg: VanillaPostgres,
|
||||
port_distributor: PortDistributor,
|
||||
fast_import: FastImport,
|
||||
pg_distrib_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
):
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("CREATE TABLE foo (a int); INSERT INTO foo SELECT generate_series(1, 10);")
|
||||
|
||||
pgdatadir = test_output_dir / "destination-pgdata"
|
||||
pg_bin = PgBin(test_output_dir, pg_distrib_dir, pg_version)
|
||||
port = port_distributor.get_port()
|
||||
with VanillaPostgres(pgdatadir, pg_bin, port) as destination_vanilla_pg:
|
||||
destination_vanilla_pg.configure(["shared_preload_libraries='neon_rmgr'"])
|
||||
destination_vanilla_pg.start()
|
||||
|
||||
# create another database & role and try to restore there
|
||||
destination_vanilla_pg.safe_psql("""
|
||||
CREATE ROLE testrole WITH
|
||||
LOGIN
|
||||
PASSWORD 'testpassword'
|
||||
NOSUPERUSER
|
||||
NOCREATEDB
|
||||
NOCREATEROLE;
|
||||
""")
|
||||
destination_vanilla_pg.safe_psql("CREATE DATABASE testdb OWNER testrole;")
|
||||
|
||||
destination_connstring = destination_vanilla_pg.connstr(
|
||||
dbname="testdb", user="testrole", password="testpassword"
|
||||
)
|
||||
fast_import.run_dump_restore(
|
||||
source_connection_string=vanilla_pg.connstr(),
|
||||
destination_connection_string=destination_connstring,
|
||||
)
|
||||
vanilla_pg.stop()
|
||||
conn = PgProtocol(dsn=destination_connstring)
|
||||
res = conn.safe_psql("SELECT count(*) FROM foo;")
|
||||
log.info(f"Result: {res}")
|
||||
assert res[0][0] == 10
|
||||
|
||||
|
||||
def test_fast_import_restore_to_connstring_from_s3_spec(
|
||||
test_output_dir,
|
||||
vanilla_pg: VanillaPostgres,
|
||||
port_distributor: PortDistributor,
|
||||
fast_import: FastImport,
|
||||
pg_distrib_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
mock_s3_server: MockS3Server,
|
||||
mock_kms: KMSClient,
|
||||
mock_s3_client: S3Client,
|
||||
):
|
||||
# Prepare KMS and S3
|
||||
key_response = mock_kms.create_key(
|
||||
Description="Test key",
|
||||
KeyUsage="ENCRYPT_DECRYPT",
|
||||
Origin="AWS_KMS",
|
||||
)
|
||||
key_id = key_response["KeyMetadata"]["KeyId"]
|
||||
|
||||
def encrypt(x: str) -> EncryptResponseTypeDef:
|
||||
return mock_kms.encrypt(KeyId=key_id, Plaintext=x)
|
||||
|
||||
# Start source postgres and ingest data
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("CREATE TABLE foo (a int); INSERT INTO foo SELECT generate_series(1, 10);")
|
||||
|
||||
# Start target postgres
|
||||
pgdatadir = test_output_dir / "destination-pgdata"
|
||||
pg_bin = PgBin(test_output_dir, pg_distrib_dir, pg_version)
|
||||
port = port_distributor.get_port()
|
||||
with VanillaPostgres(pgdatadir, pg_bin, port) as destination_vanilla_pg:
|
||||
destination_vanilla_pg.configure(["shared_preload_libraries='neon_rmgr'"])
|
||||
destination_vanilla_pg.start()
|
||||
|
||||
# Encrypt connstrings and put spec into S3
|
||||
source_connstring_encrypted = encrypt(vanilla_pg.connstr())
|
||||
destination_connstring_encrypted = encrypt(destination_vanilla_pg.connstr())
|
||||
spec = {
|
||||
"encryption_secret": {"KMS": {"key_id": key_id}},
|
||||
"source_connstring_ciphertext_base64": base64.b64encode(
|
||||
source_connstring_encrypted["CiphertextBlob"]
|
||||
).decode("utf-8"),
|
||||
"destination_connstring_ciphertext_base64": base64.b64encode(
|
||||
destination_connstring_encrypted["CiphertextBlob"]
|
||||
).decode("utf-8"),
|
||||
}
|
||||
|
||||
mock_s3_client.create_bucket(Bucket="test-bucket")
|
||||
mock_s3_client.put_object(
|
||||
Bucket="test-bucket", Key="test-prefix/spec.json", Body=json.dumps(spec)
|
||||
)
|
||||
|
||||
# Run fast_import
|
||||
if fast_import.extra_env is None:
|
||||
fast_import.extra_env = {}
|
||||
fast_import.extra_env["AWS_ACCESS_KEY_ID"] = mock_s3_server.access_key()
|
||||
fast_import.extra_env["AWS_SECRET_ACCESS_KEY"] = mock_s3_server.secret_key()
|
||||
fast_import.extra_env["AWS_SESSION_TOKEN"] = mock_s3_server.session_token()
|
||||
fast_import.extra_env["AWS_REGION"] = mock_s3_server.region()
|
||||
fast_import.extra_env["AWS_ENDPOINT_URL"] = mock_s3_server.endpoint()
|
||||
fast_import.extra_env["RUST_LOG"] = "aws_config=debug,aws_sdk_kms=debug"
|
||||
fast_import.run_dump_restore(s3prefix="s3://test-bucket/test-prefix")
|
||||
vanilla_pg.stop()
|
||||
|
||||
res = destination_vanilla_pg.safe_psql("SELECT count(*) FROM foo;")
|
||||
log.info(f"Result: {res}")
|
||||
assert res[0][0] == 10
|
||||
|
||||
|
||||
# TODO: Maybe test with pageserver?
|
||||
# 1. run whole neon env
|
||||
# 2. create timeline with some s3 path???
|
||||
|
||||
@@ -72,11 +72,6 @@ def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
|
||||
|
||||
thread.join()
|
||||
|
||||
# Fill LFC: seqscan should fetch the whole table in cache.
|
||||
# It is needed for further correct evaluation of LFC file size
|
||||
# (a sparse chunk of LFC takes less than 1 MB on disk).
|
||||
cur.execute("select sum(abalance) from pgbench_accounts")
|
||||
|
||||
# Before shrinking the cache, check that it really is large now
|
||||
(lfc_file_size, lfc_file_blocks) = get_lfc_size()
|
||||
assert int(lfc_file_blocks) > 128 * 1024
|
||||
|
||||
@@ -1,68 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
)
|
||||
|
||||
|
||||
def test_pageserver_reldir_v2(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
"rel_size_v2_enabled": "false",
|
||||
}
|
||||
)
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
# Create a relation in v1
|
||||
endpoint.safe_psql("CREATE TABLE foo1 (id INTEGER PRIMARY KEY, val text)")
|
||||
endpoint.safe_psql("CREATE TABLE foo2 (id INTEGER PRIMARY KEY, val text)")
|
||||
|
||||
# Switch to v2
|
||||
env.pageserver.http_client().update_tenant_config(
|
||||
env.initial_tenant,
|
||||
{
|
||||
"rel_size_v2_enabled": True,
|
||||
},
|
||||
)
|
||||
|
||||
# Check if both relations are still accessible
|
||||
endpoint.safe_psql("SELECT * FROM foo1")
|
||||
endpoint.safe_psql("SELECT * FROM foo2")
|
||||
|
||||
# Restart the endpoint
|
||||
endpoint.stop()
|
||||
endpoint.start()
|
||||
|
||||
# Check if both relations are still accessible again after restart
|
||||
endpoint.safe_psql("SELECT * FROM foo1")
|
||||
endpoint.safe_psql("SELECT * FROM foo2")
|
||||
|
||||
# Create a relation in v2
|
||||
endpoint.safe_psql("CREATE TABLE foo3 (id INTEGER PRIMARY KEY, val text)")
|
||||
# Delete a relation in v1
|
||||
endpoint.safe_psql("DROP TABLE foo1")
|
||||
|
||||
# Check if both relations are still accessible
|
||||
endpoint.safe_psql("SELECT * FROM foo2")
|
||||
endpoint.safe_psql("SELECT * FROM foo3")
|
||||
|
||||
# Restart the endpoint
|
||||
endpoint.stop()
|
||||
# This will acquire a basebackup, which lists all relations.
|
||||
endpoint.start()
|
||||
|
||||
# Check if both relations are still accessible
|
||||
endpoint.safe_psql("DROP TABLE IF EXISTS foo1")
|
||||
endpoint.safe_psql("SELECT * FROM foo2")
|
||||
endpoint.safe_psql("SELECT * FROM foo3")
|
||||
|
||||
endpoint.safe_psql("DROP TABLE foo3")
|
||||
endpoint.stop()
|
||||
endpoint.start()
|
||||
|
||||
# Check if relations are still accessible
|
||||
endpoint.safe_psql("DROP TABLE IF EXISTS foo1")
|
||||
endpoint.safe_psql("SELECT * FROM foo2")
|
||||
endpoint.safe_psql("DROP TABLE IF EXISTS foo3")
|
||||
@@ -481,8 +481,7 @@ def test_pageserver_metrics_many_relations(neon_env_builder: NeonEnvBuilder):
|
||||
counts = timeline_detail["directory_entries_counts"]
|
||||
assert counts
|
||||
log.info(f"directory counts: {counts}")
|
||||
# We need to add up reldir v1 + v2 counts
|
||||
assert counts[2] + counts[7] > COUNT_AT_LEAST_EXPECTED
|
||||
assert counts[2] > COUNT_AT_LEAST_EXPECTED
|
||||
|
||||
|
||||
def test_timelines_parallel_endpoints(neon_simple_env: NeonEnv):
|
||||
|
||||
@@ -1445,7 +1445,6 @@ def test_peer_recovery(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# roughly fills one segment
|
||||
endpoint.safe_psql("insert into t select generate_series(1,250000), 'payload'")
|
||||
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
|
||||
endpoint.stop() # stop compute
|
||||
|
||||
@@ -1474,15 +1473,7 @@ def test_peer_recovery(neon_env_builder: NeonEnvBuilder):
|
||||
"flush_lsn to get aligned",
|
||||
)
|
||||
|
||||
sk1_digest = sk1.http_client().timeline_digest(
|
||||
tenant_id, timeline_id, sk1.get_timeline_start_lsn(tenant_id, timeline_id), lsn
|
||||
)
|
||||
|
||||
sk2_digest = sk1.http_client().timeline_digest(
|
||||
tenant_id, timeline_id, sk2.get_timeline_start_lsn(tenant_id, timeline_id), lsn
|
||||
)
|
||||
|
||||
assert sk1_digest == sk2_digest
|
||||
cmp_sk_wal([sk1, sk2], tenant_id, timeline_id)
|
||||
|
||||
# stop one of safekeepers which weren't recovering and insert a bit more to check we can commit
|
||||
env.safekeepers[2].stop()
|
||||
|
||||
Reference in New Issue
Block a user