mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 22:50:38 +00:00
Compare commits
16 Commits
bodobolero
...
grant-jwt-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d6c661ccb6 | ||
|
|
0abf0f6dce | ||
|
|
878135fe9c | ||
|
|
75434060a5 | ||
|
|
721803a0e7 | ||
|
|
108a211917 | ||
|
|
72ef0e0fa1 | ||
|
|
eb23d355a9 | ||
|
|
bee04b8a69 | ||
|
|
63e7fab990 | ||
|
|
a181392738 | ||
|
|
fc7397122c | ||
|
|
cc599e23c1 | ||
|
|
54d1185789 | ||
|
|
8a138db8b7 | ||
|
|
211970f0e0 |
41
.github/workflows/report-workflow-stats.yml
vendored
Normal file
41
.github/workflows/report-workflow-stats.yml
vendored
Normal file
@@ -0,0 +1,41 @@
|
||||
name: Report Workflow Stats
|
||||
|
||||
on:
|
||||
workflow_run:
|
||||
workflows:
|
||||
- Add `external` label to issues and PRs created by external users
|
||||
- Benchmarking
|
||||
- Build and Test
|
||||
- Build and Test Locally
|
||||
- Build build-tools image
|
||||
- Check Permissions
|
||||
- Check build-tools image
|
||||
- Check neon with extra platform builds
|
||||
- Cloud Regression Test
|
||||
- Create Release Branch
|
||||
- Handle `approved-for-ci-run` label
|
||||
- Lint GitHub Workflows
|
||||
- Notify Slack channel about upcoming release
|
||||
- Periodic pagebench performance test on dedicated EC2 machine in eu-central-1 region
|
||||
- Pin build-tools image
|
||||
- Prepare benchmarking databases by restoring dumps
|
||||
- Push images to ACR
|
||||
- Test Postgres client libraries
|
||||
- Trigger E2E Tests
|
||||
- cleanup caches by a branch
|
||||
types: [completed]
|
||||
|
||||
jobs:
|
||||
gh-workflow-stats:
|
||||
name: Github Workflow Stats
|
||||
runs-on: ubuntu-22.04
|
||||
permissions:
|
||||
actions: read
|
||||
steps:
|
||||
- name: Export GH Workflow Stats
|
||||
uses: fedordikarev/gh-workflow-stats-action@v0.1.2
|
||||
with:
|
||||
DB_URI: ${{ secrets.GH_REPORT_STATS_DB_RW_CONNSTR }}
|
||||
DB_TABLE: "gh_workflow_stats_neon"
|
||||
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
GH_RUN_ID: ${{ github.event.workflow_run.id }}
|
||||
18
Cargo.lock
generated
18
Cargo.lock
generated
@@ -1820,6 +1820,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b5e6043086bf7973472e0c7dff2142ea0b680d30e18d9cc40f267efbf222bd47"
|
||||
dependencies = [
|
||||
"base16ct 0.2.0",
|
||||
"base64ct",
|
||||
"crypto-bigint 0.5.5",
|
||||
"digest",
|
||||
"ff 0.13.0",
|
||||
@@ -1829,6 +1830,8 @@ dependencies = [
|
||||
"pkcs8 0.10.2",
|
||||
"rand_core 0.6.4",
|
||||
"sec1 0.7.3",
|
||||
"serde_json",
|
||||
"serdect",
|
||||
"subtle",
|
||||
"zeroize",
|
||||
]
|
||||
@@ -4037,6 +4040,8 @@ dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
"postgres-protocol",
|
||||
"serde",
|
||||
"serde_json",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5256,6 +5261,7 @@ dependencies = [
|
||||
"der 0.7.8",
|
||||
"generic-array",
|
||||
"pkcs8 0.10.2",
|
||||
"serdect",
|
||||
"subtle",
|
||||
"zeroize",
|
||||
]
|
||||
@@ -5510,6 +5516,16 @@ dependencies = [
|
||||
"syn 2.0.52",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serdect"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a84f14a19e9a014bb9f4512488d9829a68e04ecabffb0f9904cd1ace94598177"
|
||||
dependencies = [
|
||||
"base16ct 0.2.0",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sha1"
|
||||
version = "0.10.5"
|
||||
@@ -7302,6 +7318,7 @@ dependencies = [
|
||||
"num-traits",
|
||||
"once_cell",
|
||||
"parquet",
|
||||
"postgres-types",
|
||||
"prettyplease",
|
||||
"proc-macro2",
|
||||
"prost",
|
||||
@@ -7326,6 +7343,7 @@ dependencies = [
|
||||
"time",
|
||||
"time-macros",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"toml_edit",
|
||||
|
||||
@@ -109,13 +109,30 @@ RUN apt update && \
|
||||
libcgal-dev libgdal-dev libgmp-dev libmpfr-dev libopenscenegraph-dev libprotobuf-c-dev \
|
||||
protobuf-c-compiler xsltproc
|
||||
|
||||
|
||||
# Postgis 3.5.0 requires SFCGAL 1.4+
|
||||
#
|
||||
# It would be nice to update all versions together, but we must solve the SFCGAL dependency first.
|
||||
# SFCGAL > 1.3 requires CGAL > 5.2, Bullseye's libcgal-dev is 5.2
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
mkdir -p /sfcgal && \
|
||||
echo "Postgis doensn't yet support PG17 (needs 3.4.3, if not higher)" && exit 0;; \
|
||||
# and also we must check backward compatibility with older versions of PostGIS.
|
||||
#
|
||||
# Use new version only for v17
|
||||
RUN case "${PG_VERSION}" in \
|
||||
"v17") \
|
||||
export SFCGAL_VERSION=1.4.1 \
|
||||
export SFCGAL_CHECKSUM=1800c8a26241588f11cddcf433049e9b9aea902e923414d2ecef33a3295626c3 \
|
||||
;; \
|
||||
"v14" | "v15" | "v16") \
|
||||
export SFCGAL_VERSION=1.3.10 \
|
||||
export SFCGAL_CHECKSUM=4e39b3b2adada6254a7bdba6d297bb28e1a9835a9f879b74f37e2dab70203232 \
|
||||
;; \
|
||||
*) \
|
||||
echo "unexpected PostgreSQL version" && exit 1 \
|
||||
;; \
|
||||
esac && \
|
||||
wget https://gitlab.com/Oslandia/SFCGAL/-/archive/v1.3.10/SFCGAL-v1.3.10.tar.gz -O SFCGAL.tar.gz && \
|
||||
echo "4e39b3b2adada6254a7bdba6d297bb28e1a9835a9f879b74f37e2dab70203232 SFCGAL.tar.gz" | sha256sum --check && \
|
||||
mkdir -p /sfcgal && \
|
||||
wget https://gitlab.com/sfcgal/SFCGAL/-/archive/v${SFCGAL_VERSION}/SFCGAL-v${SFCGAL_VERSION}.tar.gz -O SFCGAL.tar.gz && \
|
||||
echo "${SFCGAL_CHECKSUM} SFCGAL.tar.gz" | sha256sum --check && \
|
||||
mkdir sfcgal-src && cd sfcgal-src && tar xzf ../SFCGAL.tar.gz --strip-components=1 -C . && \
|
||||
cmake -DCMAKE_BUILD_TYPE=Release . && make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
DESTDIR=/sfcgal make install -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
@@ -123,15 +140,27 @@ RUN case "${PG_VERSION}" in "v17") \
|
||||
|
||||
ENV PATH="/usr/local/pgsql/bin:$PATH"
|
||||
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "Postgis doensn't yet support PG17 (needs 3.4.3, if not higher)" && exit 0;; \
|
||||
# Postgis 3.5.0 supports v17
|
||||
RUN case "${PG_VERSION}" in \
|
||||
"v17") \
|
||||
export POSTGIS_VERSION=3.5.0 \
|
||||
export POSTGIS_CHECKSUM=ca698a22cc2b2b3467ac4e063b43a28413f3004ddd505bdccdd74c56a647f510 \
|
||||
;; \
|
||||
"v14" | "v15" | "v16") \
|
||||
export POSTGIS_VERSION=3.3.3 \
|
||||
export POSTGIS_CHECKSUM=74eb356e3f85f14233791013360881b6748f78081cc688ff9d6f0f673a762d13 \
|
||||
;; \
|
||||
*) \
|
||||
echo "unexpected PostgreSQL version" && exit 1 \
|
||||
;; \
|
||||
esac && \
|
||||
wget https://download.osgeo.org/postgis/source/postgis-3.3.3.tar.gz -O postgis.tar.gz && \
|
||||
echo "74eb356e3f85f14233791013360881b6748f78081cc688ff9d6f0f673a762d13 postgis.tar.gz" | sha256sum --check && \
|
||||
wget https://download.osgeo.org/postgis/source/postgis-${POSTGIS_VERSION}.tar.gz -O postgis.tar.gz && \
|
||||
echo "${POSTGIS_CHECKSUM} postgis.tar.gz" | sha256sum --check && \
|
||||
mkdir postgis-src && cd postgis-src && tar xzf ../postgis.tar.gz --strip-components=1 -C . && \
|
||||
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /before.txt &&\
|
||||
./autogen.sh && \
|
||||
./configure --with-sfcgal=/usr/local/bin/sfcgal-config && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
cd extensions/postgis && \
|
||||
make clean && \
|
||||
@@ -152,11 +181,27 @@ RUN case "${PG_VERSION}" in "v17") \
|
||||
cp /usr/local/pgsql/share/extension/address_standardizer.control /extensions/postgis && \
|
||||
cp /usr/local/pgsql/share/extension/address_standardizer_data_us.control /extensions/postgis
|
||||
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
|
||||
# Uses versioned libraries, i.e. libpgrouting-3.4
|
||||
# and may introduce function signature changes between releases
|
||||
# i.e. release 3.5.0 has new signature for pg_dijkstra function
|
||||
#
|
||||
# Use new version only for v17
|
||||
# last release v3.6.2 - Mar 30, 2024
|
||||
RUN case "${PG_VERSION}" in \
|
||||
"v17") \
|
||||
export PGROUTING_VERSION=3.6.2 \
|
||||
export PGROUTING_CHECKSUM=f4a1ed79d6f714e52548eca3bb8e5593c6745f1bde92eb5fb858efd8984dffa2 \
|
||||
;; \
|
||||
"v14" | "v15" | "v16") \
|
||||
export PGROUTING_VERSION=3.4.2 \
|
||||
export PGROUTING_CHECKSUM=cac297c07d34460887c4f3b522b35c470138760fe358e351ad1db4edb6ee306e \
|
||||
;; \
|
||||
*) \
|
||||
echo "unexpected PostgreSQL version" && exit 1 \
|
||||
;; \
|
||||
esac && \
|
||||
wget https://github.com/pgRouting/pgrouting/archive/v3.4.2.tar.gz -O pgrouting.tar.gz && \
|
||||
echo "cac297c07d34460887c4f3b522b35c470138760fe358e351ad1db4edb6ee306e pgrouting.tar.gz" | sha256sum --check && \
|
||||
wget https://github.com/pgRouting/pgrouting/archive/v${PGROUTING_VERSION}.tar.gz -O pgrouting.tar.gz && \
|
||||
echo "${PGROUTING_CHECKSUM} pgrouting.tar.gz" | sha256sum --check && \
|
||||
mkdir pgrouting-src && cd pgrouting-src && tar xzf ../pgrouting.tar.gz --strip-components=1 -C . && \
|
||||
mkdir build && cd build && \
|
||||
cmake -DCMAKE_BUILD_TYPE=Release .. && \
|
||||
@@ -215,10 +260,9 @@ FROM build-deps AS h3-pg-build
|
||||
ARG PG_VERSION
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
mkdir -p /h3/usr/ && \
|
||||
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
|
||||
esac && \
|
||||
# not version-specific
|
||||
# last release v4.1.0 - Jan 18, 2023
|
||||
RUN mkdir -p /h3/usr/ && \
|
||||
wget https://github.com/uber/h3/archive/refs/tags/v4.1.0.tar.gz -O h3.tar.gz && \
|
||||
echo "ec99f1f5974846bde64f4513cf8d2ea1b8d172d2218ab41803bf6a63532272bc h3.tar.gz" | sha256sum --check && \
|
||||
mkdir h3-src && cd h3-src && tar xzf ../h3.tar.gz --strip-components=1 -C . && \
|
||||
@@ -229,10 +273,9 @@ RUN case "${PG_VERSION}" in "v17") \
|
||||
cp -R /h3/usr / && \
|
||||
rm -rf build
|
||||
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/zachasme/h3-pg/archive/refs/tags/v4.1.3.tar.gz -O h3-pg.tar.gz && \
|
||||
# not version-specific
|
||||
# last release v4.1.3 - Jul 26, 2023
|
||||
RUN wget https://github.com/zachasme/h3-pg/archive/refs/tags/v4.1.3.tar.gz -O h3-pg.tar.gz && \
|
||||
echo "5c17f09a820859ffe949f847bebf1be98511fb8f1bd86f94932512c00479e324 h3-pg.tar.gz" | sha256sum --check && \
|
||||
mkdir h3-pg-src && cd h3-pg-src && tar xzf ../h3-pg.tar.gz --strip-components=1 -C . && \
|
||||
export PATH="/usr/local/pgsql/bin:$PATH" && \
|
||||
@@ -251,11 +294,10 @@ FROM build-deps AS unit-pg-build
|
||||
ARG PG_VERSION
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/df7cb/postgresql-unit/archive/refs/tags/7.7.tar.gz -O postgresql-unit.tar.gz && \
|
||||
echo "411d05beeb97e5a4abf17572bfcfbb5a68d98d1018918feff995f6ee3bb03e79 postgresql-unit.tar.gz" | sha256sum --check && \
|
||||
# not version-specific
|
||||
# last release 7.9 - Sep 15, 2024
|
||||
RUN wget https://github.com/df7cb/postgresql-unit/archive/refs/tags/7.9.tar.gz -O postgresql-unit.tar.gz && \
|
||||
echo "e46de6245dcc8b2c2ecf29873dbd43b2b346773f31dd5ce4b8315895a052b456 postgresql-unit.tar.gz" | sha256sum --check && \
|
||||
mkdir postgresql-unit-src && cd postgresql-unit-src && tar xzf ../postgresql-unit.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
@@ -302,12 +344,10 @@ FROM build-deps AS pgjwt-pg-build
|
||||
ARG PG_VERSION
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
# 9742dab1b2f297ad3811120db7b21451bca2d3c9 made on 13/11/2021
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/michelp/pgjwt/archive/9742dab1b2f297ad3811120db7b21451bca2d3c9.tar.gz -O pgjwt.tar.gz && \
|
||||
echo "cfdefb15007286f67d3d45510f04a6a7a495004be5b3aecb12cda667e774203f pgjwt.tar.gz" | sha256sum --check && \
|
||||
# not version-specific
|
||||
# doesn't use releases, last commit f3d82fd - Mar 2, 2023
|
||||
RUN wget https://github.com/michelp/pgjwt/archive/f3d82fd30151e754e19ce5d6a06c71c20689ce3d.tar.gz -O pgjwt.tar.gz && \
|
||||
echo "dae8ed99eebb7593b43013f6532d772b12dfecd55548d2673f2dfd0163f6d2b9 pgjwt.tar.gz" | sha256sum --check && \
|
||||
mkdir pgjwt-src && cd pgjwt-src && tar xzf ../pgjwt.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgjwt.control
|
||||
@@ -342,10 +382,9 @@ FROM build-deps AS pg-hashids-pg-build
|
||||
ARG PG_VERSION
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/iCyberon/pg_hashids/archive/refs/tags/v1.2.1.tar.gz -O pg_hashids.tar.gz && \
|
||||
# not version-specific
|
||||
# last release v1.2.1 -Jan 12, 2018
|
||||
RUN wget https://github.com/iCyberon/pg_hashids/archive/refs/tags/v1.2.1.tar.gz -O pg_hashids.tar.gz && \
|
||||
echo "74576b992d9277c92196dd8d816baa2cc2d8046fe102f3dcd7f3c3febed6822a pg_hashids.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_hashids-src && cd pg_hashids-src && tar xzf ../pg_hashids.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
|
||||
@@ -405,10 +444,9 @@ FROM build-deps AS ip4r-pg-build
|
||||
ARG PG_VERSION
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/RhodiumToad/ip4r/archive/refs/tags/2.4.2.tar.gz -O ip4r.tar.gz && \
|
||||
# not version-specific
|
||||
# last release v2.4.2 - Jul 29, 2023
|
||||
RUN wget https://github.com/RhodiumToad/ip4r/archive/refs/tags/2.4.2.tar.gz -O ip4r.tar.gz && \
|
||||
echo "0f7b1f159974f49a47842a8ab6751aecca1ed1142b6d5e38d81b064b2ead1b4b ip4r.tar.gz" | sha256sum --check && \
|
||||
mkdir ip4r-src && cd ip4r-src && tar xzf ../ip4r.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
@@ -425,10 +463,9 @@ FROM build-deps AS prefix-pg-build
|
||||
ARG PG_VERSION
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/dimitri/prefix/archive/refs/tags/v1.2.10.tar.gz -O prefix.tar.gz && \
|
||||
# not version-specific
|
||||
# last release v1.2.10 - Jul 5, 2023
|
||||
RUN wget https://github.com/dimitri/prefix/archive/refs/tags/v1.2.10.tar.gz -O prefix.tar.gz && \
|
||||
echo "4342f251432a5f6fb05b8597139d3ccde8dcf87e8ca1498e7ee931ca057a8575 prefix.tar.gz" | sha256sum --check && \
|
||||
mkdir prefix-src && cd prefix-src && tar xzf ../prefix.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
@@ -445,10 +482,9 @@ FROM build-deps AS hll-pg-build
|
||||
ARG PG_VERSION
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/citusdata/postgresql-hll/archive/refs/tags/v2.18.tar.gz -O hll.tar.gz && \
|
||||
# not version-specific
|
||||
# last release v2.18 - Aug 29, 2023
|
||||
RUN wget https://github.com/citusdata/postgresql-hll/archive/refs/tags/v2.18.tar.gz -O hll.tar.gz && \
|
||||
echo "e2f55a6f4c4ab95ee4f1b4a2b73280258c5136b161fe9d059559556079694f0e hll.tar.gz" | sha256sum --check && \
|
||||
mkdir hll-src && cd hll-src && tar xzf ../hll.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
@@ -659,11 +695,10 @@ FROM build-deps AS pg-roaringbitmap-pg-build
|
||||
ARG PG_VERSION
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
# not version-specific
|
||||
# last release v0.5.4 - Jun 28, 2022
|
||||
ENV PATH="/usr/local/pgsql/bin/:$PATH"
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "v17 extensions is not supported yet by pg_roaringbitmap. Quit" && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/ChenHuajun/pg_roaringbitmap/archive/refs/tags/v0.5.4.tar.gz -O pg_roaringbitmap.tar.gz && \
|
||||
RUN wget https://github.com/ChenHuajun/pg_roaringbitmap/archive/refs/tags/v0.5.4.tar.gz -O pg_roaringbitmap.tar.gz && \
|
||||
echo "b75201efcb1c2d1b014ec4ae6a22769cc7a224e6e406a587f5784a37b6b5a2aa pg_roaringbitmap.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_roaringbitmap-src && cd pg_roaringbitmap-src && tar xzf ../pg_roaringbitmap.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
@@ -680,12 +715,27 @@ FROM build-deps AS pg-semver-pg-build
|
||||
ARG PG_VERSION
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
# Release 0.40.0 breaks backward compatibility with previous versions
|
||||
# see release note https://github.com/theory/pg-semver/releases/tag/v0.40.0
|
||||
# Use new version only for v17
|
||||
#
|
||||
# last release v0.40.0 - Jul 22, 2024
|
||||
ENV PATH="/usr/local/pgsql/bin/:$PATH"
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "v17 is not supported yet by pg_semver. Quit" && exit 0;; \
|
||||
RUN case "${PG_VERSION}" in \
|
||||
"v17") \
|
||||
export SEMVER_VERSION=0.40.0 \
|
||||
export SEMVER_CHECKSUM=3e50bcc29a0e2e481e7b6d2bc937cadc5f5869f55d983b5a1aafeb49f5425cfc \
|
||||
;; \
|
||||
"v14" | "v15" | "v16") \
|
||||
export SEMVER_VERSION=0.32.1 \
|
||||
export SEMVER_CHECKSUM=fbdaf7512026d62eec03fad8687c15ed509b6ba395bff140acd63d2e4fbe25d7 \
|
||||
;; \
|
||||
*) \
|
||||
echo "unexpected PostgreSQL version" && exit 1 \
|
||||
;; \
|
||||
esac && \
|
||||
wget https://github.com/theory/pg-semver/archive/refs/tags/v0.32.1.tar.gz -O pg_semver.tar.gz && \
|
||||
echo "fbdaf7512026d62eec03fad8687c15ed509b6ba395bff140acd63d2e4fbe25d7 pg_semver.tar.gz" | sha256sum --check && \
|
||||
wget https://github.com/theory/pg-semver/archive/refs/tags/v${SEMVER_VERSION}.tar.gz -O pg_semver.tar.gz && \
|
||||
echo "${SEMVER_CHECKSUM} pg_semver.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_semver-src && cd pg_semver-src && tar xzf ../pg_semver.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
|
||||
@@ -1484,6 +1484,28 @@ LIMIT 100",
|
||||
info!("Pageserver config changed");
|
||||
}
|
||||
}
|
||||
|
||||
// Gather info about installed extensions
|
||||
pub fn get_installed_extensions(&self) -> Result<()> {
|
||||
let connstr = self.connstr.clone();
|
||||
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("failed to create runtime");
|
||||
let result = rt
|
||||
.block_on(crate::installed_extensions::get_installed_extensions(
|
||||
connstr,
|
||||
))
|
||||
.expect("failed to get installed extensions");
|
||||
|
||||
info!(
|
||||
"{}",
|
||||
serde_json::to_string(&result).expect("failed to serialize extensions list")
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn forward_termination_signal() {
|
||||
|
||||
@@ -165,6 +165,32 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
||||
}
|
||||
}
|
||||
|
||||
// get the list of installed extensions
|
||||
// currently only used in python tests
|
||||
// TODO: call it from cplane
|
||||
(&Method::GET, "/installed_extensions") => {
|
||||
info!("serving /installed_extensions GET request");
|
||||
let status = compute.get_status();
|
||||
if status != ComputeStatus::Running {
|
||||
let msg = format!(
|
||||
"invalid compute status for extensions request: {:?}",
|
||||
status
|
||||
);
|
||||
error!(msg);
|
||||
return Response::new(Body::from(msg));
|
||||
}
|
||||
|
||||
let connstr = compute.connstr.clone();
|
||||
let res = crate::installed_extensions::get_installed_extensions(connstr).await;
|
||||
match res {
|
||||
Ok(res) => render_json(Body::from(serde_json::to_string(&res).unwrap())),
|
||||
Err(e) => render_json_error(
|
||||
&format!("could not get list of installed extensions: {}", e),
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
// download extension files from remote extension storage on demand
|
||||
(&Method::POST, route) if route.starts_with("/extension_server/") => {
|
||||
info!("serving {:?} POST request", route);
|
||||
|
||||
@@ -53,6 +53,20 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ComputeInsights"
|
||||
|
||||
/installed_extensions:
|
||||
get:
|
||||
tags:
|
||||
- Info
|
||||
summary: Get installed extensions.
|
||||
description: ""
|
||||
operationId: getInstalledExtensions
|
||||
responses:
|
||||
200:
|
||||
description: List of installed extensions
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/InstalledExtensions"
|
||||
/info:
|
||||
get:
|
||||
tags:
|
||||
@@ -395,6 +409,24 @@ components:
|
||||
- configuration
|
||||
example: running
|
||||
|
||||
InstalledExtensions:
|
||||
type: object
|
||||
properties:
|
||||
extensions:
|
||||
description: Contains list of installed extensions.
|
||||
type: array
|
||||
items:
|
||||
type: object
|
||||
properties:
|
||||
extname:
|
||||
type: string
|
||||
versions:
|
||||
type: array
|
||||
items:
|
||||
type: string
|
||||
n_databases:
|
||||
type: integer
|
||||
|
||||
#
|
||||
# Errors
|
||||
#
|
||||
|
||||
80
compute_tools/src/installed_extensions.rs
Normal file
80
compute_tools/src/installed_extensions.rs
Normal file
@@ -0,0 +1,80 @@
|
||||
use compute_api::responses::{InstalledExtension, InstalledExtensions};
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use url::Url;
|
||||
|
||||
use anyhow::Result;
|
||||
use postgres::{Client, NoTls};
|
||||
use tokio::task;
|
||||
|
||||
/// We don't reuse get_existing_dbs() just for code clarity
|
||||
/// and to make database listing query here more explicit.
|
||||
///
|
||||
/// Limit the number of databases to 500 to avoid excessive load.
|
||||
fn list_dbs(client: &mut Client) -> Result<Vec<String>> {
|
||||
// `pg_database.datconnlimit = -2` means that the database is in the
|
||||
// invalid state
|
||||
let databases = client
|
||||
.query(
|
||||
"SELECT datname FROM pg_catalog.pg_database
|
||||
WHERE datallowconn
|
||||
AND datconnlimit <> - 2
|
||||
LIMIT 500",
|
||||
&[],
|
||||
)?
|
||||
.iter()
|
||||
.map(|row| {
|
||||
let db: String = row.get("datname");
|
||||
db
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(databases)
|
||||
}
|
||||
|
||||
/// Connect to every database (see list_dbs above) and get the list of installed extensions.
|
||||
/// Same extension can be installed in multiple databases with different versions,
|
||||
/// we only keep the highest and lowest version across all databases.
|
||||
pub async fn get_installed_extensions(connstr: Url) -> Result<InstalledExtensions> {
|
||||
let mut connstr = connstr.clone();
|
||||
|
||||
task::spawn_blocking(move || {
|
||||
let mut client = Client::connect(connstr.as_str(), NoTls)?;
|
||||
let databases: Vec<String> = list_dbs(&mut client)?;
|
||||
|
||||
let mut extensions_map: HashMap<String, InstalledExtension> = HashMap::new();
|
||||
for db in databases.iter() {
|
||||
connstr.set_path(db);
|
||||
let mut db_client = Client::connect(connstr.as_str(), NoTls)?;
|
||||
let extensions: Vec<(String, String)> = db_client
|
||||
.query(
|
||||
"SELECT extname, extversion FROM pg_catalog.pg_extension;",
|
||||
&[],
|
||||
)?
|
||||
.iter()
|
||||
.map(|row| (row.get("extname"), row.get("extversion")))
|
||||
.collect();
|
||||
|
||||
for (extname, v) in extensions.iter() {
|
||||
let version = v.to_string();
|
||||
extensions_map
|
||||
.entry(extname.to_string())
|
||||
.and_modify(|e| {
|
||||
e.versions.insert(version.clone());
|
||||
// count the number of databases where the extension is installed
|
||||
e.n_databases += 1;
|
||||
})
|
||||
.or_insert(InstalledExtension {
|
||||
extname: extname.to_string(),
|
||||
versions: HashSet::from([version.clone()]),
|
||||
n_databases: 1,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Ok(InstalledExtensions {
|
||||
extensions: extensions_map.values().cloned().collect(),
|
||||
})
|
||||
})
|
||||
.await?
|
||||
}
|
||||
@@ -15,6 +15,7 @@ pub mod catalog;
|
||||
pub mod compute;
|
||||
pub mod disk_quota;
|
||||
pub mod extension_server;
|
||||
pub mod installed_extensions;
|
||||
pub mod local_proxy;
|
||||
pub mod lsn_lease;
|
||||
mod migration;
|
||||
|
||||
@@ -318,12 +318,26 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
"CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser",
|
||||
name.pg_quote()
|
||||
);
|
||||
// If the role we're creating is intended for JWT login, we do
|
||||
// not give it any attributes.
|
||||
if jwks_roles.contains(name.as_str()) {
|
||||
query = format!("CREATE ROLE {}", name.pg_quote());
|
||||
}
|
||||
info!("running role create query: '{}'", &query);
|
||||
query.push_str(&role.to_pg_options());
|
||||
xact.execute(query.as_str(), &[])?;
|
||||
|
||||
// If the role we're creating is intended for JWT login, we have
|
||||
// to make sure it can execute functions in the auth schema.
|
||||
if jwks_roles.contains(name.as_str()) {
|
||||
let mut grant_query = format!(
|
||||
"GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA auth TO {}",
|
||||
name.pg_quote()
|
||||
);
|
||||
info!("running grant query for JWT role: '{}'", &grant_query);
|
||||
grant_query.push_str(&role.to_pg_options());
|
||||
xact.execute(grant_query.as_str(), &[])?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::fmt::Display;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
@@ -155,3 +156,15 @@ pub enum ControlPlaneComputeStatus {
|
||||
// should be able to start with provided spec.
|
||||
Attached,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Serialize)]
|
||||
pub struct InstalledExtension {
|
||||
pub extname: String,
|
||||
pub versions: HashSet<String>,
|
||||
pub n_databases: u32, // Number of databases using this extension
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Serialize)]
|
||||
pub struct InstalledExtensions {
|
||||
pub extensions: Vec<InstalledExtension>,
|
||||
}
|
||||
|
||||
@@ -104,8 +104,7 @@ pub struct ConfigToml {
|
||||
pub image_compression: ImageCompressionAlgorithm,
|
||||
pub ephemeral_bytes_per_memory_kb: usize,
|
||||
pub l0_flush: Option<crate::models::L0FlushConfig>,
|
||||
pub virtual_file_direct_io: crate::models::virtual_file::DirectIoMode,
|
||||
pub io_buffer_alignment: usize,
|
||||
pub virtual_file_io_mode: Option<crate::models::virtual_file::IoMode>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -388,10 +387,7 @@ impl Default for ConfigToml {
|
||||
image_compression: (DEFAULT_IMAGE_COMPRESSION),
|
||||
ephemeral_bytes_per_memory_kb: (DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
|
||||
l0_flush: None,
|
||||
virtual_file_direct_io: crate::models::virtual_file::DirectIoMode::default(),
|
||||
|
||||
io_buffer_alignment: DEFAULT_IO_BUFFER_ALIGNMENT,
|
||||
|
||||
virtual_file_io_mode: None,
|
||||
tenant_config: TenantConfigToml::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -972,8 +972,6 @@ pub struct TopTenantShardsResponse {
|
||||
}
|
||||
|
||||
pub mod virtual_file {
|
||||
use std::path::PathBuf;
|
||||
|
||||
#[derive(
|
||||
Copy,
|
||||
Clone,
|
||||
@@ -994,50 +992,45 @@ pub mod virtual_file {
|
||||
}
|
||||
|
||||
/// Direct IO modes for a pageserver.
|
||||
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
|
||||
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
|
||||
pub enum DirectIoMode {
|
||||
/// Direct IO disabled (uses usual buffered IO).
|
||||
#[default]
|
||||
Disabled,
|
||||
/// Direct IO disabled (performs checks and perf simulations).
|
||||
Evaluate {
|
||||
/// Alignment check level
|
||||
alignment_check: DirectIoAlignmentCheckLevel,
|
||||
/// Latency padded for performance simulation.
|
||||
latency_padding: DirectIoLatencyPadding,
|
||||
},
|
||||
/// Direct IO enabled.
|
||||
Enabled {
|
||||
/// Actions to perform on alignment error.
|
||||
on_alignment_error: DirectIoOnAlignmentErrorAction,
|
||||
},
|
||||
#[derive(
|
||||
Copy,
|
||||
Clone,
|
||||
PartialEq,
|
||||
Eq,
|
||||
Hash,
|
||||
strum_macros::EnumString,
|
||||
strum_macros::Display,
|
||||
serde_with::DeserializeFromStr,
|
||||
serde_with::SerializeDisplay,
|
||||
Debug,
|
||||
)]
|
||||
#[strum(serialize_all = "kebab-case")]
|
||||
#[repr(u8)]
|
||||
pub enum IoMode {
|
||||
/// Uses buffered IO.
|
||||
Buffered,
|
||||
/// Uses direct IO, error out if the operation fails.
|
||||
#[cfg(target_os = "linux")]
|
||||
Direct,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum DirectIoAlignmentCheckLevel {
|
||||
#[default]
|
||||
Error,
|
||||
Log,
|
||||
None,
|
||||
impl IoMode {
|
||||
pub const fn preferred() -> Self {
|
||||
Self::Buffered
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum DirectIoOnAlignmentErrorAction {
|
||||
Error,
|
||||
#[default]
|
||||
FallbackToBuffered,
|
||||
}
|
||||
impl TryFrom<u8> for IoMode {
|
||||
type Error = u8;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
|
||||
#[serde(tag = "type", rename_all = "kebab-case")]
|
||||
pub enum DirectIoLatencyPadding {
|
||||
/// Pad virtual file operations with IO to a fake file.
|
||||
FakeFileRW { path: PathBuf },
|
||||
#[default]
|
||||
None,
|
||||
fn try_from(value: u8) -> Result<Self, Self::Error> {
|
||||
Ok(match value {
|
||||
v if v == (IoMode::Buffered as u8) => IoMode::Buffered,
|
||||
#[cfg(target_os = "linux")]
|
||||
v if v == (IoMode::Direct as u8) => IoMode::Direct,
|
||||
x => return Err(x),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -496,26 +496,12 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
builder = builder.if_match(IfMatchCondition::NotMatch(etag.to_string()))
|
||||
}
|
||||
|
||||
self.download_for_builder(builder, cancel).await
|
||||
}
|
||||
|
||||
async fn download_byte_range(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
start_inclusive: u64,
|
||||
end_exclusive: Option<u64>,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError> {
|
||||
let blob_client = self.client.blob_client(self.relative_path_to_name(from));
|
||||
|
||||
let mut builder = blob_client.get();
|
||||
|
||||
let range: Range = if let Some(end_exclusive) = end_exclusive {
|
||||
(start_inclusive..end_exclusive).into()
|
||||
} else {
|
||||
(start_inclusive..).into()
|
||||
};
|
||||
builder = builder.range(range);
|
||||
if let Some((start, end)) = opts.byte_range() {
|
||||
builder = builder.range(match end {
|
||||
Some(end) => Range::Range(start..end),
|
||||
None => Range::RangeFrom(start..),
|
||||
});
|
||||
}
|
||||
|
||||
self.download_for_builder(builder, cancel).await
|
||||
}
|
||||
|
||||
@@ -19,7 +19,8 @@ mod simulate_failures;
|
||||
mod support;
|
||||
|
||||
use std::{
|
||||
collections::HashMap, fmt::Debug, num::NonZeroU32, pin::Pin, sync::Arc, time::SystemTime,
|
||||
collections::HashMap, fmt::Debug, num::NonZeroU32, ops::Bound, pin::Pin, sync::Arc,
|
||||
time::SystemTime,
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
@@ -162,11 +163,60 @@ pub struct Listing {
|
||||
}
|
||||
|
||||
/// Options for downloads. The default value is a plain GET.
|
||||
#[derive(Default)]
|
||||
pub struct DownloadOpts {
|
||||
/// If given, returns [`DownloadError::Unmodified`] if the object still has
|
||||
/// the same ETag (using If-None-Match).
|
||||
pub etag: Option<Etag>,
|
||||
/// The start of the byte range to download, or unbounded.
|
||||
pub byte_start: Bound<u64>,
|
||||
/// The end of the byte range to download, or unbounded. Must be after the
|
||||
/// start bound.
|
||||
pub byte_end: Bound<u64>,
|
||||
}
|
||||
|
||||
impl Default for DownloadOpts {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
etag: Default::default(),
|
||||
byte_start: Bound::Unbounded,
|
||||
byte_end: Bound::Unbounded,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DownloadOpts {
|
||||
/// Returns the byte range with inclusive start and exclusive end, or None
|
||||
/// if unbounded.
|
||||
pub fn byte_range(&self) -> Option<(u64, Option<u64>)> {
|
||||
if self.byte_start == Bound::Unbounded && self.byte_end == Bound::Unbounded {
|
||||
return None;
|
||||
}
|
||||
let start = match self.byte_start {
|
||||
Bound::Excluded(i) => i + 1,
|
||||
Bound::Included(i) => i,
|
||||
Bound::Unbounded => 0,
|
||||
};
|
||||
let end = match self.byte_end {
|
||||
Bound::Excluded(i) => Some(i),
|
||||
Bound::Included(i) => Some(i + 1),
|
||||
Bound::Unbounded => None,
|
||||
};
|
||||
if let Some(end) = end {
|
||||
assert!(start < end, "range end {end} at or before start {start}");
|
||||
}
|
||||
Some((start, end))
|
||||
}
|
||||
|
||||
/// Returns the byte range as an RFC 2616 Range header value with inclusive
|
||||
/// bounds, or None if unbounded.
|
||||
pub fn byte_range_header(&self) -> Option<String> {
|
||||
self.byte_range()
|
||||
.map(|(start, end)| (start, end.map(|end| end - 1))) // make end inclusive
|
||||
.map(|(start, end)| match end {
|
||||
Some(end) => format!("bytes={start}-{end}"),
|
||||
None => format!("bytes={start}-"),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Storage (potentially remote) API to manage its state.
|
||||
@@ -257,21 +307,6 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError>;
|
||||
|
||||
/// Streams a given byte range of the remote storage entry contents.
|
||||
///
|
||||
/// The returned download stream will obey initial timeout and cancellation signal by erroring
|
||||
/// on whichever happens first. Only one of the reasons will fail the stream, which is usually
|
||||
/// enough for `tokio::io::copy_buf` usage. If needed the error can be filtered out.
|
||||
///
|
||||
/// Returns the metadata, if any was stored with the file previously.
|
||||
async fn download_byte_range(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
start_inclusive: u64,
|
||||
end_exclusive: Option<u64>,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError>;
|
||||
|
||||
/// Delete a single path from remote storage.
|
||||
///
|
||||
/// If the operation fails because of timeout or cancellation, the root cause of the error will be
|
||||
@@ -425,33 +460,6 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn download_byte_range(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
start_inclusive: u64,
|
||||
end_exclusive: Option<u64>,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError> {
|
||||
match self {
|
||||
Self::LocalFs(s) => {
|
||||
s.download_byte_range(from, start_inclusive, end_exclusive, cancel)
|
||||
.await
|
||||
}
|
||||
Self::AwsS3(s) => {
|
||||
s.download_byte_range(from, start_inclusive, end_exclusive, cancel)
|
||||
.await
|
||||
}
|
||||
Self::AzureBlob(s) => {
|
||||
s.download_byte_range(from, start_inclusive, end_exclusive, cancel)
|
||||
.await
|
||||
}
|
||||
Self::Unreliable(s) => {
|
||||
s.download_byte_range(from, start_inclusive, end_exclusive, cancel)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// See [`RemoteStorage::delete`]
|
||||
pub async fn delete(
|
||||
&self,
|
||||
@@ -573,20 +581,6 @@ impl GenericRemoteStorage {
|
||||
})
|
||||
}
|
||||
|
||||
/// Downloads the storage object into the `to_path` provided.
|
||||
/// `byte_range` could be specified to dowload only a part of the file, if needed.
|
||||
pub async fn download_storage_object(
|
||||
&self,
|
||||
byte_range: Option<(u64, Option<u64>)>,
|
||||
from: &RemotePath,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError> {
|
||||
match byte_range {
|
||||
Some((start, end)) => self.download_byte_range(from, start, end, cancel).await,
|
||||
None => self.download(from, &DownloadOpts::default(), cancel).await,
|
||||
}
|
||||
}
|
||||
|
||||
/// The name of the bucket/container/etc.
|
||||
pub fn bucket_name(&self) -> Option<&str> {
|
||||
match self {
|
||||
@@ -660,6 +654,76 @@ impl ConcurrencyLimiter {
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
/// DownloadOpts::byte_range() should generate (inclusive, exclusive) ranges
|
||||
/// with optional end bound, or None when unbounded.
|
||||
#[test]
|
||||
fn download_opts_byte_range() {
|
||||
// Consider using test_case or a similar table-driven test framework.
|
||||
let cases = [
|
||||
// (byte_start, byte_end, expected)
|
||||
(Bound::Unbounded, Bound::Unbounded, None),
|
||||
(Bound::Unbounded, Bound::Included(7), Some((0, Some(8)))),
|
||||
(Bound::Unbounded, Bound::Excluded(7), Some((0, Some(7)))),
|
||||
(Bound::Included(3), Bound::Unbounded, Some((3, None))),
|
||||
(Bound::Included(3), Bound::Included(7), Some((3, Some(8)))),
|
||||
(Bound::Included(3), Bound::Excluded(7), Some((3, Some(7)))),
|
||||
(Bound::Excluded(3), Bound::Unbounded, Some((4, None))),
|
||||
(Bound::Excluded(3), Bound::Included(7), Some((4, Some(8)))),
|
||||
(Bound::Excluded(3), Bound::Excluded(7), Some((4, Some(7)))),
|
||||
// 1-sized ranges are fine, 0 aren't and will panic (separate test).
|
||||
(Bound::Included(3), Bound::Included(3), Some((3, Some(4)))),
|
||||
(Bound::Included(3), Bound::Excluded(4), Some((3, Some(4)))),
|
||||
];
|
||||
|
||||
for (byte_start, byte_end, expect) in cases {
|
||||
let opts = DownloadOpts {
|
||||
byte_start,
|
||||
byte_end,
|
||||
..Default::default()
|
||||
};
|
||||
let result = opts.byte_range();
|
||||
assert_eq!(
|
||||
result, expect,
|
||||
"byte_start={byte_start:?} byte_end={byte_end:?}"
|
||||
);
|
||||
|
||||
// Check generated HTTP header, which uses an inclusive range.
|
||||
let expect_header = expect.map(|(start, end)| match end {
|
||||
Some(end) => format!("bytes={start}-{}", end - 1), // inclusive end
|
||||
None => format!("bytes={start}-"),
|
||||
});
|
||||
assert_eq!(
|
||||
opts.byte_range_header(),
|
||||
expect_header,
|
||||
"byte_start={byte_start:?} byte_end={byte_end:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// DownloadOpts::byte_range() zero-sized byte range should panic.
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn download_opts_byte_range_zero() {
|
||||
DownloadOpts {
|
||||
byte_start: Bound::Included(3),
|
||||
byte_end: Bound::Excluded(3),
|
||||
..Default::default()
|
||||
}
|
||||
.byte_range();
|
||||
}
|
||||
|
||||
/// DownloadOpts::byte_range() negative byte range should panic.
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn download_opts_byte_range_negative() {
|
||||
DownloadOpts {
|
||||
byte_start: Bound::Included(3),
|
||||
byte_end: Bound::Included(2),
|
||||
..Default::default()
|
||||
}
|
||||
.byte_range();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_object_name() {
|
||||
let k = RemotePath::new(Utf8Path::new("a/b/c")).unwrap();
|
||||
|
||||
@@ -506,54 +506,7 @@ impl RemoteStorage for LocalFs {
|
||||
return Err(DownloadError::Unmodified);
|
||||
}
|
||||
|
||||
let source = ReaderStream::new(
|
||||
fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.open(&target_path)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Failed to open source file {target_path:?} to use in the download")
|
||||
})
|
||||
.map_err(DownloadError::Other)?,
|
||||
);
|
||||
|
||||
let metadata = self
|
||||
.read_storage_metadata(&target_path)
|
||||
.await
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
|
||||
let source = crate::support::DownloadStream::new(cancel_or_timeout, source);
|
||||
|
||||
Ok(Download {
|
||||
metadata,
|
||||
last_modified: file_metadata
|
||||
.modified()
|
||||
.map_err(|e| DownloadError::Other(anyhow::anyhow!(e).context("Reading mtime")))?,
|
||||
etag,
|
||||
download_stream: Box::pin(source),
|
||||
})
|
||||
}
|
||||
|
||||
async fn download_byte_range(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
start_inclusive: u64,
|
||||
end_exclusive: Option<u64>,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError> {
|
||||
if let Some(end_exclusive) = end_exclusive {
|
||||
if end_exclusive <= start_inclusive {
|
||||
return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})")));
|
||||
};
|
||||
if start_inclusive == end_exclusive.saturating_sub(1) {
|
||||
return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes")));
|
||||
}
|
||||
}
|
||||
|
||||
let target_path = from.with_base(&self.storage_root);
|
||||
let file_metadata = file_metadata(&target_path).await?;
|
||||
let mut source = tokio::fs::OpenOptions::new()
|
||||
let mut file = fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.open(&target_path)
|
||||
.await
|
||||
@@ -562,31 +515,29 @@ impl RemoteStorage for LocalFs {
|
||||
})
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
let len = source
|
||||
.metadata()
|
||||
.await
|
||||
.context("query file length")
|
||||
.map_err(DownloadError::Other)?
|
||||
.len();
|
||||
let mut take = file_metadata.len();
|
||||
if let Some((start, end)) = opts.byte_range() {
|
||||
if start > 0 {
|
||||
file.seek(io::SeekFrom::Start(start))
|
||||
.await
|
||||
.context("Failed to seek to the range start in a local storage file")
|
||||
.map_err(DownloadError::Other)?;
|
||||
}
|
||||
if let Some(end) = end {
|
||||
take = end - start;
|
||||
}
|
||||
}
|
||||
|
||||
source
|
||||
.seek(io::SeekFrom::Start(start_inclusive))
|
||||
.await
|
||||
.context("Failed to seek to the range start in a local storage file")
|
||||
.map_err(DownloadError::Other)?;
|
||||
let source = ReaderStream::new(file.take(take));
|
||||
|
||||
let metadata = self
|
||||
.read_storage_metadata(&target_path)
|
||||
.await
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
let source = source.take(end_exclusive.unwrap_or(len) - start_inclusive);
|
||||
let source = ReaderStream::new(source);
|
||||
|
||||
let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
|
||||
let source = crate::support::DownloadStream::new(cancel_or_timeout, source);
|
||||
|
||||
let etag = mock_etag(&file_metadata);
|
||||
Ok(Download {
|
||||
metadata,
|
||||
last_modified: file_metadata
|
||||
@@ -688,7 +639,7 @@ mod fs_tests {
|
||||
use super::*;
|
||||
|
||||
use camino_tempfile::tempdir;
|
||||
use std::{collections::HashMap, io::Write};
|
||||
use std::{collections::HashMap, io::Write, ops::Bound};
|
||||
|
||||
async fn read_and_check_metadata(
|
||||
storage: &LocalFs,
|
||||
@@ -804,10 +755,12 @@ mod fs_tests {
|
||||
let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
|
||||
|
||||
let first_part_download = storage
|
||||
.download_byte_range(
|
||||
.download(
|
||||
&upload_target,
|
||||
0,
|
||||
Some(first_part_local.len() as u64),
|
||||
&DownloadOpts {
|
||||
byte_end: Bound::Excluded(first_part_local.len() as u64),
|
||||
..Default::default()
|
||||
},
|
||||
&cancel,
|
||||
)
|
||||
.await?;
|
||||
@@ -823,10 +776,15 @@ mod fs_tests {
|
||||
);
|
||||
|
||||
let second_part_download = storage
|
||||
.download_byte_range(
|
||||
.download(
|
||||
&upload_target,
|
||||
first_part_local.len() as u64,
|
||||
Some((first_part_local.len() + second_part_local.len()) as u64),
|
||||
&DownloadOpts {
|
||||
byte_start: Bound::Included(first_part_local.len() as u64),
|
||||
byte_end: Bound::Excluded(
|
||||
(first_part_local.len() + second_part_local.len()) as u64,
|
||||
),
|
||||
..Default::default()
|
||||
},
|
||||
&cancel,
|
||||
)
|
||||
.await?;
|
||||
@@ -842,7 +800,14 @@ mod fs_tests {
|
||||
);
|
||||
|
||||
let suffix_bytes = storage
|
||||
.download_byte_range(&upload_target, 13, None, &cancel)
|
||||
.download(
|
||||
&upload_target,
|
||||
&DownloadOpts {
|
||||
byte_start: Bound::Included(13),
|
||||
..Default::default()
|
||||
},
|
||||
&cancel,
|
||||
)
|
||||
.await?
|
||||
.download_stream;
|
||||
let suffix_bytes = aggregate(suffix_bytes).await?;
|
||||
@@ -850,7 +815,7 @@ mod fs_tests {
|
||||
assert_eq!(upload_name, suffix);
|
||||
|
||||
let all_bytes = storage
|
||||
.download_byte_range(&upload_target, 0, None, &cancel)
|
||||
.download(&upload_target, &DownloadOpts::default(), &cancel)
|
||||
.await?
|
||||
.download_stream;
|
||||
let all_bytes = aggregate(all_bytes).await?;
|
||||
@@ -861,48 +826,26 @@ mod fs_tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn download_file_range_negative() -> anyhow::Result<()> {
|
||||
let (storage, cancel) = create_storage()?;
|
||||
#[should_panic(expected = "at or before start")]
|
||||
async fn download_file_range_negative() {
|
||||
let (storage, cancel) = create_storage().unwrap();
|
||||
let upload_name = "upload_1";
|
||||
let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
|
||||
let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let start = 1_000_000_000;
|
||||
let end = start + 1;
|
||||
match storage
|
||||
.download_byte_range(
|
||||
storage
|
||||
.download(
|
||||
&upload_target,
|
||||
start,
|
||||
Some(end), // exclusive end
|
||||
&DownloadOpts {
|
||||
byte_start: Bound::Included(10),
|
||||
byte_end: Bound::Excluded(10),
|
||||
..Default::default()
|
||||
},
|
||||
&cancel,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => panic!("Should not allow downloading wrong ranges"),
|
||||
Err(e) => {
|
||||
let error_string = e.to_string();
|
||||
assert!(error_string.contains("zero bytes"));
|
||||
assert!(error_string.contains(&start.to_string()));
|
||||
assert!(error_string.contains(&end.to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
let start = 10000;
|
||||
let end = 234;
|
||||
assert!(start > end, "Should test an incorrect range");
|
||||
match storage
|
||||
.download_byte_range(&upload_target, start, Some(end), &cancel)
|
||||
.await
|
||||
{
|
||||
Ok(_) => panic!("Should not allow downloading wrong ranges"),
|
||||
Err(e) => {
|
||||
let error_string = e.to_string();
|
||||
assert!(error_string.contains("Invalid range"));
|
||||
assert!(error_string.contains(&start.to_string()));
|
||||
assert!(error_string.contains(&end.to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -945,10 +888,12 @@ mod fs_tests {
|
||||
let (first_part_local, _) = uploaded_bytes.split_at(3);
|
||||
|
||||
let partial_download_with_metadata = storage
|
||||
.download_byte_range(
|
||||
.download(
|
||||
&upload_target,
|
||||
0,
|
||||
Some(first_part_local.len() as u64),
|
||||
&DownloadOpts {
|
||||
byte_end: Bound::Excluded(first_part_local.len() as u64),
|
||||
..Default::default()
|
||||
},
|
||||
&cancel,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -804,34 +804,7 @@ impl RemoteStorage for S3Bucket {
|
||||
bucket: self.bucket_name.clone(),
|
||||
key: self.relative_path_to_s3_object(from),
|
||||
etag: opts.etag.as_ref().map(|e| e.to_string()),
|
||||
range: None,
|
||||
},
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn download_byte_range(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
start_inclusive: u64,
|
||||
end_exclusive: Option<u64>,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError> {
|
||||
// S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
|
||||
// and needs both ends to be exclusive
|
||||
let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
|
||||
let range = Some(match end_inclusive {
|
||||
Some(end_inclusive) => format!("bytes={start_inclusive}-{end_inclusive}"),
|
||||
None => format!("bytes={start_inclusive}-"),
|
||||
});
|
||||
|
||||
self.download_object(
|
||||
GetObjectRequest {
|
||||
bucket: self.bucket_name.clone(),
|
||||
key: self.relative_path_to_s3_object(from),
|
||||
etag: None,
|
||||
range,
|
||||
range: opts.byte_range_header(),
|
||||
},
|
||||
cancel,
|
||||
)
|
||||
|
||||
@@ -170,28 +170,13 @@ impl RemoteStorage for UnreliableWrapper {
|
||||
opts: &DownloadOpts,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError> {
|
||||
// Note: We treat any byte range as an "attempt" of the same operation.
|
||||
// We don't pay attention to the ranges. That's good enough for now.
|
||||
self.attempt(RemoteOp::Download(from.clone()))
|
||||
.map_err(DownloadError::Other)?;
|
||||
self.inner.download(from, opts, cancel).await
|
||||
}
|
||||
|
||||
async fn download_byte_range(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
start_inclusive: u64,
|
||||
end_exclusive: Option<u64>,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Download, DownloadError> {
|
||||
// Note: We treat any download_byte_range as an "attempt" of the same
|
||||
// operation. We don't pay attention to the ranges. That's good enough
|
||||
// for now.
|
||||
self.attempt(RemoteOp::Download(from.clone()))
|
||||
.map_err(DownloadError::Other)?;
|
||||
self.inner
|
||||
.download_byte_range(from, start_inclusive, end_exclusive, cancel)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
|
||||
self.delete_inner(path, true, cancel).await
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ use anyhow::Context;
|
||||
use camino::Utf8Path;
|
||||
use futures::StreamExt;
|
||||
use remote_storage::{DownloadError, DownloadOpts, ListingMode, ListingObject, RemotePath};
|
||||
use std::ops::Bound;
|
||||
use std::sync::Arc;
|
||||
use std::{collections::HashSet, num::NonZeroU32};
|
||||
use test_context::test_context;
|
||||
@@ -293,7 +294,15 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<
|
||||
// Full range (end specified)
|
||||
let dl = ctx
|
||||
.client
|
||||
.download_byte_range(&path, 0, Some(len as u64), &cancel)
|
||||
.download(
|
||||
&path,
|
||||
&DownloadOpts {
|
||||
byte_start: Bound::Included(0),
|
||||
byte_end: Bound::Excluded(len as u64),
|
||||
..Default::default()
|
||||
},
|
||||
&cancel,
|
||||
)
|
||||
.await?;
|
||||
let buf = download_to_vec(dl).await?;
|
||||
assert_eq!(&buf, &orig);
|
||||
@@ -301,7 +310,15 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<
|
||||
// partial range (end specified)
|
||||
let dl = ctx
|
||||
.client
|
||||
.download_byte_range(&path, 4, Some(10), &cancel)
|
||||
.download(
|
||||
&path,
|
||||
&DownloadOpts {
|
||||
byte_start: Bound::Included(4),
|
||||
byte_end: Bound::Excluded(10),
|
||||
..Default::default()
|
||||
},
|
||||
&cancel,
|
||||
)
|
||||
.await?;
|
||||
let buf = download_to_vec(dl).await?;
|
||||
assert_eq!(&buf, &orig[4..10]);
|
||||
@@ -309,7 +326,15 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<
|
||||
// partial range (end beyond real end)
|
||||
let dl = ctx
|
||||
.client
|
||||
.download_byte_range(&path, 8, Some(len as u64 * 100), &cancel)
|
||||
.download(
|
||||
&path,
|
||||
&DownloadOpts {
|
||||
byte_start: Bound::Included(8),
|
||||
byte_end: Bound::Excluded(len as u64 * 100),
|
||||
..Default::default()
|
||||
},
|
||||
&cancel,
|
||||
)
|
||||
.await?;
|
||||
let buf = download_to_vec(dl).await?;
|
||||
assert_eq!(&buf, &orig[8..]);
|
||||
@@ -317,7 +342,14 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<
|
||||
// Partial range (end unspecified)
|
||||
let dl = ctx
|
||||
.client
|
||||
.download_byte_range(&path, 4, None, &cancel)
|
||||
.download(
|
||||
&path,
|
||||
&DownloadOpts {
|
||||
byte_start: Bound::Included(4),
|
||||
..Default::default()
|
||||
},
|
||||
&cancel,
|
||||
)
|
||||
.await?;
|
||||
let buf = download_to_vec(dl).await?;
|
||||
assert_eq!(&buf, &orig[4..]);
|
||||
@@ -325,7 +357,14 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<
|
||||
// Full range (end unspecified)
|
||||
let dl = ctx
|
||||
.client
|
||||
.download_byte_range(&path, 0, None, &cancel)
|
||||
.download(
|
||||
&path,
|
||||
&DownloadOpts {
|
||||
byte_start: Bound::Included(0),
|
||||
..Default::default()
|
||||
},
|
||||
&cancel,
|
||||
)
|
||||
.await?;
|
||||
let buf = download_to_vec(dl).await?;
|
||||
assert_eq!(&buf, &orig);
|
||||
|
||||
@@ -164,11 +164,7 @@ fn criterion_benchmark(c: &mut Criterion) {
|
||||
let conf: &'static PageServerConf = Box::leak(Box::new(
|
||||
pageserver::config::PageServerConf::dummy_conf(temp_dir.path().to_path_buf()),
|
||||
));
|
||||
virtual_file::init(
|
||||
16384,
|
||||
virtual_file::io_engine_for_bench(),
|
||||
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
|
||||
);
|
||||
virtual_file::init(16384, virtual_file::io_engine_for_bench());
|
||||
page_cache::init(conf.page_cache_size);
|
||||
|
||||
{
|
||||
|
||||
@@ -540,10 +540,13 @@ impl Client {
|
||||
.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
/// Configs io buffer alignment at runtime.
|
||||
pub async fn put_io_alignment(&self, align: usize) -> Result<()> {
|
||||
let uri = format!("{}/v1/io_alignment", self.mgmt_api_endpoint);
|
||||
self.request(Method::PUT, uri, align)
|
||||
/// Configs io mode at runtime.
|
||||
pub async fn put_io_mode(
|
||||
&self,
|
||||
mode: &pageserver_api::models::virtual_file::IoMode,
|
||||
) -> Result<()> {
|
||||
let uri = format!("{}/v1/io_mode", self.mgmt_api_endpoint);
|
||||
self.request(Method::PUT, uri, mode)
|
||||
.await?
|
||||
.json()
|
||||
.await
|
||||
|
||||
@@ -152,11 +152,7 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
|
||||
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
|
||||
pageserver::virtual_file::init(
|
||||
10,
|
||||
virtual_file::api::IoEngineKind::StdFs,
|
||||
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
|
||||
);
|
||||
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
let mut total_delta_layers = 0usize;
|
||||
|
||||
@@ -59,7 +59,7 @@ pub(crate) enum LayerCmd {
|
||||
|
||||
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
|
||||
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
|
||||
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs, 1);
|
||||
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
|
||||
page_cache::init(100);
|
||||
let file = VirtualFile::open(path, ctx).await?;
|
||||
let file_id = page_cache::next_file_id();
|
||||
@@ -190,11 +190,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
new_tenant_id,
|
||||
new_timeline_id,
|
||||
} => {
|
||||
pageserver::virtual_file::init(
|
||||
10,
|
||||
virtual_file::api::IoEngineKind::StdFs,
|
||||
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
|
||||
);
|
||||
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
|
||||
@@ -26,7 +26,7 @@ use pageserver::{
|
||||
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
|
||||
virtual_file,
|
||||
};
|
||||
use pageserver_api::{config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT, shard::TenantShardId};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use postgres_ffi::ControlFileData;
|
||||
use remote_storage::{RemotePath, RemoteStorageConfig};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -205,11 +205,7 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> {
|
||||
|
||||
async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> {
|
||||
// Basic initialization of things that don't change after startup
|
||||
virtual_file::init(
|
||||
10,
|
||||
virtual_file::api::IoEngineKind::StdFs,
|
||||
DEFAULT_IO_BUFFER_ALIGNMENT,
|
||||
);
|
||||
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
|
||||
page_cache::init(100);
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
dump_layerfile_from_path(path, true, &ctx).await
|
||||
|
||||
@@ -59,9 +59,9 @@ pub(crate) struct Args {
|
||||
#[clap(long)]
|
||||
set_io_engine: Option<pageserver_api::models::virtual_file::IoEngineKind>,
|
||||
|
||||
/// Before starting the benchmark, live-reconfigure the pageserver to use specified alignment for io buffers.
|
||||
/// Before starting the benchmark, live-reconfigure the pageserver to use specified io mode (buffered vs. direct).
|
||||
#[clap(long)]
|
||||
set_io_alignment: Option<usize>,
|
||||
set_io_mode: Option<pageserver_api::models::virtual_file::IoMode>,
|
||||
|
||||
targets: Option<Vec<TenantTimelineId>>,
|
||||
}
|
||||
@@ -129,8 +129,8 @@ async fn main_impl(
|
||||
mgmt_api_client.put_io_engine(engine_str).await?;
|
||||
}
|
||||
|
||||
if let Some(align) = args.set_io_alignment {
|
||||
mgmt_api_client.put_io_alignment(align).await?;
|
||||
if let Some(mode) = &args.set_io_mode {
|
||||
mgmt_api_client.put_io_mode(mode).await?;
|
||||
}
|
||||
|
||||
// discover targets
|
||||
|
||||
@@ -125,8 +125,7 @@ fn main() -> anyhow::Result<()> {
|
||||
|
||||
// after setting up logging, log the effective IO engine choice and read path implementations
|
||||
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
|
||||
info!(?conf.virtual_file_direct_io, "starting with virtual_file Direct IO settings");
|
||||
info!(?conf.io_buffer_alignment, "starting with setting for IO buffer alignment");
|
||||
info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode");
|
||||
|
||||
// The tenants directory contains all the pageserver local disk state.
|
||||
// Create if not exists and make sure all the contents are durable before proceeding.
|
||||
@@ -168,11 +167,7 @@ fn main() -> anyhow::Result<()> {
|
||||
let scenario = failpoint_support::init();
|
||||
|
||||
// Basic initialization of things that don't change after startup
|
||||
virtual_file::init(
|
||||
conf.max_file_descriptors,
|
||||
conf.virtual_file_io_engine,
|
||||
conf.io_buffer_alignment,
|
||||
);
|
||||
virtual_file::init(conf.max_file_descriptors, conf.virtual_file_io_engine);
|
||||
page_cache::init(conf.page_cache_size);
|
||||
|
||||
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;
|
||||
|
||||
@@ -174,9 +174,7 @@ pub struct PageServerConf {
|
||||
pub l0_flush: crate::l0_flush::L0FlushConfig,
|
||||
|
||||
/// Direct IO settings
|
||||
pub virtual_file_direct_io: virtual_file::DirectIoMode,
|
||||
|
||||
pub io_buffer_alignment: usize,
|
||||
pub virtual_file_io_mode: virtual_file::IoMode,
|
||||
}
|
||||
|
||||
/// Token for authentication to safekeepers
|
||||
@@ -325,11 +323,10 @@ impl PageServerConf {
|
||||
image_compression,
|
||||
ephemeral_bytes_per_memory_kb,
|
||||
l0_flush,
|
||||
virtual_file_direct_io,
|
||||
virtual_file_io_mode,
|
||||
concurrent_tenant_warmup,
|
||||
concurrent_tenant_size_logical_size_queries,
|
||||
virtual_file_io_engine,
|
||||
io_buffer_alignment,
|
||||
tenant_config,
|
||||
} = config_toml;
|
||||
|
||||
@@ -368,8 +365,6 @@ impl PageServerConf {
|
||||
max_vectored_read_bytes,
|
||||
image_compression,
|
||||
ephemeral_bytes_per_memory_kb,
|
||||
virtual_file_direct_io,
|
||||
io_buffer_alignment,
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// fields that require additional validation or custom handling
|
||||
@@ -408,6 +403,7 @@ impl PageServerConf {
|
||||
l0_flush: l0_flush
|
||||
.map(crate::l0_flush::L0FlushConfig::from)
|
||||
.unwrap_or_default(),
|
||||
virtual_file_io_mode: virtual_file_io_mode.unwrap_or(virtual_file::IoMode::preferred()),
|
||||
};
|
||||
|
||||
// ------------------------------------------------------------
|
||||
|
||||
@@ -17,6 +17,7 @@ use hyper::header;
|
||||
use hyper::StatusCode;
|
||||
use hyper::{Body, Request, Response, Uri};
|
||||
use metrics::launch_timestamp::LaunchTimestamp;
|
||||
use pageserver_api::models::virtual_file::IoMode;
|
||||
use pageserver_api::models::AuxFilePolicy;
|
||||
use pageserver_api::models::DownloadRemoteLayersTaskSpawnRequest;
|
||||
use pageserver_api::models::IngestAuxFilesRequest;
|
||||
@@ -2381,17 +2382,13 @@ async fn put_io_engine_handler(
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn put_io_alignment_handler(
|
||||
async fn put_io_mode_handler(
|
||||
mut r: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
check_permission(&r, None)?;
|
||||
let align: usize = json_request(&mut r).await?;
|
||||
crate::virtual_file::set_io_buffer_alignment(align).map_err(|align| {
|
||||
ApiError::PreconditionFailed(
|
||||
format!("Requested io alignment ({align}) is not a power of two").into(),
|
||||
)
|
||||
})?;
|
||||
let mode: IoMode = json_request(&mut r).await?;
|
||||
crate::virtual_file::set_io_mode(mode);
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
@@ -3082,9 +3079,7 @@ pub fn make_router(
|
||||
|r| api_handler(r, timeline_collect_keyspace),
|
||||
)
|
||||
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
|
||||
.put("/v1/io_alignment", |r| {
|
||||
api_handler(r, put_io_alignment_handler)
|
||||
})
|
||||
.put("/v1/io_mode", |r| api_handler(r, put_io_mode_handler))
|
||||
.put(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/force_aux_policy_switch",
|
||||
|r| api_handler(r, force_aux_policy_switch_handler),
|
||||
|
||||
@@ -84,7 +84,7 @@ impl Drop for EphemeralFile {
|
||||
fn drop(&mut self) {
|
||||
// unlink the file
|
||||
// we are clear to do this, because we have entered a gate
|
||||
let path = &self.buffered_writer.as_inner().as_inner().path;
|
||||
let path = self.buffered_writer.as_inner().as_inner().path();
|
||||
let res = std::fs::remove_file(path);
|
||||
if let Err(e) = res {
|
||||
if e.kind() != std::io::ErrorKind::NotFound {
|
||||
@@ -356,7 +356,7 @@ mod tests {
|
||||
}
|
||||
|
||||
let file_contents =
|
||||
std::fs::read(&file.buffered_writer.as_inner().as_inner().path).unwrap();
|
||||
std::fs::read(file.buffered_writer.as_inner().as_inner().path()).unwrap();
|
||||
assert_eq!(file_contents, &content[0..cap]);
|
||||
|
||||
let buffer_contents = file.buffered_writer.inspect_buffer();
|
||||
@@ -392,7 +392,7 @@ mod tests {
|
||||
.buffered_writer
|
||||
.as_inner()
|
||||
.as_inner()
|
||||
.path
|
||||
.path()
|
||||
.metadata()
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
|
||||
@@ -950,6 +950,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
let cancel = &self.secondary_state.cancel;
|
||||
let opts = DownloadOpts {
|
||||
etag: prev_etag.cloned(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
backoff::retry(
|
||||
|
||||
@@ -573,7 +573,7 @@ impl DeltaLayerWriterInner {
|
||||
ensure!(
|
||||
metadata.len() <= S3_UPLOAD_LIMIT,
|
||||
"Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
|
||||
file.path,
|
||||
file.path(),
|
||||
metadata.len()
|
||||
);
|
||||
|
||||
@@ -791,7 +791,7 @@ impl DeltaLayerInner {
|
||||
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
let file = VirtualFile::open(path, ctx)
|
||||
let file = VirtualFile::open_v2(path, ctx)
|
||||
.await
|
||||
.context("open layer file")?;
|
||||
|
||||
@@ -1022,7 +1022,7 @@ impl DeltaLayerInner {
|
||||
blob_meta.key,
|
||||
PageReconstructError::Other(anyhow!(
|
||||
"Failed to read blobs from virtual file {}: {}",
|
||||
self.file.path,
|
||||
self.file.path(),
|
||||
kind
|
||||
)),
|
||||
);
|
||||
@@ -1048,7 +1048,7 @@ impl DeltaLayerInner {
|
||||
meta.meta.key,
|
||||
PageReconstructError::Other(anyhow!(e).context(format!(
|
||||
"Failed to decompress blob from virtual file {}",
|
||||
self.file.path,
|
||||
self.file.path(),
|
||||
))),
|
||||
);
|
||||
|
||||
@@ -1066,7 +1066,7 @@ impl DeltaLayerInner {
|
||||
meta.meta.key,
|
||||
PageReconstructError::Other(anyhow!(e).context(format!(
|
||||
"Failed to deserialize blob from virtual file {}",
|
||||
self.file.path,
|
||||
self.file.path(),
|
||||
))),
|
||||
);
|
||||
|
||||
@@ -1198,7 +1198,6 @@ impl DeltaLayerInner {
|
||||
let mut prev: Option<(Key, Lsn, BlobRef)> = None;
|
||||
|
||||
let mut read_builder: Option<ChunkedVectoredReadBuilder> = None;
|
||||
let align = virtual_file::get_io_buffer_alignment();
|
||||
|
||||
let max_read_size = self
|
||||
.max_vectored_read_bytes
|
||||
@@ -1247,7 +1246,6 @@ impl DeltaLayerInner {
|
||||
offsets.end.pos(),
|
||||
meta,
|
||||
max_read_size,
|
||||
align,
|
||||
))
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -389,7 +389,7 @@ impl ImageLayerInner {
|
||||
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
let file = VirtualFile::open(path, ctx)
|
||||
let file = VirtualFile::open_v2(path, ctx)
|
||||
.await
|
||||
.context("open layer file")?;
|
||||
let file_id = page_cache::next_file_id();
|
||||
@@ -626,7 +626,7 @@ impl ImageLayerInner {
|
||||
meta.meta.key,
|
||||
PageReconstructError::Other(anyhow!(e).context(format!(
|
||||
"Failed to decompress blob from virtual file {}",
|
||||
self.file.path,
|
||||
self.file.path(),
|
||||
))),
|
||||
);
|
||||
|
||||
@@ -647,7 +647,7 @@ impl ImageLayerInner {
|
||||
blob_meta.key,
|
||||
PageReconstructError::from(anyhow!(
|
||||
"Failed to read blobs from virtual file {}: {}",
|
||||
self.file.path,
|
||||
self.file.path(),
|
||||
kind
|
||||
)),
|
||||
);
|
||||
|
||||
@@ -194,8 +194,6 @@ pub(crate) struct ChunkedVectoredReadBuilder {
|
||||
/// Start offset and metadata for each blob in this read
|
||||
blobs_at: VecMap<u64, BlobMeta>,
|
||||
max_read_size: Option<usize>,
|
||||
/// Chunk size reads are coalesced into.
|
||||
chunk_size: usize,
|
||||
}
|
||||
|
||||
/// Computes x / d rounded up.
|
||||
@@ -204,6 +202,7 @@ fn div_round_up(x: usize, d: usize) -> usize {
|
||||
}
|
||||
|
||||
impl ChunkedVectoredReadBuilder {
|
||||
const CHUNK_SIZE: usize = virtual_file::get_io_buffer_alignment();
|
||||
/// Start building a new vectored read.
|
||||
///
|
||||
/// Note that by design, this does not check against reading more than `max_read_size` to
|
||||
@@ -214,21 +213,19 @@ impl ChunkedVectoredReadBuilder {
|
||||
end_offset: u64,
|
||||
meta: BlobMeta,
|
||||
max_read_size: Option<usize>,
|
||||
chunk_size: usize,
|
||||
) -> Self {
|
||||
let mut blobs_at = VecMap::default();
|
||||
blobs_at
|
||||
.append(start_offset, meta)
|
||||
.expect("First insertion always succeeds");
|
||||
|
||||
let start_blk_no = start_offset as usize / chunk_size;
|
||||
let end_blk_no = div_round_up(end_offset as usize, chunk_size);
|
||||
let start_blk_no = start_offset as usize / Self::CHUNK_SIZE;
|
||||
let end_blk_no = div_round_up(end_offset as usize, Self::CHUNK_SIZE);
|
||||
Self {
|
||||
start_blk_no,
|
||||
end_blk_no,
|
||||
blobs_at,
|
||||
max_read_size,
|
||||
chunk_size,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -237,18 +234,12 @@ impl ChunkedVectoredReadBuilder {
|
||||
end_offset: u64,
|
||||
meta: BlobMeta,
|
||||
max_read_size: usize,
|
||||
align: usize,
|
||||
) -> Self {
|
||||
Self::new_impl(start_offset, end_offset, meta, Some(max_read_size), align)
|
||||
Self::new_impl(start_offset, end_offset, meta, Some(max_read_size))
|
||||
}
|
||||
|
||||
pub(crate) fn new_streaming(
|
||||
start_offset: u64,
|
||||
end_offset: u64,
|
||||
meta: BlobMeta,
|
||||
align: usize,
|
||||
) -> Self {
|
||||
Self::new_impl(start_offset, end_offset, meta, None, align)
|
||||
pub(crate) fn new_streaming(start_offset: u64, end_offset: u64, meta: BlobMeta) -> Self {
|
||||
Self::new_impl(start_offset, end_offset, meta, None)
|
||||
}
|
||||
|
||||
/// Attempts to extend the current read with a new blob if the new blob resides in the same or the immediate next chunk.
|
||||
@@ -256,12 +247,12 @@ impl ChunkedVectoredReadBuilder {
|
||||
/// The resulting size also must be below the max read size.
|
||||
pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
|
||||
tracing::trace!(start, end, "trying to extend");
|
||||
let start_blk_no = start as usize / self.chunk_size;
|
||||
let end_blk_no = div_round_up(end as usize, self.chunk_size);
|
||||
let start_blk_no = start as usize / Self::CHUNK_SIZE;
|
||||
let end_blk_no = div_round_up(end as usize, Self::CHUNK_SIZE);
|
||||
|
||||
let not_limited_by_max_read_size = {
|
||||
if let Some(max_read_size) = self.max_read_size {
|
||||
let coalesced_size = (end_blk_no - self.start_blk_no) * self.chunk_size;
|
||||
let coalesced_size = (end_blk_no - self.start_blk_no) * Self::CHUNK_SIZE;
|
||||
coalesced_size <= max_read_size
|
||||
} else {
|
||||
true
|
||||
@@ -292,12 +283,12 @@ impl ChunkedVectoredReadBuilder {
|
||||
}
|
||||
|
||||
pub(crate) fn size(&self) -> usize {
|
||||
(self.end_blk_no - self.start_blk_no) * self.chunk_size
|
||||
(self.end_blk_no - self.start_blk_no) * Self::CHUNK_SIZE
|
||||
}
|
||||
|
||||
pub(crate) fn build(self) -> VectoredRead {
|
||||
let start = (self.start_blk_no * self.chunk_size) as u64;
|
||||
let end = (self.end_blk_no * self.chunk_size) as u64;
|
||||
let start = (self.start_blk_no * Self::CHUNK_SIZE) as u64;
|
||||
let end = (self.end_blk_no * Self::CHUNK_SIZE) as u64;
|
||||
VectoredRead {
|
||||
start,
|
||||
end,
|
||||
@@ -328,18 +319,14 @@ pub struct VectoredReadPlanner {
|
||||
prev: Option<(Key, Lsn, u64, BlobFlag)>,
|
||||
|
||||
max_read_size: usize,
|
||||
|
||||
align: usize,
|
||||
}
|
||||
|
||||
impl VectoredReadPlanner {
|
||||
pub fn new(max_read_size: usize) -> Self {
|
||||
let align = virtual_file::get_io_buffer_alignment();
|
||||
Self {
|
||||
blobs: BTreeMap::new(),
|
||||
prev: None,
|
||||
max_read_size,
|
||||
align,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -418,7 +405,6 @@ impl VectoredReadPlanner {
|
||||
end_offset,
|
||||
BlobMeta { key, lsn },
|
||||
self.max_read_size,
|
||||
self.align,
|
||||
);
|
||||
|
||||
let prev_read_builder = current_read_builder.replace(next_read_builder);
|
||||
@@ -472,13 +458,13 @@ impl<'a> VectoredBlobReader<'a> {
|
||||
);
|
||||
|
||||
if cfg!(debug_assertions) {
|
||||
let align = virtual_file::get_io_buffer_alignment() as u64;
|
||||
const ALIGN: u64 = virtual_file::get_io_buffer_alignment() as u64;
|
||||
debug_assert_eq!(
|
||||
read.start % align,
|
||||
read.start % ALIGN,
|
||||
0,
|
||||
"Read start at {} does not satisfy the required io buffer alignment ({} bytes)",
|
||||
read.start,
|
||||
align
|
||||
ALIGN
|
||||
);
|
||||
}
|
||||
|
||||
@@ -553,22 +539,18 @@ pub struct StreamingVectoredReadPlanner {
|
||||
max_cnt: usize,
|
||||
/// Size of the current batch
|
||||
cnt: usize,
|
||||
|
||||
align: usize,
|
||||
}
|
||||
|
||||
impl StreamingVectoredReadPlanner {
|
||||
pub fn new(max_read_size: u64, max_cnt: usize) -> Self {
|
||||
assert!(max_cnt > 0);
|
||||
assert!(max_read_size > 0);
|
||||
let align = virtual_file::get_io_buffer_alignment();
|
||||
Self {
|
||||
read_builder: None,
|
||||
prev: None,
|
||||
max_cnt,
|
||||
max_read_size,
|
||||
cnt: 0,
|
||||
align,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -621,7 +603,6 @@ impl StreamingVectoredReadPlanner {
|
||||
start_offset,
|
||||
end_offset,
|
||||
BlobMeta { key, lsn },
|
||||
self.align,
|
||||
))
|
||||
};
|
||||
}
|
||||
@@ -656,9 +637,9 @@ mod tests {
|
||||
use super::*;
|
||||
|
||||
fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
|
||||
let align = virtual_file::get_io_buffer_alignment() as u64;
|
||||
assert_eq!(read.start % align, 0);
|
||||
assert_eq!(read.start / align, offset_range.first().unwrap().2 / align);
|
||||
const ALIGN: u64 = virtual_file::get_io_buffer_alignment() as u64;
|
||||
assert_eq!(read.start % ALIGN, 0);
|
||||
assert_eq!(read.start / ALIGN, offset_range.first().unwrap().2 / ALIGN);
|
||||
|
||||
let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect();
|
||||
|
||||
@@ -676,32 +657,27 @@ mod tests {
|
||||
fn planner_chunked_coalesce_all_test() {
|
||||
use crate::virtual_file;
|
||||
|
||||
let chunk_size = virtual_file::get_io_buffer_alignment() as u64;
|
||||
const CHUNK_SIZE: u64 = virtual_file::get_io_buffer_alignment() as u64;
|
||||
|
||||
// The test explicitly does not check chunk size < 512
|
||||
if chunk_size < 512 {
|
||||
return;
|
||||
}
|
||||
|
||||
let max_read_size = chunk_size as usize * 8;
|
||||
let max_read_size = CHUNK_SIZE as usize * 8;
|
||||
let key = Key::MIN;
|
||||
let lsn = Lsn(0);
|
||||
|
||||
let blob_descriptions = [
|
||||
(key, lsn, chunk_size / 8, BlobFlag::None), // Read 1 BEGIN
|
||||
(key, lsn, chunk_size / 4, BlobFlag::Ignore), // Gap
|
||||
(key, lsn, chunk_size / 2, BlobFlag::None),
|
||||
(key, lsn, chunk_size - 2, BlobFlag::Ignore), // Gap
|
||||
(key, lsn, chunk_size, BlobFlag::None),
|
||||
(key, lsn, chunk_size * 2 - 1, BlobFlag::None),
|
||||
(key, lsn, chunk_size * 2 + 1, BlobFlag::Ignore), // Gap
|
||||
(key, lsn, chunk_size * 3 + 1, BlobFlag::None),
|
||||
(key, lsn, chunk_size * 5 + 1, BlobFlag::None),
|
||||
(key, lsn, chunk_size * 6 + 1, BlobFlag::Ignore), // skipped chunk size, but not a chunk: should coalesce.
|
||||
(key, lsn, chunk_size * 7 + 1, BlobFlag::None),
|
||||
(key, lsn, chunk_size * 8, BlobFlag::None), // Read 2 BEGIN (b/c max_read_size)
|
||||
(key, lsn, chunk_size * 9, BlobFlag::Ignore), // ==== skipped a chunk
|
||||
(key, lsn, chunk_size * 10, BlobFlag::None), // Read 3 BEGIN (cannot coalesce)
|
||||
(key, lsn, CHUNK_SIZE / 8, BlobFlag::None), // Read 1 BEGIN
|
||||
(key, lsn, CHUNK_SIZE / 4, BlobFlag::Ignore), // Gap
|
||||
(key, lsn, CHUNK_SIZE / 2, BlobFlag::None),
|
||||
(key, lsn, CHUNK_SIZE - 2, BlobFlag::Ignore), // Gap
|
||||
(key, lsn, CHUNK_SIZE, BlobFlag::None),
|
||||
(key, lsn, CHUNK_SIZE * 2 - 1, BlobFlag::None),
|
||||
(key, lsn, CHUNK_SIZE * 2 + 1, BlobFlag::Ignore), // Gap
|
||||
(key, lsn, CHUNK_SIZE * 3 + 1, BlobFlag::None),
|
||||
(key, lsn, CHUNK_SIZE * 5 + 1, BlobFlag::None),
|
||||
(key, lsn, CHUNK_SIZE * 6 + 1, BlobFlag::Ignore), // skipped chunk size, but not a chunk: should coalesce.
|
||||
(key, lsn, CHUNK_SIZE * 7 + 1, BlobFlag::None),
|
||||
(key, lsn, CHUNK_SIZE * 8, BlobFlag::None), // Read 2 BEGIN (b/c max_read_size)
|
||||
(key, lsn, CHUNK_SIZE * 9, BlobFlag::Ignore), // ==== skipped a chunk
|
||||
(key, lsn, CHUNK_SIZE * 10, BlobFlag::None), // Read 3 BEGIN (cannot coalesce)
|
||||
];
|
||||
|
||||
let ranges = [
|
||||
@@ -780,19 +756,19 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn planner_replacement_test() {
|
||||
let chunk_size = virtual_file::get_io_buffer_alignment() as u64;
|
||||
let max_read_size = 128 * chunk_size as usize;
|
||||
const CHUNK_SIZE: u64 = virtual_file::get_io_buffer_alignment() as u64;
|
||||
let max_read_size = 128 * CHUNK_SIZE as usize;
|
||||
let first_key = Key::MIN;
|
||||
let second_key = first_key.next();
|
||||
let lsn = Lsn(0);
|
||||
|
||||
let blob_descriptions = vec![
|
||||
(first_key, lsn, 0, BlobFlag::None), // First in read 1
|
||||
(first_key, lsn, chunk_size, BlobFlag::None), // Last in read 1
|
||||
(second_key, lsn, 2 * chunk_size, BlobFlag::ReplaceAll),
|
||||
(second_key, lsn, 3 * chunk_size, BlobFlag::None),
|
||||
(second_key, lsn, 4 * chunk_size, BlobFlag::ReplaceAll), // First in read 2
|
||||
(second_key, lsn, 5 * chunk_size, BlobFlag::None), // Last in read 2
|
||||
(first_key, lsn, CHUNK_SIZE, BlobFlag::None), // Last in read 1
|
||||
(second_key, lsn, 2 * CHUNK_SIZE, BlobFlag::ReplaceAll),
|
||||
(second_key, lsn, 3 * CHUNK_SIZE, BlobFlag::None),
|
||||
(second_key, lsn, 4 * CHUNK_SIZE, BlobFlag::ReplaceAll), // First in read 2
|
||||
(second_key, lsn, 5 * CHUNK_SIZE, BlobFlag::None), // Last in read 2
|
||||
];
|
||||
|
||||
let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
|
||||
@@ -802,7 +778,7 @@ mod tests {
|
||||
planner.handle(key, lsn, offset, flag);
|
||||
}
|
||||
|
||||
planner.handle_range_end(6 * chunk_size);
|
||||
planner.handle_range_end(6 * CHUNK_SIZE);
|
||||
|
||||
let reads = planner.finish();
|
||||
assert_eq!(reads.len(), 2);
|
||||
@@ -947,7 +923,6 @@ mod tests {
|
||||
let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16;
|
||||
let mut buf = BytesMut::with_capacity(reserved_bytes);
|
||||
|
||||
let align = virtual_file::get_io_buffer_alignment();
|
||||
let vectored_blob_reader = VectoredBlobReader::new(&file);
|
||||
let meta = BlobMeta {
|
||||
key: Key::MIN,
|
||||
@@ -959,8 +934,7 @@ mod tests {
|
||||
if idx + 1 == offsets.len() {
|
||||
continue;
|
||||
}
|
||||
let read_builder =
|
||||
ChunkedVectoredReadBuilder::new(*offset, *end, meta, 16 * 4096, align);
|
||||
let read_builder = ChunkedVectoredReadBuilder::new(*offset, *end, meta, 16 * 4096);
|
||||
let read = read_builder.build();
|
||||
let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?;
|
||||
assert_eq!(result.blobs.len(), 1);
|
||||
|
||||
@@ -23,10 +23,12 @@ use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::fs::File;
|
||||
use std::io::{Error, ErrorKind, Seek, SeekFrom};
|
||||
#[cfg(target_os = "linux")]
|
||||
use std::os::unix::fs::OpenOptionsExt;
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
|
||||
|
||||
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
|
||||
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use tokio::time::Instant;
|
||||
|
||||
@@ -38,7 +40,7 @@ pub use io_engine::FeatureTestResult as IoEngineFeatureTestResult;
|
||||
mod metadata;
|
||||
mod open_options;
|
||||
use self::owned_buffers_io::write::OwnedAsyncWriter;
|
||||
pub(crate) use api::DirectIoMode;
|
||||
pub(crate) use api::IoMode;
|
||||
pub(crate) use io_engine::IoEngineKind;
|
||||
pub(crate) use metadata::Metadata;
|
||||
pub(crate) use open_options::*;
|
||||
@@ -61,6 +63,171 @@ pub(crate) mod owned_buffers_io {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct VirtualFile {
|
||||
inner: VirtualFileInner,
|
||||
_mode: IoMode,
|
||||
}
|
||||
|
||||
impl VirtualFile {
|
||||
/// Open a file in read-only mode. Like File::open.
|
||||
pub async fn open<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Self, std::io::Error> {
|
||||
let inner = VirtualFileInner::open(path, ctx).await?;
|
||||
Ok(VirtualFile {
|
||||
inner,
|
||||
_mode: IoMode::Buffered,
|
||||
})
|
||||
}
|
||||
|
||||
/// Open a file in read-only mode. Like File::open.
|
||||
///
|
||||
/// `O_DIRECT` will be enabled base on `virtual_file_io_mode`.
|
||||
pub async fn open_v2<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Self, std::io::Error> {
|
||||
Self::open_with_options_v2(path.as_ref(), OpenOptions::new().read(true), ctx).await
|
||||
}
|
||||
|
||||
pub async fn create<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Self, std::io::Error> {
|
||||
let inner = VirtualFileInner::create(path, ctx).await?;
|
||||
Ok(VirtualFile {
|
||||
inner,
|
||||
_mode: IoMode::Buffered,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn create_v2<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Self, std::io::Error> {
|
||||
VirtualFile::open_with_options_v2(
|
||||
path.as_ref(),
|
||||
OpenOptions::new().write(true).create(true).truncate(true),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn open_with_options<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
open_options: &OpenOptions,
|
||||
ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
|
||||
) -> Result<Self, std::io::Error> {
|
||||
let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
|
||||
Ok(VirtualFile {
|
||||
inner,
|
||||
_mode: IoMode::Buffered,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn open_with_options_v2<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
open_options: &OpenOptions,
|
||||
ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
|
||||
) -> Result<Self, std::io::Error> {
|
||||
let file = match get_io_mode() {
|
||||
IoMode::Buffered => {
|
||||
let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
|
||||
VirtualFile {
|
||||
inner,
|
||||
_mode: IoMode::Buffered,
|
||||
}
|
||||
}
|
||||
#[cfg(target_os = "linux")]
|
||||
IoMode::Direct => {
|
||||
let inner = VirtualFileInner::open_with_options(
|
||||
path,
|
||||
open_options.clone().custom_flags(nix::libc::O_DIRECT),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
VirtualFile {
|
||||
inner,
|
||||
_mode: IoMode::Direct,
|
||||
}
|
||||
}
|
||||
};
|
||||
Ok(file)
|
||||
}
|
||||
|
||||
pub fn path(&self) -> &Utf8Path {
|
||||
self.inner.path.as_path()
|
||||
}
|
||||
|
||||
pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
|
||||
final_path: Utf8PathBuf,
|
||||
tmp_path: Utf8PathBuf,
|
||||
content: B,
|
||||
) -> std::io::Result<()> {
|
||||
VirtualFileInner::crashsafe_overwrite(final_path, tmp_path, content).await
|
||||
}
|
||||
|
||||
pub async fn sync_all(&self) -> Result<(), Error> {
|
||||
self.inner.sync_all().await
|
||||
}
|
||||
|
||||
pub async fn sync_data(&self) -> Result<(), Error> {
|
||||
self.inner.sync_data().await
|
||||
}
|
||||
|
||||
pub async fn metadata(&self) -> Result<Metadata, Error> {
|
||||
self.inner.metadata().await
|
||||
}
|
||||
|
||||
pub fn remove(self) {
|
||||
self.inner.remove();
|
||||
}
|
||||
|
||||
pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
|
||||
self.inner.seek(pos).await
|
||||
}
|
||||
|
||||
pub async fn read_exact_at<Buf>(
|
||||
&self,
|
||||
slice: Slice<Buf>,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Slice<Buf>, Error>
|
||||
where
|
||||
Buf: IoBufMut + Send,
|
||||
{
|
||||
self.inner.read_exact_at(slice, offset, ctx).await
|
||||
}
|
||||
|
||||
pub async fn read_exact_at_page(
|
||||
&self,
|
||||
page: PageWriteGuard<'static>,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PageWriteGuard<'static>, Error> {
|
||||
self.inner.read_exact_at_page(page, offset, ctx).await
|
||||
}
|
||||
|
||||
pub async fn write_all_at<Buf: IoBuf + Send>(
|
||||
&self,
|
||||
buf: FullSlice<Buf>,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, Result<(), Error>) {
|
||||
self.inner.write_all_at(buf, offset, ctx).await
|
||||
}
|
||||
|
||||
pub async fn write_all<Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
buf: FullSlice<Buf>,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, Result<usize, Error>) {
|
||||
self.inner.write_all(buf, ctx).await
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
|
||||
/// the underlying file is closed if the system is low on file descriptors,
|
||||
@@ -77,7 +244,7 @@ pub(crate) mod owned_buffers_io {
|
||||
/// 'tag' field is used to detect whether the handle still is valid or not.
|
||||
///
|
||||
#[derive(Debug)]
|
||||
pub struct VirtualFile {
|
||||
pub struct VirtualFileInner {
|
||||
/// Lazy handle to the global file descriptor cache. The slot that this points to
|
||||
/// might contain our File, or it may be empty, or it may contain a File that
|
||||
/// belongs to a different VirtualFile.
|
||||
@@ -350,12 +517,12 @@ macro_rules! with_file {
|
||||
}};
|
||||
}
|
||||
|
||||
impl VirtualFile {
|
||||
impl VirtualFileInner {
|
||||
/// Open a file in read-only mode. Like File::open.
|
||||
pub async fn open<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<VirtualFile, std::io::Error> {
|
||||
) -> Result<VirtualFileInner, std::io::Error> {
|
||||
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await
|
||||
}
|
||||
|
||||
@@ -364,7 +531,7 @@ impl VirtualFile {
|
||||
pub async fn create<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<VirtualFile, std::io::Error> {
|
||||
) -> Result<VirtualFileInner, std::io::Error> {
|
||||
Self::open_with_options(
|
||||
path.as_ref(),
|
||||
OpenOptions::new().write(true).create(true).truncate(true),
|
||||
@@ -382,7 +549,7 @@ impl VirtualFile {
|
||||
path: P,
|
||||
open_options: &OpenOptions,
|
||||
_ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
|
||||
) -> Result<VirtualFile, std::io::Error> {
|
||||
) -> Result<VirtualFileInner, std::io::Error> {
|
||||
let path_ref = path.as_ref();
|
||||
let path_str = path_ref.to_string();
|
||||
let parts = path_str.split('/').collect::<Vec<&str>>();
|
||||
@@ -423,7 +590,7 @@ impl VirtualFile {
|
||||
reopen_options.create_new(false);
|
||||
reopen_options.truncate(false);
|
||||
|
||||
let vfile = VirtualFile {
|
||||
let vfile = VirtualFileInner {
|
||||
handle: RwLock::new(handle),
|
||||
pos: 0,
|
||||
path: path_ref.to_path_buf(),
|
||||
@@ -1034,6 +1201,21 @@ impl tokio_epoll_uring::IoFd for FileGuard {
|
||||
|
||||
#[cfg(test)]
|
||||
impl VirtualFile {
|
||||
pub(crate) async fn read_blk(
|
||||
&self,
|
||||
blknum: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
|
||||
self.inner.read_blk(blknum, ctx).await
|
||||
}
|
||||
|
||||
async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
|
||||
self.inner.read_to_end(buf, ctx).await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl VirtualFileInner {
|
||||
pub(crate) async fn read_blk(
|
||||
&self,
|
||||
blknum: u32,
|
||||
@@ -1067,7 +1249,7 @@ impl VirtualFile {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for VirtualFile {
|
||||
impl Drop for VirtualFileInner {
|
||||
/// If a VirtualFile is dropped, close the underlying file if it was open.
|
||||
fn drop(&mut self) {
|
||||
let handle = self.handle.get_mut();
|
||||
@@ -1143,15 +1325,10 @@ impl OpenFiles {
|
||||
/// server startup.
|
||||
///
|
||||
#[cfg(not(test))]
|
||||
pub fn init(num_slots: usize, engine: IoEngineKind, io_buffer_alignment: usize) {
|
||||
pub fn init(num_slots: usize, engine: IoEngineKind) {
|
||||
if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
|
||||
panic!("virtual_file::init called twice");
|
||||
}
|
||||
if set_io_buffer_alignment(io_buffer_alignment).is_err() {
|
||||
panic!(
|
||||
"IO buffer alignment needs to be a power of two and greater than 512, got {io_buffer_alignment}"
|
||||
);
|
||||
}
|
||||
io_engine::init(engine);
|
||||
crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
|
||||
}
|
||||
@@ -1175,47 +1352,20 @@ fn get_open_files() -> &'static OpenFiles {
|
||||
}
|
||||
}
|
||||
|
||||
static IO_BUFFER_ALIGNMENT: AtomicUsize = AtomicUsize::new(DEFAULT_IO_BUFFER_ALIGNMENT);
|
||||
|
||||
/// Returns true if the alignment is a power of two and is greater or equal to 512.
|
||||
fn is_valid_io_buffer_alignment(align: usize) -> bool {
|
||||
align.is_power_of_two() && align >= 512
|
||||
}
|
||||
|
||||
/// Sets IO buffer alignment requirement. Returns error if the alignment requirement is
|
||||
/// not a power of two or less than 512 bytes.
|
||||
#[allow(unused)]
|
||||
pub(crate) fn set_io_buffer_alignment(align: usize) -> Result<(), usize> {
|
||||
if is_valid_io_buffer_alignment(align) {
|
||||
IO_BUFFER_ALIGNMENT.store(align, std::sync::atomic::Ordering::Relaxed);
|
||||
Ok(())
|
||||
} else {
|
||||
Err(align)
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets the io buffer alignment.
|
||||
///
|
||||
/// This function should be used for getting the actual alignment value to use.
|
||||
pub(crate) fn get_io_buffer_alignment() -> usize {
|
||||
let align = IO_BUFFER_ALIGNMENT.load(std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
if cfg!(test) {
|
||||
let env_var_name = "NEON_PAGESERVER_UNIT_TEST_IO_BUFFER_ALIGNMENT";
|
||||
if let Some(test_align) = utils::env::var(env_var_name) {
|
||||
if is_valid_io_buffer_alignment(test_align) {
|
||||
test_align
|
||||
} else {
|
||||
panic!("IO buffer alignment needs to be a power of two and greater than 512, got {test_align}");
|
||||
}
|
||||
} else {
|
||||
align
|
||||
}
|
||||
} else {
|
||||
align
|
||||
}
|
||||
pub(crate) const fn get_io_buffer_alignment() -> usize {
|
||||
DEFAULT_IO_BUFFER_ALIGNMENT
|
||||
}
|
||||
|
||||
static IO_MODE: AtomicU8 = AtomicU8::new(IoMode::preferred() as u8);
|
||||
|
||||
pub(crate) fn set_io_mode(mode: IoMode) {
|
||||
IO_MODE.store(mode as u8, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub(crate) fn get_io_mode() -> IoMode {
|
||||
IoMode::try_from(IO_MODE.load(Ordering::Relaxed)).unwrap()
|
||||
}
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::context::DownloadBehavior;
|
||||
@@ -1524,7 +1674,7 @@ mod tests {
|
||||
// Open the file many times.
|
||||
let mut files = Vec::new();
|
||||
for _ in 0..VIRTUAL_FILES {
|
||||
let f = VirtualFile::open_with_options(
|
||||
let f = VirtualFileInner::open_with_options(
|
||||
&test_file_path,
|
||||
OpenOptions::new().read(true),
|
||||
&ctx,
|
||||
@@ -1576,7 +1726,7 @@ mod tests {
|
||||
let path = testdir.join("myfile");
|
||||
let tmp_path = testdir.join("myfile.tmp");
|
||||
|
||||
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
|
||||
VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
|
||||
@@ -1585,7 +1735,7 @@ mod tests {
|
||||
assert!(!tmp_path.exists());
|
||||
drop(file);
|
||||
|
||||
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
|
||||
VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
|
||||
@@ -1608,7 +1758,7 @@ mod tests {
|
||||
std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
|
||||
assert!(tmp_path.exists());
|
||||
|
||||
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
|
||||
VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
||||
29
poetry.lock
generated
29
poetry.lock
generated
@@ -2095,6 +2095,7 @@ files = [
|
||||
{file = "psycopg2_binary-2.9.9-cp311-cp311-win32.whl", hash = "sha256:dc4926288b2a3e9fd7b50dc6a1909a13bbdadfc67d93f3374d984e56f885579d"},
|
||||
{file = "psycopg2_binary-2.9.9-cp311-cp311-win_amd64.whl", hash = "sha256:b76bedd166805480ab069612119ea636f5ab8f8771e640ae103e05a4aae3e417"},
|
||||
{file = "psycopg2_binary-2.9.9-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:8532fd6e6e2dc57bcb3bc90b079c60de896d2128c5d9d6f24a63875a95a088cf"},
|
||||
{file = "psycopg2_binary-2.9.9-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:b0605eaed3eb239e87df0d5e3c6489daae3f7388d455d0c0b4df899519c6a38d"},
|
||||
{file = "psycopg2_binary-2.9.9-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8f8544b092a29a6ddd72f3556a9fcf249ec412e10ad28be6a0c0d948924f2212"},
|
||||
{file = "psycopg2_binary-2.9.9-cp312-cp312-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:2d423c8d8a3c82d08fe8af900ad5b613ce3632a1249fd6a223941d0735fce493"},
|
||||
{file = "psycopg2_binary-2.9.9-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:2e5afae772c00980525f6d6ecf7cbca55676296b580c0e6abb407f15f3706996"},
|
||||
@@ -2103,6 +2104,8 @@ files = [
|
||||
{file = "psycopg2_binary-2.9.9-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:cb16c65dcb648d0a43a2521f2f0a2300f40639f6f8c1ecbc662141e4e3e1ee07"},
|
||||
{file = "psycopg2_binary-2.9.9-cp312-cp312-musllinux_1_1_ppc64le.whl", hash = "sha256:911dda9c487075abd54e644ccdf5e5c16773470a6a5d3826fda76699410066fb"},
|
||||
{file = "psycopg2_binary-2.9.9-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:57fede879f08d23c85140a360c6a77709113efd1c993923c59fde17aa27599fe"},
|
||||
{file = "psycopg2_binary-2.9.9-cp312-cp312-win32.whl", hash = "sha256:64cf30263844fa208851ebb13b0732ce674d8ec6a0c86a4e160495d299ba3c93"},
|
||||
{file = "psycopg2_binary-2.9.9-cp312-cp312-win_amd64.whl", hash = "sha256:81ff62668af011f9a48787564ab7eded4e9fb17a4a6a74af5ffa6a457400d2ab"},
|
||||
{file = "psycopg2_binary-2.9.9-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:2293b001e319ab0d869d660a704942c9e2cce19745262a8aba2115ef41a0a42a"},
|
||||
{file = "psycopg2_binary-2.9.9-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:03ef7df18daf2c4c07e2695e8cfd5ee7f748a1d54d802330985a78d2a5a6dca9"},
|
||||
{file = "psycopg2_binary-2.9.9-cp37-cp37m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0a602ea5aff39bb9fac6308e9c9d82b9a35c2bf288e184a816002c9fae930b77"},
|
||||
@@ -2584,6 +2587,7 @@ files = [
|
||||
{file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"},
|
||||
{file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"},
|
||||
{file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"},
|
||||
{file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"},
|
||||
{file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"},
|
||||
{file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"},
|
||||
{file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"},
|
||||
@@ -2729,21 +2733,22 @@ use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"]
|
||||
|
||||
[[package]]
|
||||
name = "responses"
|
||||
version = "0.21.0"
|
||||
version = "0.25.3"
|
||||
description = "A utility library for mocking out the `requests` Python library."
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
python-versions = ">=3.8"
|
||||
files = [
|
||||
{file = "responses-0.21.0-py3-none-any.whl", hash = "sha256:2dcc863ba63963c0c3d9ee3fa9507cbe36b7d7b0fccb4f0bdfd9e96c539b1487"},
|
||||
{file = "responses-0.21.0.tar.gz", hash = "sha256:b82502eb5f09a0289d8e209e7bad71ef3978334f56d09b444253d5ad67bf5253"},
|
||||
{file = "responses-0.25.3-py3-none-any.whl", hash = "sha256:521efcbc82081ab8daa588e08f7e8a64ce79b91c39f6e62199b19159bea7dbcb"},
|
||||
{file = "responses-0.25.3.tar.gz", hash = "sha256:617b9247abd9ae28313d57a75880422d55ec63c29d33d629697590a034358dba"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
requests = ">=2.0,<3.0"
|
||||
urllib3 = ">=1.25.10"
|
||||
pyyaml = "*"
|
||||
requests = ">=2.30.0,<3.0"
|
||||
urllib3 = ">=1.25.10,<3.0"
|
||||
|
||||
[package.extras]
|
||||
tests = ["coverage (>=6.0.0)", "flake8", "mypy", "pytest (>=7.0.0)", "pytest-asyncio", "pytest-cov", "pytest-localserver", "types-mock", "types-requests"]
|
||||
tests = ["coverage (>=6.0.0)", "flake8", "mypy", "pytest (>=7.0.0)", "pytest-asyncio", "pytest-cov", "pytest-httpserver", "tomli", "tomli-w", "types-PyYAML", "types-requests"]
|
||||
|
||||
[[package]]
|
||||
name = "rfc3339-validator"
|
||||
@@ -3137,6 +3142,16 @@ files = [
|
||||
{file = "wrapt-1.14.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:8ad85f7f4e20964db4daadcab70b47ab05c7c1cf2a7c1e51087bfaa83831854c"},
|
||||
{file = "wrapt-1.14.1-cp310-cp310-win32.whl", hash = "sha256:a9a52172be0b5aae932bef82a79ec0a0ce87288c7d132946d645eba03f0ad8a8"},
|
||||
{file = "wrapt-1.14.1-cp310-cp310-win_amd64.whl", hash = "sha256:6d323e1554b3d22cfc03cd3243b5bb815a51f5249fdcbb86fda4bf62bab9e164"},
|
||||
{file = "wrapt-1.14.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:ecee4132c6cd2ce5308e21672015ddfed1ff975ad0ac8d27168ea82e71413f55"},
|
||||
{file = "wrapt-1.14.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:2020f391008ef874c6d9e208b24f28e31bcb85ccff4f335f15a3251d222b92d9"},
|
||||
{file = "wrapt-1.14.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2feecf86e1f7a86517cab34ae6c2f081fd2d0dac860cb0c0ded96d799d20b335"},
|
||||
{file = "wrapt-1.14.1-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:240b1686f38ae665d1b15475966fe0472f78e71b1b4903c143a842659c8e4cb9"},
|
||||
{file = "wrapt-1.14.1-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a9008dad07d71f68487c91e96579c8567c98ca4c3881b9b113bc7b33e9fd78b8"},
|
||||
{file = "wrapt-1.14.1-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:6447e9f3ba72f8e2b985a1da758767698efa72723d5b59accefd716e9e8272bf"},
|
||||
{file = "wrapt-1.14.1-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:acae32e13a4153809db37405f5eba5bac5fbe2e2ba61ab227926a22901051c0a"},
|
||||
{file = "wrapt-1.14.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:49ef582b7a1152ae2766557f0550a9fcbf7bbd76f43fbdc94dd3bf07cc7168be"},
|
||||
{file = "wrapt-1.14.1-cp311-cp311-win32.whl", hash = "sha256:358fe87cc899c6bb0ddc185bf3dbfa4ba646f05b1b0b9b5a27c2cb92c2cea204"},
|
||||
{file = "wrapt-1.14.1-cp311-cp311-win_amd64.whl", hash = "sha256:26046cd03936ae745a502abf44dac702a5e6880b2b01c29aea8ddf3353b68224"},
|
||||
{file = "wrapt-1.14.1-cp35-cp35m-manylinux1_i686.whl", hash = "sha256:43ca3bbbe97af00f49efb06e352eae40434ca9d915906f77def219b88e85d907"},
|
||||
{file = "wrapt-1.14.1-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:6b1a564e6cb69922c7fe3a678b9f9a3c54e72b469875aa8018f18b4d1dd1adf3"},
|
||||
{file = "wrapt-1.14.1-cp35-cp35m-manylinux2010_i686.whl", hash = "sha256:00b6d4ea20a906c0ca56d84f93065b398ab74b927a7a3dbd470f6fc503f95dc3"},
|
||||
|
||||
@@ -39,7 +39,7 @@ http.workspace = true
|
||||
humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
hyper0.workspace = true
|
||||
hyper1 = { package = "hyper", version = "1.2", features = ["server"] }
|
||||
hyper = { workspace = true, features = ["server", "http1", "http2"] }
|
||||
hyper-util = { version = "0.1", features = ["server", "http1", "http2", "tokio"] }
|
||||
http-body-util = { version = "0.1" }
|
||||
indexmap.workspace = true
|
||||
@@ -77,7 +77,7 @@ subtle.workspace = true
|
||||
thiserror.workspace = true
|
||||
tikv-jemallocator.workspace = true
|
||||
tikv-jemalloc-ctl = { workspace = true, features = ["use_std"] }
|
||||
tokio-postgres.workspace = true
|
||||
tokio-postgres = { workspace = true, features = ["with-serde_json-1"] }
|
||||
tokio-postgres-rustls.workspace = true
|
||||
tokio-rustls.workspace = true
|
||||
tokio-util.workspace = true
|
||||
@@ -101,7 +101,7 @@ jose-jwa = "0.1.2"
|
||||
jose-jwk = { version = "0.1.2", features = ["p256", "p384", "rsa"] }
|
||||
signature = "2"
|
||||
ecdsa = "0.16"
|
||||
p256 = "0.13"
|
||||
p256 = { version = "0.13", features = ["jwk"] }
|
||||
rsa = "0.9"
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
@@ -17,6 +17,8 @@ use crate::{
|
||||
RoleName,
|
||||
};
|
||||
|
||||
use super::ComputeCredentialKeys;
|
||||
|
||||
// TODO(conrad): make these configurable.
|
||||
const CLOCK_SKEW_LEEWAY: Duration = Duration::from_secs(30);
|
||||
const MIN_RENEW: Duration = Duration::from_secs(30);
|
||||
@@ -241,7 +243,7 @@ impl JwkCacheEntryLock {
|
||||
endpoint: EndpointId,
|
||||
role_name: &RoleName,
|
||||
fetch: &F,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
) -> Result<ComputeCredentialKeys, anyhow::Error> {
|
||||
// JWT compact form is defined to be
|
||||
// <B64(Header)> || . || <B64(Payload)> || . || <B64(Signature)>
|
||||
// where Signature = alg(<B64(Header)> || . || <B64(Payload)>);
|
||||
@@ -300,9 +302,9 @@ impl JwkCacheEntryLock {
|
||||
key => bail!("unsupported key type {key:?}"),
|
||||
};
|
||||
|
||||
let payload = base64::decode_config(payload, base64::URL_SAFE_NO_PAD)
|
||||
let payloadb = base64::decode_config(payload, base64::URL_SAFE_NO_PAD)
|
||||
.context("Provided authentication token is not a valid JWT encoding")?;
|
||||
let payload = serde_json::from_slice::<JwtPayload<'_>>(&payload)
|
||||
let payload = serde_json::from_slice::<JwtPayload<'_>>(&payloadb)
|
||||
.context("Provided authentication token is not a valid JWT encoding")?;
|
||||
|
||||
tracing::debug!(?payload, "JWT signature valid with claims");
|
||||
@@ -327,7 +329,7 @@ impl JwkCacheEntryLock {
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(ComputeCredentialKeys::JwtPayload(payloadb))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -339,7 +341,7 @@ impl JwkCache {
|
||||
role_name: &RoleName,
|
||||
fetch: &F,
|
||||
jwt: &str,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
) -> Result<ComputeCredentialKeys, anyhow::Error> {
|
||||
// try with just a read lock first
|
||||
let key = (endpoint.clone(), role_name.clone());
|
||||
let entry = self.map.get(&key).as_deref().map(Arc::clone);
|
||||
@@ -571,7 +573,7 @@ mod tests {
|
||||
use bytes::Bytes;
|
||||
use http::Response;
|
||||
use http_body_util::Full;
|
||||
use hyper1::service::service_fn;
|
||||
use hyper::service::service_fn;
|
||||
use hyper_util::rt::TokioIo;
|
||||
use rand::rngs::OsRng;
|
||||
use rsa::pkcs8::DecodePrivateKey;
|
||||
@@ -736,7 +738,7 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
|
||||
});
|
||||
|
||||
let listener = TcpListener::bind("0.0.0.0:0").await.unwrap();
|
||||
let server = hyper1::server::conn::http1::Builder::new();
|
||||
let server = hyper::server::conn::http1::Builder::new();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
|
||||
@@ -175,10 +175,12 @@ impl ComputeUserInfo {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg_attr(test, derive(Debug))]
|
||||
pub(crate) enum ComputeCredentialKeys {
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
Password(Vec<u8>),
|
||||
AuthKeys(AuthKeys),
|
||||
JwtPayload(Vec<u8>),
|
||||
None,
|
||||
}
|
||||
|
||||
|
||||
@@ -309,7 +309,7 @@ impl NodeInfo {
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
ComputeCredentialKeys::Password(password) => self.config.password(password),
|
||||
ComputeCredentialKeys::AuthKeys(auth_keys) => self.config.auth_keys(*auth_keys),
|
||||
ComputeCredentialKeys::None => &mut self.config,
|
||||
ComputeCredentialKeys::JwtPayload(_) | ComputeCredentialKeys::None => &mut self.config,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use anyhow::{anyhow, bail};
|
||||
use hyper::{header::CONTENT_TYPE, Body, Request, Response, StatusCode};
|
||||
use hyper0::{header::CONTENT_TYPE, Body, Request, Response, StatusCode};
|
||||
use measured::{text::BufferedTextEncoder, MetricGroup};
|
||||
use metrics::NeonMetrics;
|
||||
use std::{
|
||||
@@ -21,7 +21,7 @@ async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
json_response(StatusCode::OK, "")
|
||||
}
|
||||
|
||||
fn make_router(metrics: AppMetrics) -> RouterBuilder<hyper::Body, ApiError> {
|
||||
fn make_router(metrics: AppMetrics) -> RouterBuilder<hyper0::Body, ApiError> {
|
||||
let state = Arc::new(Mutex::new(PrometheusHandler {
|
||||
encoder: BufferedTextEncoder::new(),
|
||||
metrics,
|
||||
@@ -45,7 +45,7 @@ pub async fn task_main(
|
||||
|
||||
let service = || RouterService::new(make_router(metrics).build()?);
|
||||
|
||||
hyper::Server::from_tcp(http_listener)?
|
||||
hyper0::Server::from_tcp(http_listener)?
|
||||
.serve(service().map_err(|e| anyhow!(e))?)
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ use std::time::Duration;
|
||||
use anyhow::bail;
|
||||
use bytes::Bytes;
|
||||
use http_body_util::BodyExt;
|
||||
use hyper1::body::Body;
|
||||
use hyper::body::Body;
|
||||
use serde::de::DeserializeOwned;
|
||||
|
||||
pub(crate) use reqwest::{Request, Response};
|
||||
|
||||
@@ -90,8 +90,6 @@ use tokio::task::JoinError;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::warn;
|
||||
|
||||
extern crate hyper0 as hyper;
|
||||
|
||||
pub mod auth;
|
||||
pub mod cache;
|
||||
pub mod cancellation;
|
||||
|
||||
@@ -7,7 +7,7 @@ use crate::metrics::{
|
||||
WakeupFailureKind,
|
||||
};
|
||||
use crate::proxy::retry::{retry_after, should_retry};
|
||||
use hyper1::StatusCode;
|
||||
use hyper::StatusCode;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use super::connect_compute::ComputeConnectBackend;
|
||||
|
||||
@@ -3,10 +3,12 @@ use std::{io, sync::Arc, time::Duration};
|
||||
use async_trait::async_trait;
|
||||
use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
|
||||
use tokio::net::{lookup_host, TcpStream};
|
||||
use tracing::{field::display, info};
|
||||
use tokio_postgres::types::ToSql;
|
||||
use tracing::{debug, field::display, info};
|
||||
|
||||
use crate::{
|
||||
auth::{
|
||||
self,
|
||||
backend::{local::StaticAuthRules, ComputeCredentials, ComputeUserInfo},
|
||||
check_peer_addr_is_in_list, AuthError,
|
||||
},
|
||||
@@ -32,10 +34,12 @@ use crate::{
|
||||
use super::{
|
||||
conn_pool::{poll_client, Client, ConnInfo, GlobalConnPool},
|
||||
http_conn_pool::{self, poll_http2_client},
|
||||
local_conn_pool::{self, LocalClient, LocalConnPool},
|
||||
};
|
||||
|
||||
pub(crate) struct PoolingBackend {
|
||||
pub(crate) http_conn_pool: Arc<super::http_conn_pool::GlobalConnPool>,
|
||||
pub(crate) local_pool: Arc<LocalConnPool<tokio_postgres::Client>>,
|
||||
pub(crate) pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
|
||||
pub(crate) config: &'static ProxyConfig,
|
||||
pub(crate) endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
@@ -112,7 +116,7 @@ impl PoolingBackend {
|
||||
config: &AuthenticationConfig,
|
||||
user_info: &ComputeUserInfo,
|
||||
jwt: String,
|
||||
) -> Result<(), AuthError> {
|
||||
) -> Result<ComputeCredentials, AuthError> {
|
||||
match &self.config.auth_backend {
|
||||
crate::auth::Backend::ControlPlane(console, ()) => {
|
||||
config
|
||||
@@ -127,13 +131,16 @@ impl PoolingBackend {
|
||||
.await
|
||||
.map_err(|e| AuthError::auth_failed(e.to_string()))?;
|
||||
|
||||
Ok(())
|
||||
Ok(ComputeCredentials {
|
||||
info: user_info.clone(),
|
||||
keys: crate::auth::backend::ComputeCredentialKeys::None,
|
||||
})
|
||||
}
|
||||
crate::auth::Backend::ConsoleRedirect(_, ()) => Err(AuthError::auth_failed(
|
||||
"JWT login over web auth proxy is not supported",
|
||||
)),
|
||||
crate::auth::Backend::Local(_) => {
|
||||
config
|
||||
let keys = config
|
||||
.jwks_cache
|
||||
.check_jwt(
|
||||
ctx,
|
||||
@@ -145,8 +152,10 @@ impl PoolingBackend {
|
||||
.await
|
||||
.map_err(|e| AuthError::auth_failed(e.to_string()))?;
|
||||
|
||||
// todo: rewrite JWT signature with key shared somehow between local proxy and postgres
|
||||
Ok(())
|
||||
Ok(ComputeCredentials {
|
||||
info: user_info.clone(),
|
||||
keys,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -231,6 +240,77 @@ impl PoolingBackend {
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Connect to postgres over localhost.
|
||||
///
|
||||
/// We expect postgres to be started here, so we won't do any retries.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if called with a non-local_proxy backend.
|
||||
#[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
|
||||
pub(crate) async fn connect_to_local_postgres(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
conn_info: ConnInfo,
|
||||
) -> Result<LocalClient<tokio_postgres::Client>, HttpConnError> {
|
||||
if let Some(client) = self.local_pool.get(ctx, &conn_info)? {
|
||||
return Ok(client);
|
||||
}
|
||||
|
||||
let conn_id = uuid::Uuid::new_v4();
|
||||
tracing::Span::current().record("conn_id", display(conn_id));
|
||||
info!(%conn_id, "local_pool: opening a new connection '{conn_info}'");
|
||||
|
||||
let mut node_info = match &self.config.auth_backend {
|
||||
auth::Backend::ControlPlane(_, ()) | auth::Backend::ConsoleRedirect(_, ()) => {
|
||||
unreachable!("only local_proxy can connect to local postgres")
|
||||
}
|
||||
auth::Backend::Local(local) => local.node_info.clone(),
|
||||
};
|
||||
|
||||
let config = node_info
|
||||
.config
|
||||
.user(&conn_info.user_info.user)
|
||||
.dbname(&conn_info.dbname);
|
||||
|
||||
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
|
||||
let (client, connection) = config.connect(tokio_postgres::NoTls).await?;
|
||||
drop(pause);
|
||||
|
||||
tracing::Span::current().record("pid", tracing::field::display(client.get_process_id()));
|
||||
|
||||
let handle = local_conn_pool::poll_client(
|
||||
self.local_pool.clone(),
|
||||
ctx,
|
||||
conn_info,
|
||||
client,
|
||||
connection,
|
||||
conn_id,
|
||||
node_info.aux.clone(),
|
||||
);
|
||||
|
||||
let kid = handle.get_client().get_process_id() as i64;
|
||||
let jwk = p256::PublicKey::from(handle.key().verifying_key()).to_jwk();
|
||||
|
||||
debug!(kid, ?jwk, "setting up backend session state");
|
||||
|
||||
// initiates the auth session
|
||||
handle
|
||||
.get_client()
|
||||
.query(
|
||||
"select auth.init($1, $2);",
|
||||
&[
|
||||
&kid as &(dyn ToSql + Sync),
|
||||
&tokio_postgres::types::Json(jwk),
|
||||
],
|
||||
)
|
||||
.await?;
|
||||
|
||||
info!(?kid, "backend session state init");
|
||||
|
||||
Ok(handle)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -241,6 +321,8 @@ pub(crate) enum HttpConnError {
|
||||
PostgresConnectionError(#[from] tokio_postgres::Error),
|
||||
#[error("could not connection to local-proxy in compute")]
|
||||
LocalProxyConnectionError(#[from] LocalProxyConnError),
|
||||
#[error("could not parse JWT payload")]
|
||||
JwtPayloadError(serde_json::Error),
|
||||
|
||||
#[error("could not get auth info")]
|
||||
GetAuthInfo(#[from] GetAuthInfoError),
|
||||
@@ -257,7 +339,7 @@ pub(crate) enum LocalProxyConnError {
|
||||
#[error("error with connection to local-proxy")]
|
||||
Io(#[source] std::io::Error),
|
||||
#[error("could not establish h2 connection")]
|
||||
H2(#[from] hyper1::Error),
|
||||
H2(#[from] hyper::Error),
|
||||
}
|
||||
|
||||
impl ReportableError for HttpConnError {
|
||||
@@ -266,6 +348,7 @@ impl ReportableError for HttpConnError {
|
||||
HttpConnError::ConnectionClosedAbruptly(_) => ErrorKind::Compute,
|
||||
HttpConnError::PostgresConnectionError(p) => p.get_error_kind(),
|
||||
HttpConnError::LocalProxyConnectionError(_) => ErrorKind::Compute,
|
||||
HttpConnError::JwtPayloadError(_) => ErrorKind::User,
|
||||
HttpConnError::GetAuthInfo(a) => a.get_error_kind(),
|
||||
HttpConnError::AuthError(a) => a.get_error_kind(),
|
||||
HttpConnError::WakeCompute(w) => w.get_error_kind(),
|
||||
@@ -280,6 +363,7 @@ impl UserFacingError for HttpConnError {
|
||||
HttpConnError::ConnectionClosedAbruptly(_) => self.to_string(),
|
||||
HttpConnError::PostgresConnectionError(p) => p.to_string(),
|
||||
HttpConnError::LocalProxyConnectionError(p) => p.to_string(),
|
||||
HttpConnError::JwtPayloadError(p) => p.to_string(),
|
||||
HttpConnError::GetAuthInfo(c) => c.to_string_client(),
|
||||
HttpConnError::AuthError(c) => c.to_string_client(),
|
||||
HttpConnError::WakeCompute(c) => c.to_string_client(),
|
||||
@@ -296,6 +380,7 @@ impl CouldRetry for HttpConnError {
|
||||
HttpConnError::PostgresConnectionError(e) => e.could_retry(),
|
||||
HttpConnError::LocalProxyConnectionError(e) => e.could_retry(),
|
||||
HttpConnError::ConnectionClosedAbruptly(_) => false,
|
||||
HttpConnError::JwtPayloadError(_) => false,
|
||||
HttpConnError::GetAuthInfo(_) => false,
|
||||
HttpConnError::AuthError(_) => false,
|
||||
HttpConnError::WakeCompute(_) => false,
|
||||
@@ -481,7 +566,7 @@ async fn connect_http2(
|
||||
};
|
||||
};
|
||||
|
||||
let (client, connection) = hyper1::client::conn::http2::Builder::new(TokioExecutor::new())
|
||||
let (client, connection) = hyper::client::conn::http2::Builder::new(TokioExecutor::new())
|
||||
.timer(TokioTimer::new())
|
||||
.keep_alive_interval(Duration::from_secs(20))
|
||||
.keep_alive_while_idle(true)
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use dashmap::DashMap;
|
||||
use hyper1::client::conn::http2;
|
||||
use hyper::client::conn::http2;
|
||||
use hyper_util::rt::{TokioExecutor, TokioIo};
|
||||
use parking_lot::RwLock;
|
||||
use rand::Rng;
|
||||
@@ -18,9 +18,9 @@ use tracing::{info, info_span, Instrument};
|
||||
|
||||
use super::conn_pool::ConnInfo;
|
||||
|
||||
pub(crate) type Send = http2::SendRequest<hyper1::body::Incoming>;
|
||||
pub(crate) type Send = http2::SendRequest<hyper::body::Incoming>;
|
||||
pub(crate) type Connect =
|
||||
http2::Connection<TokioIo<TcpStream>, hyper1::body::Incoming, TokioExecutor>;
|
||||
http2::Connection<TokioIo<TcpStream>, hyper::body::Incoming, TokioExecutor>;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ConnPoolEntry {
|
||||
|
||||
@@ -11,7 +11,7 @@ use serde::Serialize;
|
||||
use utils::http::error::ApiError;
|
||||
|
||||
/// Like [`ApiError::into_response`]
|
||||
pub(crate) fn api_error_into_response(this: ApiError) -> Response<BoxBody<Bytes, hyper1::Error>> {
|
||||
pub(crate) fn api_error_into_response(this: ApiError) -> Response<BoxBody<Bytes, hyper::Error>> {
|
||||
match this {
|
||||
ApiError::BadRequest(err) => HttpErrorBody::response_from_msg_and_status(
|
||||
format!("{err:#?}"), // use debug printing so that we give the cause
|
||||
@@ -67,12 +67,12 @@ impl HttpErrorBody {
|
||||
fn response_from_msg_and_status(
|
||||
msg: String,
|
||||
status: StatusCode,
|
||||
) -> Response<BoxBody<Bytes, hyper1::Error>> {
|
||||
) -> Response<BoxBody<Bytes, hyper::Error>> {
|
||||
HttpErrorBody { msg }.to_response(status)
|
||||
}
|
||||
|
||||
/// Same as [`utils::http::error::HttpErrorBody::to_response`]
|
||||
fn to_response(&self, status: StatusCode) -> Response<BoxBody<Bytes, hyper1::Error>> {
|
||||
fn to_response(&self, status: StatusCode) -> Response<BoxBody<Bytes, hyper::Error>> {
|
||||
Response::builder()
|
||||
.status(status)
|
||||
.header(http::header::CONTENT_TYPE, "application/json")
|
||||
@@ -90,7 +90,7 @@ impl HttpErrorBody {
|
||||
pub(crate) fn json_response<T: Serialize>(
|
||||
status: StatusCode,
|
||||
data: T,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper1::Error>>, ApiError> {
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ApiError> {
|
||||
let json = serde_json::to_string(&data)
|
||||
.context("Failed to serialize JSON response")
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
544
proxy/src/serverless/local_conn_pool.rs
Normal file
544
proxy/src/serverless/local_conn_pool.rs
Normal file
@@ -0,0 +1,544 @@
|
||||
use futures::{future::poll_fn, Future};
|
||||
use jose_jwk::jose_b64::base64ct::{Base64UrlUnpadded, Encoding};
|
||||
use p256::ecdsa::{Signature, SigningKey};
|
||||
use parking_lot::RwLock;
|
||||
use rand::rngs::OsRng;
|
||||
use serde_json::Value;
|
||||
use signature::Signer;
|
||||
use std::task::{ready, Poll};
|
||||
use std::{collections::HashMap, pin::pin, sync::Arc, sync::Weak, time::Duration};
|
||||
use tokio::time::Instant;
|
||||
use tokio_postgres::tls::NoTlsStream;
|
||||
use tokio_postgres::types::ToSql;
|
||||
use tokio_postgres::{AsyncMessage, ReadyForQueryStatus, Socket};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use typed_json::json;
|
||||
|
||||
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
|
||||
use crate::metrics::Metrics;
|
||||
use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
|
||||
use crate::{context::RequestMonitoring, DbName, RoleName};
|
||||
|
||||
use tracing::{debug, error, warn, Span};
|
||||
use tracing::{info, info_span, Instrument};
|
||||
|
||||
use super::backend::HttpConnError;
|
||||
use super::conn_pool::{ClientInnerExt, ConnInfo};
|
||||
|
||||
struct ConnPoolEntry<C: ClientInnerExt> {
|
||||
conn: ClientInner<C>,
|
||||
_last_access: std::time::Instant,
|
||||
}
|
||||
|
||||
// /// key id for the pg_session_jwt state
|
||||
// static PG_SESSION_JWT_KID: AtomicU64 = AtomicU64::new(1);
|
||||
|
||||
// Per-endpoint connection pool, (dbname, username) -> DbUserConnPool
|
||||
// Number of open connections is limited by the `max_conns_per_endpoint`.
|
||||
pub(crate) struct EndpointConnPool<C: ClientInnerExt> {
|
||||
pools: HashMap<(DbName, RoleName), DbUserConnPool<C>>,
|
||||
total_conns: usize,
|
||||
max_conns: usize,
|
||||
global_pool_size_max_conns: usize,
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt> EndpointConnPool<C> {
|
||||
fn get_conn_entry(&mut self, db_user: (DbName, RoleName)) -> Option<ConnPoolEntry<C>> {
|
||||
let Self {
|
||||
pools, total_conns, ..
|
||||
} = self;
|
||||
pools
|
||||
.get_mut(&db_user)
|
||||
.and_then(|pool_entries| pool_entries.get_conn_entry(total_conns))
|
||||
}
|
||||
|
||||
fn remove_client(&mut self, db_user: (DbName, RoleName), conn_id: uuid::Uuid) -> bool {
|
||||
let Self {
|
||||
pools, total_conns, ..
|
||||
} = self;
|
||||
if let Some(pool) = pools.get_mut(&db_user) {
|
||||
let old_len = pool.conns.len();
|
||||
pool.conns.retain(|conn| conn.conn.conn_id != conn_id);
|
||||
let new_len = pool.conns.len();
|
||||
let removed = old_len - new_len;
|
||||
if removed > 0 {
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.http_pool_opened_connections
|
||||
.get_metric()
|
||||
.dec_by(removed as i64);
|
||||
}
|
||||
*total_conns -= removed;
|
||||
removed > 0
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
fn put(pool: &RwLock<Self>, conn_info: &ConnInfo, client: ClientInner<C>) {
|
||||
let conn_id = client.conn_id;
|
||||
|
||||
if client.is_closed() {
|
||||
info!(%conn_id, "local_pool: throwing away connection '{conn_info}' because connection is closed");
|
||||
return;
|
||||
}
|
||||
let global_max_conn = pool.read().global_pool_size_max_conns;
|
||||
if pool.read().total_conns >= global_max_conn {
|
||||
info!(%conn_id, "local_pool: throwing away connection '{conn_info}' because pool is full");
|
||||
return;
|
||||
}
|
||||
|
||||
// return connection to the pool
|
||||
let mut returned = false;
|
||||
let mut per_db_size = 0;
|
||||
let total_conns = {
|
||||
let mut pool = pool.write();
|
||||
|
||||
if pool.total_conns < pool.max_conns {
|
||||
let pool_entries = pool.pools.entry(conn_info.db_and_user()).or_default();
|
||||
pool_entries.conns.push(ConnPoolEntry {
|
||||
conn: client,
|
||||
_last_access: std::time::Instant::now(),
|
||||
});
|
||||
|
||||
returned = true;
|
||||
per_db_size = pool_entries.conns.len();
|
||||
|
||||
pool.total_conns += 1;
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.http_pool_opened_connections
|
||||
.get_metric()
|
||||
.inc();
|
||||
}
|
||||
|
||||
pool.total_conns
|
||||
};
|
||||
|
||||
// do logging outside of the mutex
|
||||
if returned {
|
||||
info!(%conn_id, "local_pool: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}");
|
||||
} else {
|
||||
info!(%conn_id, "local_pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt> Drop for EndpointConnPool<C> {
|
||||
fn drop(&mut self) {
|
||||
if self.total_conns > 0 {
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.http_pool_opened_connections
|
||||
.get_metric()
|
||||
.dec_by(self.total_conns as i64);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct DbUserConnPool<C: ClientInnerExt> {
|
||||
conns: Vec<ConnPoolEntry<C>>,
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt> Default for DbUserConnPool<C> {
|
||||
fn default() -> Self {
|
||||
Self { conns: Vec::new() }
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt> DbUserConnPool<C> {
|
||||
fn clear_closed_clients(&mut self, conns: &mut usize) -> usize {
|
||||
let old_len = self.conns.len();
|
||||
|
||||
self.conns.retain(|conn| !conn.conn.is_closed());
|
||||
|
||||
let new_len = self.conns.len();
|
||||
let removed = old_len - new_len;
|
||||
*conns -= removed;
|
||||
removed
|
||||
}
|
||||
|
||||
fn get_conn_entry(&mut self, conns: &mut usize) -> Option<ConnPoolEntry<C>> {
|
||||
let mut removed = self.clear_closed_clients(conns);
|
||||
let conn = self.conns.pop();
|
||||
if conn.is_some() {
|
||||
*conns -= 1;
|
||||
removed += 1;
|
||||
}
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.http_pool_opened_connections
|
||||
.get_metric()
|
||||
.dec_by(removed as i64);
|
||||
conn
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct LocalConnPool<C: ClientInnerExt> {
|
||||
global_pool: RwLock<EndpointConnPool<C>>,
|
||||
|
||||
config: &'static crate::config::HttpConfig,
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt> LocalConnPool<C> {
|
||||
pub(crate) fn new(config: &'static crate::config::HttpConfig) -> Arc<Self> {
|
||||
Arc::new(Self {
|
||||
global_pool: RwLock::new(EndpointConnPool {
|
||||
pools: HashMap::new(),
|
||||
total_conns: 0,
|
||||
max_conns: config.pool_options.max_conns_per_endpoint,
|
||||
global_pool_size_max_conns: config.pool_options.max_total_conns,
|
||||
}),
|
||||
config,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn get_idle_timeout(&self) -> Duration {
|
||||
self.config.pool_options.idle_timeout
|
||||
}
|
||||
|
||||
// pub(crate) fn shutdown(&self) {
|
||||
// let mut pool = self.global_pool.write();
|
||||
// pool.pools.clear();
|
||||
// pool.total_conns = 0;
|
||||
// }
|
||||
|
||||
pub(crate) fn get(
|
||||
self: &Arc<Self>,
|
||||
ctx: &RequestMonitoring,
|
||||
conn_info: &ConnInfo,
|
||||
) -> Result<Option<LocalClient<C>>, HttpConnError> {
|
||||
let mut client: Option<ClientInner<C>> = None;
|
||||
if let Some(entry) = self
|
||||
.global_pool
|
||||
.write()
|
||||
.get_conn_entry(conn_info.db_and_user())
|
||||
{
|
||||
client = Some(entry.conn);
|
||||
}
|
||||
|
||||
// ok return cached connection if found and establish a new one otherwise
|
||||
if let Some(client) = client {
|
||||
if client.is_closed() {
|
||||
info!("local_pool: cached connection '{conn_info}' is closed, opening a new one");
|
||||
return Ok(None);
|
||||
}
|
||||
tracing::Span::current().record("conn_id", tracing::field::display(client.conn_id));
|
||||
tracing::Span::current().record(
|
||||
"pid",
|
||||
tracing::field::display(client.inner.get_process_id()),
|
||||
);
|
||||
info!(
|
||||
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
|
||||
"local_pool: reusing connection '{conn_info}'"
|
||||
);
|
||||
client.session.send(ctx.session_id())?;
|
||||
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
|
||||
ctx.success();
|
||||
return Ok(Some(LocalClient::new(
|
||||
client,
|
||||
conn_info.clone(),
|
||||
Arc::downgrade(self),
|
||||
)));
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn poll_client(
|
||||
global_pool: Arc<LocalConnPool<tokio_postgres::Client>>,
|
||||
ctx: &RequestMonitoring,
|
||||
conn_info: ConnInfo,
|
||||
client: tokio_postgres::Client,
|
||||
mut connection: tokio_postgres::Connection<Socket, NoTlsStream>,
|
||||
conn_id: uuid::Uuid,
|
||||
aux: MetricsAuxInfo,
|
||||
) -> LocalClient<tokio_postgres::Client> {
|
||||
let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol());
|
||||
let mut session_id = ctx.session_id();
|
||||
let (tx, mut rx) = tokio::sync::watch::channel(session_id);
|
||||
|
||||
let span = info_span!(parent: None, "connection", %conn_id);
|
||||
let cold_start_info = ctx.cold_start_info();
|
||||
span.in_scope(|| {
|
||||
info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection");
|
||||
});
|
||||
let pool = Arc::downgrade(&global_pool);
|
||||
let pool_clone = pool.clone();
|
||||
|
||||
let db_user = conn_info.db_and_user();
|
||||
let idle = global_pool.get_idle_timeout();
|
||||
let cancel = CancellationToken::new();
|
||||
let cancelled = cancel.clone().cancelled_owned();
|
||||
|
||||
tokio::spawn(
|
||||
async move {
|
||||
let _conn_gauge = conn_gauge;
|
||||
let mut idle_timeout = pin!(tokio::time::sleep(idle));
|
||||
let mut cancelled = pin!(cancelled);
|
||||
|
||||
poll_fn(move |cx| {
|
||||
if cancelled.as_mut().poll(cx).is_ready() {
|
||||
info!("connection dropped");
|
||||
return Poll::Ready(())
|
||||
}
|
||||
|
||||
match rx.has_changed() {
|
||||
Ok(true) => {
|
||||
session_id = *rx.borrow_and_update();
|
||||
info!(%session_id, "changed session");
|
||||
idle_timeout.as_mut().reset(Instant::now() + idle);
|
||||
}
|
||||
Err(_) => {
|
||||
info!("connection dropped");
|
||||
return Poll::Ready(())
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
// 5 minute idle connection timeout
|
||||
if idle_timeout.as_mut().poll(cx).is_ready() {
|
||||
idle_timeout.as_mut().reset(Instant::now() + idle);
|
||||
info!("connection idle");
|
||||
if let Some(pool) = pool.clone().upgrade() {
|
||||
// remove client from pool - should close the connection if it's idle.
|
||||
// does nothing if the client is currently checked-out and in-use
|
||||
if pool.global_pool.write().remove_client(db_user.clone(), conn_id) {
|
||||
info!("idle connection removed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
let message = ready!(connection.poll_message(cx));
|
||||
|
||||
match message {
|
||||
Some(Ok(AsyncMessage::Notice(notice))) => {
|
||||
info!(%session_id, "notice: {}", notice);
|
||||
}
|
||||
Some(Ok(AsyncMessage::Notification(notif))) => {
|
||||
warn!(%session_id, pid = notif.process_id(), channel = notif.channel(), "notification received");
|
||||
}
|
||||
Some(Ok(_)) => {
|
||||
warn!(%session_id, "unknown message");
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
error!(%session_id, "connection error: {}", e);
|
||||
break
|
||||
}
|
||||
None => {
|
||||
info!("connection closed");
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// remove from connection pool
|
||||
if let Some(pool) = pool.clone().upgrade() {
|
||||
if pool.global_pool.write().remove_client(db_user.clone(), conn_id) {
|
||||
info!("closed connection removed");
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Ready(())
|
||||
}).await;
|
||||
|
||||
}
|
||||
.instrument(span));
|
||||
|
||||
let key = SigningKey::random(&mut OsRng);
|
||||
|
||||
let inner = ClientInner {
|
||||
inner: client,
|
||||
session: tx,
|
||||
cancel,
|
||||
aux,
|
||||
conn_id,
|
||||
key,
|
||||
jti: 0,
|
||||
};
|
||||
LocalClient::new(inner, conn_info, pool_clone)
|
||||
}
|
||||
|
||||
struct ClientInner<C: ClientInnerExt> {
|
||||
inner: C,
|
||||
session: tokio::sync::watch::Sender<uuid::Uuid>,
|
||||
cancel: CancellationToken,
|
||||
aux: MetricsAuxInfo,
|
||||
conn_id: uuid::Uuid,
|
||||
|
||||
// needed for pg_session_jwt state
|
||||
key: SigningKey,
|
||||
jti: u64,
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt> Drop for ClientInner<C> {
|
||||
fn drop(&mut self) {
|
||||
// on client drop, tell the conn to shut down
|
||||
self.cancel.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt> ClientInner<C> {
|
||||
pub(crate) fn is_closed(&self) -> bool {
|
||||
self.inner.is_closed()
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt> LocalClient<C> {
|
||||
pub(crate) fn metrics(&self) -> Arc<MetricCounter> {
|
||||
let aux = &self.inner.as_ref().unwrap().aux;
|
||||
USAGE_METRICS.register(Ids {
|
||||
endpoint_id: aux.endpoint_id,
|
||||
branch_id: aux.branch_id,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct LocalClient<C: ClientInnerExt> {
|
||||
span: Span,
|
||||
inner: Option<ClientInner<C>>,
|
||||
conn_info: ConnInfo,
|
||||
pool: Weak<LocalConnPool<C>>,
|
||||
}
|
||||
|
||||
pub(crate) struct Discard<'a, C: ClientInnerExt> {
|
||||
conn_info: &'a ConnInfo,
|
||||
pool: &'a mut Weak<LocalConnPool<C>>,
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt> LocalClient<C> {
|
||||
pub(self) fn new(
|
||||
inner: ClientInner<C>,
|
||||
conn_info: ConnInfo,
|
||||
pool: Weak<LocalConnPool<C>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner: Some(inner),
|
||||
span: Span::current(),
|
||||
conn_info,
|
||||
pool,
|
||||
}
|
||||
}
|
||||
pub(crate) fn inner(&mut self) -> (&mut C, Discard<'_, C>) {
|
||||
let Self {
|
||||
inner,
|
||||
pool,
|
||||
conn_info,
|
||||
span: _,
|
||||
} = self;
|
||||
let inner = inner.as_mut().expect("client inner should not be removed");
|
||||
(&mut inner.inner, Discard { conn_info, pool })
|
||||
}
|
||||
pub(crate) fn key(&self) -> &SigningKey {
|
||||
let inner = &self
|
||||
.inner
|
||||
.as_ref()
|
||||
.expect("client inner should not be removed");
|
||||
&inner.key
|
||||
}
|
||||
}
|
||||
|
||||
impl LocalClient<tokio_postgres::Client> {
|
||||
pub(crate) async fn set_jwt_session(&mut self, payload: &[u8]) -> Result<(), HttpConnError> {
|
||||
let inner = self
|
||||
.inner
|
||||
.as_mut()
|
||||
.expect("client inner should not be removed");
|
||||
inner.jti += 1;
|
||||
|
||||
let kid = inner.inner.get_process_id();
|
||||
let header = json!({"kid":kid}).to_string();
|
||||
|
||||
let mut payload = serde_json::from_slice::<serde_json::Map<String, Value>>(payload)
|
||||
.map_err(HttpConnError::JwtPayloadError)?;
|
||||
payload.insert("jti".to_string(), Value::Number(inner.jti.into()));
|
||||
let payload = Value::Object(payload).to_string();
|
||||
|
||||
debug!(
|
||||
kid,
|
||||
jti = inner.jti,
|
||||
?header,
|
||||
?payload,
|
||||
"signing new ephemeral JWT"
|
||||
);
|
||||
|
||||
let token = sign_jwt(&inner.key, header, payload);
|
||||
|
||||
// initiates the auth session
|
||||
inner.inner.simple_query("discard all").await?;
|
||||
inner
|
||||
.inner
|
||||
.query(
|
||||
"select auth.jwt_session_init($1)",
|
||||
&[&token as &(dyn ToSql + Sync)],
|
||||
)
|
||||
.await?;
|
||||
|
||||
info!(kid, jti = inner.jti, "user session state init");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn sign_jwt(sk: &SigningKey, header: String, payload: String) -> String {
|
||||
let header = Base64UrlUnpadded::encode_string(header.as_bytes());
|
||||
let payload = Base64UrlUnpadded::encode_string(payload.as_bytes());
|
||||
|
||||
let message = format!("{header}.{payload}");
|
||||
let sig: Signature = sk.sign(message.as_bytes());
|
||||
let base64_sig = Base64UrlUnpadded::encode_string(&sig.to_bytes());
|
||||
format!("{message}.{base64_sig}")
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt> Discard<'_, C> {
|
||||
pub(crate) fn check_idle(&mut self, status: ReadyForQueryStatus) {
|
||||
let conn_info = &self.conn_info;
|
||||
if status != ReadyForQueryStatus::Idle && std::mem::take(self.pool).strong_count() > 0 {
|
||||
info!(
|
||||
"local_pool: throwing away connection '{conn_info}' because connection is not idle"
|
||||
);
|
||||
}
|
||||
}
|
||||
pub(crate) fn discard(&mut self) {
|
||||
let conn_info = &self.conn_info;
|
||||
if std::mem::take(self.pool).strong_count() > 0 {
|
||||
info!("local_pool: throwing away connection '{conn_info}' because connection is potentially in a broken state");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt> LocalClient<C> {
|
||||
pub fn get_client(&self) -> &C {
|
||||
&self
|
||||
.inner
|
||||
.as_ref()
|
||||
.expect("client inner should not be removed")
|
||||
.inner
|
||||
}
|
||||
|
||||
fn do_drop(&mut self) -> Option<impl FnOnce()> {
|
||||
let conn_info = self.conn_info.clone();
|
||||
let client = self
|
||||
.inner
|
||||
.take()
|
||||
.expect("client inner should not be removed");
|
||||
if let Some(conn_pool) = std::mem::take(&mut self.pool).upgrade() {
|
||||
let current_span = self.span.clone();
|
||||
// return connection to the pool
|
||||
return Some(move || {
|
||||
let _span = current_span.enter();
|
||||
EndpointConnPool::put(&conn_pool.global_pool, &conn_info, client);
|
||||
});
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt> Drop for LocalClient<C> {
|
||||
fn drop(&mut self) {
|
||||
if let Some(drop) = self.do_drop() {
|
||||
tokio::task::spawn_blocking(drop);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -8,6 +8,7 @@ mod conn_pool;
|
||||
mod http_conn_pool;
|
||||
mod http_util;
|
||||
mod json;
|
||||
mod local_conn_pool;
|
||||
mod sql_over_http;
|
||||
mod websocket;
|
||||
|
||||
@@ -22,7 +23,7 @@ use futures::TryFutureExt;
|
||||
use http::{Method, Response, StatusCode};
|
||||
use http_body_util::combinators::BoxBody;
|
||||
use http_body_util::{BodyExt, Empty};
|
||||
use hyper1::body::Incoming;
|
||||
use hyper::body::Incoming;
|
||||
use hyper_util::rt::TokioExecutor;
|
||||
use hyper_util::server::conn::auto::Builder;
|
||||
use rand::rngs::StdRng;
|
||||
@@ -63,6 +64,7 @@ pub async fn task_main(
|
||||
info!("websocket server has shut down");
|
||||
}
|
||||
|
||||
let local_pool = local_conn_pool::LocalConnPool::new(&config.http_config);
|
||||
let conn_pool = conn_pool::GlobalConnPool::new(&config.http_config);
|
||||
{
|
||||
let conn_pool = Arc::clone(&conn_pool);
|
||||
@@ -105,6 +107,7 @@ pub async fn task_main(
|
||||
|
||||
let backend = Arc::new(PoolingBackend {
|
||||
http_conn_pool: Arc::clone(&http_conn_pool),
|
||||
local_pool,
|
||||
pool: Arc::clone(&conn_pool),
|
||||
config,
|
||||
endpoint_rate_limiter: Arc::clone(&endpoint_rate_limiter),
|
||||
@@ -302,7 +305,7 @@ async fn connection_handler(
|
||||
let server = Builder::new(TokioExecutor::new());
|
||||
let conn = server.serve_connection_with_upgrades(
|
||||
hyper_util::rt::TokioIo::new(conn),
|
||||
hyper1::service::service_fn(move |req: hyper1::Request<Incoming>| {
|
||||
hyper::service::service_fn(move |req: hyper::Request<Incoming>| {
|
||||
// First HTTP request shares the same session ID
|
||||
let session_id = session_id.take().unwrap_or_else(uuid::Uuid::new_v4);
|
||||
|
||||
@@ -355,7 +358,7 @@ async fn connection_handler(
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn request_handler(
|
||||
mut request: hyper1::Request<Incoming>,
|
||||
mut request: hyper::Request<Incoming>,
|
||||
config: &'static ProxyConfig,
|
||||
backend: Arc<PoolingBackend>,
|
||||
ws_connections: TaskTracker,
|
||||
@@ -365,7 +368,7 @@ async fn request_handler(
|
||||
// used to cancel in-flight HTTP requests. not used to cancel websockets
|
||||
http_cancellation_token: CancellationToken,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper1::Error>>, ApiError> {
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ApiError> {
|
||||
let host = request
|
||||
.headers()
|
||||
.get("host")
|
||||
|
||||
@@ -12,14 +12,14 @@ use http::Method;
|
||||
use http_body_util::combinators::BoxBody;
|
||||
use http_body_util::BodyExt;
|
||||
use http_body_util::Full;
|
||||
use hyper1::body::Body;
|
||||
use hyper1::body::Incoming;
|
||||
use hyper1::header;
|
||||
use hyper1::http::HeaderName;
|
||||
use hyper1::http::HeaderValue;
|
||||
use hyper1::Response;
|
||||
use hyper1::StatusCode;
|
||||
use hyper1::{HeaderMap, Request};
|
||||
use hyper::body::Body;
|
||||
use hyper::body::Incoming;
|
||||
use hyper::header;
|
||||
use hyper::http::HeaderName;
|
||||
use hyper::http::HeaderValue;
|
||||
use hyper::Response;
|
||||
use hyper::StatusCode;
|
||||
use hyper::{HeaderMap, Request};
|
||||
use pq_proto::StartupMessageParamsBuilder;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value;
|
||||
@@ -40,7 +40,7 @@ use url::Url;
|
||||
use urlencoding;
|
||||
use utils::http::error::ApiError;
|
||||
|
||||
use crate::auth::backend::ComputeCredentials;
|
||||
use crate::auth::backend::ComputeCredentialKeys;
|
||||
use crate::auth::backend::ComputeUserInfo;
|
||||
use crate::auth::endpoint_sni;
|
||||
use crate::auth::ComputeUserInfoParseError;
|
||||
@@ -56,20 +56,22 @@ use crate::metrics::Metrics;
|
||||
use crate::proxy::run_until_cancelled;
|
||||
use crate::proxy::NeonOptions;
|
||||
use crate::serverless::backend::HttpConnError;
|
||||
use crate::usage_metrics::MetricCounter;
|
||||
use crate::usage_metrics::MetricCounterRecorder;
|
||||
use crate::DbName;
|
||||
use crate::RoleName;
|
||||
|
||||
use super::backend::LocalProxyConnError;
|
||||
use super::backend::PoolingBackend;
|
||||
use super::conn_pool;
|
||||
use super::conn_pool::AuthData;
|
||||
use super::conn_pool::Client;
|
||||
use super::conn_pool::ConnInfo;
|
||||
use super::conn_pool::ConnInfoWithAuth;
|
||||
use super::http_util::json_response;
|
||||
use super::json::json_to_pg_text;
|
||||
use super::json::pg_text_row_to_json;
|
||||
use super::json::JsonConversionError;
|
||||
use super::local_conn_pool;
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
@@ -272,7 +274,7 @@ pub(crate) async fn handle(
|
||||
request: Request<Incoming>,
|
||||
backend: Arc<PoolingBackend>,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper1::Error>>, ApiError> {
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ApiError> {
|
||||
let result = handle_inner(cancel, config, &ctx, request, backend).await;
|
||||
|
||||
let mut response = match result {
|
||||
@@ -435,7 +437,7 @@ impl UserFacingError for SqlOverHttpError {
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum ReadPayloadError {
|
||||
#[error("could not read the HTTP request body: {0}")]
|
||||
Read(#[from] hyper1::Error),
|
||||
Read(#[from] hyper::Error),
|
||||
#[error("could not parse the HTTP request body: {0}")]
|
||||
Parse(#[from] serde_json::Error),
|
||||
}
|
||||
@@ -476,7 +478,7 @@ struct HttpHeaders {
|
||||
}
|
||||
|
||||
impl HttpHeaders {
|
||||
fn try_parse(headers: &hyper1::http::HeaderMap) -> Result<Self, SqlOverHttpError> {
|
||||
fn try_parse(headers: &hyper::http::HeaderMap) -> Result<Self, SqlOverHttpError> {
|
||||
// Determine the output options. Default behaviour is 'false'. Anything that is not
|
||||
// strictly 'true' assumed to be false.
|
||||
let raw_output = headers.get(&RAW_TEXT_OUTPUT) == Some(&HEADER_VALUE_TRUE);
|
||||
@@ -529,7 +531,7 @@ async fn handle_inner(
|
||||
ctx: &RequestMonitoring,
|
||||
request: Request<Incoming>,
|
||||
backend: Arc<PoolingBackend>,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper1::Error>>, SqlOverHttpError> {
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, SqlOverHttpError> {
|
||||
let _requeset_gauge = Metrics::get()
|
||||
.proxy
|
||||
.connection_requests
|
||||
@@ -577,7 +579,7 @@ async fn handle_db_inner(
|
||||
conn_info: ConnInfo,
|
||||
auth: AuthData,
|
||||
backend: Arc<PoolingBackend>,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper1::Error>>, SqlOverHttpError> {
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, SqlOverHttpError> {
|
||||
//
|
||||
// Determine the destination and connection params
|
||||
//
|
||||
@@ -620,6 +622,9 @@ async fn handle_db_inner(
|
||||
|
||||
let authenticate_and_connect = Box::pin(
|
||||
async {
|
||||
let is_local_proxy =
|
||||
matches!(backend.config.auth_backend, crate::auth::Backend::Local(_));
|
||||
|
||||
let keys = match auth {
|
||||
AuthData::Password(pw) => {
|
||||
backend
|
||||
@@ -639,18 +644,24 @@ async fn handle_db_inner(
|
||||
&conn_info.user_info,
|
||||
jwt,
|
||||
)
|
||||
.await?;
|
||||
|
||||
ComputeCredentials {
|
||||
info: conn_info.user_info.clone(),
|
||||
keys: crate::auth::backend::ComputeCredentialKeys::None,
|
||||
}
|
||||
.await?
|
||||
}
|
||||
};
|
||||
|
||||
let client = match keys.keys {
|
||||
ComputeCredentialKeys::JwtPayload(payload) if is_local_proxy => {
|
||||
let mut client = backend.connect_to_local_postgres(ctx, conn_info).await?;
|
||||
client.set_jwt_session(&payload).await?;
|
||||
Client::Local(client)
|
||||
}
|
||||
_ => {
|
||||
let client = backend
|
||||
.connect_to_compute(ctx, conn_info, keys, !allow_pool)
|
||||
.await?;
|
||||
Client::Remote(client)
|
||||
}
|
||||
};
|
||||
|
||||
let client = backend
|
||||
.connect_to_compute(ctx, conn_info, keys, !allow_pool)
|
||||
.await?;
|
||||
// not strictly necessary to mark success here,
|
||||
// but it's just insurance for if we forget it somewhere else
|
||||
ctx.success();
|
||||
@@ -744,7 +755,7 @@ async fn handle_auth_broker_inner(
|
||||
conn_info: ConnInfo,
|
||||
jwt: String,
|
||||
backend: Arc<PoolingBackend>,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper1::Error>>, SqlOverHttpError> {
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, SqlOverHttpError> {
|
||||
backend
|
||||
.authenticate_with_jwt(
|
||||
ctx,
|
||||
@@ -791,7 +802,7 @@ impl QueryData {
|
||||
self,
|
||||
config: &'static ProxyConfig,
|
||||
cancel: CancellationToken,
|
||||
client: &mut Client<tokio_postgres::Client>,
|
||||
client: &mut Client,
|
||||
parsed_headers: HttpHeaders,
|
||||
) -> Result<String, SqlOverHttpError> {
|
||||
let (inner, mut discard) = client.inner();
|
||||
@@ -865,7 +876,7 @@ impl BatchQueryData {
|
||||
self,
|
||||
config: &'static ProxyConfig,
|
||||
cancel: CancellationToken,
|
||||
client: &mut Client<tokio_postgres::Client>,
|
||||
client: &mut Client,
|
||||
parsed_headers: HttpHeaders,
|
||||
) -> Result<String, SqlOverHttpError> {
|
||||
info!("starting transaction");
|
||||
@@ -1058,3 +1069,50 @@ async fn query_to_json<T: GenericClient>(
|
||||
|
||||
Ok((ready, results))
|
||||
}
|
||||
|
||||
enum Client {
|
||||
Remote(conn_pool::Client<tokio_postgres::Client>),
|
||||
Local(local_conn_pool::LocalClient<tokio_postgres::Client>),
|
||||
}
|
||||
|
||||
enum Discard<'a> {
|
||||
Remote(conn_pool::Discard<'a, tokio_postgres::Client>),
|
||||
Local(local_conn_pool::Discard<'a, tokio_postgres::Client>),
|
||||
}
|
||||
|
||||
impl Client {
|
||||
fn metrics(&self) -> Arc<MetricCounter> {
|
||||
match self {
|
||||
Client::Remote(client) => client.metrics(),
|
||||
Client::Local(local_client) => local_client.metrics(),
|
||||
}
|
||||
}
|
||||
|
||||
fn inner(&mut self) -> (&mut tokio_postgres::Client, Discard<'_>) {
|
||||
match self {
|
||||
Client::Remote(client) => {
|
||||
let (c, d) = client.inner();
|
||||
(c, Discard::Remote(d))
|
||||
}
|
||||
Client::Local(local_client) => {
|
||||
let (c, d) = local_client.inner();
|
||||
(c, Discard::Local(d))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Discard<'_> {
|
||||
fn check_idle(&mut self, status: ReadyForQueryStatus) {
|
||||
match self {
|
||||
Discard::Remote(discard) => discard.check_idle(status),
|
||||
Discard::Local(discard) => discard.check_idle(status),
|
||||
}
|
||||
}
|
||||
fn discard(&mut self) {
|
||||
match self {
|
||||
Discard::Remote(discard) => discard.discard(),
|
||||
Discard::Local(discard) => discard.discard(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ use anyhow::Context as _;
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use framed_websockets::{Frame, OpCode, WebSocketServer};
|
||||
use futures::{Sink, Stream};
|
||||
use hyper1::upgrade::OnUpgrade;
|
||||
use hyper::upgrade::OnUpgrade;
|
||||
use hyper_util::rt::TokioIo;
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
|
||||
@@ -485,49 +485,51 @@ async fn upload_events_chunk(
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{
|
||||
net::TcpListener,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
use super::*;
|
||||
|
||||
use crate::{http, BranchId, EndpointId};
|
||||
use anyhow::Error;
|
||||
use chrono::Utc;
|
||||
use consumption_metrics::{Event, EventChunk};
|
||||
use hyper::{
|
||||
service::{make_service_fn, service_fn},
|
||||
Body, Response,
|
||||
};
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::{body::Incoming, server::conn::http1, service::service_fn, Request, Response};
|
||||
use hyper_util::rt::TokioIo;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::net::TcpListener;
|
||||
use url::Url;
|
||||
|
||||
use super::*;
|
||||
use crate::{http, BranchId, EndpointId};
|
||||
|
||||
#[tokio::test]
|
||||
async fn metrics() {
|
||||
let listener = TcpListener::bind("0.0.0.0:0").unwrap();
|
||||
type Report = EventChunk<'static, Event<Ids, String>>;
|
||||
let reports: Arc<Mutex<Vec<Report>>> = Arc::default();
|
||||
|
||||
let reports = Arc::new(Mutex::new(vec![]));
|
||||
let reports2 = reports.clone();
|
||||
|
||||
let server = hyper::server::Server::from_tcp(listener)
|
||||
.unwrap()
|
||||
.serve(make_service_fn(move |_| {
|
||||
let reports = reports.clone();
|
||||
async move {
|
||||
Ok::<_, Error>(service_fn(move |req| {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
tokio::spawn({
|
||||
let reports = reports.clone();
|
||||
async move {
|
||||
loop {
|
||||
if let Ok((stream, _addr)) = listener.accept().await {
|
||||
let reports = reports.clone();
|
||||
async move {
|
||||
let bytes = hyper::body::to_bytes(req.into_body()).await?;
|
||||
let events: EventChunk<'static, Event<Ids, String>> =
|
||||
serde_json::from_slice(&bytes)?;
|
||||
reports.lock().unwrap().push(events);
|
||||
Ok::<_, Error>(Response::new(Body::from(vec![])))
|
||||
}
|
||||
}))
|
||||
http1::Builder::new()
|
||||
.serve_connection(
|
||||
TokioIo::new(stream),
|
||||
service_fn(move |req: Request<Incoming>| {
|
||||
let reports = reports.clone();
|
||||
async move {
|
||||
let bytes = req.into_body().collect().await?.to_bytes();
|
||||
let events = serde_json::from_slice(&bytes)?;
|
||||
reports.lock().unwrap().push(events);
|
||||
Ok::<_, Error>(Response::new(String::new()))
|
||||
}
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
}));
|
||||
let addr = server.local_addr();
|
||||
tokio::spawn(server);
|
||||
}
|
||||
});
|
||||
|
||||
let metrics = Metrics::default();
|
||||
let client = http::new_client();
|
||||
@@ -536,7 +538,7 @@ mod tests {
|
||||
|
||||
// no counters have been registered
|
||||
collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
|
||||
let r = std::mem::take(&mut *reports2.lock().unwrap());
|
||||
let r = std::mem::take(&mut *reports.lock().unwrap());
|
||||
assert!(r.is_empty());
|
||||
|
||||
// register a new counter
|
||||
@@ -548,7 +550,7 @@ mod tests {
|
||||
|
||||
// the counter should be observed despite 0 egress
|
||||
collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
|
||||
let r = std::mem::take(&mut *reports2.lock().unwrap());
|
||||
let r = std::mem::take(&mut *reports.lock().unwrap());
|
||||
assert_eq!(r.len(), 1);
|
||||
assert_eq!(r[0].events.len(), 1);
|
||||
assert_eq!(r[0].events[0].value, 0);
|
||||
@@ -558,7 +560,7 @@ mod tests {
|
||||
|
||||
// egress should be observered
|
||||
collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
|
||||
let r = std::mem::take(&mut *reports2.lock().unwrap());
|
||||
let r = std::mem::take(&mut *reports.lock().unwrap());
|
||||
assert_eq!(r.len(), 1);
|
||||
assert_eq!(r[0].events.len(), 1);
|
||||
assert_eq!(r[0].events[0].value, 1);
|
||||
@@ -568,7 +570,7 @@ mod tests {
|
||||
|
||||
// we do not observe the counter
|
||||
collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
|
||||
let r = std::mem::take(&mut *reports2.lock().unwrap());
|
||||
let r = std::mem::take(&mut *reports.lock().unwrap());
|
||||
assert!(r.is_empty());
|
||||
|
||||
// counter is unregistered
|
||||
|
||||
@@ -12,8 +12,8 @@ use metrics::{
|
||||
core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
|
||||
proto::MetricFamily,
|
||||
register_histogram_vec, register_int_counter, register_int_counter_pair,
|
||||
register_int_counter_pair_vec, register_int_counter_vec, Gauge, HistogramVec, IntCounter,
|
||||
IntCounterPair, IntCounterPairVec, IntCounterVec, IntGaugeVec,
|
||||
register_int_counter_pair_vec, register_int_counter_vec, register_int_gauge, Gauge,
|
||||
HistogramVec, IntCounter, IntCounterPair, IntCounterPairVec, IntCounterVec, IntGaugeVec,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
@@ -231,6 +231,14 @@ pub(crate) static EVICTION_EVENTS_COMPLETED: Lazy<IntCounterVec> = Lazy::new(||
|
||||
.expect("Failed to register metric")
|
||||
});
|
||||
|
||||
pub static NUM_EVICTED_TIMELINES: Lazy<IntGauge> = Lazy::new(|| {
|
||||
register_int_gauge!(
|
||||
"safekeeper_evicted_timelines",
|
||||
"Number of currently evicted timelines"
|
||||
)
|
||||
.expect("Failed to register metric")
|
||||
});
|
||||
|
||||
pub const LABEL_UNKNOWN: &str = "unknown";
|
||||
|
||||
/// Labels for traffic metrics.
|
||||
|
||||
@@ -631,13 +631,19 @@ impl Timeline {
|
||||
|
||||
return Err(e);
|
||||
}
|
||||
self.bootstrap(conf, broker_active_set, partial_backup_rate_limiter);
|
||||
self.bootstrap(
|
||||
shared_state,
|
||||
conf,
|
||||
broker_active_set,
|
||||
partial_backup_rate_limiter,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Bootstrap new or existing timeline starting background tasks.
|
||||
pub fn bootstrap(
|
||||
self: &Arc<Timeline>,
|
||||
_shared_state: &mut WriteGuardSharedState<'_>,
|
||||
conf: &SafeKeeperConf,
|
||||
broker_active_set: Arc<TimelinesSet>,
|
||||
partial_backup_rate_limiter: RateLimiter,
|
||||
|
||||
@@ -15,7 +15,9 @@ use tracing::{debug, info, instrument, warn};
|
||||
use utils::crashsafe::durable_rename;
|
||||
|
||||
use crate::{
|
||||
metrics::{EvictionEvent, EVICTION_EVENTS_COMPLETED, EVICTION_EVENTS_STARTED},
|
||||
metrics::{
|
||||
EvictionEvent, EVICTION_EVENTS_COMPLETED, EVICTION_EVENTS_STARTED, NUM_EVICTED_TIMELINES,
|
||||
},
|
||||
rate_limit::rand_duration,
|
||||
timeline_manager::{Manager, StateSnapshot},
|
||||
wal_backup,
|
||||
@@ -93,6 +95,7 @@ impl Manager {
|
||||
}
|
||||
|
||||
info!("successfully evicted timeline");
|
||||
NUM_EVICTED_TIMELINES.inc();
|
||||
}
|
||||
|
||||
/// Attempt to restore evicted timeline from remote storage; it must be
|
||||
@@ -128,6 +131,7 @@ impl Manager {
|
||||
tokio::time::Instant::now() + rand_duration(&self.conf.eviction_min_resident);
|
||||
|
||||
info!("successfully restored evicted timeline");
|
||||
NUM_EVICTED_TIMELINES.dec();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -25,7 +25,10 @@ use utils::lsn::Lsn;
|
||||
|
||||
use crate::{
|
||||
control_file::{FileStorage, Storage},
|
||||
metrics::{MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL, MISC_OPERATION_SECONDS},
|
||||
metrics::{
|
||||
MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL, MISC_OPERATION_SECONDS,
|
||||
NUM_EVICTED_TIMELINES,
|
||||
},
|
||||
rate_limit::{rand_duration, RateLimiter},
|
||||
recovery::recovery_main,
|
||||
remove_wal::calc_horizon_lsn,
|
||||
@@ -251,6 +254,11 @@ pub async fn main_task(
|
||||
mgr.recovery_task = Some(tokio::spawn(recovery_main(tli, mgr.conf.clone())));
|
||||
}
|
||||
|
||||
// If timeline is evicted, reflect that in the metric.
|
||||
if mgr.is_offloaded {
|
||||
NUM_EVICTED_TIMELINES.inc();
|
||||
}
|
||||
|
||||
let last_state = 'outer: loop {
|
||||
MANAGER_ITERATIONS_TOTAL.inc();
|
||||
|
||||
@@ -367,6 +375,11 @@ pub async fn main_task(
|
||||
mgr.update_wal_removal_end(res);
|
||||
}
|
||||
|
||||
// If timeline is deleted while evicted decrement the gauge.
|
||||
if mgr.tli.is_cancelled() && mgr.is_offloaded {
|
||||
NUM_EVICTED_TIMELINES.dec();
|
||||
}
|
||||
|
||||
mgr.set_status(Status::Finished);
|
||||
}
|
||||
|
||||
|
||||
@@ -165,12 +165,14 @@ impl GlobalTimelines {
|
||||
match Timeline::load_timeline(&conf, ttid) {
|
||||
Ok(timeline) => {
|
||||
let tli = Arc::new(timeline);
|
||||
let mut shared_state = tli.write_shared_state().await;
|
||||
TIMELINES_STATE
|
||||
.lock()
|
||||
.unwrap()
|
||||
.timelines
|
||||
.insert(ttid, tli.clone());
|
||||
tli.bootstrap(
|
||||
&mut shared_state,
|
||||
&conf,
|
||||
broker_active_set.clone(),
|
||||
partial_backup_rate_limiter.clone(),
|
||||
@@ -213,6 +215,7 @@ impl GlobalTimelines {
|
||||
match Timeline::load_timeline(&conf, ttid) {
|
||||
Ok(timeline) => {
|
||||
let tli = Arc::new(timeline);
|
||||
let mut shared_state = tli.write_shared_state().await;
|
||||
|
||||
// TODO: prevent concurrent timeline creation/loading
|
||||
{
|
||||
@@ -227,8 +230,13 @@ impl GlobalTimelines {
|
||||
state.timelines.insert(ttid, tli.clone());
|
||||
}
|
||||
|
||||
tli.bootstrap(&conf, broker_active_set, partial_backup_rate_limiter);
|
||||
|
||||
tli.bootstrap(
|
||||
&mut shared_state,
|
||||
&conf,
|
||||
broker_active_set,
|
||||
partial_backup_rate_limiter,
|
||||
);
|
||||
drop(shared_state);
|
||||
Ok(tli)
|
||||
}
|
||||
// If we can't load a timeline, it's bad. Caller will figure it out.
|
||||
|
||||
@@ -17,7 +17,9 @@ use std::time::Duration;
|
||||
use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr;
|
||||
use postgres_ffi::XLogFileName;
|
||||
use postgres_ffi::{XLogSegNo, PG_TLI};
|
||||
use remote_storage::{GenericRemoteStorage, ListingMode, RemotePath, StorageMetadata};
|
||||
use remote_storage::{
|
||||
DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath, StorageMetadata,
|
||||
};
|
||||
use tokio::fs::File;
|
||||
|
||||
use tokio::select;
|
||||
@@ -503,8 +505,12 @@ pub async fn read_object(
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let opts = DownloadOpts {
|
||||
byte_start: std::ops::Bound::Included(offset),
|
||||
..Default::default()
|
||||
};
|
||||
let download = storage
|
||||
.download_storage_object(Some((offset, None)), file_path, &cancel)
|
||||
.download(file_path, &opts, &cancel)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Failed to open WAL segment download stream for remote path {file_path:?}")
|
||||
|
||||
@@ -22,7 +22,7 @@ use utils::sync::gate::GateGuard;
|
||||
|
||||
use crate::compute_hook::{ComputeHook, NotifyError};
|
||||
use crate::node::Node;
|
||||
use crate::tenant_shard::{IntentState, ObservedState, ObservedStateLocation};
|
||||
use crate::tenant_shard::{IntentState, ObservedState, ObservedStateDelta, ObservedStateLocation};
|
||||
|
||||
const DEFAULT_HEATMAP_PERIOD: &str = "60s";
|
||||
|
||||
@@ -45,8 +45,15 @@ pub(super) struct Reconciler {
|
||||
pub(crate) reconciler_config: ReconcilerConfig,
|
||||
|
||||
pub(crate) config: TenantConfig,
|
||||
|
||||
/// Observed state from the point of view of the reconciler.
|
||||
/// This gets updated as the reconciliation makes progress.
|
||||
pub(crate) observed: ObservedState,
|
||||
|
||||
/// Snapshot of the observed state at the point when the reconciler
|
||||
/// was spawned.
|
||||
pub(crate) original_observed: ObservedState,
|
||||
|
||||
pub(crate) service_config: service::Config,
|
||||
|
||||
/// A hook to notify the running postgres instances when we change the location
|
||||
@@ -846,6 +853,39 @@ impl Reconciler {
|
||||
}
|
||||
}
|
||||
|
||||
/// Compare the observed state snapshot from when the reconcile was created
|
||||
/// with the final observed state in order to generate observed state deltas.
|
||||
pub(crate) fn observed_deltas(&self) -> Vec<ObservedStateDelta> {
|
||||
let mut deltas = Vec::default();
|
||||
|
||||
for (node_id, location) in &self.observed.locations {
|
||||
let previous_location = self.original_observed.locations.get(node_id);
|
||||
let do_upsert = match previous_location {
|
||||
// Location config changed for node
|
||||
Some(prev) if location.conf != prev.conf => true,
|
||||
// New location config for node
|
||||
None => true,
|
||||
// Location config has not changed for node
|
||||
_ => false,
|
||||
};
|
||||
|
||||
if do_upsert {
|
||||
deltas.push(ObservedStateDelta::Upsert(Box::new((
|
||||
*node_id,
|
||||
location.clone(),
|
||||
))));
|
||||
}
|
||||
}
|
||||
|
||||
for node_id in self.original_observed.locations.keys() {
|
||||
if !self.observed.locations.contains_key(node_id) {
|
||||
deltas.push(ObservedStateDelta::Delete(*node_id));
|
||||
}
|
||||
}
|
||||
|
||||
deltas
|
||||
}
|
||||
|
||||
/// Keep trying to notify the compute indefinitely, only dropping out if:
|
||||
/// - the node `origin` becomes unavailable -> Ok(())
|
||||
/// - the node `origin` no longer has our tenant shard attached -> Ok(())
|
||||
|
||||
@@ -28,8 +28,8 @@ use crate::{
|
||||
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
|
||||
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
|
||||
tenant_shard::{
|
||||
MigrateAttachment, ReconcileNeeded, ReconcilerStatus, ScheduleOptimization,
|
||||
ScheduleOptimizationAction,
|
||||
MigrateAttachment, ObservedStateDelta, ReconcileNeeded, ReconcilerStatus,
|
||||
ScheduleOptimization, ScheduleOptimizationAction,
|
||||
},
|
||||
};
|
||||
use anyhow::Context;
|
||||
@@ -1072,7 +1072,7 @@ impl Service {
|
||||
tenant_id=%result.tenant_shard_id.tenant_id, shard_id=%result.tenant_shard_id.shard_slug(),
|
||||
sequence=%result.sequence
|
||||
))]
|
||||
fn process_result(&self, mut result: ReconcileResult) {
|
||||
fn process_result(&self, result: ReconcileResult) {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, tenants, _scheduler) = locked.parts_mut();
|
||||
let Some(tenant) = tenants.get_mut(&result.tenant_shard_id) else {
|
||||
@@ -1094,22 +1094,27 @@ impl Service {
|
||||
|
||||
// In case a node was deleted while this reconcile is in flight, filter it out of the update we will
|
||||
// make to the tenant
|
||||
result
|
||||
.observed
|
||||
.locations
|
||||
.retain(|node_id, _loc| nodes.contains_key(node_id));
|
||||
let deltas = result.observed_deltas.into_iter().flat_map(|delta| {
|
||||
// In case a node was deleted while this reconcile is in flight, filter it out of the update we will
|
||||
// make to the tenant
|
||||
let node = nodes.get(delta.node_id())?;
|
||||
|
||||
if node.is_available() {
|
||||
return Some(delta);
|
||||
}
|
||||
|
||||
// In case a node became unavailable concurrently with the reconcile, observed
|
||||
// locations on it are now uncertain. By convention, set them to None in order
|
||||
// for them to get refreshed when the node comes back online.
|
||||
Some(ObservedStateDelta::Upsert(Box::new((
|
||||
node.get_id(),
|
||||
ObservedStateLocation { conf: None },
|
||||
))))
|
||||
});
|
||||
|
||||
match result.result {
|
||||
Ok(()) => {
|
||||
for (node_id, loc) in &result.observed.locations {
|
||||
if let Some(conf) = &loc.conf {
|
||||
tracing::info!("Updating observed location {}: {:?}", node_id, conf);
|
||||
} else {
|
||||
tracing::info!("Setting observed location {} to None", node_id,)
|
||||
}
|
||||
}
|
||||
|
||||
tenant.observed = result.observed;
|
||||
tenant.apply_observed_deltas(deltas);
|
||||
tenant.waiter.advance(result.sequence);
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -1131,9 +1136,10 @@ impl Service {
|
||||
// so that waiters will see the correct error after waiting.
|
||||
tenant.set_last_error(result.sequence, e);
|
||||
|
||||
for (node_id, o) in result.observed.locations {
|
||||
tenant.observed.locations.insert(node_id, o);
|
||||
}
|
||||
// Skip deletions on reconcile failures
|
||||
let upsert_deltas =
|
||||
deltas.filter(|delta| matches!(delta, ObservedStateDelta::Upsert(_)));
|
||||
tenant.apply_observed_deltas(upsert_deltas);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -425,6 +425,22 @@ pub(crate) enum ReconcileNeeded {
|
||||
Yes,
|
||||
}
|
||||
|
||||
/// Pending modification to the observed state of a tenant shard.
|
||||
/// Produced by [`Reconciler::observed_deltas`] and applied in [`crate::service::Service::process_result`].
|
||||
pub(crate) enum ObservedStateDelta {
|
||||
Upsert(Box<(NodeId, ObservedStateLocation)>),
|
||||
Delete(NodeId),
|
||||
}
|
||||
|
||||
impl ObservedStateDelta {
|
||||
pub(crate) fn node_id(&self) -> &NodeId {
|
||||
match self {
|
||||
Self::Upsert(up) => &up.0,
|
||||
Self::Delete(nid) => nid,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// When a reconcile task completes, it sends this result object
|
||||
/// to be applied to the primary TenantShard.
|
||||
pub(crate) struct ReconcileResult {
|
||||
@@ -437,7 +453,7 @@ pub(crate) struct ReconcileResult {
|
||||
|
||||
pub(crate) tenant_shard_id: TenantShardId,
|
||||
pub(crate) generation: Option<Generation>,
|
||||
pub(crate) observed: ObservedState,
|
||||
pub(crate) observed_deltas: Vec<ObservedStateDelta>,
|
||||
|
||||
/// Set [`TenantShard::pending_compute_notification`] from this flag
|
||||
pub(crate) pending_compute_notification: bool,
|
||||
@@ -1123,7 +1139,7 @@ impl TenantShard {
|
||||
result,
|
||||
tenant_shard_id: reconciler.tenant_shard_id,
|
||||
generation: reconciler.generation,
|
||||
observed: reconciler.observed,
|
||||
observed_deltas: reconciler.observed_deltas(),
|
||||
pending_compute_notification: reconciler.compute_notify_failure,
|
||||
}
|
||||
}
|
||||
@@ -1177,6 +1193,7 @@ impl TenantShard {
|
||||
reconciler_config,
|
||||
config: self.config.clone(),
|
||||
observed: self.observed.clone(),
|
||||
original_observed: self.observed.clone(),
|
||||
compute_hook: compute_hook.clone(),
|
||||
service_config: service_config.clone(),
|
||||
_gate_guard: gate_guard,
|
||||
@@ -1437,6 +1454,62 @@ impl TenantShard {
|
||||
.map(|(node_id, gen)| (node_id, Generation::new(gen)))
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Update the observed state of the tenant by applying incremental deltas
|
||||
///
|
||||
/// Deltas are generated by reconcilers via [`Reconciler::observed_deltas`].
|
||||
/// They are then filtered in [`crate::service::Service::process_result`].
|
||||
pub(crate) fn apply_observed_deltas(
|
||||
&mut self,
|
||||
deltas: impl Iterator<Item = ObservedStateDelta>,
|
||||
) {
|
||||
for delta in deltas {
|
||||
match delta {
|
||||
ObservedStateDelta::Upsert(ups) => {
|
||||
let (node_id, loc) = *ups;
|
||||
|
||||
// If the generation of the observed location in the delta is lagging
|
||||
// behind the current one, then we have a race condition and cannot
|
||||
// be certain about the true observed state. Set the observed state
|
||||
// to None in order to reflect this.
|
||||
let crnt_gen = self
|
||||
.observed
|
||||
.locations
|
||||
.get(&node_id)
|
||||
.and_then(|loc| loc.conf.as_ref())
|
||||
.and_then(|conf| conf.generation);
|
||||
let new_gen = loc.conf.as_ref().and_then(|conf| conf.generation);
|
||||
match (crnt_gen, new_gen) {
|
||||
(Some(crnt), Some(new)) if crnt_gen > new_gen => {
|
||||
tracing::warn!(
|
||||
"Skipping observed state update {}: {:?} and using None due to stale generation ({} > {})",
|
||||
node_id, loc, crnt, new
|
||||
);
|
||||
|
||||
self.observed
|
||||
.locations
|
||||
.insert(node_id, ObservedStateLocation { conf: None });
|
||||
|
||||
continue;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
if let Some(conf) = &loc.conf {
|
||||
tracing::info!("Updating observed location {}: {:?}", node_id, conf);
|
||||
} else {
|
||||
tracing::info!("Setting observed location {} to None", node_id,)
|
||||
}
|
||||
|
||||
self.observed.locations.insert(node_id, loc);
|
||||
}
|
||||
ObservedStateDelta::Delete(node_id) => {
|
||||
tracing::info!("Deleting observed location {}", node_id);
|
||||
self.observed.locations.remove(&node_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -7,7 +7,6 @@ import json
|
||||
import os
|
||||
import re
|
||||
import timeit
|
||||
from collections.abc import Iterator
|
||||
from contextlib import contextmanager
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
@@ -25,7 +24,8 @@ from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonPageserver
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Callable, ClassVar, Optional
|
||||
from collections.abc import Iterator, Mapping
|
||||
from typing import Callable, Optional
|
||||
|
||||
|
||||
"""
|
||||
@@ -141,6 +141,28 @@ class PgBenchRunResult:
|
||||
)
|
||||
|
||||
|
||||
# Taken from https://github.com/postgres/postgres/blob/REL_15_1/src/bin/pgbench/pgbench.c#L5144-L5171
|
||||
#
|
||||
# This used to be a class variable on PgBenchInitResult. However later versions
|
||||
# of Python complain:
|
||||
#
|
||||
# ValueError: mutable default <class 'dict'> for field EXTRACTORS is not allowed: use default_factory
|
||||
#
|
||||
# When you do what the error tells you to do, it seems to fail our Python 3.9
|
||||
# test environment. So let's just move it to a private module constant, and move
|
||||
# on.
|
||||
_PGBENCH_INIT_EXTRACTORS: Mapping[str, re.Pattern[str]] = {
|
||||
"drop_tables": re.compile(r"drop tables (\d+\.\d+) s"),
|
||||
"create_tables": re.compile(r"create tables (\d+\.\d+) s"),
|
||||
"client_side_generate": re.compile(r"client-side generate (\d+\.\d+) s"),
|
||||
"server_side_generate": re.compile(r"server-side generate (\d+\.\d+) s"),
|
||||
"vacuum": re.compile(r"vacuum (\d+\.\d+) s"),
|
||||
"primary_keys": re.compile(r"primary keys (\d+\.\d+) s"),
|
||||
"foreign_keys": re.compile(r"foreign keys (\d+\.\d+) s"),
|
||||
"total": re.compile(r"done in (\d+\.\d+) s"), # Total time printed by pgbench
|
||||
}
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class PgBenchInitResult:
|
||||
total: Optional[float]
|
||||
@@ -155,20 +177,6 @@ class PgBenchInitResult:
|
||||
start_timestamp: int
|
||||
end_timestamp: int
|
||||
|
||||
# Taken from https://github.com/postgres/postgres/blob/REL_15_1/src/bin/pgbench/pgbench.c#L5144-L5171
|
||||
EXTRACTORS: ClassVar[dict[str, re.Pattern[str]]] = dataclasses.field(
|
||||
default_factory=lambda: {
|
||||
"drop_tables": re.compile(r"drop tables (\d+\.\d+) s"),
|
||||
"create_tables": re.compile(r"create tables (\d+\.\d+) s"),
|
||||
"client_side_generate": re.compile(r"client-side generate (\d+\.\d+) s"),
|
||||
"server_side_generate": re.compile(r"server-side generate (\d+\.\d+) s"),
|
||||
"vacuum": re.compile(r"vacuum (\d+\.\d+) s"),
|
||||
"primary_keys": re.compile(r"primary keys (\d+\.\d+) s"),
|
||||
"foreign_keys": re.compile(r"foreign keys (\d+\.\d+) s"),
|
||||
"total": re.compile(r"done in (\d+\.\d+) s"), # Total time printed by pgbench
|
||||
}
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def parse_from_stderr(
|
||||
cls,
|
||||
@@ -185,7 +193,7 @@ class PgBenchInitResult:
|
||||
timings: dict[str, Optional[float]] = {}
|
||||
last_line_items = re.split(r"\(|\)|,", last_line)
|
||||
for item in last_line_items:
|
||||
for key, regex in cls.EXTRACTORS.items():
|
||||
for key, regex in _PGBENCH_INIT_EXTRACTORS.items():
|
||||
if (m := regex.match(item.strip())) is not None:
|
||||
if key in timings:
|
||||
raise RuntimeError(
|
||||
|
||||
@@ -23,3 +23,8 @@ class EndpointHttpClient(requests.Session):
|
||||
res = self.get(f"http://localhost:{self.port}/database_schema?database={database}")
|
||||
res.raise_for_status()
|
||||
return res.text
|
||||
|
||||
def installed_extensions(self):
|
||||
res = self.get(f"http://localhost:{self.port}/installed_extensions")
|
||||
res.raise_for_status()
|
||||
return res.json()
|
||||
|
||||
@@ -446,7 +446,6 @@ class NeonLocalCli(AbstractNeonCli):
|
||||
if immediate:
|
||||
cmd.extend(["-m", "immediate"])
|
||||
|
||||
log.info(f"Stopping pageserver with {cmd}")
|
||||
return self.raw_cli(cmd)
|
||||
|
||||
def safekeeper_start(
|
||||
|
||||
@@ -395,7 +395,7 @@ class NeonEnvBuilder:
|
||||
pageserver_default_tenant_config_compaction_algorithm: Optional[dict[str, Any]] = None,
|
||||
safekeeper_extra_opts: Optional[list[str]] = None,
|
||||
storage_controller_port_override: Optional[int] = None,
|
||||
pageserver_io_buffer_alignment: Optional[int] = None,
|
||||
pageserver_virtual_file_io_mode: Optional[str] = None,
|
||||
):
|
||||
self.repo_dir = repo_dir
|
||||
self.rust_log_override = rust_log_override
|
||||
@@ -449,7 +449,7 @@ class NeonEnvBuilder:
|
||||
|
||||
self.storage_controller_port_override = storage_controller_port_override
|
||||
|
||||
self.pageserver_io_buffer_alignment = pageserver_io_buffer_alignment
|
||||
self.pageserver_virtual_file_io_mode = pageserver_virtual_file_io_mode
|
||||
|
||||
assert test_name.startswith(
|
||||
"test_"
|
||||
@@ -1038,7 +1038,7 @@ class NeonEnv:
|
||||
|
||||
self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine
|
||||
self.pageserver_aux_file_policy = config.pageserver_aux_file_policy
|
||||
self.pageserver_io_buffer_alignment = config.pageserver_io_buffer_alignment
|
||||
self.pageserver_virtual_file_io_mode = config.pageserver_virtual_file_io_mode
|
||||
|
||||
# Create the neon_local's `NeonLocalInitConf`
|
||||
cfg: dict[str, Any] = {
|
||||
@@ -1102,7 +1102,8 @@ class NeonEnv:
|
||||
for key, value in override.items():
|
||||
ps_cfg[key] = value
|
||||
|
||||
ps_cfg["io_buffer_alignment"] = self.pageserver_io_buffer_alignment
|
||||
if self.pageserver_virtual_file_io_mode is not None:
|
||||
ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode
|
||||
|
||||
# Create a corresponding NeonPageserver object
|
||||
self.pageservers.append(
|
||||
@@ -1407,7 +1408,7 @@ def neon_simple_env(
|
||||
pageserver_virtual_file_io_engine: str,
|
||||
pageserver_aux_file_policy: Optional[AuxFileStore],
|
||||
pageserver_default_tenant_config_compaction_algorithm: Optional[dict[str, Any]],
|
||||
pageserver_io_buffer_alignment: Optional[int],
|
||||
pageserver_virtual_file_io_mode: Optional[str],
|
||||
) -> Iterator[NeonEnv]:
|
||||
"""
|
||||
Simple Neon environment, with no authentication and no safekeepers.
|
||||
@@ -1433,7 +1434,7 @@ def neon_simple_env(
|
||||
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
|
||||
pageserver_aux_file_policy=pageserver_aux_file_policy,
|
||||
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
|
||||
pageserver_io_buffer_alignment=pageserver_io_buffer_alignment,
|
||||
pageserver_virtual_file_io_mode=pageserver_virtual_file_io_mode,
|
||||
) as builder:
|
||||
env = builder.init_start()
|
||||
|
||||
@@ -1457,7 +1458,7 @@ def neon_env_builder(
|
||||
pageserver_default_tenant_config_compaction_algorithm: Optional[dict[str, Any]],
|
||||
pageserver_aux_file_policy: Optional[AuxFileStore],
|
||||
record_property: Callable[[str, object], None],
|
||||
pageserver_io_buffer_alignment: Optional[int],
|
||||
pageserver_virtual_file_io_mode: Optional[str],
|
||||
) -> Iterator[NeonEnvBuilder]:
|
||||
"""
|
||||
Fixture to create a Neon environment for test.
|
||||
@@ -1492,7 +1493,7 @@ def neon_env_builder(
|
||||
test_overlay_dir=test_overlay_dir,
|
||||
pageserver_aux_file_policy=pageserver_aux_file_policy,
|
||||
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
|
||||
pageserver_io_buffer_alignment=pageserver_io_buffer_alignment,
|
||||
pageserver_virtual_file_io_mode=pageserver_virtual_file_io_mode,
|
||||
) as builder:
|
||||
yield builder
|
||||
# Propogate `preserve_database_files` to make it possible to use in other fixtures,
|
||||
@@ -3914,7 +3915,6 @@ class Safekeeper(LogUtils):
|
||||
return self
|
||||
|
||||
def stop(self, immediate: bool = False) -> Safekeeper:
|
||||
log.info(f"Stopping safekeeper {self.id}")
|
||||
self.env.neon_cli.safekeeper_stop(self.id, immediate)
|
||||
self.running = False
|
||||
return self
|
||||
|
||||
@@ -41,8 +41,8 @@ def pageserver_virtual_file_io_engine() -> Optional[str]:
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def pageserver_io_buffer_alignment() -> Optional[int]:
|
||||
return None
|
||||
def pageserver_virtual_file_io_mode() -> Optional[str]:
|
||||
return os.getenv("PAGESERVER_VIRTUAL_FILE_IO_MODE")
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
|
||||
@@ -5,13 +5,13 @@ import hashlib
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Union
|
||||
|
||||
import boto3
|
||||
import toml
|
||||
from moto.server import ThreadedMotoServer
|
||||
from mypy_boto3_s3 import S3Client
|
||||
|
||||
from fixtures.common_types import TenantId, TenantShardId, TimelineId
|
||||
@@ -43,7 +43,6 @@ class RemoteStorageUser(str, enum.Enum):
|
||||
class MockS3Server:
|
||||
"""
|
||||
Starts a mock S3 server for testing on a port given, errors if the server fails to start or exits prematurely.
|
||||
Relies that `poetry` and `moto` server are installed, since it's the way the tests are run.
|
||||
|
||||
Also provides a set of methods to derive the connection properties from and the method to kill the underlying server.
|
||||
"""
|
||||
@@ -53,22 +52,8 @@ class MockS3Server:
|
||||
port: int,
|
||||
):
|
||||
self.port = port
|
||||
|
||||
# XXX: do not use `shell=True` or add `exec ` to the command here otherwise.
|
||||
# We use `self.subprocess.kill()` to shut down the server, which would not "just" work in Linux
|
||||
# if a process is started from the shell process.
|
||||
self.subprocess = subprocess.Popen(["poetry", "run", "moto_server", f"-p{port}"])
|
||||
error = None
|
||||
try:
|
||||
return_code = self.subprocess.poll()
|
||||
if return_code is not None:
|
||||
error = f"expected mock s3 server to run but it exited with code {return_code}. stdout: '{self.subprocess.stdout}', stderr: '{self.subprocess.stderr}'"
|
||||
except Exception as e:
|
||||
error = f"expected mock s3 server to start but it failed with exception: {e}. stdout: '{self.subprocess.stdout}', stderr: '{self.subprocess.stderr}'"
|
||||
if error is not None:
|
||||
log.error(error)
|
||||
self.kill()
|
||||
raise RuntimeError("failed to start s3 mock server")
|
||||
self.server = ThreadedMotoServer(port=port)
|
||||
self.server.start()
|
||||
|
||||
def endpoint(self) -> str:
|
||||
return f"http://127.0.0.1:{self.port}"
|
||||
@@ -83,7 +68,7 @@ class MockS3Server:
|
||||
return "test"
|
||||
|
||||
def kill(self):
|
||||
self.subprocess.kill()
|
||||
self.server.stop()
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -192,7 +192,7 @@ def tpch_queuies() -> tuple[ParameterSet, ...]:
|
||||
- querues in returning tuple are ordered by the query number
|
||||
- pytest parameters id is adjusted to match the query id (the numbering starts from 1)
|
||||
"""
|
||||
queries_dir = Path(__file__).parent / "performance" / "tpc-h" / "queries"
|
||||
queries_dir = Path(__file__).parent / "tpc-h" / "queries"
|
||||
assert queries_dir.exists(), f"TPC-H queries dir not found: {queries_dir}"
|
||||
|
||||
return tuple(
|
||||
|
||||
87
test_runner/regress/test_installed_extensions.py
Normal file
87
test_runner/regress/test_installed_extensions.py
Normal file
@@ -0,0 +1,87 @@
|
||||
from logging import info
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
def test_installed_extensions(neon_simple_env: NeonEnv):
|
||||
"""basic test for the endpoint that returns the list of installed extensions"""
|
||||
|
||||
env = neon_simple_env
|
||||
|
||||
env.create_branch("test_installed_extensions")
|
||||
|
||||
endpoint = env.endpoints.create_start("test_installed_extensions")
|
||||
|
||||
endpoint.safe_psql("CREATE DATABASE test_installed_extensions")
|
||||
endpoint.safe_psql("CREATE DATABASE test_installed_extensions_2")
|
||||
|
||||
client = endpoint.http_client()
|
||||
res = client.installed_extensions()
|
||||
|
||||
info("Extensions list: %s", res)
|
||||
info("Extensions: %s", res["extensions"])
|
||||
# 'plpgsql' is a default extension that is always installed.
|
||||
assert any(
|
||||
ext["extname"] == "plpgsql" and ext["versions"] == ["1.0"] for ext in res["extensions"]
|
||||
), "The 'plpgsql' extension is missing"
|
||||
|
||||
# check that the neon_test_utils extension is not installed
|
||||
assert not any(
|
||||
ext["extname"] == "neon_test_utils" for ext in res["extensions"]
|
||||
), "The 'neon_test_utils' extension is installed"
|
||||
|
||||
pg_conn = endpoint.connect(dbname="test_installed_extensions")
|
||||
with pg_conn.cursor() as cur:
|
||||
cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
cur.execute(
|
||||
"SELECT default_version FROM pg_available_extensions WHERE name = 'neon_test_utils'"
|
||||
)
|
||||
res = cur.fetchone()
|
||||
neon_test_utils_version = res[0]
|
||||
|
||||
with pg_conn.cursor() as cur:
|
||||
cur.execute("CREATE EXTENSION neon version '1.1'")
|
||||
|
||||
pg_conn_2 = endpoint.connect(dbname="test_installed_extensions_2")
|
||||
with pg_conn_2.cursor() as cur:
|
||||
cur.execute("CREATE EXTENSION neon version '1.2'")
|
||||
|
||||
res = client.installed_extensions()
|
||||
|
||||
info("Extensions list: %s", res)
|
||||
info("Extensions: %s", res["extensions"])
|
||||
|
||||
# check that the neon_test_utils extension is installed only in 1 database
|
||||
# and has the expected version
|
||||
assert any(
|
||||
ext["extname"] == "neon_test_utils"
|
||||
and ext["versions"] == [neon_test_utils_version]
|
||||
and ext["n_databases"] == 1
|
||||
for ext in res["extensions"]
|
||||
)
|
||||
|
||||
# check that the plpgsql extension is installed in all databases
|
||||
# this is a default extension that is always installed
|
||||
assert any(ext["extname"] == "plpgsql" and ext["n_databases"] == 4 for ext in res["extensions"])
|
||||
|
||||
# check that the neon extension is installed and has expected versions
|
||||
for ext in res["extensions"]:
|
||||
if ext["extname"] == "neon":
|
||||
assert ext["n_databases"] == 2
|
||||
ext["versions"].sort()
|
||||
assert ext["versions"] == ["1.1", "1.2"]
|
||||
|
||||
with pg_conn.cursor() as cur:
|
||||
cur.execute("ALTER EXTENSION neon UPDATE TO '1.3'")
|
||||
|
||||
res = client.installed_extensions()
|
||||
|
||||
info("Extensions list: %s", res)
|
||||
info("Extensions: %s", res["extensions"])
|
||||
|
||||
# check that the neon_test_utils extension is updated
|
||||
for ext in res["extensions"]:
|
||||
if ext["extname"] == "neon":
|
||||
assert ext["n_databases"] == 2
|
||||
ext["versions"].sort()
|
||||
assert ext["versions"] == ["1.2", "1.3"]
|
||||
@@ -666,14 +666,17 @@ def test_upgrade_generationless_local_file_paths(
|
||||
pageserver.stop()
|
||||
timeline_dir = pageserver.timeline_dir(tenant_id, timeline_id)
|
||||
files_renamed = 0
|
||||
log.info(f"Renaming files in {timeline_dir}")
|
||||
for filename in os.listdir(timeline_dir):
|
||||
path = os.path.join(timeline_dir, filename)
|
||||
log.info(f"Found file {path}")
|
||||
if path.endswith("-v1-00000001"):
|
||||
new_path = path[:-12]
|
||||
os.rename(path, new_path)
|
||||
log.info(f"Renamed {path} -> {new_path}")
|
||||
if filename.endswith("-v1-00000001"):
|
||||
new_filename = filename[:-12]
|
||||
os.rename(
|
||||
os.path.join(timeline_dir, filename), os.path.join(timeline_dir, new_filename)
|
||||
)
|
||||
log.info(f"Renamed {filename} -> {new_filename}")
|
||||
files_renamed += 1
|
||||
else:
|
||||
log.info(f"Keeping {filename}")
|
||||
|
||||
assert files_renamed > 0
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ from contextlib import closing
|
||||
from dataclasses import dataclass, field
|
||||
from functools import partial
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, List
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import psycopg2
|
||||
import psycopg2.errors
|
||||
@@ -735,7 +735,7 @@ class ProposerPostgres(PgProtocol):
|
||||
"""Path to postgresql.conf"""
|
||||
return os.path.join(self.pgdata_dir, "postgresql.conf")
|
||||
|
||||
def create_dir_config(self, safekeepers: str, additional_config_options: Optional[dict[str,str]] = None):
|
||||
def create_dir_config(self, safekeepers: str):
|
||||
"""Create dir and config for running --sync-safekeepers"""
|
||||
|
||||
Path(self.pg_data_dir_path()).mkdir(exist_ok=True)
|
||||
@@ -750,9 +750,6 @@ class ProposerPostgres(PgProtocol):
|
||||
f"listen_addresses = '{self.listen_addr}'\n",
|
||||
f"port = '{self.port}'\n",
|
||||
]
|
||||
if additional_config_options:
|
||||
for key, value in additional_config_options.items():
|
||||
cfg.append(f"{key} = '{value}'\n")
|
||||
|
||||
f.writelines(cfg)
|
||||
|
||||
@@ -1449,7 +1446,6 @@ class SafekeeperEnv:
|
||||
pg_bin: PgBin,
|
||||
neon_binpath: Path,
|
||||
num_safekeepers: int = 1,
|
||||
pg_conf_options: Optional[dict[str, str]] = None,
|
||||
):
|
||||
self.repo_dir = repo_dir
|
||||
self.port_distributor = port_distributor
|
||||
@@ -1461,7 +1457,6 @@ class SafekeeperEnv:
|
||||
self.postgres: Optional[ProposerPostgres] = None
|
||||
self.tenant_id: Optional[TenantId] = None
|
||||
self.timeline_id: Optional[TimelineId] = None
|
||||
self.pg_conf_options = pg_conf_options
|
||||
|
||||
def init(self) -> SafekeeperEnv:
|
||||
assert self.postgres is None, "postgres is already initialized"
|
||||
@@ -1504,7 +1499,6 @@ class SafekeeperEnv:
|
||||
str(i),
|
||||
"--broker-endpoint",
|
||||
self.fake_broker_endpoint,
|
||||
# "--no-sync",
|
||||
]
|
||||
log.info(f'Running command "{" ".join(cmd)}"')
|
||||
|
||||
@@ -1539,7 +1533,7 @@ class SafekeeperEnv:
|
||||
self.port_distributor.get_port(),
|
||||
)
|
||||
pg.initdb()
|
||||
pg.create_dir_config(self.get_safekeeper_connstrs(), self.pg_conf_options)
|
||||
pg.create_dir_config(self.get_safekeeper_connstrs())
|
||||
return pg
|
||||
|
||||
def kill_safekeeper(self, sk_dir):
|
||||
@@ -1564,224 +1558,6 @@ class SafekeeperEnv:
|
||||
self.kill_safekeeper(sk_proc.args[6])
|
||||
|
||||
|
||||
def run_pg_restore(pg_bin: PgBin, dump_file: Path, postgres: ProposerPostgres, table_names: List[str]):
|
||||
# env_vars = {
|
||||
# "PGOPTIONS": "-c maintenance_work_mem=4388608 -c max_parallel_maintenance_workers=7",
|
||||
# }
|
||||
|
||||
postgres.connstr
|
||||
|
||||
pg_restore_command: List[str] = [
|
||||
"pg_restore",
|
||||
"-v", # Verbose output
|
||||
"-d", postgres.connstr(options='-c maintenance_work_mem=4388608 -c max_parallel_maintenance_workers=7 -cstatement_timeout=0'), # Target database
|
||||
"--no-owner", # Do not restore ownership
|
||||
"--jobs=4", # Number of parallel jobs
|
||||
str(dump_file), # Dump file
|
||||
]
|
||||
|
||||
# Add table names to the command
|
||||
for table in table_names:
|
||||
pg_restore_command.insert(-2, "-t")
|
||||
pg_restore_command.insert(-2, table)
|
||||
|
||||
pg_bin.run(pg_restore_command)
|
||||
return None
|
||||
|
||||
|
||||
def download_pg_dump(database_name: str) -> Path:
|
||||
dump_file_path = Path(f"/tmp/{database_name}.pg_dump")
|
||||
if not dump_file_path.exists():
|
||||
s3_path = f"s3://neon-github-dev/performance/pgdumps/{database_name}/{database_name}.pg_dump"
|
||||
try:
|
||||
log.info(f"Downloading {s3_path} to {dump_file_path}")
|
||||
subprocess.run(["aws", "s3", "cp", s3_path, str(dump_file_path)], check=True)
|
||||
except subprocess.CalledProcessError:
|
||||
log.error("Failed to download the pg_dump file. Ensure AWS S3 credentials are set in the environment variables.")
|
||||
raise
|
||||
return dump_file_path
|
||||
|
||||
def test_safekeeper_without_pageserver_and_pg_restore(
|
||||
test_output_dir: str,
|
||||
port_distributor: PortDistributor,
|
||||
pg_bin: PgBin,
|
||||
neon_binpath: Path,
|
||||
):
|
||||
# Create the environment in the test-specific output dir
|
||||
repo_dir = Path(os.path.join(test_output_dir, "repo"))
|
||||
|
||||
# Download the pg_dump file if it doesn't exist
|
||||
database_name = "clickbench" # Replace with your database name
|
||||
dump_file_path = download_pg_dump(database_name)
|
||||
|
||||
pg_conf_options: dict[str, str] = {
|
||||
"shared_buffers": "8GB",
|
||||
}
|
||||
|
||||
env = SafekeeperEnv(
|
||||
repo_dir,
|
||||
port_distributor,
|
||||
pg_bin,
|
||||
neon_binpath,
|
||||
pg_conf_options=pg_conf_options,
|
||||
)
|
||||
|
||||
with env:
|
||||
env.init()
|
||||
assert env.postgres is not None
|
||||
shared_buffers = env.postgres.safe_psql("show shared_buffers")[0][0]
|
||||
log.info(f"shared_buffers: {shared_buffers}")
|
||||
size_before = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
|
||||
log.info(f"Database size before restore: {size_before}")
|
||||
|
||||
start_time = time.time()
|
||||
run_pg_restore(pg_bin, dump_file_path, env.postgres, ["hits"])
|
||||
end_time = time.time()
|
||||
duration = end_time - start_time
|
||||
size_after = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
|
||||
log.info(f"Database size after restore: {size_after}")
|
||||
# Calculate the restore rate in bytes/second
|
||||
restored_size = size_after - size_before
|
||||
restore_rate = restored_size / duration
|
||||
log.info(f"pg_restore duration: {duration:.2f} seconds")
|
||||
log.info(f"Restore rate: {restore_rate:.2f} bytes/second")
|
||||
|
||||
def test_safekeeper_without_pageserver_and_pg_restore_tpch(
|
||||
test_output_dir: str,
|
||||
port_distributor: PortDistributor,
|
||||
pg_bin: PgBin,
|
||||
neon_binpath: Path,
|
||||
):
|
||||
# Create the environment in the test-specific output dir
|
||||
repo_dir = Path(os.path.join(test_output_dir, "repo"))
|
||||
|
||||
# Download the pg_dump file if it doesn't exist
|
||||
database_name = "tpch" # Replace with your database name
|
||||
dump_file_path = download_pg_dump(database_name)
|
||||
|
||||
pg_conf_options: dict[str, str] = {
|
||||
"shared_buffers": "8GB",
|
||||
}
|
||||
|
||||
env = SafekeeperEnv(
|
||||
repo_dir,
|
||||
port_distributor,
|
||||
pg_bin,
|
||||
neon_binpath,
|
||||
pg_conf_options=pg_conf_options,
|
||||
)
|
||||
|
||||
with env:
|
||||
env.init()
|
||||
assert env.postgres is not None
|
||||
shared_buffers = env.postgres.safe_psql("show shared_buffers")[0][0]
|
||||
log.info(f"shared_buffers: {shared_buffers}")
|
||||
size_before = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
|
||||
log.info(f"Database size before restore: {size_before}")
|
||||
|
||||
table_names = ["customer", "lineitem", "nation", "orders", "part", "partsupp", "region", "supplier"]
|
||||
start_time = time.time()
|
||||
run_pg_restore(pg_bin, dump_file_path, env.postgres, table_names)
|
||||
end_time = time.time()
|
||||
duration = end_time - start_time
|
||||
size_after = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
|
||||
log.info(f"Database size after restore: {size_after}")
|
||||
# Calculate the restore rate in bytes/second
|
||||
restored_size = size_after - size_before
|
||||
restore_rate = restored_size / duration
|
||||
log.info(f"pg_restore duration: {duration:.2f} seconds")
|
||||
log.info(f"Restore rate: {restore_rate:.2f} bytes/second")
|
||||
|
||||
def test_safekeeper_without_pageserver_and_waltest(
|
||||
test_output_dir: str,
|
||||
port_distributor: PortDistributor,
|
||||
pg_bin: PgBin,
|
||||
neon_binpath: Path,
|
||||
):
|
||||
# Create the environment in the test-specific output dir
|
||||
repo_dir = Path(os.path.join(test_output_dir, "repo"))
|
||||
|
||||
pg_conf_options: dict[str, str] = {
|
||||
"shared_buffers": "8GB",
|
||||
}
|
||||
|
||||
env = SafekeeperEnv(
|
||||
repo_dir,
|
||||
port_distributor,
|
||||
pg_bin,
|
||||
neon_binpath,
|
||||
pg_conf_options=pg_conf_options,
|
||||
)
|
||||
|
||||
wal_test = """
|
||||
CREATE OR REPLACE FUNCTION public.wal_bandwidth_test()
|
||||
RETURNS text
|
||||
LANGUAGE plpgsql
|
||||
AS $function$
|
||||
declare
|
||||
i int;
|
||||
j int;
|
||||
lastlsn pg_lsn;
|
||||
nowlsn pg_lsn;
|
||||
lastts timestamp;
|
||||
nowts timestamp;
|
||||
bandwidth numeric;
|
||||
|
||||
result text := ''; -- Initialize an empty string to capture messages
|
||||
begin
|
||||
for i in 1..100 loop
|
||||
|
||||
lastlsn = pg_current_wal_insert_lsn();
|
||||
lastts = clock_timestamp();
|
||||
|
||||
-- Emit 100 MB of WAL
|
||||
for j in 1..10000 loop
|
||||
perform pg_logical_emit_message(false, '', repeat('x', 10486));
|
||||
end loop;
|
||||
|
||||
nowlsn = pg_current_wal_insert_lsn();
|
||||
nowts = clock_timestamp();
|
||||
|
||||
bandwidth = (nowlsn - lastlsn) / (extract(epoch from nowts) - extract(epoch from lastts));
|
||||
|
||||
-- Capture the message instead of raising a notice
|
||||
result := result || format('bandwidth: %s kB / s%s',
|
||||
lpad(round(bandwidth / 1024)::text, 10),
|
||||
chr(10)); -- Newline for formatting
|
||||
|
||||
end loop;
|
||||
|
||||
return result; -- Return the concatenated string of messages
|
||||
end;
|
||||
$function$
|
||||
;
|
||||
"""
|
||||
|
||||
with env:
|
||||
env.init()
|
||||
assert env.postgres is not None
|
||||
shared_buffers = env.postgres.safe_psql("show shared_buffers")[0][0]
|
||||
log.info(f"shared_buffers: {shared_buffers}")
|
||||
size_before = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
|
||||
log.info(f"Database size before test: {size_before}")
|
||||
#env.postgres.safe_psql("create extension neon;")
|
||||
env.postgres.safe_psql(wal_test)
|
||||
start_time = time.time()
|
||||
output = env.postgres.safe_psql("""
|
||||
SET statement_timeout = 0;
|
||||
select wal_bandwidth_test();
|
||||
""")[0][0]
|
||||
end_time = time.time()
|
||||
duration = end_time - start_time
|
||||
log.info(output)
|
||||
size_after = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
|
||||
log.info(f"Database size after test (irrelevant because no real WAL records, no relations): {size_after}")
|
||||
# Calculate the restore rate in bytes/second
|
||||
restored_size = size_after - size_before
|
||||
restore_rate = restored_size / duration
|
||||
log.info(f"test duration: {duration:.2f} seconds")
|
||||
log.info(f"Average ingest rate(irrelevant because no real WAL records, no relations): {restore_rate:.2f} bytes/second")
|
||||
|
||||
def test_safekeeper_without_pageserver(
|
||||
test_output_dir: str,
|
||||
port_distributor: PortDistributor,
|
||||
@@ -2538,12 +2314,12 @@ def test_s3_eviction(
|
||||
]
|
||||
if delete_offloaded_wal:
|
||||
neon_env_builder.safekeeper_extra_opts.append("--delete-offloaded-wal")
|
||||
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
"checkpoint_timeout": "100ms",
|
||||
}
|
||||
)
|
||||
# make lagging_wal_timeout small to force pageserver quickly forget about
|
||||
# safekeeper after it stops sending updates (timeline is deactivated) to
|
||||
# make test faster. Won't be needed with
|
||||
# https://github.com/neondatabase/neon/issues/8148 fixed.
|
||||
initial_tenant_conf = {"lagging_wal_timeout": "1s", "checkpoint_timeout": "100ms"}
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=initial_tenant_conf)
|
||||
|
||||
n_timelines = 5
|
||||
|
||||
@@ -2631,9 +2407,37 @@ def test_s3_eviction(
|
||||
and sk.log_contains("successfully restored evicted timeline")
|
||||
for sk in env.safekeepers
|
||||
)
|
||||
|
||||
assert event_metrics_seen
|
||||
|
||||
# test safekeeper_evicted_timelines metric
|
||||
log.info("testing safekeeper_evicted_timelines metric")
|
||||
# checkpoint pageserver to force remote_consistent_lsn update
|
||||
for i in range(n_timelines):
|
||||
ps_client.timeline_checkpoint(env.initial_tenant, timelines[i], wait_until_uploaded=True)
|
||||
for ep in endpoints:
|
||||
log.info(ep.is_running())
|
||||
sk = env.safekeepers[0]
|
||||
|
||||
# all timelines must be evicted eventually
|
||||
def all_evicted():
|
||||
n_evicted = sk.http_client().get_metric_value("safekeeper_evicted_timelines")
|
||||
assert n_evicted # make mypy happy
|
||||
assert int(n_evicted) == n_timelines
|
||||
|
||||
wait_until(60, 0.5, all_evicted)
|
||||
# restart should preserve the metric value
|
||||
sk.stop().start()
|
||||
wait_until(60, 0.5, all_evicted)
|
||||
# and endpoint start should reduce is
|
||||
endpoints[0].start()
|
||||
|
||||
def one_unevicted():
|
||||
n_evicted = sk.http_client().get_metric_value("safekeeper_evicted_timelines")
|
||||
assert n_evicted # make mypy happy
|
||||
assert int(n_evicted) < n_timelines
|
||||
|
||||
wait_until(60, 0.5, one_unevicted)
|
||||
|
||||
|
||||
# Test resetting uploaded partial segment state.
|
||||
def test_backup_partial_reset(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
@@ -58,6 +58,7 @@ num-integer = { version = "0.1", features = ["i128"] }
|
||||
num-traits = { version = "0.2", features = ["i128", "libm"] }
|
||||
once_cell = { version = "1" }
|
||||
parquet = { version = "53", default-features = false, features = ["zstd"] }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2", default-features = false, features = ["with-serde_json-1"] }
|
||||
prost = { version = "0.13", features = ["prost-derive"] }
|
||||
rand = { version = "0.8", features = ["small_rng"] }
|
||||
regex = { version = "1" }
|
||||
@@ -66,7 +67,7 @@ regex-syntax = { version = "0.8" }
|
||||
reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls", "stream"] }
|
||||
scopeguard = { version = "1" }
|
||||
serde = { version = "1", features = ["alloc", "derive"] }
|
||||
serde_json = { version = "1", features = ["raw_value"] }
|
||||
serde_json = { version = "1", features = ["alloc", "raw_value"] }
|
||||
sha2 = { version = "0.10", features = ["asm", "oid"] }
|
||||
signature = { version = "2", default-features = false, features = ["digest", "rand_core", "std"] }
|
||||
smallvec = { version = "1", default-features = false, features = ["const_new", "write"] }
|
||||
@@ -76,6 +77,7 @@ sync_wrapper = { version = "0.1", default-features = false, features = ["futures
|
||||
tikv-jemalloc-sys = { version = "0.5" }
|
||||
time = { version = "0.3", features = ["macros", "serde-well-known"] }
|
||||
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2", features = ["with-serde_json-1"] }
|
||||
tokio-stream = { version = "0.1", features = ["net"] }
|
||||
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
|
||||
toml_edit = { version = "0.22", features = ["serde"] }
|
||||
|
||||
Reference in New Issue
Block a user