Compare commits

..

16 Commits

Author SHA1 Message Date
Jere Vaara
d6c661ccb6 add missing string to format 2024-10-11 09:20:46 +03:00
David Gomes
0abf0f6dce quick test 2024-10-11 07:48:45 +02:00
Tristan Partin
878135fe9c Move PgBenchInitResult.EXTRACTORS to a private module constant
This seems to paper over a behavioral difference in Python 3.9 and
Python 3.12 with how dataclasses work with mutable variables. On Python
3.12, I get the following error:

ValueError: mutable default <class 'dict'> for field EXTRACTORS is not allowed: use default_factory

This obviously doesn't occur in our testing environment. When I do what
the error tells me, EXTRACTORS doesn't seem to exist as an attribute on
the class in at least Python 3.9.

The solution provided in this commit seems like the least amount of
friction to keep the wheels turning.

Signed-off-by: Tristan Partin <tristan@neon.tech>
2024-10-09 14:02:09 -05:00
Conrad Ludgate
75434060a5 local_proxy: integrate with pg_session_jwt extension (#9086) 2024-10-09 18:24:10 +01:00
Anastasia Lubennikova
721803a0e7 Add partial support of extensions for v17: (#9322)
- PostGIS 3.5.0
- pgrouting 3.6.2
- h3 4.1.3
- unit 7.9
- pgjwt version (f3d82fd)
- pg_hashids 1.2.1
- ip4r 2.4.2
- prefix 1.2.10
- postgresql-hll 2.18
- pg_roaringbitmap 0.5.4
- pg-semver 0.40.0

update support of extensions for v14-v16:
- unit 7.7 -> 7.9
- pgjwt 9742dab -> f3d82fd

---------

Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2024-10-09 17:07:59 +01:00
Fedor Dikarev
108a211917 added workflow Report Workflow Stats (#9330)
## Summary of changes
CI: Collect stats for Github Workflows Runs
2024-10-09 17:27:41 +02:00
Heikki Linnakangas
72ef0e0fa1 tests: Remove redundant log lines when stopping storage nodes (#9317)
The neon_cli functions print the command that gets executed, which
contains the same information.

Before:

    2024-10-07 22:32:28.884 INFO [neon_fixtures.py:3927] Stopping safekeeper 1
    2024-10-07 22:32:28.884 INFO [neon_cli.py:73] Running command "/tmp/neon/bin/neon_local safekeeper stop 1"
    2024-10-07 22:32:28.989 INFO [neon_fixtures.py:3927] Stopping safekeeper 2
    2024-10-07 22:32:28.989 INFO [neon_cli.py:73] Running command "/tmp/neon/bin/neon_local safekeeper stop 2"
    2024-10-07 22:32:29.93 INFO [neon_fixtures.py:3927] Stopping safekeeper 3
    2024-10-07 22:32:29.94 INFO [neon_cli.py:73] Running command "/tmp/neon/bin/neon_local safekeeper stop 3"
    2024-10-07 22:32:29.251 INFO [neon_cli.py:450] Stopping pageserver with ['pageserver', 'stop', '--id=1']
    2024-10-07 22:32:29.251 INFO [neon_cli.py:73] Running command "/tmp/neon/bin/neon_local pageserver stop --id=1"

After:

    2024-10-07 22:32:28.884 INFO [neon_cli.py:73] Running command "/tmp/neon/bin/neon_local safekeeper stop 1"
    2024-10-07 22:32:28.989 INFO [neon_cli.py:73] Running command "/tmp/neon/bin/neon_local safekeeper stop 2"
    2024-10-07 22:32:29.94 INFO [neon_cli.py:73] Running command "/tmp/neon/bin/neon_local safekeeper stop 3"
    2024-10-07 22:32:29.251 INFO [neon_cli.py:73] Running command "/tmp/neon/bin/neon_local pageserver stop --id=1"
2024-10-09 15:51:34 +03:00
Heikki Linnakangas
eb23d355a9 tests: Use ThreadedMotoServer python class to launch mock S3 server (#9313)
This is simpler than using subprocess.

One difference is in how moto's log output is now collected. Previously,
moto's logs went to stderr, and were collected and printed at the end of
the test by pytest, like this:

    2024-10-07T22:45:12.3705222Z ----------------------------- Captured stderr call -----------------------------
    2024-10-07T22:45:12.3705577Z 127.0.0.1 - - [07/Oct/2024 22:35:14] "PUT /pageserver-test-deletion-queue-2e6efa8245ec92a37a07004569c29eb7 HTTP/1.1" 200 -
    2024-10-07T22:45:12.3706181Z 127.0.0.1 - - [07/Oct/2024 22:35:15] "GET /pageserver-test-deletion-queue-2e6efa8245ec92a37a07004569c29eb7/?list-type=2&delimiter=/&prefix=/tenants/43da25eac0f41412696dd31b94dbb83c/timelines/ HTTP/1.1" 200 -
    2024-10-07T22:45:12.3706894Z 127.0.0.1 - - [07/Oct/2024 22:35:16] "PUT /pageserver-test-deletion-queue-2e6efa8245ec92a37a07004569c29eb7//tenants/43da25eac0f41412696dd31b94dbb83c/timelines/eabba5f0c1c72c8656d3ef1d85b98c1d/initdb.tar.zst?x-id=PutObject HTTP/1.1" 200 -

Note the timestamps: the timestamp at the beginning of the line is the
time that the stderr was dumped, i.e. the end of the test, which makes
those timestamps rather useless. The timestamp in the middle of the line
is when the operation actually happened, but it has only 1 s
granularity.

With this change, moto's log lines are printed in the "live log call"
section, as they happen, which makes the timestamps more useful:

    2024-10-08 12:12:31.129 INFO [_internal.py:97] 127.0.0.1 - - [08/Oct/2024 12:12:31] "GET /pageserver-test-deletion-queue-e24e7525d437e1874d8a52030dcabb4f/?list-type=2&delimiter=/&prefix=/tenants/7b6a16b1460eda5204083fba78bc360f/timelines/ HTTP/1.1" 200 -
    2024-10-08 12:12:32.612 INFO [_internal.py:97] 127.0.0.1 - - [08/Oct/2024 12:12:32] "PUT /pageserver-test-deletion-queue-e24e7525d437e1874d8a52030dcabb4f//tenants/7b6a16b1460eda5204083fba78bc360f/timelines/7ab4c2b67fa8c712cada207675139877/initdb.tar.zst?x-id=PutObject HTTP/1.1" 200 -
2024-10-09 15:34:51 +03:00
Yuchen Liang
bee04b8a69 pageserver: add direct io config to virtual file (#9214)
## Problem
We need a way to incrementally switch to direct IO. During the rollout
we might want to switch to O_DIRECT on image and delta layer read path
first before others.

## Summary of changes
- Revisited and simplified direct io config in `PageserverConf`. 
- We could add a fallback mode for open, but for read there isn't a
reasonable alternative (without creating another buffered virtual file).
- Added a wrapper around `VirtualFile`, current implementation become
`VirtualFileInner`
- Use `open_v2`, `create_v2`, `open_with_options_v2` when we want to use
the IO mode specified in PS config.
- Once we onboard all IO through VirtualFile using this new API, we will
delete the old code path.
- Make io mode live configurable for benchmarking.
- Only guaranteed for files opened after the config change, so do it
before the experiment.

As an example, we are using `open_v2` with
`virtual_file::IoMode::Direct` in
https://github.com/neondatabase/neon/pull/9169

We also remove `io_buffer_alignment` config in
a04cfd754b and use it as a compile time
constant. This way we don't have to carry the alignment around or make
frequent call to retrieve this information from the static variable.

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-09 08:33:07 -04:00
Anastasia Lubennikova
63e7fab990 Add /installed_extensions endpoint to collect statistics about extension usage. (#8917)
Add /installed_extensions endpoint to collect
statistics about extension usage.
It returns a list of installed extensions in the format:

```json
{
  "extensions": [
    {
      "extname": "extension_name",
      "versions": ["1.0", "1.1"],
      "n_databases": 5,
    }
  ]
}
```

---------

Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2024-10-09 13:32:13 +01:00
Arseny Sher
a181392738 safekeeper: add evicted_timelines gauge. (#9318)
showing total number of evicted timelines.
2024-10-09 14:40:30 +03:00
Alexander Bayandin
fc7397122c test_runner: fix path to tpc-h queries (#9327)
## Problem

The path to TPC-H queries was incorrectly changed in #9306.
This path is used for `test_tpch` parameterization, so all perf tests
started to fail:

```
==================================== ERRORS ====================================
__________ ERROR collecting test_runner/performance/test_perf_olap.py __________
test_runner/performance/test_perf_olap.py:205: in <module>
    @pytest.mark.parametrize("query", tpch_queuies())
test_runner/performance/test_perf_olap.py:196: in tpch_queuies
    assert queries_dir.exists(), f"TPC-H queries dir not found: {queries_dir}"
E   AssertionError: TPC-H queries dir not found: /__w/neon/neon/test_runner/performance/performance/tpc-h/queries
E   assert False
E    +  where False = <bound method Path.exists of PosixPath('/__w/neon/neon/test_runner/performance/performance/tpc-h/queries')>()
E    +    where <bound method Path.exists of PosixPath('/__w/neon/neon/test_runner/performance/performance/tpc-h/queries')> = PosixPath('/__w/neon/neon/test_runner/performance/performance/tpc-h/queries').exists
```

## Summary of changes
- Fix the path to tpc-h queries
2024-10-09 12:11:06 +01:00
Vlad Lazar
cc599e23c1 storcon: make observed state updates more granular (#9276)
## Problem

Previously, observed state updates from the reconciler may have
clobbered inline changes made to the observed state by other code paths.

## Summary of changes

Model observed state changes from reconcilers as deltas. This means that
we only update what has changed. Handling for node going off-line concurrently
during the reconcile is also added: set observed state to None in such cases to
respect the convention.

Closes https://github.com/neondatabase/neon/issues/9124
2024-10-09 11:53:29 +01:00
Folke Behrens
54d1185789 proxy: Unalias hyper1 and replace one use of hyper0 in test (#9324)
Leaves one final use of hyper0 in proxy for the health service,
which requires some coordinated effort with other services.
2024-10-09 12:44:17 +02:00
Heikki Linnakangas
8a138db8b7 tests: Reduce noise from logging renamed files (#9315)
Instead of printing the full absolute path for every file, print just
the filenames.

Before:

    2024-10-08 13:19:39.98 INFO [test_pageserver_generations.py:669] Found file /home/heikki/git-sandbox/neon/test_output/test_upgrade_generationless_local_file_paths[debug-pg16]/repo/pageserver_1/tenants/0c04a8df7691a367ad0bb1cc1373ba4d/timelines/f41022551e5f96ce8dbefb9b5d35ab45/000000067F0000000100000A8D0100000000-000000067F0000000100000AC10000000002__00000000014F16F0-v1-00000001
    2024-10-08 13:19:39.99 INFO [test_pageserver_generations.py:673] Renamed /home/heikki/git-sandbox/neon/test_output/test_upgrade_generationless_local_file_paths[debug-pg16]/repo/pageserver_1/tenants/0c04a8df7691a367ad0bb1cc1373ba4d/timelines/f41022551e5f96ce8dbefb9b5d35ab45/000000067F0000000100000A8D0100000000-000000067F0000000100000AC10000000002__00000000014F16F0-v1-00000001 -> /home/heikki/git-sandbox/neon/test_output/test_upgrade_generationless_local_file_paths[debug-pg16]/repo/pageserver_1/tenants/0c04a8df7691a367ad0bb1cc1373ba4d/timelines/f41022551e5f96ce8dbefb9b5d35ab45/000000067F0000000100000A8D0100000000-000000067F0000000100000AC10000000002__00000000014F16F0

After:

    2024-10-08 13:24:39.726 INFO [test_pageserver_generations.py:667] Renaming files in /home/heikki/git-sandbox/neon/test_output/test_upgrade_generationless_local_file_paths[debug-pg16]/repo/pageserver_1/tenants/3439538816c520adecc541cc8b1de21c/timelines/6a7be8ee707b355de48dd91b326d6ae1
    2024-10-08 13:24:39.728 INFO [test_pageserver_generations.py:673] Renamed
000000067F0000000100000A8D0100000000-000000067F0000000100000AC10000000002__00000000014F16F0-v1-00000001 -> 000000067F0000000100000A8D0100000000-000000067F0000000100000AC10000000002__00000000014F16F0
2024-10-09 10:55:56 +01:00
Erik Grinaker
211970f0e0 remote_storage: add DownloadOpts::byte_(start|end) (#9293)
`download_byte_range()` is basically a copy of `download()` with an
additional option passed to the backend SDKs. This can cause these code
paths to diverge, and prevents combining various options.

This patch adds `DownloadOpts::byte_(start|end)` and move byte range
handling into `download()`.
2024-10-09 10:29:06 +01:00
70 changed files with 2104 additions and 964 deletions

View File

@@ -0,0 +1,41 @@
name: Report Workflow Stats
on:
workflow_run:
workflows:
- Add `external` label to issues and PRs created by external users
- Benchmarking
- Build and Test
- Build and Test Locally
- Build build-tools image
- Check Permissions
- Check build-tools image
- Check neon with extra platform builds
- Cloud Regression Test
- Create Release Branch
- Handle `approved-for-ci-run` label
- Lint GitHub Workflows
- Notify Slack channel about upcoming release
- Periodic pagebench performance test on dedicated EC2 machine in eu-central-1 region
- Pin build-tools image
- Prepare benchmarking databases by restoring dumps
- Push images to ACR
- Test Postgres client libraries
- Trigger E2E Tests
- cleanup caches by a branch
types: [completed]
jobs:
gh-workflow-stats:
name: Github Workflow Stats
runs-on: ubuntu-22.04
permissions:
actions: read
steps:
- name: Export GH Workflow Stats
uses: fedordikarev/gh-workflow-stats-action@v0.1.2
with:
DB_URI: ${{ secrets.GH_REPORT_STATS_DB_RW_CONNSTR }}
DB_TABLE: "gh_workflow_stats_neon"
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
GH_RUN_ID: ${{ github.event.workflow_run.id }}

18
Cargo.lock generated
View File

@@ -1820,6 +1820,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6043086bf7973472e0c7dff2142ea0b680d30e18d9cc40f267efbf222bd47"
dependencies = [
"base16ct 0.2.0",
"base64ct",
"crypto-bigint 0.5.5",
"digest",
"ff 0.13.0",
@@ -1829,6 +1830,8 @@ dependencies = [
"pkcs8 0.10.2",
"rand_core 0.6.4",
"sec1 0.7.3",
"serde_json",
"serdect",
"subtle",
"zeroize",
]
@@ -4037,6 +4040,8 @@ dependencies = [
"bytes",
"fallible-iterator",
"postgres-protocol",
"serde",
"serde_json",
]
[[package]]
@@ -5256,6 +5261,7 @@ dependencies = [
"der 0.7.8",
"generic-array",
"pkcs8 0.10.2",
"serdect",
"subtle",
"zeroize",
]
@@ -5510,6 +5516,16 @@ dependencies = [
"syn 2.0.52",
]
[[package]]
name = "serdect"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a84f14a19e9a014bb9f4512488d9829a68e04ecabffb0f9904cd1ace94598177"
dependencies = [
"base16ct 0.2.0",
"serde",
]
[[package]]
name = "sha1"
version = "0.10.5"
@@ -7302,6 +7318,7 @@ dependencies = [
"num-traits",
"once_cell",
"parquet",
"postgres-types",
"prettyplease",
"proc-macro2",
"prost",
@@ -7326,6 +7343,7 @@ dependencies = [
"time",
"time-macros",
"tokio",
"tokio-postgres",
"tokio-stream",
"tokio-util",
"toml_edit",

View File

@@ -109,13 +109,30 @@ RUN apt update && \
libcgal-dev libgdal-dev libgmp-dev libmpfr-dev libopenscenegraph-dev libprotobuf-c-dev \
protobuf-c-compiler xsltproc
# Postgis 3.5.0 requires SFCGAL 1.4+
#
# It would be nice to update all versions together, but we must solve the SFCGAL dependency first.
# SFCGAL > 1.3 requires CGAL > 5.2, Bullseye's libcgal-dev is 5.2
RUN case "${PG_VERSION}" in "v17") \
mkdir -p /sfcgal && \
echo "Postgis doensn't yet support PG17 (needs 3.4.3, if not higher)" && exit 0;; \
# and also we must check backward compatibility with older versions of PostGIS.
#
# Use new version only for v17
RUN case "${PG_VERSION}" in \
"v17") \
export SFCGAL_VERSION=1.4.1 \
export SFCGAL_CHECKSUM=1800c8a26241588f11cddcf433049e9b9aea902e923414d2ecef33a3295626c3 \
;; \
"v14" | "v15" | "v16") \
export SFCGAL_VERSION=1.3.10 \
export SFCGAL_CHECKSUM=4e39b3b2adada6254a7bdba6d297bb28e1a9835a9f879b74f37e2dab70203232 \
;; \
*) \
echo "unexpected PostgreSQL version" && exit 1 \
;; \
esac && \
wget https://gitlab.com/Oslandia/SFCGAL/-/archive/v1.3.10/SFCGAL-v1.3.10.tar.gz -O SFCGAL.tar.gz && \
echo "4e39b3b2adada6254a7bdba6d297bb28e1a9835a9f879b74f37e2dab70203232 SFCGAL.tar.gz" | sha256sum --check && \
mkdir -p /sfcgal && \
wget https://gitlab.com/sfcgal/SFCGAL/-/archive/v${SFCGAL_VERSION}/SFCGAL-v${SFCGAL_VERSION}.tar.gz -O SFCGAL.tar.gz && \
echo "${SFCGAL_CHECKSUM} SFCGAL.tar.gz" | sha256sum --check && \
mkdir sfcgal-src && cd sfcgal-src && tar xzf ../SFCGAL.tar.gz --strip-components=1 -C . && \
cmake -DCMAKE_BUILD_TYPE=Release . && make -j $(getconf _NPROCESSORS_ONLN) && \
DESTDIR=/sfcgal make install -j $(getconf _NPROCESSORS_ONLN) && \
@@ -123,15 +140,27 @@ RUN case "${PG_VERSION}" in "v17") \
ENV PATH="/usr/local/pgsql/bin:$PATH"
RUN case "${PG_VERSION}" in "v17") \
echo "Postgis doensn't yet support PG17 (needs 3.4.3, if not higher)" && exit 0;; \
# Postgis 3.5.0 supports v17
RUN case "${PG_VERSION}" in \
"v17") \
export POSTGIS_VERSION=3.5.0 \
export POSTGIS_CHECKSUM=ca698a22cc2b2b3467ac4e063b43a28413f3004ddd505bdccdd74c56a647f510 \
;; \
"v14" | "v15" | "v16") \
export POSTGIS_VERSION=3.3.3 \
export POSTGIS_CHECKSUM=74eb356e3f85f14233791013360881b6748f78081cc688ff9d6f0f673a762d13 \
;; \
*) \
echo "unexpected PostgreSQL version" && exit 1 \
;; \
esac && \
wget https://download.osgeo.org/postgis/source/postgis-3.3.3.tar.gz -O postgis.tar.gz && \
echo "74eb356e3f85f14233791013360881b6748f78081cc688ff9d6f0f673a762d13 postgis.tar.gz" | sha256sum --check && \
wget https://download.osgeo.org/postgis/source/postgis-${POSTGIS_VERSION}.tar.gz -O postgis.tar.gz && \
echo "${POSTGIS_CHECKSUM} postgis.tar.gz" | sha256sum --check && \
mkdir postgis-src && cd postgis-src && tar xzf ../postgis.tar.gz --strip-components=1 -C . && \
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /before.txt &&\
./autogen.sh && \
./configure --with-sfcgal=/usr/local/bin/sfcgal-config && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
cd extensions/postgis && \
make clean && \
@@ -152,11 +181,27 @@ RUN case "${PG_VERSION}" in "v17") \
cp /usr/local/pgsql/share/extension/address_standardizer.control /extensions/postgis && \
cp /usr/local/pgsql/share/extension/address_standardizer_data_us.control /extensions/postgis
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
# Uses versioned libraries, i.e. libpgrouting-3.4
# and may introduce function signature changes between releases
# i.e. release 3.5.0 has new signature for pg_dijkstra function
#
# Use new version only for v17
# last release v3.6.2 - Mar 30, 2024
RUN case "${PG_VERSION}" in \
"v17") \
export PGROUTING_VERSION=3.6.2 \
export PGROUTING_CHECKSUM=f4a1ed79d6f714e52548eca3bb8e5593c6745f1bde92eb5fb858efd8984dffa2 \
;; \
"v14" | "v15" | "v16") \
export PGROUTING_VERSION=3.4.2 \
export PGROUTING_CHECKSUM=cac297c07d34460887c4f3b522b35c470138760fe358e351ad1db4edb6ee306e \
;; \
*) \
echo "unexpected PostgreSQL version" && exit 1 \
;; \
esac && \
wget https://github.com/pgRouting/pgrouting/archive/v3.4.2.tar.gz -O pgrouting.tar.gz && \
echo "cac297c07d34460887c4f3b522b35c470138760fe358e351ad1db4edb6ee306e pgrouting.tar.gz" | sha256sum --check && \
wget https://github.com/pgRouting/pgrouting/archive/v${PGROUTING_VERSION}.tar.gz -O pgrouting.tar.gz && \
echo "${PGROUTING_CHECKSUM} pgrouting.tar.gz" | sha256sum --check && \
mkdir pgrouting-src && cd pgrouting-src && tar xzf ../pgrouting.tar.gz --strip-components=1 -C . && \
mkdir build && cd build && \
cmake -DCMAKE_BUILD_TYPE=Release .. && \
@@ -215,10 +260,9 @@ FROM build-deps AS h3-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN case "${PG_VERSION}" in "v17") \
mkdir -p /h3/usr/ && \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
# not version-specific
# last release v4.1.0 - Jan 18, 2023
RUN mkdir -p /h3/usr/ && \
wget https://github.com/uber/h3/archive/refs/tags/v4.1.0.tar.gz -O h3.tar.gz && \
echo "ec99f1f5974846bde64f4513cf8d2ea1b8d172d2218ab41803bf6a63532272bc h3.tar.gz" | sha256sum --check && \
mkdir h3-src && cd h3-src && tar xzf ../h3.tar.gz --strip-components=1 -C . && \
@@ -229,10 +273,9 @@ RUN case "${PG_VERSION}" in "v17") \
cp -R /h3/usr / && \
rm -rf build
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/zachasme/h3-pg/archive/refs/tags/v4.1.3.tar.gz -O h3-pg.tar.gz && \
# not version-specific
# last release v4.1.3 - Jul 26, 2023
RUN wget https://github.com/zachasme/h3-pg/archive/refs/tags/v4.1.3.tar.gz -O h3-pg.tar.gz && \
echo "5c17f09a820859ffe949f847bebf1be98511fb8f1bd86f94932512c00479e324 h3-pg.tar.gz" | sha256sum --check && \
mkdir h3-pg-src && cd h3-pg-src && tar xzf ../h3-pg.tar.gz --strip-components=1 -C . && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
@@ -251,11 +294,10 @@ FROM build-deps AS unit-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/df7cb/postgresql-unit/archive/refs/tags/7.7.tar.gz -O postgresql-unit.tar.gz && \
echo "411d05beeb97e5a4abf17572bfcfbb5a68d98d1018918feff995f6ee3bb03e79 postgresql-unit.tar.gz" | sha256sum --check && \
# not version-specific
# last release 7.9 - Sep 15, 2024
RUN wget https://github.com/df7cb/postgresql-unit/archive/refs/tags/7.9.tar.gz -O postgresql-unit.tar.gz && \
echo "e46de6245dcc8b2c2ecf29873dbd43b2b346773f31dd5ce4b8315895a052b456 postgresql-unit.tar.gz" | sha256sum --check && \
mkdir postgresql-unit-src && cd postgresql-unit-src && tar xzf ../postgresql-unit.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -302,12 +344,10 @@ FROM build-deps AS pgjwt-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# 9742dab1b2f297ad3811120db7b21451bca2d3c9 made on 13/11/2021
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/michelp/pgjwt/archive/9742dab1b2f297ad3811120db7b21451bca2d3c9.tar.gz -O pgjwt.tar.gz && \
echo "cfdefb15007286f67d3d45510f04a6a7a495004be5b3aecb12cda667e774203f pgjwt.tar.gz" | sha256sum --check && \
# not version-specific
# doesn't use releases, last commit f3d82fd - Mar 2, 2023
RUN wget https://github.com/michelp/pgjwt/archive/f3d82fd30151e754e19ce5d6a06c71c20689ce3d.tar.gz -O pgjwt.tar.gz && \
echo "dae8ed99eebb7593b43013f6532d772b12dfecd55548d2673f2dfd0163f6d2b9 pgjwt.tar.gz" | sha256sum --check && \
mkdir pgjwt-src && cd pgjwt-src && tar xzf ../pgjwt.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgjwt.control
@@ -342,10 +382,9 @@ FROM build-deps AS pg-hashids-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/iCyberon/pg_hashids/archive/refs/tags/v1.2.1.tar.gz -O pg_hashids.tar.gz && \
# not version-specific
# last release v1.2.1 -Jan 12, 2018
RUN wget https://github.com/iCyberon/pg_hashids/archive/refs/tags/v1.2.1.tar.gz -O pg_hashids.tar.gz && \
echo "74576b992d9277c92196dd8d816baa2cc2d8046fe102f3dcd7f3c3febed6822a pg_hashids.tar.gz" | sha256sum --check && \
mkdir pg_hashids-src && cd pg_hashids-src && tar xzf ../pg_hashids.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
@@ -405,10 +444,9 @@ FROM build-deps AS ip4r-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/RhodiumToad/ip4r/archive/refs/tags/2.4.2.tar.gz -O ip4r.tar.gz && \
# not version-specific
# last release v2.4.2 - Jul 29, 2023
RUN wget https://github.com/RhodiumToad/ip4r/archive/refs/tags/2.4.2.tar.gz -O ip4r.tar.gz && \
echo "0f7b1f159974f49a47842a8ab6751aecca1ed1142b6d5e38d81b064b2ead1b4b ip4r.tar.gz" | sha256sum --check && \
mkdir ip4r-src && cd ip4r-src && tar xzf ../ip4r.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -425,10 +463,9 @@ FROM build-deps AS prefix-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/dimitri/prefix/archive/refs/tags/v1.2.10.tar.gz -O prefix.tar.gz && \
# not version-specific
# last release v1.2.10 - Jul 5, 2023
RUN wget https://github.com/dimitri/prefix/archive/refs/tags/v1.2.10.tar.gz -O prefix.tar.gz && \
echo "4342f251432a5f6fb05b8597139d3ccde8dcf87e8ca1498e7ee931ca057a8575 prefix.tar.gz" | sha256sum --check && \
mkdir prefix-src && cd prefix-src && tar xzf ../prefix.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -445,10 +482,9 @@ FROM build-deps AS hll-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/citusdata/postgresql-hll/archive/refs/tags/v2.18.tar.gz -O hll.tar.gz && \
# not version-specific
# last release v2.18 - Aug 29, 2023
RUN wget https://github.com/citusdata/postgresql-hll/archive/refs/tags/v2.18.tar.gz -O hll.tar.gz && \
echo "e2f55a6f4c4ab95ee4f1b4a2b73280258c5136b161fe9d059559556079694f0e hll.tar.gz" | sha256sum --check && \
mkdir hll-src && cd hll-src && tar xzf ../hll.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -659,11 +695,10 @@ FROM build-deps AS pg-roaringbitmap-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# not version-specific
# last release v0.5.4 - Jun 28, 2022
ENV PATH="/usr/local/pgsql/bin/:$PATH"
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions is not supported yet by pg_roaringbitmap. Quit" && exit 0;; \
esac && \
wget https://github.com/ChenHuajun/pg_roaringbitmap/archive/refs/tags/v0.5.4.tar.gz -O pg_roaringbitmap.tar.gz && \
RUN wget https://github.com/ChenHuajun/pg_roaringbitmap/archive/refs/tags/v0.5.4.tar.gz -O pg_roaringbitmap.tar.gz && \
echo "b75201efcb1c2d1b014ec4ae6a22769cc7a224e6e406a587f5784a37b6b5a2aa pg_roaringbitmap.tar.gz" | sha256sum --check && \
mkdir pg_roaringbitmap-src && cd pg_roaringbitmap-src && tar xzf ../pg_roaringbitmap.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) && \
@@ -680,12 +715,27 @@ FROM build-deps AS pg-semver-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# Release 0.40.0 breaks backward compatibility with previous versions
# see release note https://github.com/theory/pg-semver/releases/tag/v0.40.0
# Use new version only for v17
#
# last release v0.40.0 - Jul 22, 2024
ENV PATH="/usr/local/pgsql/bin/:$PATH"
RUN case "${PG_VERSION}" in "v17") \
echo "v17 is not supported yet by pg_semver. Quit" && exit 0;; \
RUN case "${PG_VERSION}" in \
"v17") \
export SEMVER_VERSION=0.40.0 \
export SEMVER_CHECKSUM=3e50bcc29a0e2e481e7b6d2bc937cadc5f5869f55d983b5a1aafeb49f5425cfc \
;; \
"v14" | "v15" | "v16") \
export SEMVER_VERSION=0.32.1 \
export SEMVER_CHECKSUM=fbdaf7512026d62eec03fad8687c15ed509b6ba395bff140acd63d2e4fbe25d7 \
;; \
*) \
echo "unexpected PostgreSQL version" && exit 1 \
;; \
esac && \
wget https://github.com/theory/pg-semver/archive/refs/tags/v0.32.1.tar.gz -O pg_semver.tar.gz && \
echo "fbdaf7512026d62eec03fad8687c15ed509b6ba395bff140acd63d2e4fbe25d7 pg_semver.tar.gz" | sha256sum --check && \
wget https://github.com/theory/pg-semver/archive/refs/tags/v${SEMVER_VERSION}.tar.gz -O pg_semver.tar.gz && \
echo "${SEMVER_CHECKSUM} pg_semver.tar.gz" | sha256sum --check && \
mkdir pg_semver-src && cd pg_semver-src && tar xzf ../pg_semver.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \

View File

@@ -1484,6 +1484,28 @@ LIMIT 100",
info!("Pageserver config changed");
}
}
// Gather info about installed extensions
pub fn get_installed_extensions(&self) -> Result<()> {
let connstr = self.connstr.clone();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create runtime");
let result = rt
.block_on(crate::installed_extensions::get_installed_extensions(
connstr,
))
.expect("failed to get installed extensions");
info!(
"{}",
serde_json::to_string(&result).expect("failed to serialize extensions list")
);
Ok(())
}
}
pub fn forward_termination_signal() {

View File

@@ -165,6 +165,32 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}
// get the list of installed extensions
// currently only used in python tests
// TODO: call it from cplane
(&Method::GET, "/installed_extensions") => {
info!("serving /installed_extensions GET request");
let status = compute.get_status();
if status != ComputeStatus::Running {
let msg = format!(
"invalid compute status for extensions request: {:?}",
status
);
error!(msg);
return Response::new(Body::from(msg));
}
let connstr = compute.connstr.clone();
let res = crate::installed_extensions::get_installed_extensions(connstr).await;
match res {
Ok(res) => render_json(Body::from(serde_json::to_string(&res).unwrap())),
Err(e) => render_json_error(
&format!("could not get list of installed extensions: {}", e),
StatusCode::INTERNAL_SERVER_ERROR,
),
}
}
// download extension files from remote extension storage on demand
(&Method::POST, route) if route.starts_with("/extension_server/") => {
info!("serving {:?} POST request", route);

View File

@@ -53,6 +53,20 @@ paths:
schema:
$ref: "#/components/schemas/ComputeInsights"
/installed_extensions:
get:
tags:
- Info
summary: Get installed extensions.
description: ""
operationId: getInstalledExtensions
responses:
200:
description: List of installed extensions
content:
application/json:
schema:
$ref: "#/components/schemas/InstalledExtensions"
/info:
get:
tags:
@@ -395,6 +409,24 @@ components:
- configuration
example: running
InstalledExtensions:
type: object
properties:
extensions:
description: Contains list of installed extensions.
type: array
items:
type: object
properties:
extname:
type: string
versions:
type: array
items:
type: string
n_databases:
type: integer
#
# Errors
#

View File

@@ -0,0 +1,80 @@
use compute_api::responses::{InstalledExtension, InstalledExtensions};
use std::collections::HashMap;
use std::collections::HashSet;
use url::Url;
use anyhow::Result;
use postgres::{Client, NoTls};
use tokio::task;
/// We don't reuse get_existing_dbs() just for code clarity
/// and to make database listing query here more explicit.
///
/// Limit the number of databases to 500 to avoid excessive load.
fn list_dbs(client: &mut Client) -> Result<Vec<String>> {
// `pg_database.datconnlimit = -2` means that the database is in the
// invalid state
let databases = client
.query(
"SELECT datname FROM pg_catalog.pg_database
WHERE datallowconn
AND datconnlimit <> - 2
LIMIT 500",
&[],
)?
.iter()
.map(|row| {
let db: String = row.get("datname");
db
})
.collect();
Ok(databases)
}
/// Connect to every database (see list_dbs above) and get the list of installed extensions.
/// Same extension can be installed in multiple databases with different versions,
/// we only keep the highest and lowest version across all databases.
pub async fn get_installed_extensions(connstr: Url) -> Result<InstalledExtensions> {
let mut connstr = connstr.clone();
task::spawn_blocking(move || {
let mut client = Client::connect(connstr.as_str(), NoTls)?;
let databases: Vec<String> = list_dbs(&mut client)?;
let mut extensions_map: HashMap<String, InstalledExtension> = HashMap::new();
for db in databases.iter() {
connstr.set_path(db);
let mut db_client = Client::connect(connstr.as_str(), NoTls)?;
let extensions: Vec<(String, String)> = db_client
.query(
"SELECT extname, extversion FROM pg_catalog.pg_extension;",
&[],
)?
.iter()
.map(|row| (row.get("extname"), row.get("extversion")))
.collect();
for (extname, v) in extensions.iter() {
let version = v.to_string();
extensions_map
.entry(extname.to_string())
.and_modify(|e| {
e.versions.insert(version.clone());
// count the number of databases where the extension is installed
e.n_databases += 1;
})
.or_insert(InstalledExtension {
extname: extname.to_string(),
versions: HashSet::from([version.clone()]),
n_databases: 1,
});
}
}
Ok(InstalledExtensions {
extensions: extensions_map.values().cloned().collect(),
})
})
.await?
}

View File

@@ -15,6 +15,7 @@ pub mod catalog;
pub mod compute;
pub mod disk_quota;
pub mod extension_server;
pub mod installed_extensions;
pub mod local_proxy;
pub mod lsn_lease;
mod migration;

View File

@@ -318,12 +318,26 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
"CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser",
name.pg_quote()
);
// If the role we're creating is intended for JWT login, we do
// not give it any attributes.
if jwks_roles.contains(name.as_str()) {
query = format!("CREATE ROLE {}", name.pg_quote());
}
info!("running role create query: '{}'", &query);
query.push_str(&role.to_pg_options());
xact.execute(query.as_str(), &[])?;
// If the role we're creating is intended for JWT login, we have
// to make sure it can execute functions in the auth schema.
if jwks_roles.contains(name.as_str()) {
let mut grant_query = format!(
"GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA auth TO {}",
name.pg_quote()
);
info!("running grant query for JWT role: '{}'", &grant_query);
grant_query.push_str(&role.to_pg_options());
xact.execute(grant_query.as_str(), &[])?;
}
}
}

View File

@@ -1,5 +1,6 @@
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
use std::collections::HashSet;
use std::fmt::Display;
use chrono::{DateTime, Utc};
@@ -155,3 +156,15 @@ pub enum ControlPlaneComputeStatus {
// should be able to start with provided spec.
Attached,
}
#[derive(Clone, Debug, Default, Serialize)]
pub struct InstalledExtension {
pub extname: String,
pub versions: HashSet<String>,
pub n_databases: u32, // Number of databases using this extension
}
#[derive(Clone, Debug, Default, Serialize)]
pub struct InstalledExtensions {
pub extensions: Vec<InstalledExtension>,
}

View File

@@ -104,8 +104,7 @@ pub struct ConfigToml {
pub image_compression: ImageCompressionAlgorithm,
pub ephemeral_bytes_per_memory_kb: usize,
pub l0_flush: Option<crate::models::L0FlushConfig>,
pub virtual_file_direct_io: crate::models::virtual_file::DirectIoMode,
pub io_buffer_alignment: usize,
pub virtual_file_io_mode: Option<crate::models::virtual_file::IoMode>,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -388,10 +387,7 @@ impl Default for ConfigToml {
image_compression: (DEFAULT_IMAGE_COMPRESSION),
ephemeral_bytes_per_memory_kb: (DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
l0_flush: None,
virtual_file_direct_io: crate::models::virtual_file::DirectIoMode::default(),
io_buffer_alignment: DEFAULT_IO_BUFFER_ALIGNMENT,
virtual_file_io_mode: None,
tenant_config: TenantConfigToml::default(),
}
}

View File

@@ -972,8 +972,6 @@ pub struct TopTenantShardsResponse {
}
pub mod virtual_file {
use std::path::PathBuf;
#[derive(
Copy,
Clone,
@@ -994,50 +992,45 @@ pub mod virtual_file {
}
/// Direct IO modes for a pageserver.
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
pub enum DirectIoMode {
/// Direct IO disabled (uses usual buffered IO).
#[default]
Disabled,
/// Direct IO disabled (performs checks and perf simulations).
Evaluate {
/// Alignment check level
alignment_check: DirectIoAlignmentCheckLevel,
/// Latency padded for performance simulation.
latency_padding: DirectIoLatencyPadding,
},
/// Direct IO enabled.
Enabled {
/// Actions to perform on alignment error.
on_alignment_error: DirectIoOnAlignmentErrorAction,
},
#[derive(
Copy,
Clone,
PartialEq,
Eq,
Hash,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
Debug,
)]
#[strum(serialize_all = "kebab-case")]
#[repr(u8)]
pub enum IoMode {
/// Uses buffered IO.
Buffered,
/// Uses direct IO, error out if the operation fails.
#[cfg(target_os = "linux")]
Direct,
}
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
#[serde(rename_all = "kebab-case")]
pub enum DirectIoAlignmentCheckLevel {
#[default]
Error,
Log,
None,
impl IoMode {
pub const fn preferred() -> Self {
Self::Buffered
}
}
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
#[serde(rename_all = "kebab-case")]
pub enum DirectIoOnAlignmentErrorAction {
Error,
#[default]
FallbackToBuffered,
}
impl TryFrom<u8> for IoMode {
type Error = u8;
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
#[serde(tag = "type", rename_all = "kebab-case")]
pub enum DirectIoLatencyPadding {
/// Pad virtual file operations with IO to a fake file.
FakeFileRW { path: PathBuf },
#[default]
None,
fn try_from(value: u8) -> Result<Self, Self::Error> {
Ok(match value {
v if v == (IoMode::Buffered as u8) => IoMode::Buffered,
#[cfg(target_os = "linux")]
v if v == (IoMode::Direct as u8) => IoMode::Direct,
x => return Err(x),
})
}
}
}

View File

@@ -496,26 +496,12 @@ impl RemoteStorage for AzureBlobStorage {
builder = builder.if_match(IfMatchCondition::NotMatch(etag.to_string()))
}
self.download_for_builder(builder, cancel).await
}
async fn download_byte_range(
&self,
from: &RemotePath,
start_inclusive: u64,
end_exclusive: Option<u64>,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
let blob_client = self.client.blob_client(self.relative_path_to_name(from));
let mut builder = blob_client.get();
let range: Range = if let Some(end_exclusive) = end_exclusive {
(start_inclusive..end_exclusive).into()
} else {
(start_inclusive..).into()
};
builder = builder.range(range);
if let Some((start, end)) = opts.byte_range() {
builder = builder.range(match end {
Some(end) => Range::Range(start..end),
None => Range::RangeFrom(start..),
});
}
self.download_for_builder(builder, cancel).await
}

View File

@@ -19,7 +19,8 @@ mod simulate_failures;
mod support;
use std::{
collections::HashMap, fmt::Debug, num::NonZeroU32, pin::Pin, sync::Arc, time::SystemTime,
collections::HashMap, fmt::Debug, num::NonZeroU32, ops::Bound, pin::Pin, sync::Arc,
time::SystemTime,
};
use anyhow::Context;
@@ -162,11 +163,60 @@ pub struct Listing {
}
/// Options for downloads. The default value is a plain GET.
#[derive(Default)]
pub struct DownloadOpts {
/// If given, returns [`DownloadError::Unmodified`] if the object still has
/// the same ETag (using If-None-Match).
pub etag: Option<Etag>,
/// The start of the byte range to download, or unbounded.
pub byte_start: Bound<u64>,
/// The end of the byte range to download, or unbounded. Must be after the
/// start bound.
pub byte_end: Bound<u64>,
}
impl Default for DownloadOpts {
fn default() -> Self {
Self {
etag: Default::default(),
byte_start: Bound::Unbounded,
byte_end: Bound::Unbounded,
}
}
}
impl DownloadOpts {
/// Returns the byte range with inclusive start and exclusive end, or None
/// if unbounded.
pub fn byte_range(&self) -> Option<(u64, Option<u64>)> {
if self.byte_start == Bound::Unbounded && self.byte_end == Bound::Unbounded {
return None;
}
let start = match self.byte_start {
Bound::Excluded(i) => i + 1,
Bound::Included(i) => i,
Bound::Unbounded => 0,
};
let end = match self.byte_end {
Bound::Excluded(i) => Some(i),
Bound::Included(i) => Some(i + 1),
Bound::Unbounded => None,
};
if let Some(end) = end {
assert!(start < end, "range end {end} at or before start {start}");
}
Some((start, end))
}
/// Returns the byte range as an RFC 2616 Range header value with inclusive
/// bounds, or None if unbounded.
pub fn byte_range_header(&self) -> Option<String> {
self.byte_range()
.map(|(start, end)| (start, end.map(|end| end - 1))) // make end inclusive
.map(|(start, end)| match end {
Some(end) => format!("bytes={start}-{end}"),
None => format!("bytes={start}-"),
})
}
}
/// Storage (potentially remote) API to manage its state.
@@ -257,21 +307,6 @@ pub trait RemoteStorage: Send + Sync + 'static {
cancel: &CancellationToken,
) -> Result<Download, DownloadError>;
/// Streams a given byte range of the remote storage entry contents.
///
/// The returned download stream will obey initial timeout and cancellation signal by erroring
/// on whichever happens first. Only one of the reasons will fail the stream, which is usually
/// enough for `tokio::io::copy_buf` usage. If needed the error can be filtered out.
///
/// Returns the metadata, if any was stored with the file previously.
async fn download_byte_range(
&self,
from: &RemotePath,
start_inclusive: u64,
end_exclusive: Option<u64>,
cancel: &CancellationToken,
) -> Result<Download, DownloadError>;
/// Delete a single path from remote storage.
///
/// If the operation fails because of timeout or cancellation, the root cause of the error will be
@@ -425,33 +460,6 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
}
}
pub async fn download_byte_range(
&self,
from: &RemotePath,
start_inclusive: u64,
end_exclusive: Option<u64>,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
match self {
Self::LocalFs(s) => {
s.download_byte_range(from, start_inclusive, end_exclusive, cancel)
.await
}
Self::AwsS3(s) => {
s.download_byte_range(from, start_inclusive, end_exclusive, cancel)
.await
}
Self::AzureBlob(s) => {
s.download_byte_range(from, start_inclusive, end_exclusive, cancel)
.await
}
Self::Unreliable(s) => {
s.download_byte_range(from, start_inclusive, end_exclusive, cancel)
.await
}
}
}
/// See [`RemoteStorage::delete`]
pub async fn delete(
&self,
@@ -573,20 +581,6 @@ impl GenericRemoteStorage {
})
}
/// Downloads the storage object into the `to_path` provided.
/// `byte_range` could be specified to dowload only a part of the file, if needed.
pub async fn download_storage_object(
&self,
byte_range: Option<(u64, Option<u64>)>,
from: &RemotePath,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
match byte_range {
Some((start, end)) => self.download_byte_range(from, start, end, cancel).await,
None => self.download(from, &DownloadOpts::default(), cancel).await,
}
}
/// The name of the bucket/container/etc.
pub fn bucket_name(&self) -> Option<&str> {
match self {
@@ -660,6 +654,76 @@ impl ConcurrencyLimiter {
mod tests {
use super::*;
/// DownloadOpts::byte_range() should generate (inclusive, exclusive) ranges
/// with optional end bound, or None when unbounded.
#[test]
fn download_opts_byte_range() {
// Consider using test_case or a similar table-driven test framework.
let cases = [
// (byte_start, byte_end, expected)
(Bound::Unbounded, Bound::Unbounded, None),
(Bound::Unbounded, Bound::Included(7), Some((0, Some(8)))),
(Bound::Unbounded, Bound::Excluded(7), Some((0, Some(7)))),
(Bound::Included(3), Bound::Unbounded, Some((3, None))),
(Bound::Included(3), Bound::Included(7), Some((3, Some(8)))),
(Bound::Included(3), Bound::Excluded(7), Some((3, Some(7)))),
(Bound::Excluded(3), Bound::Unbounded, Some((4, None))),
(Bound::Excluded(3), Bound::Included(7), Some((4, Some(8)))),
(Bound::Excluded(3), Bound::Excluded(7), Some((4, Some(7)))),
// 1-sized ranges are fine, 0 aren't and will panic (separate test).
(Bound::Included(3), Bound::Included(3), Some((3, Some(4)))),
(Bound::Included(3), Bound::Excluded(4), Some((3, Some(4)))),
];
for (byte_start, byte_end, expect) in cases {
let opts = DownloadOpts {
byte_start,
byte_end,
..Default::default()
};
let result = opts.byte_range();
assert_eq!(
result, expect,
"byte_start={byte_start:?} byte_end={byte_end:?}"
);
// Check generated HTTP header, which uses an inclusive range.
let expect_header = expect.map(|(start, end)| match end {
Some(end) => format!("bytes={start}-{}", end - 1), // inclusive end
None => format!("bytes={start}-"),
});
assert_eq!(
opts.byte_range_header(),
expect_header,
"byte_start={byte_start:?} byte_end={byte_end:?}"
);
}
}
/// DownloadOpts::byte_range() zero-sized byte range should panic.
#[test]
#[should_panic]
fn download_opts_byte_range_zero() {
DownloadOpts {
byte_start: Bound::Included(3),
byte_end: Bound::Excluded(3),
..Default::default()
}
.byte_range();
}
/// DownloadOpts::byte_range() negative byte range should panic.
#[test]
#[should_panic]
fn download_opts_byte_range_negative() {
DownloadOpts {
byte_start: Bound::Included(3),
byte_end: Bound::Included(2),
..Default::default()
}
.byte_range();
}
#[test]
fn test_object_name() {
let k = RemotePath::new(Utf8Path::new("a/b/c")).unwrap();

View File

@@ -506,54 +506,7 @@ impl RemoteStorage for LocalFs {
return Err(DownloadError::Unmodified);
}
let source = ReaderStream::new(
fs::OpenOptions::new()
.read(true)
.open(&target_path)
.await
.with_context(|| {
format!("Failed to open source file {target_path:?} to use in the download")
})
.map_err(DownloadError::Other)?,
);
let metadata = self
.read_storage_metadata(&target_path)
.await
.map_err(DownloadError::Other)?;
let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
let source = crate::support::DownloadStream::new(cancel_or_timeout, source);
Ok(Download {
metadata,
last_modified: file_metadata
.modified()
.map_err(|e| DownloadError::Other(anyhow::anyhow!(e).context("Reading mtime")))?,
etag,
download_stream: Box::pin(source),
})
}
async fn download_byte_range(
&self,
from: &RemotePath,
start_inclusive: u64,
end_exclusive: Option<u64>,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
if let Some(end_exclusive) = end_exclusive {
if end_exclusive <= start_inclusive {
return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})")));
};
if start_inclusive == end_exclusive.saturating_sub(1) {
return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes")));
}
}
let target_path = from.with_base(&self.storage_root);
let file_metadata = file_metadata(&target_path).await?;
let mut source = tokio::fs::OpenOptions::new()
let mut file = fs::OpenOptions::new()
.read(true)
.open(&target_path)
.await
@@ -562,31 +515,29 @@ impl RemoteStorage for LocalFs {
})
.map_err(DownloadError::Other)?;
let len = source
.metadata()
.await
.context("query file length")
.map_err(DownloadError::Other)?
.len();
let mut take = file_metadata.len();
if let Some((start, end)) = opts.byte_range() {
if start > 0 {
file.seek(io::SeekFrom::Start(start))
.await
.context("Failed to seek to the range start in a local storage file")
.map_err(DownloadError::Other)?;
}
if let Some(end) = end {
take = end - start;
}
}
source
.seek(io::SeekFrom::Start(start_inclusive))
.await
.context("Failed to seek to the range start in a local storage file")
.map_err(DownloadError::Other)?;
let source = ReaderStream::new(file.take(take));
let metadata = self
.read_storage_metadata(&target_path)
.await
.map_err(DownloadError::Other)?;
let source = source.take(end_exclusive.unwrap_or(len) - start_inclusive);
let source = ReaderStream::new(source);
let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
let source = crate::support::DownloadStream::new(cancel_or_timeout, source);
let etag = mock_etag(&file_metadata);
Ok(Download {
metadata,
last_modified: file_metadata
@@ -688,7 +639,7 @@ mod fs_tests {
use super::*;
use camino_tempfile::tempdir;
use std::{collections::HashMap, io::Write};
use std::{collections::HashMap, io::Write, ops::Bound};
async fn read_and_check_metadata(
storage: &LocalFs,
@@ -804,10 +755,12 @@ mod fs_tests {
let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
let first_part_download = storage
.download_byte_range(
.download(
&upload_target,
0,
Some(first_part_local.len() as u64),
&DownloadOpts {
byte_end: Bound::Excluded(first_part_local.len() as u64),
..Default::default()
},
&cancel,
)
.await?;
@@ -823,10 +776,15 @@ mod fs_tests {
);
let second_part_download = storage
.download_byte_range(
.download(
&upload_target,
first_part_local.len() as u64,
Some((first_part_local.len() + second_part_local.len()) as u64),
&DownloadOpts {
byte_start: Bound::Included(first_part_local.len() as u64),
byte_end: Bound::Excluded(
(first_part_local.len() + second_part_local.len()) as u64,
),
..Default::default()
},
&cancel,
)
.await?;
@@ -842,7 +800,14 @@ mod fs_tests {
);
let suffix_bytes = storage
.download_byte_range(&upload_target, 13, None, &cancel)
.download(
&upload_target,
&DownloadOpts {
byte_start: Bound::Included(13),
..Default::default()
},
&cancel,
)
.await?
.download_stream;
let suffix_bytes = aggregate(suffix_bytes).await?;
@@ -850,7 +815,7 @@ mod fs_tests {
assert_eq!(upload_name, suffix);
let all_bytes = storage
.download_byte_range(&upload_target, 0, None, &cancel)
.download(&upload_target, &DownloadOpts::default(), &cancel)
.await?
.download_stream;
let all_bytes = aggregate(all_bytes).await?;
@@ -861,48 +826,26 @@ mod fs_tests {
}
#[tokio::test]
async fn download_file_range_negative() -> anyhow::Result<()> {
let (storage, cancel) = create_storage()?;
#[should_panic(expected = "at or before start")]
async fn download_file_range_negative() {
let (storage, cancel) = create_storage().unwrap();
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel)
.await
.unwrap();
let start = 1_000_000_000;
let end = start + 1;
match storage
.download_byte_range(
storage
.download(
&upload_target,
start,
Some(end), // exclusive end
&DownloadOpts {
byte_start: Bound::Included(10),
byte_end: Bound::Excluded(10),
..Default::default()
},
&cancel,
)
.await
{
Ok(_) => panic!("Should not allow downloading wrong ranges"),
Err(e) => {
let error_string = e.to_string();
assert!(error_string.contains("zero bytes"));
assert!(error_string.contains(&start.to_string()));
assert!(error_string.contains(&end.to_string()));
}
}
let start = 10000;
let end = 234;
assert!(start > end, "Should test an incorrect range");
match storage
.download_byte_range(&upload_target, start, Some(end), &cancel)
.await
{
Ok(_) => panic!("Should not allow downloading wrong ranges"),
Err(e) => {
let error_string = e.to_string();
assert!(error_string.contains("Invalid range"));
assert!(error_string.contains(&start.to_string()));
assert!(error_string.contains(&end.to_string()));
}
}
Ok(())
.unwrap();
}
#[tokio::test]
@@ -945,10 +888,12 @@ mod fs_tests {
let (first_part_local, _) = uploaded_bytes.split_at(3);
let partial_download_with_metadata = storage
.download_byte_range(
.download(
&upload_target,
0,
Some(first_part_local.len() as u64),
&DownloadOpts {
byte_end: Bound::Excluded(first_part_local.len() as u64),
..Default::default()
},
&cancel,
)
.await?;

View File

@@ -804,34 +804,7 @@ impl RemoteStorage for S3Bucket {
bucket: self.bucket_name.clone(),
key: self.relative_path_to_s3_object(from),
etag: opts.etag.as_ref().map(|e| e.to_string()),
range: None,
},
cancel,
)
.await
}
async fn download_byte_range(
&self,
from: &RemotePath,
start_inclusive: u64,
end_exclusive: Option<u64>,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
// S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
// and needs both ends to be exclusive
let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
let range = Some(match end_inclusive {
Some(end_inclusive) => format!("bytes={start_inclusive}-{end_inclusive}"),
None => format!("bytes={start_inclusive}-"),
});
self.download_object(
GetObjectRequest {
bucket: self.bucket_name.clone(),
key: self.relative_path_to_s3_object(from),
etag: None,
range,
range: opts.byte_range_header(),
},
cancel,
)

View File

@@ -170,28 +170,13 @@ impl RemoteStorage for UnreliableWrapper {
opts: &DownloadOpts,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
// Note: We treat any byte range as an "attempt" of the same operation.
// We don't pay attention to the ranges. That's good enough for now.
self.attempt(RemoteOp::Download(from.clone()))
.map_err(DownloadError::Other)?;
self.inner.download(from, opts, cancel).await
}
async fn download_byte_range(
&self,
from: &RemotePath,
start_inclusive: u64,
end_exclusive: Option<u64>,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
// Note: We treat any download_byte_range as an "attempt" of the same
// operation. We don't pay attention to the ranges. That's good enough
// for now.
self.attempt(RemoteOp::Download(from.clone()))
.map_err(DownloadError::Other)?;
self.inner
.download_byte_range(from, start_inclusive, end_exclusive, cancel)
.await
}
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
self.delete_inner(path, true, cancel).await
}

View File

@@ -2,6 +2,7 @@ use anyhow::Context;
use camino::Utf8Path;
use futures::StreamExt;
use remote_storage::{DownloadError, DownloadOpts, ListingMode, ListingObject, RemotePath};
use std::ops::Bound;
use std::sync::Arc;
use std::{collections::HashSet, num::NonZeroU32};
use test_context::test_context;
@@ -293,7 +294,15 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<
// Full range (end specified)
let dl = ctx
.client
.download_byte_range(&path, 0, Some(len as u64), &cancel)
.download(
&path,
&DownloadOpts {
byte_start: Bound::Included(0),
byte_end: Bound::Excluded(len as u64),
..Default::default()
},
&cancel,
)
.await?;
let buf = download_to_vec(dl).await?;
assert_eq!(&buf, &orig);
@@ -301,7 +310,15 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<
// partial range (end specified)
let dl = ctx
.client
.download_byte_range(&path, 4, Some(10), &cancel)
.download(
&path,
&DownloadOpts {
byte_start: Bound::Included(4),
byte_end: Bound::Excluded(10),
..Default::default()
},
&cancel,
)
.await?;
let buf = download_to_vec(dl).await?;
assert_eq!(&buf, &orig[4..10]);
@@ -309,7 +326,15 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<
// partial range (end beyond real end)
let dl = ctx
.client
.download_byte_range(&path, 8, Some(len as u64 * 100), &cancel)
.download(
&path,
&DownloadOpts {
byte_start: Bound::Included(8),
byte_end: Bound::Excluded(len as u64 * 100),
..Default::default()
},
&cancel,
)
.await?;
let buf = download_to_vec(dl).await?;
assert_eq!(&buf, &orig[8..]);
@@ -317,7 +342,14 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<
// Partial range (end unspecified)
let dl = ctx
.client
.download_byte_range(&path, 4, None, &cancel)
.download(
&path,
&DownloadOpts {
byte_start: Bound::Included(4),
..Default::default()
},
&cancel,
)
.await?;
let buf = download_to_vec(dl).await?;
assert_eq!(&buf, &orig[4..]);
@@ -325,7 +357,14 @@ async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<
// Full range (end unspecified)
let dl = ctx
.client
.download_byte_range(&path, 0, None, &cancel)
.download(
&path,
&DownloadOpts {
byte_start: Bound::Included(0),
..Default::default()
},
&cancel,
)
.await?;
let buf = download_to_vec(dl).await?;
assert_eq!(&buf, &orig);

View File

@@ -164,11 +164,7 @@ fn criterion_benchmark(c: &mut Criterion) {
let conf: &'static PageServerConf = Box::leak(Box::new(
pageserver::config::PageServerConf::dummy_conf(temp_dir.path().to_path_buf()),
));
virtual_file::init(
16384,
virtual_file::io_engine_for_bench(),
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
);
virtual_file::init(16384, virtual_file::io_engine_for_bench());
page_cache::init(conf.page_cache_size);
{

View File

@@ -540,10 +540,13 @@ impl Client {
.map_err(Error::ReceiveBody)
}
/// Configs io buffer alignment at runtime.
pub async fn put_io_alignment(&self, align: usize) -> Result<()> {
let uri = format!("{}/v1/io_alignment", self.mgmt_api_endpoint);
self.request(Method::PUT, uri, align)
/// Configs io mode at runtime.
pub async fn put_io_mode(
&self,
mode: &pageserver_api::models::virtual_file::IoMode,
) -> Result<()> {
let uri = format!("{}/v1/io_mode", self.mgmt_api_endpoint);
self.request(Method::PUT, uri, mode)
.await?
.json()
.await

View File

@@ -152,11 +152,7 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
pageserver::virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
);
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
pageserver::page_cache::init(100);
let mut total_delta_layers = 0usize;

View File

@@ -59,7 +59,7 @@ pub(crate) enum LayerCmd {
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs, 1);
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
page_cache::init(100);
let file = VirtualFile::open(path, ctx).await?;
let file_id = page_cache::next_file_id();
@@ -190,11 +190,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
new_tenant_id,
new_timeline_id,
} => {
pageserver::virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT,
);
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
pageserver::page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);

View File

@@ -26,7 +26,7 @@ use pageserver::{
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
virtual_file,
};
use pageserver_api::{config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT, shard::TenantShardId};
use pageserver_api::shard::TenantShardId;
use postgres_ffi::ControlFileData;
use remote_storage::{RemotePath, RemoteStorageConfig};
use tokio_util::sync::CancellationToken;
@@ -205,11 +205,7 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> {
async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup
virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
DEFAULT_IO_BUFFER_ALIGNMENT,
);
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
dump_layerfile_from_path(path, true, &ctx).await

View File

@@ -59,9 +59,9 @@ pub(crate) struct Args {
#[clap(long)]
set_io_engine: Option<pageserver_api::models::virtual_file::IoEngineKind>,
/// Before starting the benchmark, live-reconfigure the pageserver to use specified alignment for io buffers.
/// Before starting the benchmark, live-reconfigure the pageserver to use specified io mode (buffered vs. direct).
#[clap(long)]
set_io_alignment: Option<usize>,
set_io_mode: Option<pageserver_api::models::virtual_file::IoMode>,
targets: Option<Vec<TenantTimelineId>>,
}
@@ -129,8 +129,8 @@ async fn main_impl(
mgmt_api_client.put_io_engine(engine_str).await?;
}
if let Some(align) = args.set_io_alignment {
mgmt_api_client.put_io_alignment(align).await?;
if let Some(mode) = &args.set_io_mode {
mgmt_api_client.put_io_mode(mode).await?;
}
// discover targets

View File

@@ -125,8 +125,7 @@ fn main() -> anyhow::Result<()> {
// after setting up logging, log the effective IO engine choice and read path implementations
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.virtual_file_direct_io, "starting with virtual_file Direct IO settings");
info!(?conf.io_buffer_alignment, "starting with setting for IO buffer alignment");
info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode");
// The tenants directory contains all the pageserver local disk state.
// Create if not exists and make sure all the contents are durable before proceeding.
@@ -168,11 +167,7 @@ fn main() -> anyhow::Result<()> {
let scenario = failpoint_support::init();
// Basic initialization of things that don't change after startup
virtual_file::init(
conf.max_file_descriptors,
conf.virtual_file_io_engine,
conf.io_buffer_alignment,
);
virtual_file::init(conf.max_file_descriptors, conf.virtual_file_io_engine);
page_cache::init(conf.page_cache_size);
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;

View File

@@ -174,9 +174,7 @@ pub struct PageServerConf {
pub l0_flush: crate::l0_flush::L0FlushConfig,
/// Direct IO settings
pub virtual_file_direct_io: virtual_file::DirectIoMode,
pub io_buffer_alignment: usize,
pub virtual_file_io_mode: virtual_file::IoMode,
}
/// Token for authentication to safekeepers
@@ -325,11 +323,10 @@ impl PageServerConf {
image_compression,
ephemeral_bytes_per_memory_kb,
l0_flush,
virtual_file_direct_io,
virtual_file_io_mode,
concurrent_tenant_warmup,
concurrent_tenant_size_logical_size_queries,
virtual_file_io_engine,
io_buffer_alignment,
tenant_config,
} = config_toml;
@@ -368,8 +365,6 @@ impl PageServerConf {
max_vectored_read_bytes,
image_compression,
ephemeral_bytes_per_memory_kb,
virtual_file_direct_io,
io_buffer_alignment,
// ------------------------------------------------------------
// fields that require additional validation or custom handling
@@ -408,6 +403,7 @@ impl PageServerConf {
l0_flush: l0_flush
.map(crate::l0_flush::L0FlushConfig::from)
.unwrap_or_default(),
virtual_file_io_mode: virtual_file_io_mode.unwrap_or(virtual_file::IoMode::preferred()),
};
// ------------------------------------------------------------

View File

@@ -17,6 +17,7 @@ use hyper::header;
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::virtual_file::IoMode;
use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::DownloadRemoteLayersTaskSpawnRequest;
use pageserver_api::models::IngestAuxFilesRequest;
@@ -2381,17 +2382,13 @@ async fn put_io_engine_handler(
json_response(StatusCode::OK, ())
}
async fn put_io_alignment_handler(
async fn put_io_mode_handler(
mut r: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&r, None)?;
let align: usize = json_request(&mut r).await?;
crate::virtual_file::set_io_buffer_alignment(align).map_err(|align| {
ApiError::PreconditionFailed(
format!("Requested io alignment ({align}) is not a power of two").into(),
)
})?;
let mode: IoMode = json_request(&mut r).await?;
crate::virtual_file::set_io_mode(mode);
json_response(StatusCode::OK, ())
}
@@ -3082,9 +3079,7 @@ pub fn make_router(
|r| api_handler(r, timeline_collect_keyspace),
)
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
.put("/v1/io_alignment", |r| {
api_handler(r, put_io_alignment_handler)
})
.put("/v1/io_mode", |r| api_handler(r, put_io_mode_handler))
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/force_aux_policy_switch",
|r| api_handler(r, force_aux_policy_switch_handler),

View File

@@ -84,7 +84,7 @@ impl Drop for EphemeralFile {
fn drop(&mut self) {
// unlink the file
// we are clear to do this, because we have entered a gate
let path = &self.buffered_writer.as_inner().as_inner().path;
let path = self.buffered_writer.as_inner().as_inner().path();
let res = std::fs::remove_file(path);
if let Err(e) = res {
if e.kind() != std::io::ErrorKind::NotFound {
@@ -356,7 +356,7 @@ mod tests {
}
let file_contents =
std::fs::read(&file.buffered_writer.as_inner().as_inner().path).unwrap();
std::fs::read(file.buffered_writer.as_inner().as_inner().path()).unwrap();
assert_eq!(file_contents, &content[0..cap]);
let buffer_contents = file.buffered_writer.inspect_buffer();
@@ -392,7 +392,7 @@ mod tests {
.buffered_writer
.as_inner()
.as_inner()
.path
.path()
.metadata()
.unwrap();
assert_eq!(

View File

@@ -950,6 +950,7 @@ impl<'a> TenantDownloader<'a> {
let cancel = &self.secondary_state.cancel;
let opts = DownloadOpts {
etag: prev_etag.cloned(),
..Default::default()
};
backoff::retry(

View File

@@ -573,7 +573,7 @@ impl DeltaLayerWriterInner {
ensure!(
metadata.len() <= S3_UPLOAD_LIMIT,
"Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
file.path,
file.path(),
metadata.len()
);
@@ -791,7 +791,7 @@ impl DeltaLayerInner {
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path, ctx)
let file = VirtualFile::open_v2(path, ctx)
.await
.context("open layer file")?;
@@ -1022,7 +1022,7 @@ impl DeltaLayerInner {
blob_meta.key,
PageReconstructError::Other(anyhow!(
"Failed to read blobs from virtual file {}: {}",
self.file.path,
self.file.path(),
kind
)),
);
@@ -1048,7 +1048,7 @@ impl DeltaLayerInner {
meta.meta.key,
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to decompress blob from virtual file {}",
self.file.path,
self.file.path(),
))),
);
@@ -1066,7 +1066,7 @@ impl DeltaLayerInner {
meta.meta.key,
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to deserialize blob from virtual file {}",
self.file.path,
self.file.path(),
))),
);
@@ -1198,7 +1198,6 @@ impl DeltaLayerInner {
let mut prev: Option<(Key, Lsn, BlobRef)> = None;
let mut read_builder: Option<ChunkedVectoredReadBuilder> = None;
let align = virtual_file::get_io_buffer_alignment();
let max_read_size = self
.max_vectored_read_bytes
@@ -1247,7 +1246,6 @@ impl DeltaLayerInner {
offsets.end.pos(),
meta,
max_read_size,
align,
))
}
} else {

View File

@@ -389,7 +389,7 @@ impl ImageLayerInner {
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path, ctx)
let file = VirtualFile::open_v2(path, ctx)
.await
.context("open layer file")?;
let file_id = page_cache::next_file_id();
@@ -626,7 +626,7 @@ impl ImageLayerInner {
meta.meta.key,
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to decompress blob from virtual file {}",
self.file.path,
self.file.path(),
))),
);
@@ -647,7 +647,7 @@ impl ImageLayerInner {
blob_meta.key,
PageReconstructError::from(anyhow!(
"Failed to read blobs from virtual file {}: {}",
self.file.path,
self.file.path(),
kind
)),
);

View File

@@ -194,8 +194,6 @@ pub(crate) struct ChunkedVectoredReadBuilder {
/// Start offset and metadata for each blob in this read
blobs_at: VecMap<u64, BlobMeta>,
max_read_size: Option<usize>,
/// Chunk size reads are coalesced into.
chunk_size: usize,
}
/// Computes x / d rounded up.
@@ -204,6 +202,7 @@ fn div_round_up(x: usize, d: usize) -> usize {
}
impl ChunkedVectoredReadBuilder {
const CHUNK_SIZE: usize = virtual_file::get_io_buffer_alignment();
/// Start building a new vectored read.
///
/// Note that by design, this does not check against reading more than `max_read_size` to
@@ -214,21 +213,19 @@ impl ChunkedVectoredReadBuilder {
end_offset: u64,
meta: BlobMeta,
max_read_size: Option<usize>,
chunk_size: usize,
) -> Self {
let mut blobs_at = VecMap::default();
blobs_at
.append(start_offset, meta)
.expect("First insertion always succeeds");
let start_blk_no = start_offset as usize / chunk_size;
let end_blk_no = div_round_up(end_offset as usize, chunk_size);
let start_blk_no = start_offset as usize / Self::CHUNK_SIZE;
let end_blk_no = div_round_up(end_offset as usize, Self::CHUNK_SIZE);
Self {
start_blk_no,
end_blk_no,
blobs_at,
max_read_size,
chunk_size,
}
}
@@ -237,18 +234,12 @@ impl ChunkedVectoredReadBuilder {
end_offset: u64,
meta: BlobMeta,
max_read_size: usize,
align: usize,
) -> Self {
Self::new_impl(start_offset, end_offset, meta, Some(max_read_size), align)
Self::new_impl(start_offset, end_offset, meta, Some(max_read_size))
}
pub(crate) fn new_streaming(
start_offset: u64,
end_offset: u64,
meta: BlobMeta,
align: usize,
) -> Self {
Self::new_impl(start_offset, end_offset, meta, None, align)
pub(crate) fn new_streaming(start_offset: u64, end_offset: u64, meta: BlobMeta) -> Self {
Self::new_impl(start_offset, end_offset, meta, None)
}
/// Attempts to extend the current read with a new blob if the new blob resides in the same or the immediate next chunk.
@@ -256,12 +247,12 @@ impl ChunkedVectoredReadBuilder {
/// The resulting size also must be below the max read size.
pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
tracing::trace!(start, end, "trying to extend");
let start_blk_no = start as usize / self.chunk_size;
let end_blk_no = div_round_up(end as usize, self.chunk_size);
let start_blk_no = start as usize / Self::CHUNK_SIZE;
let end_blk_no = div_round_up(end as usize, Self::CHUNK_SIZE);
let not_limited_by_max_read_size = {
if let Some(max_read_size) = self.max_read_size {
let coalesced_size = (end_blk_no - self.start_blk_no) * self.chunk_size;
let coalesced_size = (end_blk_no - self.start_blk_no) * Self::CHUNK_SIZE;
coalesced_size <= max_read_size
} else {
true
@@ -292,12 +283,12 @@ impl ChunkedVectoredReadBuilder {
}
pub(crate) fn size(&self) -> usize {
(self.end_blk_no - self.start_blk_no) * self.chunk_size
(self.end_blk_no - self.start_blk_no) * Self::CHUNK_SIZE
}
pub(crate) fn build(self) -> VectoredRead {
let start = (self.start_blk_no * self.chunk_size) as u64;
let end = (self.end_blk_no * self.chunk_size) as u64;
let start = (self.start_blk_no * Self::CHUNK_SIZE) as u64;
let end = (self.end_blk_no * Self::CHUNK_SIZE) as u64;
VectoredRead {
start,
end,
@@ -328,18 +319,14 @@ pub struct VectoredReadPlanner {
prev: Option<(Key, Lsn, u64, BlobFlag)>,
max_read_size: usize,
align: usize,
}
impl VectoredReadPlanner {
pub fn new(max_read_size: usize) -> Self {
let align = virtual_file::get_io_buffer_alignment();
Self {
blobs: BTreeMap::new(),
prev: None,
max_read_size,
align,
}
}
@@ -418,7 +405,6 @@ impl VectoredReadPlanner {
end_offset,
BlobMeta { key, lsn },
self.max_read_size,
self.align,
);
let prev_read_builder = current_read_builder.replace(next_read_builder);
@@ -472,13 +458,13 @@ impl<'a> VectoredBlobReader<'a> {
);
if cfg!(debug_assertions) {
let align = virtual_file::get_io_buffer_alignment() as u64;
const ALIGN: u64 = virtual_file::get_io_buffer_alignment() as u64;
debug_assert_eq!(
read.start % align,
read.start % ALIGN,
0,
"Read start at {} does not satisfy the required io buffer alignment ({} bytes)",
read.start,
align
ALIGN
);
}
@@ -553,22 +539,18 @@ pub struct StreamingVectoredReadPlanner {
max_cnt: usize,
/// Size of the current batch
cnt: usize,
align: usize,
}
impl StreamingVectoredReadPlanner {
pub fn new(max_read_size: u64, max_cnt: usize) -> Self {
assert!(max_cnt > 0);
assert!(max_read_size > 0);
let align = virtual_file::get_io_buffer_alignment();
Self {
read_builder: None,
prev: None,
max_cnt,
max_read_size,
cnt: 0,
align,
}
}
@@ -621,7 +603,6 @@ impl StreamingVectoredReadPlanner {
start_offset,
end_offset,
BlobMeta { key, lsn },
self.align,
))
};
}
@@ -656,9 +637,9 @@ mod tests {
use super::*;
fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
let align = virtual_file::get_io_buffer_alignment() as u64;
assert_eq!(read.start % align, 0);
assert_eq!(read.start / align, offset_range.first().unwrap().2 / align);
const ALIGN: u64 = virtual_file::get_io_buffer_alignment() as u64;
assert_eq!(read.start % ALIGN, 0);
assert_eq!(read.start / ALIGN, offset_range.first().unwrap().2 / ALIGN);
let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect();
@@ -676,32 +657,27 @@ mod tests {
fn planner_chunked_coalesce_all_test() {
use crate::virtual_file;
let chunk_size = virtual_file::get_io_buffer_alignment() as u64;
const CHUNK_SIZE: u64 = virtual_file::get_io_buffer_alignment() as u64;
// The test explicitly does not check chunk size < 512
if chunk_size < 512 {
return;
}
let max_read_size = chunk_size as usize * 8;
let max_read_size = CHUNK_SIZE as usize * 8;
let key = Key::MIN;
let lsn = Lsn(0);
let blob_descriptions = [
(key, lsn, chunk_size / 8, BlobFlag::None), // Read 1 BEGIN
(key, lsn, chunk_size / 4, BlobFlag::Ignore), // Gap
(key, lsn, chunk_size / 2, BlobFlag::None),
(key, lsn, chunk_size - 2, BlobFlag::Ignore), // Gap
(key, lsn, chunk_size, BlobFlag::None),
(key, lsn, chunk_size * 2 - 1, BlobFlag::None),
(key, lsn, chunk_size * 2 + 1, BlobFlag::Ignore), // Gap
(key, lsn, chunk_size * 3 + 1, BlobFlag::None),
(key, lsn, chunk_size * 5 + 1, BlobFlag::None),
(key, lsn, chunk_size * 6 + 1, BlobFlag::Ignore), // skipped chunk size, but not a chunk: should coalesce.
(key, lsn, chunk_size * 7 + 1, BlobFlag::None),
(key, lsn, chunk_size * 8, BlobFlag::None), // Read 2 BEGIN (b/c max_read_size)
(key, lsn, chunk_size * 9, BlobFlag::Ignore), // ==== skipped a chunk
(key, lsn, chunk_size * 10, BlobFlag::None), // Read 3 BEGIN (cannot coalesce)
(key, lsn, CHUNK_SIZE / 8, BlobFlag::None), // Read 1 BEGIN
(key, lsn, CHUNK_SIZE / 4, BlobFlag::Ignore), // Gap
(key, lsn, CHUNK_SIZE / 2, BlobFlag::None),
(key, lsn, CHUNK_SIZE - 2, BlobFlag::Ignore), // Gap
(key, lsn, CHUNK_SIZE, BlobFlag::None),
(key, lsn, CHUNK_SIZE * 2 - 1, BlobFlag::None),
(key, lsn, CHUNK_SIZE * 2 + 1, BlobFlag::Ignore), // Gap
(key, lsn, CHUNK_SIZE * 3 + 1, BlobFlag::None),
(key, lsn, CHUNK_SIZE * 5 + 1, BlobFlag::None),
(key, lsn, CHUNK_SIZE * 6 + 1, BlobFlag::Ignore), // skipped chunk size, but not a chunk: should coalesce.
(key, lsn, CHUNK_SIZE * 7 + 1, BlobFlag::None),
(key, lsn, CHUNK_SIZE * 8, BlobFlag::None), // Read 2 BEGIN (b/c max_read_size)
(key, lsn, CHUNK_SIZE * 9, BlobFlag::Ignore), // ==== skipped a chunk
(key, lsn, CHUNK_SIZE * 10, BlobFlag::None), // Read 3 BEGIN (cannot coalesce)
];
let ranges = [
@@ -780,19 +756,19 @@ mod tests {
#[test]
fn planner_replacement_test() {
let chunk_size = virtual_file::get_io_buffer_alignment() as u64;
let max_read_size = 128 * chunk_size as usize;
const CHUNK_SIZE: u64 = virtual_file::get_io_buffer_alignment() as u64;
let max_read_size = 128 * CHUNK_SIZE as usize;
let first_key = Key::MIN;
let second_key = first_key.next();
let lsn = Lsn(0);
let blob_descriptions = vec![
(first_key, lsn, 0, BlobFlag::None), // First in read 1
(first_key, lsn, chunk_size, BlobFlag::None), // Last in read 1
(second_key, lsn, 2 * chunk_size, BlobFlag::ReplaceAll),
(second_key, lsn, 3 * chunk_size, BlobFlag::None),
(second_key, lsn, 4 * chunk_size, BlobFlag::ReplaceAll), // First in read 2
(second_key, lsn, 5 * chunk_size, BlobFlag::None), // Last in read 2
(first_key, lsn, CHUNK_SIZE, BlobFlag::None), // Last in read 1
(second_key, lsn, 2 * CHUNK_SIZE, BlobFlag::ReplaceAll),
(second_key, lsn, 3 * CHUNK_SIZE, BlobFlag::None),
(second_key, lsn, 4 * CHUNK_SIZE, BlobFlag::ReplaceAll), // First in read 2
(second_key, lsn, 5 * CHUNK_SIZE, BlobFlag::None), // Last in read 2
];
let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
@@ -802,7 +778,7 @@ mod tests {
planner.handle(key, lsn, offset, flag);
}
planner.handle_range_end(6 * chunk_size);
planner.handle_range_end(6 * CHUNK_SIZE);
let reads = planner.finish();
assert_eq!(reads.len(), 2);
@@ -947,7 +923,6 @@ mod tests {
let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16;
let mut buf = BytesMut::with_capacity(reserved_bytes);
let align = virtual_file::get_io_buffer_alignment();
let vectored_blob_reader = VectoredBlobReader::new(&file);
let meta = BlobMeta {
key: Key::MIN,
@@ -959,8 +934,7 @@ mod tests {
if idx + 1 == offsets.len() {
continue;
}
let read_builder =
ChunkedVectoredReadBuilder::new(*offset, *end, meta, 16 * 4096, align);
let read_builder = ChunkedVectoredReadBuilder::new(*offset, *end, meta, 16 * 4096);
let read = read_builder.build();
let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?;
assert_eq!(result.blobs.len(), 1);

View File

@@ -23,10 +23,12 @@ use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
use pageserver_api::shard::TenantShardId;
use std::fs::File;
use std::io::{Error, ErrorKind, Seek, SeekFrom};
#[cfg(target_os = "linux")]
use std::os::unix::fs::OpenOptionsExt;
use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;
@@ -38,7 +40,7 @@ pub use io_engine::FeatureTestResult as IoEngineFeatureTestResult;
mod metadata;
mod open_options;
use self::owned_buffers_io::write::OwnedAsyncWriter;
pub(crate) use api::DirectIoMode;
pub(crate) use api::IoMode;
pub(crate) use io_engine::IoEngineKind;
pub(crate) use metadata::Metadata;
pub(crate) use open_options::*;
@@ -61,6 +63,171 @@ pub(crate) mod owned_buffers_io {
}
}
#[derive(Debug)]
pub struct VirtualFile {
inner: VirtualFileInner,
_mode: IoMode,
}
impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
pub async fn open<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
let inner = VirtualFileInner::open(path, ctx).await?;
Ok(VirtualFile {
inner,
_mode: IoMode::Buffered,
})
}
/// Open a file in read-only mode. Like File::open.
///
/// `O_DIRECT` will be enabled base on `virtual_file_io_mode`.
pub async fn open_v2<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
Self::open_with_options_v2(path.as_ref(), OpenOptions::new().read(true), ctx).await
}
pub async fn create<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
let inner = VirtualFileInner::create(path, ctx).await?;
Ok(VirtualFile {
inner,
_mode: IoMode::Buffered,
})
}
pub async fn create_v2<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
VirtualFile::open_with_options_v2(
path.as_ref(),
OpenOptions::new().write(true).create(true).truncate(true),
ctx,
)
.await
}
pub async fn open_with_options<P: AsRef<Utf8Path>>(
path: P,
open_options: &OpenOptions,
ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
) -> Result<Self, std::io::Error> {
let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
Ok(VirtualFile {
inner,
_mode: IoMode::Buffered,
})
}
pub async fn open_with_options_v2<P: AsRef<Utf8Path>>(
path: P,
open_options: &OpenOptions,
ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
) -> Result<Self, std::io::Error> {
let file = match get_io_mode() {
IoMode::Buffered => {
let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
VirtualFile {
inner,
_mode: IoMode::Buffered,
}
}
#[cfg(target_os = "linux")]
IoMode::Direct => {
let inner = VirtualFileInner::open_with_options(
path,
open_options.clone().custom_flags(nix::libc::O_DIRECT),
ctx,
)
.await?;
VirtualFile {
inner,
_mode: IoMode::Direct,
}
}
};
Ok(file)
}
pub fn path(&self) -> &Utf8Path {
self.inner.path.as_path()
}
pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
final_path: Utf8PathBuf,
tmp_path: Utf8PathBuf,
content: B,
) -> std::io::Result<()> {
VirtualFileInner::crashsafe_overwrite(final_path, tmp_path, content).await
}
pub async fn sync_all(&self) -> Result<(), Error> {
self.inner.sync_all().await
}
pub async fn sync_data(&self) -> Result<(), Error> {
self.inner.sync_data().await
}
pub async fn metadata(&self) -> Result<Metadata, Error> {
self.inner.metadata().await
}
pub fn remove(self) {
self.inner.remove();
}
pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
self.inner.seek(pos).await
}
pub async fn read_exact_at<Buf>(
&self,
slice: Slice<Buf>,
offset: u64,
ctx: &RequestContext,
) -> Result<Slice<Buf>, Error>
where
Buf: IoBufMut + Send,
{
self.inner.read_exact_at(slice, offset, ctx).await
}
pub async fn read_exact_at_page(
&self,
page: PageWriteGuard<'static>,
offset: u64,
ctx: &RequestContext,
) -> Result<PageWriteGuard<'static>, Error> {
self.inner.read_exact_at_page(page, offset, ctx).await
}
pub async fn write_all_at<Buf: IoBuf + Send>(
&self,
buf: FullSlice<Buf>,
offset: u64,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<(), Error>) {
self.inner.write_all_at(buf, offset, ctx).await
}
pub async fn write_all<Buf: IoBuf + Send>(
&mut self,
buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<usize, Error>) {
self.inner.write_all(buf, ctx).await
}
}
///
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
/// the underlying file is closed if the system is low on file descriptors,
@@ -77,7 +244,7 @@ pub(crate) mod owned_buffers_io {
/// 'tag' field is used to detect whether the handle still is valid or not.
///
#[derive(Debug)]
pub struct VirtualFile {
pub struct VirtualFileInner {
/// Lazy handle to the global file descriptor cache. The slot that this points to
/// might contain our File, or it may be empty, or it may contain a File that
/// belongs to a different VirtualFile.
@@ -350,12 +517,12 @@ macro_rules! with_file {
}};
}
impl VirtualFile {
impl VirtualFileInner {
/// Open a file in read-only mode. Like File::open.
pub async fn open<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<VirtualFile, std::io::Error> {
) -> Result<VirtualFileInner, std::io::Error> {
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await
}
@@ -364,7 +531,7 @@ impl VirtualFile {
pub async fn create<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
) -> Result<VirtualFile, std::io::Error> {
) -> Result<VirtualFileInner, std::io::Error> {
Self::open_with_options(
path.as_ref(),
OpenOptions::new().write(true).create(true).truncate(true),
@@ -382,7 +549,7 @@ impl VirtualFile {
path: P,
open_options: &OpenOptions,
_ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
) -> Result<VirtualFile, std::io::Error> {
) -> Result<VirtualFileInner, std::io::Error> {
let path_ref = path.as_ref();
let path_str = path_ref.to_string();
let parts = path_str.split('/').collect::<Vec<&str>>();
@@ -423,7 +590,7 @@ impl VirtualFile {
reopen_options.create_new(false);
reopen_options.truncate(false);
let vfile = VirtualFile {
let vfile = VirtualFileInner {
handle: RwLock::new(handle),
pos: 0,
path: path_ref.to_path_buf(),
@@ -1034,6 +1201,21 @@ impl tokio_epoll_uring::IoFd for FileGuard {
#[cfg(test)]
impl VirtualFile {
pub(crate) async fn read_blk(
&self,
blknum: u32,
ctx: &RequestContext,
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
self.inner.read_blk(blknum, ctx).await
}
async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
self.inner.read_to_end(buf, ctx).await
}
}
#[cfg(test)]
impl VirtualFileInner {
pub(crate) async fn read_blk(
&self,
blknum: u32,
@@ -1067,7 +1249,7 @@ impl VirtualFile {
}
}
impl Drop for VirtualFile {
impl Drop for VirtualFileInner {
/// If a VirtualFile is dropped, close the underlying file if it was open.
fn drop(&mut self) {
let handle = self.handle.get_mut();
@@ -1143,15 +1325,10 @@ impl OpenFiles {
/// server startup.
///
#[cfg(not(test))]
pub fn init(num_slots: usize, engine: IoEngineKind, io_buffer_alignment: usize) {
pub fn init(num_slots: usize, engine: IoEngineKind) {
if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
panic!("virtual_file::init called twice");
}
if set_io_buffer_alignment(io_buffer_alignment).is_err() {
panic!(
"IO buffer alignment needs to be a power of two and greater than 512, got {io_buffer_alignment}"
);
}
io_engine::init(engine);
crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
}
@@ -1175,47 +1352,20 @@ fn get_open_files() -> &'static OpenFiles {
}
}
static IO_BUFFER_ALIGNMENT: AtomicUsize = AtomicUsize::new(DEFAULT_IO_BUFFER_ALIGNMENT);
/// Returns true if the alignment is a power of two and is greater or equal to 512.
fn is_valid_io_buffer_alignment(align: usize) -> bool {
align.is_power_of_two() && align >= 512
}
/// Sets IO buffer alignment requirement. Returns error if the alignment requirement is
/// not a power of two or less than 512 bytes.
#[allow(unused)]
pub(crate) fn set_io_buffer_alignment(align: usize) -> Result<(), usize> {
if is_valid_io_buffer_alignment(align) {
IO_BUFFER_ALIGNMENT.store(align, std::sync::atomic::Ordering::Relaxed);
Ok(())
} else {
Err(align)
}
}
/// Gets the io buffer alignment.
///
/// This function should be used for getting the actual alignment value to use.
pub(crate) fn get_io_buffer_alignment() -> usize {
let align = IO_BUFFER_ALIGNMENT.load(std::sync::atomic::Ordering::Relaxed);
if cfg!(test) {
let env_var_name = "NEON_PAGESERVER_UNIT_TEST_IO_BUFFER_ALIGNMENT";
if let Some(test_align) = utils::env::var(env_var_name) {
if is_valid_io_buffer_alignment(test_align) {
test_align
} else {
panic!("IO buffer alignment needs to be a power of two and greater than 512, got {test_align}");
}
} else {
align
}
} else {
align
}
pub(crate) const fn get_io_buffer_alignment() -> usize {
DEFAULT_IO_BUFFER_ALIGNMENT
}
static IO_MODE: AtomicU8 = AtomicU8::new(IoMode::preferred() as u8);
pub(crate) fn set_io_mode(mode: IoMode) {
IO_MODE.store(mode as u8, std::sync::atomic::Ordering::Relaxed);
}
pub(crate) fn get_io_mode() -> IoMode {
IoMode::try_from(IO_MODE.load(Ordering::Relaxed)).unwrap()
}
#[cfg(test)]
mod tests {
use crate::context::DownloadBehavior;
@@ -1524,7 +1674,7 @@ mod tests {
// Open the file many times.
let mut files = Vec::new();
for _ in 0..VIRTUAL_FILES {
let f = VirtualFile::open_with_options(
let f = VirtualFileInner::open_with_options(
&test_file_path,
OpenOptions::new().read(true),
&ctx,
@@ -1576,7 +1726,7 @@ mod tests {
let path = testdir.join("myfile");
let tmp_path = testdir.join("myfile.tmp");
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
@@ -1585,7 +1735,7 @@ mod tests {
assert!(!tmp_path.exists());
drop(file);
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
@@ -1608,7 +1758,7 @@ mod tests {
std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
assert!(tmp_path.exists());
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
.await
.unwrap();

29
poetry.lock generated
View File

@@ -2095,6 +2095,7 @@ files = [
{file = "psycopg2_binary-2.9.9-cp311-cp311-win32.whl", hash = "sha256:dc4926288b2a3e9fd7b50dc6a1909a13bbdadfc67d93f3374d984e56f885579d"},
{file = "psycopg2_binary-2.9.9-cp311-cp311-win_amd64.whl", hash = "sha256:b76bedd166805480ab069612119ea636f5ab8f8771e640ae103e05a4aae3e417"},
{file = "psycopg2_binary-2.9.9-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:8532fd6e6e2dc57bcb3bc90b079c60de896d2128c5d9d6f24a63875a95a088cf"},
{file = "psycopg2_binary-2.9.9-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:b0605eaed3eb239e87df0d5e3c6489daae3f7388d455d0c0b4df899519c6a38d"},
{file = "psycopg2_binary-2.9.9-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8f8544b092a29a6ddd72f3556a9fcf249ec412e10ad28be6a0c0d948924f2212"},
{file = "psycopg2_binary-2.9.9-cp312-cp312-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:2d423c8d8a3c82d08fe8af900ad5b613ce3632a1249fd6a223941d0735fce493"},
{file = "psycopg2_binary-2.9.9-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:2e5afae772c00980525f6d6ecf7cbca55676296b580c0e6abb407f15f3706996"},
@@ -2103,6 +2104,8 @@ files = [
{file = "psycopg2_binary-2.9.9-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:cb16c65dcb648d0a43a2521f2f0a2300f40639f6f8c1ecbc662141e4e3e1ee07"},
{file = "psycopg2_binary-2.9.9-cp312-cp312-musllinux_1_1_ppc64le.whl", hash = "sha256:911dda9c487075abd54e644ccdf5e5c16773470a6a5d3826fda76699410066fb"},
{file = "psycopg2_binary-2.9.9-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:57fede879f08d23c85140a360c6a77709113efd1c993923c59fde17aa27599fe"},
{file = "psycopg2_binary-2.9.9-cp312-cp312-win32.whl", hash = "sha256:64cf30263844fa208851ebb13b0732ce674d8ec6a0c86a4e160495d299ba3c93"},
{file = "psycopg2_binary-2.9.9-cp312-cp312-win_amd64.whl", hash = "sha256:81ff62668af011f9a48787564ab7eded4e9fb17a4a6a74af5ffa6a457400d2ab"},
{file = "psycopg2_binary-2.9.9-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:2293b001e319ab0d869d660a704942c9e2cce19745262a8aba2115ef41a0a42a"},
{file = "psycopg2_binary-2.9.9-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:03ef7df18daf2c4c07e2695e8cfd5ee7f748a1d54d802330985a78d2a5a6dca9"},
{file = "psycopg2_binary-2.9.9-cp37-cp37m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0a602ea5aff39bb9fac6308e9c9d82b9a35c2bf288e184a816002c9fae930b77"},
@@ -2584,6 +2587,7 @@ files = [
{file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"},
{file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"},
{file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"},
{file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"},
{file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"},
{file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"},
{file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"},
@@ -2729,21 +2733,22 @@ use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"]
[[package]]
name = "responses"
version = "0.21.0"
version = "0.25.3"
description = "A utility library for mocking out the `requests` Python library."
optional = false
python-versions = ">=3.7"
python-versions = ">=3.8"
files = [
{file = "responses-0.21.0-py3-none-any.whl", hash = "sha256:2dcc863ba63963c0c3d9ee3fa9507cbe36b7d7b0fccb4f0bdfd9e96c539b1487"},
{file = "responses-0.21.0.tar.gz", hash = "sha256:b82502eb5f09a0289d8e209e7bad71ef3978334f56d09b444253d5ad67bf5253"},
{file = "responses-0.25.3-py3-none-any.whl", hash = "sha256:521efcbc82081ab8daa588e08f7e8a64ce79b91c39f6e62199b19159bea7dbcb"},
{file = "responses-0.25.3.tar.gz", hash = "sha256:617b9247abd9ae28313d57a75880422d55ec63c29d33d629697590a034358dba"},
]
[package.dependencies]
requests = ">=2.0,<3.0"
urllib3 = ">=1.25.10"
pyyaml = "*"
requests = ">=2.30.0,<3.0"
urllib3 = ">=1.25.10,<3.0"
[package.extras]
tests = ["coverage (>=6.0.0)", "flake8", "mypy", "pytest (>=7.0.0)", "pytest-asyncio", "pytest-cov", "pytest-localserver", "types-mock", "types-requests"]
tests = ["coverage (>=6.0.0)", "flake8", "mypy", "pytest (>=7.0.0)", "pytest-asyncio", "pytest-cov", "pytest-httpserver", "tomli", "tomli-w", "types-PyYAML", "types-requests"]
[[package]]
name = "rfc3339-validator"
@@ -3137,6 +3142,16 @@ files = [
{file = "wrapt-1.14.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:8ad85f7f4e20964db4daadcab70b47ab05c7c1cf2a7c1e51087bfaa83831854c"},
{file = "wrapt-1.14.1-cp310-cp310-win32.whl", hash = "sha256:a9a52172be0b5aae932bef82a79ec0a0ce87288c7d132946d645eba03f0ad8a8"},
{file = "wrapt-1.14.1-cp310-cp310-win_amd64.whl", hash = "sha256:6d323e1554b3d22cfc03cd3243b5bb815a51f5249fdcbb86fda4bf62bab9e164"},
{file = "wrapt-1.14.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:ecee4132c6cd2ce5308e21672015ddfed1ff975ad0ac8d27168ea82e71413f55"},
{file = "wrapt-1.14.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:2020f391008ef874c6d9e208b24f28e31bcb85ccff4f335f15a3251d222b92d9"},
{file = "wrapt-1.14.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2feecf86e1f7a86517cab34ae6c2f081fd2d0dac860cb0c0ded96d799d20b335"},
{file = "wrapt-1.14.1-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:240b1686f38ae665d1b15475966fe0472f78e71b1b4903c143a842659c8e4cb9"},
{file = "wrapt-1.14.1-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a9008dad07d71f68487c91e96579c8567c98ca4c3881b9b113bc7b33e9fd78b8"},
{file = "wrapt-1.14.1-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:6447e9f3ba72f8e2b985a1da758767698efa72723d5b59accefd716e9e8272bf"},
{file = "wrapt-1.14.1-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:acae32e13a4153809db37405f5eba5bac5fbe2e2ba61ab227926a22901051c0a"},
{file = "wrapt-1.14.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:49ef582b7a1152ae2766557f0550a9fcbf7bbd76f43fbdc94dd3bf07cc7168be"},
{file = "wrapt-1.14.1-cp311-cp311-win32.whl", hash = "sha256:358fe87cc899c6bb0ddc185bf3dbfa4ba646f05b1b0b9b5a27c2cb92c2cea204"},
{file = "wrapt-1.14.1-cp311-cp311-win_amd64.whl", hash = "sha256:26046cd03936ae745a502abf44dac702a5e6880b2b01c29aea8ddf3353b68224"},
{file = "wrapt-1.14.1-cp35-cp35m-manylinux1_i686.whl", hash = "sha256:43ca3bbbe97af00f49efb06e352eae40434ca9d915906f77def219b88e85d907"},
{file = "wrapt-1.14.1-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:6b1a564e6cb69922c7fe3a678b9f9a3c54e72b469875aa8018f18b4d1dd1adf3"},
{file = "wrapt-1.14.1-cp35-cp35m-manylinux2010_i686.whl", hash = "sha256:00b6d4ea20a906c0ca56d84f93065b398ab74b927a7a3dbd470f6fc503f95dc3"},

View File

@@ -39,7 +39,7 @@ http.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
hyper0.workspace = true
hyper1 = { package = "hyper", version = "1.2", features = ["server"] }
hyper = { workspace = true, features = ["server", "http1", "http2"] }
hyper-util = { version = "0.1", features = ["server", "http1", "http2", "tokio"] }
http-body-util = { version = "0.1" }
indexmap.workspace = true
@@ -77,7 +77,7 @@ subtle.workspace = true
thiserror.workspace = true
tikv-jemallocator.workspace = true
tikv-jemalloc-ctl = { workspace = true, features = ["use_std"] }
tokio-postgres.workspace = true
tokio-postgres = { workspace = true, features = ["with-serde_json-1"] }
tokio-postgres-rustls.workspace = true
tokio-rustls.workspace = true
tokio-util.workspace = true
@@ -101,7 +101,7 @@ jose-jwa = "0.1.2"
jose-jwk = { version = "0.1.2", features = ["p256", "p384", "rsa"] }
signature = "2"
ecdsa = "0.16"
p256 = "0.13"
p256 = { version = "0.13", features = ["jwk"] }
rsa = "0.9"
workspace_hack.workspace = true

View File

@@ -17,6 +17,8 @@ use crate::{
RoleName,
};
use super::ComputeCredentialKeys;
// TODO(conrad): make these configurable.
const CLOCK_SKEW_LEEWAY: Duration = Duration::from_secs(30);
const MIN_RENEW: Duration = Duration::from_secs(30);
@@ -241,7 +243,7 @@ impl JwkCacheEntryLock {
endpoint: EndpointId,
role_name: &RoleName,
fetch: &F,
) -> Result<(), anyhow::Error> {
) -> Result<ComputeCredentialKeys, anyhow::Error> {
// JWT compact form is defined to be
// <B64(Header)> || . || <B64(Payload)> || . || <B64(Signature)>
// where Signature = alg(<B64(Header)> || . || <B64(Payload)>);
@@ -300,9 +302,9 @@ impl JwkCacheEntryLock {
key => bail!("unsupported key type {key:?}"),
};
let payload = base64::decode_config(payload, base64::URL_SAFE_NO_PAD)
let payloadb = base64::decode_config(payload, base64::URL_SAFE_NO_PAD)
.context("Provided authentication token is not a valid JWT encoding")?;
let payload = serde_json::from_slice::<JwtPayload<'_>>(&payload)
let payload = serde_json::from_slice::<JwtPayload<'_>>(&payloadb)
.context("Provided authentication token is not a valid JWT encoding")?;
tracing::debug!(?payload, "JWT signature valid with claims");
@@ -327,7 +329,7 @@ impl JwkCacheEntryLock {
);
}
Ok(())
Ok(ComputeCredentialKeys::JwtPayload(payloadb))
}
}
@@ -339,7 +341,7 @@ impl JwkCache {
role_name: &RoleName,
fetch: &F,
jwt: &str,
) -> Result<(), anyhow::Error> {
) -> Result<ComputeCredentialKeys, anyhow::Error> {
// try with just a read lock first
let key = (endpoint.clone(), role_name.clone());
let entry = self.map.get(&key).as_deref().map(Arc::clone);
@@ -571,7 +573,7 @@ mod tests {
use bytes::Bytes;
use http::Response;
use http_body_util::Full;
use hyper1::service::service_fn;
use hyper::service::service_fn;
use hyper_util::rt::TokioIo;
use rand::rngs::OsRng;
use rsa::pkcs8::DecodePrivateKey;
@@ -736,7 +738,7 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
});
let listener = TcpListener::bind("0.0.0.0:0").await.unwrap();
let server = hyper1::server::conn::http1::Builder::new();
let server = hyper::server::conn::http1::Builder::new();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
loop {

View File

@@ -175,10 +175,12 @@ impl ComputeUserInfo {
}
}
#[cfg_attr(test, derive(Debug))]
pub(crate) enum ComputeCredentialKeys {
#[cfg(any(test, feature = "testing"))]
Password(Vec<u8>),
AuthKeys(AuthKeys),
JwtPayload(Vec<u8>),
None,
}

View File

@@ -309,7 +309,7 @@ impl NodeInfo {
#[cfg(any(test, feature = "testing"))]
ComputeCredentialKeys::Password(password) => self.config.password(password),
ComputeCredentialKeys::AuthKeys(auth_keys) => self.config.auth_keys(*auth_keys),
ComputeCredentialKeys::None => &mut self.config,
ComputeCredentialKeys::JwtPayload(_) | ComputeCredentialKeys::None => &mut self.config,
};
}
}

View File

@@ -1,5 +1,5 @@
use anyhow::{anyhow, bail};
use hyper::{header::CONTENT_TYPE, Body, Request, Response, StatusCode};
use hyper0::{header::CONTENT_TYPE, Body, Request, Response, StatusCode};
use measured::{text::BufferedTextEncoder, MetricGroup};
use metrics::NeonMetrics;
use std::{
@@ -21,7 +21,7 @@ async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
json_response(StatusCode::OK, "")
}
fn make_router(metrics: AppMetrics) -> RouterBuilder<hyper::Body, ApiError> {
fn make_router(metrics: AppMetrics) -> RouterBuilder<hyper0::Body, ApiError> {
let state = Arc::new(Mutex::new(PrometheusHandler {
encoder: BufferedTextEncoder::new(),
metrics,
@@ -45,7 +45,7 @@ pub async fn task_main(
let service = || RouterService::new(make_router(metrics).build()?);
hyper::Server::from_tcp(http_listener)?
hyper0::Server::from_tcp(http_listener)?
.serve(service().map_err(|e| anyhow!(e))?)
.await?;

View File

@@ -9,7 +9,7 @@ use std::time::Duration;
use anyhow::bail;
use bytes::Bytes;
use http_body_util::BodyExt;
use hyper1::body::Body;
use hyper::body::Body;
use serde::de::DeserializeOwned;
pub(crate) use reqwest::{Request, Response};

View File

@@ -90,8 +90,6 @@ use tokio::task::JoinError;
use tokio_util::sync::CancellationToken;
use tracing::warn;
extern crate hyper0 as hyper;
pub mod auth;
pub mod cache;
pub mod cancellation;

View File

@@ -7,7 +7,7 @@ use crate::metrics::{
WakeupFailureKind,
};
use crate::proxy::retry::{retry_after, should_retry};
use hyper1::StatusCode;
use hyper::StatusCode;
use tracing::{error, info, warn};
use super::connect_compute::ComputeConnectBackend;

View File

@@ -3,10 +3,12 @@ use std::{io, sync::Arc, time::Duration};
use async_trait::async_trait;
use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
use tokio::net::{lookup_host, TcpStream};
use tracing::{field::display, info};
use tokio_postgres::types::ToSql;
use tracing::{debug, field::display, info};
use crate::{
auth::{
self,
backend::{local::StaticAuthRules, ComputeCredentials, ComputeUserInfo},
check_peer_addr_is_in_list, AuthError,
},
@@ -32,10 +34,12 @@ use crate::{
use super::{
conn_pool::{poll_client, Client, ConnInfo, GlobalConnPool},
http_conn_pool::{self, poll_http2_client},
local_conn_pool::{self, LocalClient, LocalConnPool},
};
pub(crate) struct PoolingBackend {
pub(crate) http_conn_pool: Arc<super::http_conn_pool::GlobalConnPool>,
pub(crate) local_pool: Arc<LocalConnPool<tokio_postgres::Client>>,
pub(crate) pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
pub(crate) config: &'static ProxyConfig,
pub(crate) endpoint_rate_limiter: Arc<EndpointRateLimiter>,
@@ -112,7 +116,7 @@ impl PoolingBackend {
config: &AuthenticationConfig,
user_info: &ComputeUserInfo,
jwt: String,
) -> Result<(), AuthError> {
) -> Result<ComputeCredentials, AuthError> {
match &self.config.auth_backend {
crate::auth::Backend::ControlPlane(console, ()) => {
config
@@ -127,13 +131,16 @@ impl PoolingBackend {
.await
.map_err(|e| AuthError::auth_failed(e.to_string()))?;
Ok(())
Ok(ComputeCredentials {
info: user_info.clone(),
keys: crate::auth::backend::ComputeCredentialKeys::None,
})
}
crate::auth::Backend::ConsoleRedirect(_, ()) => Err(AuthError::auth_failed(
"JWT login over web auth proxy is not supported",
)),
crate::auth::Backend::Local(_) => {
config
let keys = config
.jwks_cache
.check_jwt(
ctx,
@@ -145,8 +152,10 @@ impl PoolingBackend {
.await
.map_err(|e| AuthError::auth_failed(e.to_string()))?;
// todo: rewrite JWT signature with key shared somehow between local proxy and postgres
Ok(())
Ok(ComputeCredentials {
info: user_info.clone(),
keys,
})
}
}
}
@@ -231,6 +240,77 @@ impl PoolingBackend {
)
.await
}
/// Connect to postgres over localhost.
///
/// We expect postgres to be started here, so we won't do any retries.
///
/// # Panics
///
/// Panics if called with a non-local_proxy backend.
#[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
pub(crate) async fn connect_to_local_postgres(
&self,
ctx: &RequestMonitoring,
conn_info: ConnInfo,
) -> Result<LocalClient<tokio_postgres::Client>, HttpConnError> {
if let Some(client) = self.local_pool.get(ctx, &conn_info)? {
return Ok(client);
}
let conn_id = uuid::Uuid::new_v4();
tracing::Span::current().record("conn_id", display(conn_id));
info!(%conn_id, "local_pool: opening a new connection '{conn_info}'");
let mut node_info = match &self.config.auth_backend {
auth::Backend::ControlPlane(_, ()) | auth::Backend::ConsoleRedirect(_, ()) => {
unreachable!("only local_proxy can connect to local postgres")
}
auth::Backend::Local(local) => local.node_info.clone(),
};
let config = node_info
.config
.user(&conn_info.user_info.user)
.dbname(&conn_info.dbname);
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let (client, connection) = config.connect(tokio_postgres::NoTls).await?;
drop(pause);
tracing::Span::current().record("pid", tracing::field::display(client.get_process_id()));
let handle = local_conn_pool::poll_client(
self.local_pool.clone(),
ctx,
conn_info,
client,
connection,
conn_id,
node_info.aux.clone(),
);
let kid = handle.get_client().get_process_id() as i64;
let jwk = p256::PublicKey::from(handle.key().verifying_key()).to_jwk();
debug!(kid, ?jwk, "setting up backend session state");
// initiates the auth session
handle
.get_client()
.query(
"select auth.init($1, $2);",
&[
&kid as &(dyn ToSql + Sync),
&tokio_postgres::types::Json(jwk),
],
)
.await?;
info!(?kid, "backend session state init");
Ok(handle)
}
}
#[derive(Debug, thiserror::Error)]
@@ -241,6 +321,8 @@ pub(crate) enum HttpConnError {
PostgresConnectionError(#[from] tokio_postgres::Error),
#[error("could not connection to local-proxy in compute")]
LocalProxyConnectionError(#[from] LocalProxyConnError),
#[error("could not parse JWT payload")]
JwtPayloadError(serde_json::Error),
#[error("could not get auth info")]
GetAuthInfo(#[from] GetAuthInfoError),
@@ -257,7 +339,7 @@ pub(crate) enum LocalProxyConnError {
#[error("error with connection to local-proxy")]
Io(#[source] std::io::Error),
#[error("could not establish h2 connection")]
H2(#[from] hyper1::Error),
H2(#[from] hyper::Error),
}
impl ReportableError for HttpConnError {
@@ -266,6 +348,7 @@ impl ReportableError for HttpConnError {
HttpConnError::ConnectionClosedAbruptly(_) => ErrorKind::Compute,
HttpConnError::PostgresConnectionError(p) => p.get_error_kind(),
HttpConnError::LocalProxyConnectionError(_) => ErrorKind::Compute,
HttpConnError::JwtPayloadError(_) => ErrorKind::User,
HttpConnError::GetAuthInfo(a) => a.get_error_kind(),
HttpConnError::AuthError(a) => a.get_error_kind(),
HttpConnError::WakeCompute(w) => w.get_error_kind(),
@@ -280,6 +363,7 @@ impl UserFacingError for HttpConnError {
HttpConnError::ConnectionClosedAbruptly(_) => self.to_string(),
HttpConnError::PostgresConnectionError(p) => p.to_string(),
HttpConnError::LocalProxyConnectionError(p) => p.to_string(),
HttpConnError::JwtPayloadError(p) => p.to_string(),
HttpConnError::GetAuthInfo(c) => c.to_string_client(),
HttpConnError::AuthError(c) => c.to_string_client(),
HttpConnError::WakeCompute(c) => c.to_string_client(),
@@ -296,6 +380,7 @@ impl CouldRetry for HttpConnError {
HttpConnError::PostgresConnectionError(e) => e.could_retry(),
HttpConnError::LocalProxyConnectionError(e) => e.could_retry(),
HttpConnError::ConnectionClosedAbruptly(_) => false,
HttpConnError::JwtPayloadError(_) => false,
HttpConnError::GetAuthInfo(_) => false,
HttpConnError::AuthError(_) => false,
HttpConnError::WakeCompute(_) => false,
@@ -481,7 +566,7 @@ async fn connect_http2(
};
};
let (client, connection) = hyper1::client::conn::http2::Builder::new(TokioExecutor::new())
let (client, connection) = hyper::client::conn::http2::Builder::new(TokioExecutor::new())
.timer(TokioTimer::new())
.keep_alive_interval(Duration::from_secs(20))
.keep_alive_while_idle(true)

View File

@@ -1,5 +1,5 @@
use dashmap::DashMap;
use hyper1::client::conn::http2;
use hyper::client::conn::http2;
use hyper_util::rt::{TokioExecutor, TokioIo};
use parking_lot::RwLock;
use rand::Rng;
@@ -18,9 +18,9 @@ use tracing::{info, info_span, Instrument};
use super::conn_pool::ConnInfo;
pub(crate) type Send = http2::SendRequest<hyper1::body::Incoming>;
pub(crate) type Send = http2::SendRequest<hyper::body::Incoming>;
pub(crate) type Connect =
http2::Connection<TokioIo<TcpStream>, hyper1::body::Incoming, TokioExecutor>;
http2::Connection<TokioIo<TcpStream>, hyper::body::Incoming, TokioExecutor>;
#[derive(Clone)]
struct ConnPoolEntry {

View File

@@ -11,7 +11,7 @@ use serde::Serialize;
use utils::http::error::ApiError;
/// Like [`ApiError::into_response`]
pub(crate) fn api_error_into_response(this: ApiError) -> Response<BoxBody<Bytes, hyper1::Error>> {
pub(crate) fn api_error_into_response(this: ApiError) -> Response<BoxBody<Bytes, hyper::Error>> {
match this {
ApiError::BadRequest(err) => HttpErrorBody::response_from_msg_and_status(
format!("{err:#?}"), // use debug printing so that we give the cause
@@ -67,12 +67,12 @@ impl HttpErrorBody {
fn response_from_msg_and_status(
msg: String,
status: StatusCode,
) -> Response<BoxBody<Bytes, hyper1::Error>> {
) -> Response<BoxBody<Bytes, hyper::Error>> {
HttpErrorBody { msg }.to_response(status)
}
/// Same as [`utils::http::error::HttpErrorBody::to_response`]
fn to_response(&self, status: StatusCode) -> Response<BoxBody<Bytes, hyper1::Error>> {
fn to_response(&self, status: StatusCode) -> Response<BoxBody<Bytes, hyper::Error>> {
Response::builder()
.status(status)
.header(http::header::CONTENT_TYPE, "application/json")
@@ -90,7 +90,7 @@ impl HttpErrorBody {
pub(crate) fn json_response<T: Serialize>(
status: StatusCode,
data: T,
) -> Result<Response<BoxBody<Bytes, hyper1::Error>>, ApiError> {
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ApiError> {
let json = serde_json::to_string(&data)
.context("Failed to serialize JSON response")
.map_err(ApiError::InternalServerError)?;

View File

@@ -0,0 +1,544 @@
use futures::{future::poll_fn, Future};
use jose_jwk::jose_b64::base64ct::{Base64UrlUnpadded, Encoding};
use p256::ecdsa::{Signature, SigningKey};
use parking_lot::RwLock;
use rand::rngs::OsRng;
use serde_json::Value;
use signature::Signer;
use std::task::{ready, Poll};
use std::{collections::HashMap, pin::pin, sync::Arc, sync::Weak, time::Duration};
use tokio::time::Instant;
use tokio_postgres::tls::NoTlsStream;
use tokio_postgres::types::ToSql;
use tokio_postgres::{AsyncMessage, ReadyForQueryStatus, Socket};
use tokio_util::sync::CancellationToken;
use typed_json::json;
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::Metrics;
use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
use crate::{context::RequestMonitoring, DbName, RoleName};
use tracing::{debug, error, warn, Span};
use tracing::{info, info_span, Instrument};
use super::backend::HttpConnError;
use super::conn_pool::{ClientInnerExt, ConnInfo};
struct ConnPoolEntry<C: ClientInnerExt> {
conn: ClientInner<C>,
_last_access: std::time::Instant,
}
// /// key id for the pg_session_jwt state
// static PG_SESSION_JWT_KID: AtomicU64 = AtomicU64::new(1);
// Per-endpoint connection pool, (dbname, username) -> DbUserConnPool
// Number of open connections is limited by the `max_conns_per_endpoint`.
pub(crate) struct EndpointConnPool<C: ClientInnerExt> {
pools: HashMap<(DbName, RoleName), DbUserConnPool<C>>,
total_conns: usize,
max_conns: usize,
global_pool_size_max_conns: usize,
}
impl<C: ClientInnerExt> EndpointConnPool<C> {
fn get_conn_entry(&mut self, db_user: (DbName, RoleName)) -> Option<ConnPoolEntry<C>> {
let Self {
pools, total_conns, ..
} = self;
pools
.get_mut(&db_user)
.and_then(|pool_entries| pool_entries.get_conn_entry(total_conns))
}
fn remove_client(&mut self, db_user: (DbName, RoleName), conn_id: uuid::Uuid) -> bool {
let Self {
pools, total_conns, ..
} = self;
if let Some(pool) = pools.get_mut(&db_user) {
let old_len = pool.conns.len();
pool.conns.retain(|conn| conn.conn.conn_id != conn_id);
let new_len = pool.conns.len();
let removed = old_len - new_len;
if removed > 0 {
Metrics::get()
.proxy
.http_pool_opened_connections
.get_metric()
.dec_by(removed as i64);
}
*total_conns -= removed;
removed > 0
} else {
false
}
}
fn put(pool: &RwLock<Self>, conn_info: &ConnInfo, client: ClientInner<C>) {
let conn_id = client.conn_id;
if client.is_closed() {
info!(%conn_id, "local_pool: throwing away connection '{conn_info}' because connection is closed");
return;
}
let global_max_conn = pool.read().global_pool_size_max_conns;
if pool.read().total_conns >= global_max_conn {
info!(%conn_id, "local_pool: throwing away connection '{conn_info}' because pool is full");
return;
}
// return connection to the pool
let mut returned = false;
let mut per_db_size = 0;
let total_conns = {
let mut pool = pool.write();
if pool.total_conns < pool.max_conns {
let pool_entries = pool.pools.entry(conn_info.db_and_user()).or_default();
pool_entries.conns.push(ConnPoolEntry {
conn: client,
_last_access: std::time::Instant::now(),
});
returned = true;
per_db_size = pool_entries.conns.len();
pool.total_conns += 1;
Metrics::get()
.proxy
.http_pool_opened_connections
.get_metric()
.inc();
}
pool.total_conns
};
// do logging outside of the mutex
if returned {
info!(%conn_id, "local_pool: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}");
} else {
info!(%conn_id, "local_pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}");
}
}
}
impl<C: ClientInnerExt> Drop for EndpointConnPool<C> {
fn drop(&mut self) {
if self.total_conns > 0 {
Metrics::get()
.proxy
.http_pool_opened_connections
.get_metric()
.dec_by(self.total_conns as i64);
}
}
}
pub(crate) struct DbUserConnPool<C: ClientInnerExt> {
conns: Vec<ConnPoolEntry<C>>,
}
impl<C: ClientInnerExt> Default for DbUserConnPool<C> {
fn default() -> Self {
Self { conns: Vec::new() }
}
}
impl<C: ClientInnerExt> DbUserConnPool<C> {
fn clear_closed_clients(&mut self, conns: &mut usize) -> usize {
let old_len = self.conns.len();
self.conns.retain(|conn| !conn.conn.is_closed());
let new_len = self.conns.len();
let removed = old_len - new_len;
*conns -= removed;
removed
}
fn get_conn_entry(&mut self, conns: &mut usize) -> Option<ConnPoolEntry<C>> {
let mut removed = self.clear_closed_clients(conns);
let conn = self.conns.pop();
if conn.is_some() {
*conns -= 1;
removed += 1;
}
Metrics::get()
.proxy
.http_pool_opened_connections
.get_metric()
.dec_by(removed as i64);
conn
}
}
pub(crate) struct LocalConnPool<C: ClientInnerExt> {
global_pool: RwLock<EndpointConnPool<C>>,
config: &'static crate::config::HttpConfig,
}
impl<C: ClientInnerExt> LocalConnPool<C> {
pub(crate) fn new(config: &'static crate::config::HttpConfig) -> Arc<Self> {
Arc::new(Self {
global_pool: RwLock::new(EndpointConnPool {
pools: HashMap::new(),
total_conns: 0,
max_conns: config.pool_options.max_conns_per_endpoint,
global_pool_size_max_conns: config.pool_options.max_total_conns,
}),
config,
})
}
pub(crate) fn get_idle_timeout(&self) -> Duration {
self.config.pool_options.idle_timeout
}
// pub(crate) fn shutdown(&self) {
// let mut pool = self.global_pool.write();
// pool.pools.clear();
// pool.total_conns = 0;
// }
pub(crate) fn get(
self: &Arc<Self>,
ctx: &RequestMonitoring,
conn_info: &ConnInfo,
) -> Result<Option<LocalClient<C>>, HttpConnError> {
let mut client: Option<ClientInner<C>> = None;
if let Some(entry) = self
.global_pool
.write()
.get_conn_entry(conn_info.db_and_user())
{
client = Some(entry.conn);
}
// ok return cached connection if found and establish a new one otherwise
if let Some(client) = client {
if client.is_closed() {
info!("local_pool: cached connection '{conn_info}' is closed, opening a new one");
return Ok(None);
}
tracing::Span::current().record("conn_id", tracing::field::display(client.conn_id));
tracing::Span::current().record(
"pid",
tracing::field::display(client.inner.get_process_id()),
);
info!(
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
"local_pool: reusing connection '{conn_info}'"
);
client.session.send(ctx.session_id())?;
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
ctx.success();
return Ok(Some(LocalClient::new(
client,
conn_info.clone(),
Arc::downgrade(self),
)));
}
Ok(None)
}
}
pub(crate) fn poll_client(
global_pool: Arc<LocalConnPool<tokio_postgres::Client>>,
ctx: &RequestMonitoring,
conn_info: ConnInfo,
client: tokio_postgres::Client,
mut connection: tokio_postgres::Connection<Socket, NoTlsStream>,
conn_id: uuid::Uuid,
aux: MetricsAuxInfo,
) -> LocalClient<tokio_postgres::Client> {
let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol());
let mut session_id = ctx.session_id();
let (tx, mut rx) = tokio::sync::watch::channel(session_id);
let span = info_span!(parent: None, "connection", %conn_id);
let cold_start_info = ctx.cold_start_info();
span.in_scope(|| {
info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection");
});
let pool = Arc::downgrade(&global_pool);
let pool_clone = pool.clone();
let db_user = conn_info.db_and_user();
let idle = global_pool.get_idle_timeout();
let cancel = CancellationToken::new();
let cancelled = cancel.clone().cancelled_owned();
tokio::spawn(
async move {
let _conn_gauge = conn_gauge;
let mut idle_timeout = pin!(tokio::time::sleep(idle));
let mut cancelled = pin!(cancelled);
poll_fn(move |cx| {
if cancelled.as_mut().poll(cx).is_ready() {
info!("connection dropped");
return Poll::Ready(())
}
match rx.has_changed() {
Ok(true) => {
session_id = *rx.borrow_and_update();
info!(%session_id, "changed session");
idle_timeout.as_mut().reset(Instant::now() + idle);
}
Err(_) => {
info!("connection dropped");
return Poll::Ready(())
}
_ => {}
}
// 5 minute idle connection timeout
if idle_timeout.as_mut().poll(cx).is_ready() {
idle_timeout.as_mut().reset(Instant::now() + idle);
info!("connection idle");
if let Some(pool) = pool.clone().upgrade() {
// remove client from pool - should close the connection if it's idle.
// does nothing if the client is currently checked-out and in-use
if pool.global_pool.write().remove_client(db_user.clone(), conn_id) {
info!("idle connection removed");
}
}
}
loop {
let message = ready!(connection.poll_message(cx));
match message {
Some(Ok(AsyncMessage::Notice(notice))) => {
info!(%session_id, "notice: {}", notice);
}
Some(Ok(AsyncMessage::Notification(notif))) => {
warn!(%session_id, pid = notif.process_id(), channel = notif.channel(), "notification received");
}
Some(Ok(_)) => {
warn!(%session_id, "unknown message");
}
Some(Err(e)) => {
error!(%session_id, "connection error: {}", e);
break
}
None => {
info!("connection closed");
break
}
}
}
// remove from connection pool
if let Some(pool) = pool.clone().upgrade() {
if pool.global_pool.write().remove_client(db_user.clone(), conn_id) {
info!("closed connection removed");
}
}
Poll::Ready(())
}).await;
}
.instrument(span));
let key = SigningKey::random(&mut OsRng);
let inner = ClientInner {
inner: client,
session: tx,
cancel,
aux,
conn_id,
key,
jti: 0,
};
LocalClient::new(inner, conn_info, pool_clone)
}
struct ClientInner<C: ClientInnerExt> {
inner: C,
session: tokio::sync::watch::Sender<uuid::Uuid>,
cancel: CancellationToken,
aux: MetricsAuxInfo,
conn_id: uuid::Uuid,
// needed for pg_session_jwt state
key: SigningKey,
jti: u64,
}
impl<C: ClientInnerExt> Drop for ClientInner<C> {
fn drop(&mut self) {
// on client drop, tell the conn to shut down
self.cancel.cancel();
}
}
impl<C: ClientInnerExt> ClientInner<C> {
pub(crate) fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}
impl<C: ClientInnerExt> LocalClient<C> {
pub(crate) fn metrics(&self) -> Arc<MetricCounter> {
let aux = &self.inner.as_ref().unwrap().aux;
USAGE_METRICS.register(Ids {
endpoint_id: aux.endpoint_id,
branch_id: aux.branch_id,
})
}
}
pub(crate) struct LocalClient<C: ClientInnerExt> {
span: Span,
inner: Option<ClientInner<C>>,
conn_info: ConnInfo,
pool: Weak<LocalConnPool<C>>,
}
pub(crate) struct Discard<'a, C: ClientInnerExt> {
conn_info: &'a ConnInfo,
pool: &'a mut Weak<LocalConnPool<C>>,
}
impl<C: ClientInnerExt> LocalClient<C> {
pub(self) fn new(
inner: ClientInner<C>,
conn_info: ConnInfo,
pool: Weak<LocalConnPool<C>>,
) -> Self {
Self {
inner: Some(inner),
span: Span::current(),
conn_info,
pool,
}
}
pub(crate) fn inner(&mut self) -> (&mut C, Discard<'_, C>) {
let Self {
inner,
pool,
conn_info,
span: _,
} = self;
let inner = inner.as_mut().expect("client inner should not be removed");
(&mut inner.inner, Discard { conn_info, pool })
}
pub(crate) fn key(&self) -> &SigningKey {
let inner = &self
.inner
.as_ref()
.expect("client inner should not be removed");
&inner.key
}
}
impl LocalClient<tokio_postgres::Client> {
pub(crate) async fn set_jwt_session(&mut self, payload: &[u8]) -> Result<(), HttpConnError> {
let inner = self
.inner
.as_mut()
.expect("client inner should not be removed");
inner.jti += 1;
let kid = inner.inner.get_process_id();
let header = json!({"kid":kid}).to_string();
let mut payload = serde_json::from_slice::<serde_json::Map<String, Value>>(payload)
.map_err(HttpConnError::JwtPayloadError)?;
payload.insert("jti".to_string(), Value::Number(inner.jti.into()));
let payload = Value::Object(payload).to_string();
debug!(
kid,
jti = inner.jti,
?header,
?payload,
"signing new ephemeral JWT"
);
let token = sign_jwt(&inner.key, header, payload);
// initiates the auth session
inner.inner.simple_query("discard all").await?;
inner
.inner
.query(
"select auth.jwt_session_init($1)",
&[&token as &(dyn ToSql + Sync)],
)
.await?;
info!(kid, jti = inner.jti, "user session state init");
Ok(())
}
}
fn sign_jwt(sk: &SigningKey, header: String, payload: String) -> String {
let header = Base64UrlUnpadded::encode_string(header.as_bytes());
let payload = Base64UrlUnpadded::encode_string(payload.as_bytes());
let message = format!("{header}.{payload}");
let sig: Signature = sk.sign(message.as_bytes());
let base64_sig = Base64UrlUnpadded::encode_string(&sig.to_bytes());
format!("{message}.{base64_sig}")
}
impl<C: ClientInnerExt> Discard<'_, C> {
pub(crate) fn check_idle(&mut self, status: ReadyForQueryStatus) {
let conn_info = &self.conn_info;
if status != ReadyForQueryStatus::Idle && std::mem::take(self.pool).strong_count() > 0 {
info!(
"local_pool: throwing away connection '{conn_info}' because connection is not idle"
);
}
}
pub(crate) fn discard(&mut self) {
let conn_info = &self.conn_info;
if std::mem::take(self.pool).strong_count() > 0 {
info!("local_pool: throwing away connection '{conn_info}' because connection is potentially in a broken state");
}
}
}
impl<C: ClientInnerExt> LocalClient<C> {
pub fn get_client(&self) -> &C {
&self
.inner
.as_ref()
.expect("client inner should not be removed")
.inner
}
fn do_drop(&mut self) -> Option<impl FnOnce()> {
let conn_info = self.conn_info.clone();
let client = self
.inner
.take()
.expect("client inner should not be removed");
if let Some(conn_pool) = std::mem::take(&mut self.pool).upgrade() {
let current_span = self.span.clone();
// return connection to the pool
return Some(move || {
let _span = current_span.enter();
EndpointConnPool::put(&conn_pool.global_pool, &conn_info, client);
});
}
None
}
}
impl<C: ClientInnerExt> Drop for LocalClient<C> {
fn drop(&mut self) {
if let Some(drop) = self.do_drop() {
tokio::task::spawn_blocking(drop);
}
}
}

View File

@@ -8,6 +8,7 @@ mod conn_pool;
mod http_conn_pool;
mod http_util;
mod json;
mod local_conn_pool;
mod sql_over_http;
mod websocket;
@@ -22,7 +23,7 @@ use futures::TryFutureExt;
use http::{Method, Response, StatusCode};
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Empty};
use hyper1::body::Incoming;
use hyper::body::Incoming;
use hyper_util::rt::TokioExecutor;
use hyper_util::server::conn::auto::Builder;
use rand::rngs::StdRng;
@@ -63,6 +64,7 @@ pub async fn task_main(
info!("websocket server has shut down");
}
let local_pool = local_conn_pool::LocalConnPool::new(&config.http_config);
let conn_pool = conn_pool::GlobalConnPool::new(&config.http_config);
{
let conn_pool = Arc::clone(&conn_pool);
@@ -105,6 +107,7 @@ pub async fn task_main(
let backend = Arc::new(PoolingBackend {
http_conn_pool: Arc::clone(&http_conn_pool),
local_pool,
pool: Arc::clone(&conn_pool),
config,
endpoint_rate_limiter: Arc::clone(&endpoint_rate_limiter),
@@ -302,7 +305,7 @@ async fn connection_handler(
let server = Builder::new(TokioExecutor::new());
let conn = server.serve_connection_with_upgrades(
hyper_util::rt::TokioIo::new(conn),
hyper1::service::service_fn(move |req: hyper1::Request<Incoming>| {
hyper::service::service_fn(move |req: hyper::Request<Incoming>| {
// First HTTP request shares the same session ID
let session_id = session_id.take().unwrap_or_else(uuid::Uuid::new_v4);
@@ -355,7 +358,7 @@ async fn connection_handler(
#[allow(clippy::too_many_arguments)]
async fn request_handler(
mut request: hyper1::Request<Incoming>,
mut request: hyper::Request<Incoming>,
config: &'static ProxyConfig,
backend: Arc<PoolingBackend>,
ws_connections: TaskTracker,
@@ -365,7 +368,7 @@ async fn request_handler(
// used to cancel in-flight HTTP requests. not used to cancel websockets
http_cancellation_token: CancellationToken,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
) -> Result<Response<BoxBody<Bytes, hyper1::Error>>, ApiError> {
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ApiError> {
let host = request
.headers()
.get("host")

View File

@@ -12,14 +12,14 @@ use http::Method;
use http_body_util::combinators::BoxBody;
use http_body_util::BodyExt;
use http_body_util::Full;
use hyper1::body::Body;
use hyper1::body::Incoming;
use hyper1::header;
use hyper1::http::HeaderName;
use hyper1::http::HeaderValue;
use hyper1::Response;
use hyper1::StatusCode;
use hyper1::{HeaderMap, Request};
use hyper::body::Body;
use hyper::body::Incoming;
use hyper::header;
use hyper::http::HeaderName;
use hyper::http::HeaderValue;
use hyper::Response;
use hyper::StatusCode;
use hyper::{HeaderMap, Request};
use pq_proto::StartupMessageParamsBuilder;
use serde::Serialize;
use serde_json::Value;
@@ -40,7 +40,7 @@ use url::Url;
use urlencoding;
use utils::http::error::ApiError;
use crate::auth::backend::ComputeCredentials;
use crate::auth::backend::ComputeCredentialKeys;
use crate::auth::backend::ComputeUserInfo;
use crate::auth::endpoint_sni;
use crate::auth::ComputeUserInfoParseError;
@@ -56,20 +56,22 @@ use crate::metrics::Metrics;
use crate::proxy::run_until_cancelled;
use crate::proxy::NeonOptions;
use crate::serverless::backend::HttpConnError;
use crate::usage_metrics::MetricCounter;
use crate::usage_metrics::MetricCounterRecorder;
use crate::DbName;
use crate::RoleName;
use super::backend::LocalProxyConnError;
use super::backend::PoolingBackend;
use super::conn_pool;
use super::conn_pool::AuthData;
use super::conn_pool::Client;
use super::conn_pool::ConnInfo;
use super::conn_pool::ConnInfoWithAuth;
use super::http_util::json_response;
use super::json::json_to_pg_text;
use super::json::pg_text_row_to_json;
use super::json::JsonConversionError;
use super::local_conn_pool;
#[derive(serde::Deserialize)]
#[serde(rename_all = "camelCase")]
@@ -272,7 +274,7 @@ pub(crate) async fn handle(
request: Request<Incoming>,
backend: Arc<PoolingBackend>,
cancel: CancellationToken,
) -> Result<Response<BoxBody<Bytes, hyper1::Error>>, ApiError> {
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ApiError> {
let result = handle_inner(cancel, config, &ctx, request, backend).await;
let mut response = match result {
@@ -435,7 +437,7 @@ impl UserFacingError for SqlOverHttpError {
#[derive(Debug, thiserror::Error)]
pub(crate) enum ReadPayloadError {
#[error("could not read the HTTP request body: {0}")]
Read(#[from] hyper1::Error),
Read(#[from] hyper::Error),
#[error("could not parse the HTTP request body: {0}")]
Parse(#[from] serde_json::Error),
}
@@ -476,7 +478,7 @@ struct HttpHeaders {
}
impl HttpHeaders {
fn try_parse(headers: &hyper1::http::HeaderMap) -> Result<Self, SqlOverHttpError> {
fn try_parse(headers: &hyper::http::HeaderMap) -> Result<Self, SqlOverHttpError> {
// Determine the output options. Default behaviour is 'false'. Anything that is not
// strictly 'true' assumed to be false.
let raw_output = headers.get(&RAW_TEXT_OUTPUT) == Some(&HEADER_VALUE_TRUE);
@@ -529,7 +531,7 @@ async fn handle_inner(
ctx: &RequestMonitoring,
request: Request<Incoming>,
backend: Arc<PoolingBackend>,
) -> Result<Response<BoxBody<Bytes, hyper1::Error>>, SqlOverHttpError> {
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, SqlOverHttpError> {
let _requeset_gauge = Metrics::get()
.proxy
.connection_requests
@@ -577,7 +579,7 @@ async fn handle_db_inner(
conn_info: ConnInfo,
auth: AuthData,
backend: Arc<PoolingBackend>,
) -> Result<Response<BoxBody<Bytes, hyper1::Error>>, SqlOverHttpError> {
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, SqlOverHttpError> {
//
// Determine the destination and connection params
//
@@ -620,6 +622,9 @@ async fn handle_db_inner(
let authenticate_and_connect = Box::pin(
async {
let is_local_proxy =
matches!(backend.config.auth_backend, crate::auth::Backend::Local(_));
let keys = match auth {
AuthData::Password(pw) => {
backend
@@ -639,18 +644,24 @@ async fn handle_db_inner(
&conn_info.user_info,
jwt,
)
.await?;
ComputeCredentials {
info: conn_info.user_info.clone(),
keys: crate::auth::backend::ComputeCredentialKeys::None,
}
.await?
}
};
let client = match keys.keys {
ComputeCredentialKeys::JwtPayload(payload) if is_local_proxy => {
let mut client = backend.connect_to_local_postgres(ctx, conn_info).await?;
client.set_jwt_session(&payload).await?;
Client::Local(client)
}
_ => {
let client = backend
.connect_to_compute(ctx, conn_info, keys, !allow_pool)
.await?;
Client::Remote(client)
}
};
let client = backend
.connect_to_compute(ctx, conn_info, keys, !allow_pool)
.await?;
// not strictly necessary to mark success here,
// but it's just insurance for if we forget it somewhere else
ctx.success();
@@ -744,7 +755,7 @@ async fn handle_auth_broker_inner(
conn_info: ConnInfo,
jwt: String,
backend: Arc<PoolingBackend>,
) -> Result<Response<BoxBody<Bytes, hyper1::Error>>, SqlOverHttpError> {
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, SqlOverHttpError> {
backend
.authenticate_with_jwt(
ctx,
@@ -791,7 +802,7 @@ impl QueryData {
self,
config: &'static ProxyConfig,
cancel: CancellationToken,
client: &mut Client<tokio_postgres::Client>,
client: &mut Client,
parsed_headers: HttpHeaders,
) -> Result<String, SqlOverHttpError> {
let (inner, mut discard) = client.inner();
@@ -865,7 +876,7 @@ impl BatchQueryData {
self,
config: &'static ProxyConfig,
cancel: CancellationToken,
client: &mut Client<tokio_postgres::Client>,
client: &mut Client,
parsed_headers: HttpHeaders,
) -> Result<String, SqlOverHttpError> {
info!("starting transaction");
@@ -1058,3 +1069,50 @@ async fn query_to_json<T: GenericClient>(
Ok((ready, results))
}
enum Client {
Remote(conn_pool::Client<tokio_postgres::Client>),
Local(local_conn_pool::LocalClient<tokio_postgres::Client>),
}
enum Discard<'a> {
Remote(conn_pool::Discard<'a, tokio_postgres::Client>),
Local(local_conn_pool::Discard<'a, tokio_postgres::Client>),
}
impl Client {
fn metrics(&self) -> Arc<MetricCounter> {
match self {
Client::Remote(client) => client.metrics(),
Client::Local(local_client) => local_client.metrics(),
}
}
fn inner(&mut self) -> (&mut tokio_postgres::Client, Discard<'_>) {
match self {
Client::Remote(client) => {
let (c, d) = client.inner();
(c, Discard::Remote(d))
}
Client::Local(local_client) => {
let (c, d) = local_client.inner();
(c, Discard::Local(d))
}
}
}
}
impl Discard<'_> {
fn check_idle(&mut self, status: ReadyForQueryStatus) {
match self {
Discard::Remote(discard) => discard.check_idle(status),
Discard::Local(discard) => discard.check_idle(status),
}
}
fn discard(&mut self) {
match self {
Discard::Remote(discard) => discard.discard(),
Discard::Local(discard) => discard.discard(),
}
}
}

View File

@@ -12,7 +12,7 @@ use anyhow::Context as _;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use framed_websockets::{Frame, OpCode, WebSocketServer};
use futures::{Sink, Stream};
use hyper1::upgrade::OnUpgrade;
use hyper::upgrade::OnUpgrade;
use hyper_util::rt::TokioIo;
use pin_project_lite::pin_project;

View File

@@ -485,49 +485,51 @@ async fn upload_events_chunk(
#[cfg(test)]
mod tests {
use std::{
net::TcpListener,
sync::{Arc, Mutex},
};
use super::*;
use crate::{http, BranchId, EndpointId};
use anyhow::Error;
use chrono::Utc;
use consumption_metrics::{Event, EventChunk};
use hyper::{
service::{make_service_fn, service_fn},
Body, Response,
};
use http_body_util::BodyExt;
use hyper::{body::Incoming, server::conn::http1, service::service_fn, Request, Response};
use hyper_util::rt::TokioIo;
use std::sync::{Arc, Mutex};
use tokio::net::TcpListener;
use url::Url;
use super::*;
use crate::{http, BranchId, EndpointId};
#[tokio::test]
async fn metrics() {
let listener = TcpListener::bind("0.0.0.0:0").unwrap();
type Report = EventChunk<'static, Event<Ids, String>>;
let reports: Arc<Mutex<Vec<Report>>> = Arc::default();
let reports = Arc::new(Mutex::new(vec![]));
let reports2 = reports.clone();
let server = hyper::server::Server::from_tcp(listener)
.unwrap()
.serve(make_service_fn(move |_| {
let reports = reports.clone();
async move {
Ok::<_, Error>(service_fn(move |req| {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn({
let reports = reports.clone();
async move {
loop {
if let Ok((stream, _addr)) = listener.accept().await {
let reports = reports.clone();
async move {
let bytes = hyper::body::to_bytes(req.into_body()).await?;
let events: EventChunk<'static, Event<Ids, String>> =
serde_json::from_slice(&bytes)?;
reports.lock().unwrap().push(events);
Ok::<_, Error>(Response::new(Body::from(vec![])))
}
}))
http1::Builder::new()
.serve_connection(
TokioIo::new(stream),
service_fn(move |req: Request<Incoming>| {
let reports = reports.clone();
async move {
let bytes = req.into_body().collect().await?.to_bytes();
let events = serde_json::from_slice(&bytes)?;
reports.lock().unwrap().push(events);
Ok::<_, Error>(Response::new(String::new()))
}
}),
)
.await
.unwrap();
}
}
}));
let addr = server.local_addr();
tokio::spawn(server);
}
});
let metrics = Metrics::default();
let client = http::new_client();
@@ -536,7 +538,7 @@ mod tests {
// no counters have been registered
collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
let r = std::mem::take(&mut *reports2.lock().unwrap());
let r = std::mem::take(&mut *reports.lock().unwrap());
assert!(r.is_empty());
// register a new counter
@@ -548,7 +550,7 @@ mod tests {
// the counter should be observed despite 0 egress
collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
let r = std::mem::take(&mut *reports2.lock().unwrap());
let r = std::mem::take(&mut *reports.lock().unwrap());
assert_eq!(r.len(), 1);
assert_eq!(r[0].events.len(), 1);
assert_eq!(r[0].events[0].value, 0);
@@ -558,7 +560,7 @@ mod tests {
// egress should be observered
collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
let r = std::mem::take(&mut *reports2.lock().unwrap());
let r = std::mem::take(&mut *reports.lock().unwrap());
assert_eq!(r.len(), 1);
assert_eq!(r[0].events.len(), 1);
assert_eq!(r[0].events[0].value, 1);
@@ -568,7 +570,7 @@ mod tests {
// we do not observe the counter
collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
let r = std::mem::take(&mut *reports2.lock().unwrap());
let r = std::mem::take(&mut *reports.lock().unwrap());
assert!(r.is_empty());
// counter is unregistered

View File

@@ -12,8 +12,8 @@ use metrics::{
core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
proto::MetricFamily,
register_histogram_vec, register_int_counter, register_int_counter_pair,
register_int_counter_pair_vec, register_int_counter_vec, Gauge, HistogramVec, IntCounter,
IntCounterPair, IntCounterPairVec, IntCounterVec, IntGaugeVec,
register_int_counter_pair_vec, register_int_counter_vec, register_int_gauge, Gauge,
HistogramVec, IntCounter, IntCounterPair, IntCounterPairVec, IntCounterVec, IntGaugeVec,
};
use once_cell::sync::Lazy;
@@ -231,6 +231,14 @@ pub(crate) static EVICTION_EVENTS_COMPLETED: Lazy<IntCounterVec> = Lazy::new(||
.expect("Failed to register metric")
});
pub static NUM_EVICTED_TIMELINES: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"safekeeper_evicted_timelines",
"Number of currently evicted timelines"
)
.expect("Failed to register metric")
});
pub const LABEL_UNKNOWN: &str = "unknown";
/// Labels for traffic metrics.

View File

@@ -631,13 +631,19 @@ impl Timeline {
return Err(e);
}
self.bootstrap(conf, broker_active_set, partial_backup_rate_limiter);
self.bootstrap(
shared_state,
conf,
broker_active_set,
partial_backup_rate_limiter,
);
Ok(())
}
/// Bootstrap new or existing timeline starting background tasks.
pub fn bootstrap(
self: &Arc<Timeline>,
_shared_state: &mut WriteGuardSharedState<'_>,
conf: &SafeKeeperConf,
broker_active_set: Arc<TimelinesSet>,
partial_backup_rate_limiter: RateLimiter,

View File

@@ -15,7 +15,9 @@ use tracing::{debug, info, instrument, warn};
use utils::crashsafe::durable_rename;
use crate::{
metrics::{EvictionEvent, EVICTION_EVENTS_COMPLETED, EVICTION_EVENTS_STARTED},
metrics::{
EvictionEvent, EVICTION_EVENTS_COMPLETED, EVICTION_EVENTS_STARTED, NUM_EVICTED_TIMELINES,
},
rate_limit::rand_duration,
timeline_manager::{Manager, StateSnapshot},
wal_backup,
@@ -93,6 +95,7 @@ impl Manager {
}
info!("successfully evicted timeline");
NUM_EVICTED_TIMELINES.inc();
}
/// Attempt to restore evicted timeline from remote storage; it must be
@@ -128,6 +131,7 @@ impl Manager {
tokio::time::Instant::now() + rand_duration(&self.conf.eviction_min_resident);
info!("successfully restored evicted timeline");
NUM_EVICTED_TIMELINES.dec();
}
}

View File

@@ -25,7 +25,10 @@ use utils::lsn::Lsn;
use crate::{
control_file::{FileStorage, Storage},
metrics::{MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL, MISC_OPERATION_SECONDS},
metrics::{
MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL, MISC_OPERATION_SECONDS,
NUM_EVICTED_TIMELINES,
},
rate_limit::{rand_duration, RateLimiter},
recovery::recovery_main,
remove_wal::calc_horizon_lsn,
@@ -251,6 +254,11 @@ pub async fn main_task(
mgr.recovery_task = Some(tokio::spawn(recovery_main(tli, mgr.conf.clone())));
}
// If timeline is evicted, reflect that in the metric.
if mgr.is_offloaded {
NUM_EVICTED_TIMELINES.inc();
}
let last_state = 'outer: loop {
MANAGER_ITERATIONS_TOTAL.inc();
@@ -367,6 +375,11 @@ pub async fn main_task(
mgr.update_wal_removal_end(res);
}
// If timeline is deleted while evicted decrement the gauge.
if mgr.tli.is_cancelled() && mgr.is_offloaded {
NUM_EVICTED_TIMELINES.dec();
}
mgr.set_status(Status::Finished);
}

View File

@@ -165,12 +165,14 @@ impl GlobalTimelines {
match Timeline::load_timeline(&conf, ttid) {
Ok(timeline) => {
let tli = Arc::new(timeline);
let mut shared_state = tli.write_shared_state().await;
TIMELINES_STATE
.lock()
.unwrap()
.timelines
.insert(ttid, tli.clone());
tli.bootstrap(
&mut shared_state,
&conf,
broker_active_set.clone(),
partial_backup_rate_limiter.clone(),
@@ -213,6 +215,7 @@ impl GlobalTimelines {
match Timeline::load_timeline(&conf, ttid) {
Ok(timeline) => {
let tli = Arc::new(timeline);
let mut shared_state = tli.write_shared_state().await;
// TODO: prevent concurrent timeline creation/loading
{
@@ -227,8 +230,13 @@ impl GlobalTimelines {
state.timelines.insert(ttid, tli.clone());
}
tli.bootstrap(&conf, broker_active_set, partial_backup_rate_limiter);
tli.bootstrap(
&mut shared_state,
&conf,
broker_active_set,
partial_backup_rate_limiter,
);
drop(shared_state);
Ok(tli)
}
// If we can't load a timeline, it's bad. Caller will figure it out.

View File

@@ -17,7 +17,9 @@ use std::time::Duration;
use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr;
use postgres_ffi::XLogFileName;
use postgres_ffi::{XLogSegNo, PG_TLI};
use remote_storage::{GenericRemoteStorage, ListingMode, RemotePath, StorageMetadata};
use remote_storage::{
DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath, StorageMetadata,
};
use tokio::fs::File;
use tokio::select;
@@ -503,8 +505,12 @@ pub async fn read_object(
let cancel = CancellationToken::new();
let opts = DownloadOpts {
byte_start: std::ops::Bound::Included(offset),
..Default::default()
};
let download = storage
.download_storage_object(Some((offset, None)), file_path, &cancel)
.download(file_path, &opts, &cancel)
.await
.with_context(|| {
format!("Failed to open WAL segment download stream for remote path {file_path:?}")

View File

@@ -22,7 +22,7 @@ use utils::sync::gate::GateGuard;
use crate::compute_hook::{ComputeHook, NotifyError};
use crate::node::Node;
use crate::tenant_shard::{IntentState, ObservedState, ObservedStateLocation};
use crate::tenant_shard::{IntentState, ObservedState, ObservedStateDelta, ObservedStateLocation};
const DEFAULT_HEATMAP_PERIOD: &str = "60s";
@@ -45,8 +45,15 @@ pub(super) struct Reconciler {
pub(crate) reconciler_config: ReconcilerConfig,
pub(crate) config: TenantConfig,
/// Observed state from the point of view of the reconciler.
/// This gets updated as the reconciliation makes progress.
pub(crate) observed: ObservedState,
/// Snapshot of the observed state at the point when the reconciler
/// was spawned.
pub(crate) original_observed: ObservedState,
pub(crate) service_config: service::Config,
/// A hook to notify the running postgres instances when we change the location
@@ -846,6 +853,39 @@ impl Reconciler {
}
}
/// Compare the observed state snapshot from when the reconcile was created
/// with the final observed state in order to generate observed state deltas.
pub(crate) fn observed_deltas(&self) -> Vec<ObservedStateDelta> {
let mut deltas = Vec::default();
for (node_id, location) in &self.observed.locations {
let previous_location = self.original_observed.locations.get(node_id);
let do_upsert = match previous_location {
// Location config changed for node
Some(prev) if location.conf != prev.conf => true,
// New location config for node
None => true,
// Location config has not changed for node
_ => false,
};
if do_upsert {
deltas.push(ObservedStateDelta::Upsert(Box::new((
*node_id,
location.clone(),
))));
}
}
for node_id in self.original_observed.locations.keys() {
if !self.observed.locations.contains_key(node_id) {
deltas.push(ObservedStateDelta::Delete(*node_id));
}
}
deltas
}
/// Keep trying to notify the compute indefinitely, only dropping out if:
/// - the node `origin` becomes unavailable -> Ok(())
/// - the node `origin` no longer has our tenant shard attached -> Ok(())

View File

@@ -28,8 +28,8 @@ use crate::{
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
tenant_shard::{
MigrateAttachment, ReconcileNeeded, ReconcilerStatus, ScheduleOptimization,
ScheduleOptimizationAction,
MigrateAttachment, ObservedStateDelta, ReconcileNeeded, ReconcilerStatus,
ScheduleOptimization, ScheduleOptimizationAction,
},
};
use anyhow::Context;
@@ -1072,7 +1072,7 @@ impl Service {
tenant_id=%result.tenant_shard_id.tenant_id, shard_id=%result.tenant_shard_id.shard_slug(),
sequence=%result.sequence
))]
fn process_result(&self, mut result: ReconcileResult) {
fn process_result(&self, result: ReconcileResult) {
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, _scheduler) = locked.parts_mut();
let Some(tenant) = tenants.get_mut(&result.tenant_shard_id) else {
@@ -1094,22 +1094,27 @@ impl Service {
// In case a node was deleted while this reconcile is in flight, filter it out of the update we will
// make to the tenant
result
.observed
.locations
.retain(|node_id, _loc| nodes.contains_key(node_id));
let deltas = result.observed_deltas.into_iter().flat_map(|delta| {
// In case a node was deleted while this reconcile is in flight, filter it out of the update we will
// make to the tenant
let node = nodes.get(delta.node_id())?;
if node.is_available() {
return Some(delta);
}
// In case a node became unavailable concurrently with the reconcile, observed
// locations on it are now uncertain. By convention, set them to None in order
// for them to get refreshed when the node comes back online.
Some(ObservedStateDelta::Upsert(Box::new((
node.get_id(),
ObservedStateLocation { conf: None },
))))
});
match result.result {
Ok(()) => {
for (node_id, loc) in &result.observed.locations {
if let Some(conf) = &loc.conf {
tracing::info!("Updating observed location {}: {:?}", node_id, conf);
} else {
tracing::info!("Setting observed location {} to None", node_id,)
}
}
tenant.observed = result.observed;
tenant.apply_observed_deltas(deltas);
tenant.waiter.advance(result.sequence);
}
Err(e) => {
@@ -1131,9 +1136,10 @@ impl Service {
// so that waiters will see the correct error after waiting.
tenant.set_last_error(result.sequence, e);
for (node_id, o) in result.observed.locations {
tenant.observed.locations.insert(node_id, o);
}
// Skip deletions on reconcile failures
let upsert_deltas =
deltas.filter(|delta| matches!(delta, ObservedStateDelta::Upsert(_)));
tenant.apply_observed_deltas(upsert_deltas);
}
}

View File

@@ -425,6 +425,22 @@ pub(crate) enum ReconcileNeeded {
Yes,
}
/// Pending modification to the observed state of a tenant shard.
/// Produced by [`Reconciler::observed_deltas`] and applied in [`crate::service::Service::process_result`].
pub(crate) enum ObservedStateDelta {
Upsert(Box<(NodeId, ObservedStateLocation)>),
Delete(NodeId),
}
impl ObservedStateDelta {
pub(crate) fn node_id(&self) -> &NodeId {
match self {
Self::Upsert(up) => &up.0,
Self::Delete(nid) => nid,
}
}
}
/// When a reconcile task completes, it sends this result object
/// to be applied to the primary TenantShard.
pub(crate) struct ReconcileResult {
@@ -437,7 +453,7 @@ pub(crate) struct ReconcileResult {
pub(crate) tenant_shard_id: TenantShardId,
pub(crate) generation: Option<Generation>,
pub(crate) observed: ObservedState,
pub(crate) observed_deltas: Vec<ObservedStateDelta>,
/// Set [`TenantShard::pending_compute_notification`] from this flag
pub(crate) pending_compute_notification: bool,
@@ -1123,7 +1139,7 @@ impl TenantShard {
result,
tenant_shard_id: reconciler.tenant_shard_id,
generation: reconciler.generation,
observed: reconciler.observed,
observed_deltas: reconciler.observed_deltas(),
pending_compute_notification: reconciler.compute_notify_failure,
}
}
@@ -1177,6 +1193,7 @@ impl TenantShard {
reconciler_config,
config: self.config.clone(),
observed: self.observed.clone(),
original_observed: self.observed.clone(),
compute_hook: compute_hook.clone(),
service_config: service_config.clone(),
_gate_guard: gate_guard,
@@ -1437,6 +1454,62 @@ impl TenantShard {
.map(|(node_id, gen)| (node_id, Generation::new(gen)))
.collect()
}
/// Update the observed state of the tenant by applying incremental deltas
///
/// Deltas are generated by reconcilers via [`Reconciler::observed_deltas`].
/// They are then filtered in [`crate::service::Service::process_result`].
pub(crate) fn apply_observed_deltas(
&mut self,
deltas: impl Iterator<Item = ObservedStateDelta>,
) {
for delta in deltas {
match delta {
ObservedStateDelta::Upsert(ups) => {
let (node_id, loc) = *ups;
// If the generation of the observed location in the delta is lagging
// behind the current one, then we have a race condition and cannot
// be certain about the true observed state. Set the observed state
// to None in order to reflect this.
let crnt_gen = self
.observed
.locations
.get(&node_id)
.and_then(|loc| loc.conf.as_ref())
.and_then(|conf| conf.generation);
let new_gen = loc.conf.as_ref().and_then(|conf| conf.generation);
match (crnt_gen, new_gen) {
(Some(crnt), Some(new)) if crnt_gen > new_gen => {
tracing::warn!(
"Skipping observed state update {}: {:?} and using None due to stale generation ({} > {})",
node_id, loc, crnt, new
);
self.observed
.locations
.insert(node_id, ObservedStateLocation { conf: None });
continue;
}
_ => {}
}
if let Some(conf) = &loc.conf {
tracing::info!("Updating observed location {}: {:?}", node_id, conf);
} else {
tracing::info!("Setting observed location {} to None", node_id,)
}
self.observed.locations.insert(node_id, loc);
}
ObservedStateDelta::Delete(node_id) => {
tracing::info!("Deleting observed location {}", node_id);
self.observed.locations.remove(&node_id);
}
}
}
}
}
#[cfg(test)]

View File

@@ -7,7 +7,6 @@ import json
import os
import re
import timeit
from collections.abc import Iterator
from contextlib import contextmanager
from datetime import datetime
from pathlib import Path
@@ -25,7 +24,8 @@ from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonPageserver
if TYPE_CHECKING:
from typing import Callable, ClassVar, Optional
from collections.abc import Iterator, Mapping
from typing import Callable, Optional
"""
@@ -141,6 +141,28 @@ class PgBenchRunResult:
)
# Taken from https://github.com/postgres/postgres/blob/REL_15_1/src/bin/pgbench/pgbench.c#L5144-L5171
#
# This used to be a class variable on PgBenchInitResult. However later versions
# of Python complain:
#
# ValueError: mutable default <class 'dict'> for field EXTRACTORS is not allowed: use default_factory
#
# When you do what the error tells you to do, it seems to fail our Python 3.9
# test environment. So let's just move it to a private module constant, and move
# on.
_PGBENCH_INIT_EXTRACTORS: Mapping[str, re.Pattern[str]] = {
"drop_tables": re.compile(r"drop tables (\d+\.\d+) s"),
"create_tables": re.compile(r"create tables (\d+\.\d+) s"),
"client_side_generate": re.compile(r"client-side generate (\d+\.\d+) s"),
"server_side_generate": re.compile(r"server-side generate (\d+\.\d+) s"),
"vacuum": re.compile(r"vacuum (\d+\.\d+) s"),
"primary_keys": re.compile(r"primary keys (\d+\.\d+) s"),
"foreign_keys": re.compile(r"foreign keys (\d+\.\d+) s"),
"total": re.compile(r"done in (\d+\.\d+) s"), # Total time printed by pgbench
}
@dataclasses.dataclass
class PgBenchInitResult:
total: Optional[float]
@@ -155,20 +177,6 @@ class PgBenchInitResult:
start_timestamp: int
end_timestamp: int
# Taken from https://github.com/postgres/postgres/blob/REL_15_1/src/bin/pgbench/pgbench.c#L5144-L5171
EXTRACTORS: ClassVar[dict[str, re.Pattern[str]]] = dataclasses.field(
default_factory=lambda: {
"drop_tables": re.compile(r"drop tables (\d+\.\d+) s"),
"create_tables": re.compile(r"create tables (\d+\.\d+) s"),
"client_side_generate": re.compile(r"client-side generate (\d+\.\d+) s"),
"server_side_generate": re.compile(r"server-side generate (\d+\.\d+) s"),
"vacuum": re.compile(r"vacuum (\d+\.\d+) s"),
"primary_keys": re.compile(r"primary keys (\d+\.\d+) s"),
"foreign_keys": re.compile(r"foreign keys (\d+\.\d+) s"),
"total": re.compile(r"done in (\d+\.\d+) s"), # Total time printed by pgbench
}
)
@classmethod
def parse_from_stderr(
cls,
@@ -185,7 +193,7 @@ class PgBenchInitResult:
timings: dict[str, Optional[float]] = {}
last_line_items = re.split(r"\(|\)|,", last_line)
for item in last_line_items:
for key, regex in cls.EXTRACTORS.items():
for key, regex in _PGBENCH_INIT_EXTRACTORS.items():
if (m := regex.match(item.strip())) is not None:
if key in timings:
raise RuntimeError(

View File

@@ -23,3 +23,8 @@ class EndpointHttpClient(requests.Session):
res = self.get(f"http://localhost:{self.port}/database_schema?database={database}")
res.raise_for_status()
return res.text
def installed_extensions(self):
res = self.get(f"http://localhost:{self.port}/installed_extensions")
res.raise_for_status()
return res.json()

View File

@@ -446,7 +446,6 @@ class NeonLocalCli(AbstractNeonCli):
if immediate:
cmd.extend(["-m", "immediate"])
log.info(f"Stopping pageserver with {cmd}")
return self.raw_cli(cmd)
def safekeeper_start(

View File

@@ -395,7 +395,7 @@ class NeonEnvBuilder:
pageserver_default_tenant_config_compaction_algorithm: Optional[dict[str, Any]] = None,
safekeeper_extra_opts: Optional[list[str]] = None,
storage_controller_port_override: Optional[int] = None,
pageserver_io_buffer_alignment: Optional[int] = None,
pageserver_virtual_file_io_mode: Optional[str] = None,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
@@ -449,7 +449,7 @@ class NeonEnvBuilder:
self.storage_controller_port_override = storage_controller_port_override
self.pageserver_io_buffer_alignment = pageserver_io_buffer_alignment
self.pageserver_virtual_file_io_mode = pageserver_virtual_file_io_mode
assert test_name.startswith(
"test_"
@@ -1038,7 +1038,7 @@ class NeonEnv:
self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine
self.pageserver_aux_file_policy = config.pageserver_aux_file_policy
self.pageserver_io_buffer_alignment = config.pageserver_io_buffer_alignment
self.pageserver_virtual_file_io_mode = config.pageserver_virtual_file_io_mode
# Create the neon_local's `NeonLocalInitConf`
cfg: dict[str, Any] = {
@@ -1102,7 +1102,8 @@ class NeonEnv:
for key, value in override.items():
ps_cfg[key] = value
ps_cfg["io_buffer_alignment"] = self.pageserver_io_buffer_alignment
if self.pageserver_virtual_file_io_mode is not None:
ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode
# Create a corresponding NeonPageserver object
self.pageservers.append(
@@ -1407,7 +1408,7 @@ def neon_simple_env(
pageserver_virtual_file_io_engine: str,
pageserver_aux_file_policy: Optional[AuxFileStore],
pageserver_default_tenant_config_compaction_algorithm: Optional[dict[str, Any]],
pageserver_io_buffer_alignment: Optional[int],
pageserver_virtual_file_io_mode: Optional[str],
) -> Iterator[NeonEnv]:
"""
Simple Neon environment, with no authentication and no safekeepers.
@@ -1433,7 +1434,7 @@ def neon_simple_env(
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
pageserver_aux_file_policy=pageserver_aux_file_policy,
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
pageserver_io_buffer_alignment=pageserver_io_buffer_alignment,
pageserver_virtual_file_io_mode=pageserver_virtual_file_io_mode,
) as builder:
env = builder.init_start()
@@ -1457,7 +1458,7 @@ def neon_env_builder(
pageserver_default_tenant_config_compaction_algorithm: Optional[dict[str, Any]],
pageserver_aux_file_policy: Optional[AuxFileStore],
record_property: Callable[[str, object], None],
pageserver_io_buffer_alignment: Optional[int],
pageserver_virtual_file_io_mode: Optional[str],
) -> Iterator[NeonEnvBuilder]:
"""
Fixture to create a Neon environment for test.
@@ -1492,7 +1493,7 @@ def neon_env_builder(
test_overlay_dir=test_overlay_dir,
pageserver_aux_file_policy=pageserver_aux_file_policy,
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
pageserver_io_buffer_alignment=pageserver_io_buffer_alignment,
pageserver_virtual_file_io_mode=pageserver_virtual_file_io_mode,
) as builder:
yield builder
# Propogate `preserve_database_files` to make it possible to use in other fixtures,
@@ -3914,7 +3915,6 @@ class Safekeeper(LogUtils):
return self
def stop(self, immediate: bool = False) -> Safekeeper:
log.info(f"Stopping safekeeper {self.id}")
self.env.neon_cli.safekeeper_stop(self.id, immediate)
self.running = False
return self

View File

@@ -41,8 +41,8 @@ def pageserver_virtual_file_io_engine() -> Optional[str]:
@pytest.fixture(scope="function", autouse=True)
def pageserver_io_buffer_alignment() -> Optional[int]:
return None
def pageserver_virtual_file_io_mode() -> Optional[str]:
return os.getenv("PAGESERVER_VIRTUAL_FILE_IO_MODE")
@pytest.fixture(scope="function", autouse=True)

View File

@@ -5,13 +5,13 @@ import hashlib
import json
import os
import re
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Union
import boto3
import toml
from moto.server import ThreadedMotoServer
from mypy_boto3_s3 import S3Client
from fixtures.common_types import TenantId, TenantShardId, TimelineId
@@ -43,7 +43,6 @@ class RemoteStorageUser(str, enum.Enum):
class MockS3Server:
"""
Starts a mock S3 server for testing on a port given, errors if the server fails to start or exits prematurely.
Relies that `poetry` and `moto` server are installed, since it's the way the tests are run.
Also provides a set of methods to derive the connection properties from and the method to kill the underlying server.
"""
@@ -53,22 +52,8 @@ class MockS3Server:
port: int,
):
self.port = port
# XXX: do not use `shell=True` or add `exec ` to the command here otherwise.
# We use `self.subprocess.kill()` to shut down the server, which would not "just" work in Linux
# if a process is started from the shell process.
self.subprocess = subprocess.Popen(["poetry", "run", "moto_server", f"-p{port}"])
error = None
try:
return_code = self.subprocess.poll()
if return_code is not None:
error = f"expected mock s3 server to run but it exited with code {return_code}. stdout: '{self.subprocess.stdout}', stderr: '{self.subprocess.stderr}'"
except Exception as e:
error = f"expected mock s3 server to start but it failed with exception: {e}. stdout: '{self.subprocess.stdout}', stderr: '{self.subprocess.stderr}'"
if error is not None:
log.error(error)
self.kill()
raise RuntimeError("failed to start s3 mock server")
self.server = ThreadedMotoServer(port=port)
self.server.start()
def endpoint(self) -> str:
return f"http://127.0.0.1:{self.port}"
@@ -83,7 +68,7 @@ class MockS3Server:
return "test"
def kill(self):
self.subprocess.kill()
self.server.stop()
@dataclass

View File

@@ -192,7 +192,7 @@ def tpch_queuies() -> tuple[ParameterSet, ...]:
- querues in returning tuple are ordered by the query number
- pytest parameters id is adjusted to match the query id (the numbering starts from 1)
"""
queries_dir = Path(__file__).parent / "performance" / "tpc-h" / "queries"
queries_dir = Path(__file__).parent / "tpc-h" / "queries"
assert queries_dir.exists(), f"TPC-H queries dir not found: {queries_dir}"
return tuple(

View File

@@ -0,0 +1,87 @@
from logging import info
from fixtures.neon_fixtures import NeonEnv
def test_installed_extensions(neon_simple_env: NeonEnv):
"""basic test for the endpoint that returns the list of installed extensions"""
env = neon_simple_env
env.create_branch("test_installed_extensions")
endpoint = env.endpoints.create_start("test_installed_extensions")
endpoint.safe_psql("CREATE DATABASE test_installed_extensions")
endpoint.safe_psql("CREATE DATABASE test_installed_extensions_2")
client = endpoint.http_client()
res = client.installed_extensions()
info("Extensions list: %s", res)
info("Extensions: %s", res["extensions"])
# 'plpgsql' is a default extension that is always installed.
assert any(
ext["extname"] == "plpgsql" and ext["versions"] == ["1.0"] for ext in res["extensions"]
), "The 'plpgsql' extension is missing"
# check that the neon_test_utils extension is not installed
assert not any(
ext["extname"] == "neon_test_utils" for ext in res["extensions"]
), "The 'neon_test_utils' extension is installed"
pg_conn = endpoint.connect(dbname="test_installed_extensions")
with pg_conn.cursor() as cur:
cur.execute("CREATE EXTENSION neon_test_utils")
cur.execute(
"SELECT default_version FROM pg_available_extensions WHERE name = 'neon_test_utils'"
)
res = cur.fetchone()
neon_test_utils_version = res[0]
with pg_conn.cursor() as cur:
cur.execute("CREATE EXTENSION neon version '1.1'")
pg_conn_2 = endpoint.connect(dbname="test_installed_extensions_2")
with pg_conn_2.cursor() as cur:
cur.execute("CREATE EXTENSION neon version '1.2'")
res = client.installed_extensions()
info("Extensions list: %s", res)
info("Extensions: %s", res["extensions"])
# check that the neon_test_utils extension is installed only in 1 database
# and has the expected version
assert any(
ext["extname"] == "neon_test_utils"
and ext["versions"] == [neon_test_utils_version]
and ext["n_databases"] == 1
for ext in res["extensions"]
)
# check that the plpgsql extension is installed in all databases
# this is a default extension that is always installed
assert any(ext["extname"] == "plpgsql" and ext["n_databases"] == 4 for ext in res["extensions"])
# check that the neon extension is installed and has expected versions
for ext in res["extensions"]:
if ext["extname"] == "neon":
assert ext["n_databases"] == 2
ext["versions"].sort()
assert ext["versions"] == ["1.1", "1.2"]
with pg_conn.cursor() as cur:
cur.execute("ALTER EXTENSION neon UPDATE TO '1.3'")
res = client.installed_extensions()
info("Extensions list: %s", res)
info("Extensions: %s", res["extensions"])
# check that the neon_test_utils extension is updated
for ext in res["extensions"]:
if ext["extname"] == "neon":
assert ext["n_databases"] == 2
ext["versions"].sort()
assert ext["versions"] == ["1.2", "1.3"]

View File

@@ -666,14 +666,17 @@ def test_upgrade_generationless_local_file_paths(
pageserver.stop()
timeline_dir = pageserver.timeline_dir(tenant_id, timeline_id)
files_renamed = 0
log.info(f"Renaming files in {timeline_dir}")
for filename in os.listdir(timeline_dir):
path = os.path.join(timeline_dir, filename)
log.info(f"Found file {path}")
if path.endswith("-v1-00000001"):
new_path = path[:-12]
os.rename(path, new_path)
log.info(f"Renamed {path} -> {new_path}")
if filename.endswith("-v1-00000001"):
new_filename = filename[:-12]
os.rename(
os.path.join(timeline_dir, filename), os.path.join(timeline_dir, new_filename)
)
log.info(f"Renamed {filename} -> {new_filename}")
files_renamed += 1
else:
log.info(f"Keeping {filename}")
assert files_renamed > 0

View File

@@ -14,7 +14,7 @@ from contextlib import closing
from dataclasses import dataclass, field
from functools import partial
from pathlib import Path
from typing import TYPE_CHECKING, List
from typing import TYPE_CHECKING
import psycopg2
import psycopg2.errors
@@ -735,7 +735,7 @@ class ProposerPostgres(PgProtocol):
"""Path to postgresql.conf"""
return os.path.join(self.pgdata_dir, "postgresql.conf")
def create_dir_config(self, safekeepers: str, additional_config_options: Optional[dict[str,str]] = None):
def create_dir_config(self, safekeepers: str):
"""Create dir and config for running --sync-safekeepers"""
Path(self.pg_data_dir_path()).mkdir(exist_ok=True)
@@ -750,9 +750,6 @@ class ProposerPostgres(PgProtocol):
f"listen_addresses = '{self.listen_addr}'\n",
f"port = '{self.port}'\n",
]
if additional_config_options:
for key, value in additional_config_options.items():
cfg.append(f"{key} = '{value}'\n")
f.writelines(cfg)
@@ -1449,7 +1446,6 @@ class SafekeeperEnv:
pg_bin: PgBin,
neon_binpath: Path,
num_safekeepers: int = 1,
pg_conf_options: Optional[dict[str, str]] = None,
):
self.repo_dir = repo_dir
self.port_distributor = port_distributor
@@ -1461,7 +1457,6 @@ class SafekeeperEnv:
self.postgres: Optional[ProposerPostgres] = None
self.tenant_id: Optional[TenantId] = None
self.timeline_id: Optional[TimelineId] = None
self.pg_conf_options = pg_conf_options
def init(self) -> SafekeeperEnv:
assert self.postgres is None, "postgres is already initialized"
@@ -1504,7 +1499,6 @@ class SafekeeperEnv:
str(i),
"--broker-endpoint",
self.fake_broker_endpoint,
# "--no-sync",
]
log.info(f'Running command "{" ".join(cmd)}"')
@@ -1539,7 +1533,7 @@ class SafekeeperEnv:
self.port_distributor.get_port(),
)
pg.initdb()
pg.create_dir_config(self.get_safekeeper_connstrs(), self.pg_conf_options)
pg.create_dir_config(self.get_safekeeper_connstrs())
return pg
def kill_safekeeper(self, sk_dir):
@@ -1564,224 +1558,6 @@ class SafekeeperEnv:
self.kill_safekeeper(sk_proc.args[6])
def run_pg_restore(pg_bin: PgBin, dump_file: Path, postgres: ProposerPostgres, table_names: List[str]):
# env_vars = {
# "PGOPTIONS": "-c maintenance_work_mem=4388608 -c max_parallel_maintenance_workers=7",
# }
postgres.connstr
pg_restore_command: List[str] = [
"pg_restore",
"-v", # Verbose output
"-d", postgres.connstr(options='-c maintenance_work_mem=4388608 -c max_parallel_maintenance_workers=7 -cstatement_timeout=0'), # Target database
"--no-owner", # Do not restore ownership
"--jobs=4", # Number of parallel jobs
str(dump_file), # Dump file
]
# Add table names to the command
for table in table_names:
pg_restore_command.insert(-2, "-t")
pg_restore_command.insert(-2, table)
pg_bin.run(pg_restore_command)
return None
def download_pg_dump(database_name: str) -> Path:
dump_file_path = Path(f"/tmp/{database_name}.pg_dump")
if not dump_file_path.exists():
s3_path = f"s3://neon-github-dev/performance/pgdumps/{database_name}/{database_name}.pg_dump"
try:
log.info(f"Downloading {s3_path} to {dump_file_path}")
subprocess.run(["aws", "s3", "cp", s3_path, str(dump_file_path)], check=True)
except subprocess.CalledProcessError:
log.error("Failed to download the pg_dump file. Ensure AWS S3 credentials are set in the environment variables.")
raise
return dump_file_path
def test_safekeeper_without_pageserver_and_pg_restore(
test_output_dir: str,
port_distributor: PortDistributor,
pg_bin: PgBin,
neon_binpath: Path,
):
# Create the environment in the test-specific output dir
repo_dir = Path(os.path.join(test_output_dir, "repo"))
# Download the pg_dump file if it doesn't exist
database_name = "clickbench" # Replace with your database name
dump_file_path = download_pg_dump(database_name)
pg_conf_options: dict[str, str] = {
"shared_buffers": "8GB",
}
env = SafekeeperEnv(
repo_dir,
port_distributor,
pg_bin,
neon_binpath,
pg_conf_options=pg_conf_options,
)
with env:
env.init()
assert env.postgres is not None
shared_buffers = env.postgres.safe_psql("show shared_buffers")[0][0]
log.info(f"shared_buffers: {shared_buffers}")
size_before = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
log.info(f"Database size before restore: {size_before}")
start_time = time.time()
run_pg_restore(pg_bin, dump_file_path, env.postgres, ["hits"])
end_time = time.time()
duration = end_time - start_time
size_after = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
log.info(f"Database size after restore: {size_after}")
# Calculate the restore rate in bytes/second
restored_size = size_after - size_before
restore_rate = restored_size / duration
log.info(f"pg_restore duration: {duration:.2f} seconds")
log.info(f"Restore rate: {restore_rate:.2f} bytes/second")
def test_safekeeper_without_pageserver_and_pg_restore_tpch(
test_output_dir: str,
port_distributor: PortDistributor,
pg_bin: PgBin,
neon_binpath: Path,
):
# Create the environment in the test-specific output dir
repo_dir = Path(os.path.join(test_output_dir, "repo"))
# Download the pg_dump file if it doesn't exist
database_name = "tpch" # Replace with your database name
dump_file_path = download_pg_dump(database_name)
pg_conf_options: dict[str, str] = {
"shared_buffers": "8GB",
}
env = SafekeeperEnv(
repo_dir,
port_distributor,
pg_bin,
neon_binpath,
pg_conf_options=pg_conf_options,
)
with env:
env.init()
assert env.postgres is not None
shared_buffers = env.postgres.safe_psql("show shared_buffers")[0][0]
log.info(f"shared_buffers: {shared_buffers}")
size_before = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
log.info(f"Database size before restore: {size_before}")
table_names = ["customer", "lineitem", "nation", "orders", "part", "partsupp", "region", "supplier"]
start_time = time.time()
run_pg_restore(pg_bin, dump_file_path, env.postgres, table_names)
end_time = time.time()
duration = end_time - start_time
size_after = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
log.info(f"Database size after restore: {size_after}")
# Calculate the restore rate in bytes/second
restored_size = size_after - size_before
restore_rate = restored_size / duration
log.info(f"pg_restore duration: {duration:.2f} seconds")
log.info(f"Restore rate: {restore_rate:.2f} bytes/second")
def test_safekeeper_without_pageserver_and_waltest(
test_output_dir: str,
port_distributor: PortDistributor,
pg_bin: PgBin,
neon_binpath: Path,
):
# Create the environment in the test-specific output dir
repo_dir = Path(os.path.join(test_output_dir, "repo"))
pg_conf_options: dict[str, str] = {
"shared_buffers": "8GB",
}
env = SafekeeperEnv(
repo_dir,
port_distributor,
pg_bin,
neon_binpath,
pg_conf_options=pg_conf_options,
)
wal_test = """
CREATE OR REPLACE FUNCTION public.wal_bandwidth_test()
RETURNS text
LANGUAGE plpgsql
AS $function$
declare
i int;
j int;
lastlsn pg_lsn;
nowlsn pg_lsn;
lastts timestamp;
nowts timestamp;
bandwidth numeric;
result text := ''; -- Initialize an empty string to capture messages
begin
for i in 1..100 loop
lastlsn = pg_current_wal_insert_lsn();
lastts = clock_timestamp();
-- Emit 100 MB of WAL
for j in 1..10000 loop
perform pg_logical_emit_message(false, '', repeat('x', 10486));
end loop;
nowlsn = pg_current_wal_insert_lsn();
nowts = clock_timestamp();
bandwidth = (nowlsn - lastlsn) / (extract(epoch from nowts) - extract(epoch from lastts));
-- Capture the message instead of raising a notice
result := result || format('bandwidth: %s kB / s%s',
lpad(round(bandwidth / 1024)::text, 10),
chr(10)); -- Newline for formatting
end loop;
return result; -- Return the concatenated string of messages
end;
$function$
;
"""
with env:
env.init()
assert env.postgres is not None
shared_buffers = env.postgres.safe_psql("show shared_buffers")[0][0]
log.info(f"shared_buffers: {shared_buffers}")
size_before = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
log.info(f"Database size before test: {size_before}")
#env.postgres.safe_psql("create extension neon;")
env.postgres.safe_psql(wal_test)
start_time = time.time()
output = env.postgres.safe_psql("""
SET statement_timeout = 0;
select wal_bandwidth_test();
""")[0][0]
end_time = time.time()
duration = end_time - start_time
log.info(output)
size_after = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
log.info(f"Database size after test (irrelevant because no real WAL records, no relations): {size_after}")
# Calculate the restore rate in bytes/second
restored_size = size_after - size_before
restore_rate = restored_size / duration
log.info(f"test duration: {duration:.2f} seconds")
log.info(f"Average ingest rate(irrelevant because no real WAL records, no relations): {restore_rate:.2f} bytes/second")
def test_safekeeper_without_pageserver(
test_output_dir: str,
port_distributor: PortDistributor,
@@ -2538,12 +2314,12 @@ def test_s3_eviction(
]
if delete_offloaded_wal:
neon_env_builder.safekeeper_extra_opts.append("--delete-offloaded-wal")
env = neon_env_builder.init_start(
initial_tenant_conf={
"checkpoint_timeout": "100ms",
}
)
# make lagging_wal_timeout small to force pageserver quickly forget about
# safekeeper after it stops sending updates (timeline is deactivated) to
# make test faster. Won't be needed with
# https://github.com/neondatabase/neon/issues/8148 fixed.
initial_tenant_conf = {"lagging_wal_timeout": "1s", "checkpoint_timeout": "100ms"}
env = neon_env_builder.init_start(initial_tenant_conf=initial_tenant_conf)
n_timelines = 5
@@ -2631,9 +2407,37 @@ def test_s3_eviction(
and sk.log_contains("successfully restored evicted timeline")
for sk in env.safekeepers
)
assert event_metrics_seen
# test safekeeper_evicted_timelines metric
log.info("testing safekeeper_evicted_timelines metric")
# checkpoint pageserver to force remote_consistent_lsn update
for i in range(n_timelines):
ps_client.timeline_checkpoint(env.initial_tenant, timelines[i], wait_until_uploaded=True)
for ep in endpoints:
log.info(ep.is_running())
sk = env.safekeepers[0]
# all timelines must be evicted eventually
def all_evicted():
n_evicted = sk.http_client().get_metric_value("safekeeper_evicted_timelines")
assert n_evicted # make mypy happy
assert int(n_evicted) == n_timelines
wait_until(60, 0.5, all_evicted)
# restart should preserve the metric value
sk.stop().start()
wait_until(60, 0.5, all_evicted)
# and endpoint start should reduce is
endpoints[0].start()
def one_unevicted():
n_evicted = sk.http_client().get_metric_value("safekeeper_evicted_timelines")
assert n_evicted # make mypy happy
assert int(n_evicted) < n_timelines
wait_until(60, 0.5, one_unevicted)
# Test resetting uploaded partial segment state.
def test_backup_partial_reset(neon_env_builder: NeonEnvBuilder):

View File

@@ -58,6 +58,7 @@ num-integer = { version = "0.1", features = ["i128"] }
num-traits = { version = "0.2", features = ["i128", "libm"] }
once_cell = { version = "1" }
parquet = { version = "53", default-features = false, features = ["zstd"] }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2", default-features = false, features = ["with-serde_json-1"] }
prost = { version = "0.13", features = ["prost-derive"] }
rand = { version = "0.8", features = ["small_rng"] }
regex = { version = "1" }
@@ -66,7 +67,7 @@ regex-syntax = { version = "0.8" }
reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls", "stream"] }
scopeguard = { version = "1" }
serde = { version = "1", features = ["alloc", "derive"] }
serde_json = { version = "1", features = ["raw_value"] }
serde_json = { version = "1", features = ["alloc", "raw_value"] }
sha2 = { version = "0.10", features = ["asm", "oid"] }
signature = { version = "2", default-features = false, features = ["digest", "rand_core", "std"] }
smallvec = { version = "1", default-features = false, features = ["const_new", "write"] }
@@ -76,6 +77,7 @@ sync_wrapper = { version = "0.1", default-features = false, features = ["futures
tikv-jemalloc-sys = { version = "0.5" }
time = { version = "0.3", features = ["macros", "serde-well-known"] }
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2", features = ["with-serde_json-1"] }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
toml_edit = { version = "0.22", features = ["serde"] }