Compare commits

..

18 Commits

Author SHA1 Message Date
Konstantin Knizhnik
3984b5c03f Update test_compute_restart.py 2024-10-17 14:37:24 +03:00
Konstantin Knizhnik
aa3ef2f048 Update test_compute_restart.py 2024-10-17 10:29:08 +03:00
Konstantin Knizhnik
5f37c3802e Add test restarting compute node to investigate flukyness of test_subscriber_restart 2024-10-17 08:52:56 +03:00
Konstantin Knizhnik
934dbb61f5 Check access_count in lfc_evict (#9407)
## Problem

See
https://neondb.slack.com/archives/C033A2WE6BZ/p1729007738526309?thread_ts=1722942856.987979&cid=C033A2WE6BZ

When replica receives WAL record which target page is not present in
shared buffer, we evict this page from LFC.
If all pages from the LFC chunk are evicted, then chunk is moved to the
beginning of LRU least to force it reuse.
Unfortunately access_count is not checked and if the entry is access at
this moment then this operation can cause LRU list corruption.

## Summary of changes

Check `access_count` in `lfc_evict`

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2024-10-17 08:04:57 +03:00
Christian Schwarz
67d5d98b19 readme: fix build instructions for debian 12 (#9371)
We need libprotobuf-dev for some of the
`/usr/include/google/protobuf/...*.proto`
referenced by our protobuf decls.
2024-10-16 21:47:53 +02:00
Tristan Partin
e0fa6bcf1a Fix some sql_exporter metrics for PG 17
Checkpointer related statistics moved from pg_stat_bgwriter to
pg_stat_checkpointer, so we need to adjust our queries accordingly.

Signed-off-by: Tristan Partin <tristan@neon.tech>
2024-10-16 14:46:33 -05:00
Tristan Partin
409a286eaa Fix typo in sql_exporter generator
Bad copy-paste seemingly. This manifested itself as a failure to start
for the sql_exporter, and was just dying on loop in staging. A future PR
will have E2E testing of sql_exporter.

Signed-off-by: Tristan Partin <tristan@neon.tech>
2024-10-16 13:08:40 -05:00
Arpad Müller
0551cfb6a7 Fix beta clippy warnings (#9419)
```
warning: first doc comment paragraph is too long
  --> compute_tools/src/installed_extensions.rs:35:1
   |
35 | / /// Connect to every database (see list_dbs above) and get the list of installed extensions.
36 | | /// Same extension can be installed in multiple databases with different versions,
37 | | /// we only keep the highest and lowest version across all databases.
   | |_
   |
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#too_long_first_doc_paragraph
   = note: `#[warn(clippy::too_long_first_doc_paragraph)]` on by default
help: add an empty line
   |
35 ~ /// Connect to every database (see list_dbs above) and get the list of installed extensions.
36 + ///
   |
```
2024-10-16 19:04:56 +01:00
Folke Behrens
ed694732e7 proxy: merge AuthError and AuthErrorImpl (#9418)
Since GetAuthInfoError now boxes the ControlPlaneError message the
variant is not big anymore and AuthError is 32 bytes.
2024-10-16 19:10:49 +02:00
Alex Chi Z.
8a114e3aed refactor(pageserver): upgrade remote_storage to use hyper1 (#9405)
part of https://github.com/neondatabase/neon/issues/9255

## Summary of changes

Upgrade remote_storage crate to use hyper1. Hyper0 is used when
providing the streaming HTTP body to the s3 SDK, and it is refactored to
use hyper1.


Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-10-16 16:19:45 +01:00
Arpad Müller
55b246085e Activate timelines during unoffload (#9399)
The current code has forgotten to activate timelines during unoffload,
leading to inability to receive the basebackup, due to the timeline
still being in loading state.

```
  stderr:
    command failed: compute startup failed: failed to get basebackup@0/0 from pageserver postgresql://no_user@localhost:15014

    Caused by:
        0: db error: ERROR: Not found: Timeline 508546c79b2b16a84ab609fdf966e0d3/bfc18c24c4b837ecae5dbb5216c80fce is not active, state: Loading
        1: ERROR: Not found: Timeline 508546c79b2b16a84ab609fdf966e0d3/bfc18c24c4b837ecae5dbb5216c80fce is not active, state: Loading
```

Therefore, also activate the timeline during unoffloading.

Part of #8088
2024-10-16 16:47:17 +02:00
Anastasia Lubennikova
9668601f46 Add support of extensions for v17 (part 2) (#9389)
- plv8 3.2.3
    - HypoPG 1.4.1
    - pgtap 1.3.3
    - timescaledb 2.17.0
    - pg_hint_plan 17_1_7_0
    - rdkit Release_2024_09_1
    - pg_uuidv7 1.6.0
    - wal2json 2.6
    - pg_ivm 1.9
    - pg_partman 5.1.0

    update support of extensions for v14-v16:
    - HypoPG 1.4.0 -> 1.4.1
    - pgtap 1.2.0 -> 1.3.3
    - plpgsql_check 2.5.3 -> 2.7.11
    - pg_uuidv7 1.0.1 -> 1.6.0
    - wal2json 2.5 -> 2.6
    - pg_ivm 1.7 -> 1.9
    - pg_partman 5.0.1 -> 5.1.0
2024-10-16 15:29:23 +01:00
Arpad Müller
3140c14d60 Remove allow(clippy::unknown_lints) (#9416)
the lint stabilized in 1.80.
2024-10-16 16:28:55 +02:00
John Spray
d6281cbe65 tests: stabilize test_timelines_parallel_endpoints (#9413)
## Problem

This test would get failures like `command failed: Found no timeline id
for branch name 'branch_8'`

It's because neon_local is being invoked concurrently for branch
creation, which is unsafe (they'll step on each others' JSON writes)

Example failure:
https://neon-github-public-dev.s3.amazonaws.com/reports/pr-9410/11363051979/index.html#testresult/5ddc56c640f5422b/retries

## Summary of changes

- Don't do branch creation concurrently with endpoint creation via neon_local
2024-10-16 15:27:46 +01:00
Vlad Lazar
d490ad23e0 storcon: use the same trace fields for reconciler and results (#9410)
## Problem

The reconciler use `seq`, but processing of results uses `sequence`.
Order is different too. It makes it annoying to read logs.

## Summary of Changes

Use the same tracing fields in both
2024-10-16 14:04:17 +01:00
Folke Behrens
f14e45f0ce proxy: format imports with nightly rustfmt (#9414)
```shell
cargo +nightly fmt -p proxy -- -l --config imports_granularity=Module,group_imports=StdExternalCrate,reorder_imports=true
```

These rust-analyzer settings for VSCode should help retain this style:
```json
  "rust-analyzer.imports.group.enable": true,
  "rust-analyzer.imports.prefix": "crate",
  "rust-analyzer.imports.merge.glob": false,
  "rust-analyzer.imports.granularity.group": "module",
  "rust-analyzer.imports.granularity.enforce": true,
```
2024-10-16 15:01:56 +02:00
John Spray
89a65a9e5a pageserver: improve handling of archival_config calls during Timeline shutdown (#9415)
## Problem

In test `test_timeline_offloading`, we see failures like:
```
PageserverApiException: queue is in state Stopped
```

Example failure:
https://neon-github-public-dev.s3.amazonaws.com/reports/main/11356917668/index.html#testresult/ff0e348a78a974ee/retries

## Summary of changes

- Amend code paths that handle errors from RemoteTimelineClient to check
for cancellation and emit the Cancelled error variant in these cases
(will give clients a 503 to retry)
- Remove the implicit `#[from]` for the Other error case, to make it
harder to add code that accidentally squashes errors into this
(500-equivalent) error variant.

This would be neater if we made RemoteTimelineClient return a structured
error instead of anyhow::Error, but that's a bigger refactor.

I'm not sure if the test really intends to hit this path, but the error
handling fix makes sense either way.
2024-10-16 13:39:58 +01:00
Cihan Demirci
bc6b8cee01 don't trigger workflows in two repos (#9340)
https://github.com/neondatabase/cloud/issues/16723
2024-10-16 10:43:48 +01:00
96 changed files with 1074 additions and 1012 deletions

View File

@@ -1100,7 +1100,6 @@ jobs:
run: |
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
gh workflow --repo neondatabase/infra run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=false
gh workflow --repo neondatabase/azure run deploy.yml -f dockerTag=${{needs.tag.outputs.build-tag}}
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
gh workflow --repo neondatabase/infra run deploy-dev.yml --ref main \
-f deployPgSniRouter=false \

3
Cargo.lock generated
View File

@@ -4648,9 +4648,10 @@ dependencies = [
"camino-tempfile",
"futures",
"futures-util",
"http-body-util",
"http-types",
"humantime-serde",
"hyper 0.14.30",
"hyper 1.4.1",
"itertools 0.10.5",
"metrics",
"once_cell",

View File

@@ -31,7 +31,7 @@ See developer documentation in [SUMMARY.md](/docs/SUMMARY.md) for more informati
```bash
apt install build-essential libtool libreadline-dev zlib1g-dev flex bison libseccomp-dev \
libssl-dev clang pkg-config libpq-dev cmake postgresql-client protobuf-compiler \
libcurl4-openssl-dev openssl python3-poetry lsof libicu-dev
libprotobuf-dev libcurl4-openssl-dev openssl python3-poetry lsof libicu-dev
```
* On Fedora, these packages are needed:
```bash

View File

@@ -18,13 +18,14 @@ RUN case $DEBIAN_VERSION in \
# Version-specific installs for Bullseye (PG14-PG16):
# The h3_pg extension needs a cmake 3.20+, but Debian bullseye has 3.18.
# Install newer version (3.25) from backports.
# libstdc++-10-dev is required for plv8
bullseye) \
echo "deb http://deb.debian.org/debian bullseye-backports main" > /etc/apt/sources.list.d/bullseye-backports.list; \
VERSION_INSTALLS="cmake/bullseye-backports cmake-data/bullseye-backports"; \
VERSION_INSTALLS="cmake/bullseye-backports cmake-data/bullseye-backports libstdc++-10-dev"; \
;; \
# Version-specific installs for Bookworm (PG17):
bookworm) \
VERSION_INSTALLS="cmake"; \
VERSION_INSTALLS="cmake libstdc++-12-dev"; \
;; \
*) \
echo "Unknown Debian version ${DEBIAN_VERSION}" && exit 1 \
@@ -227,18 +228,33 @@ FROM build-deps AS plv8-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
apt update && \
RUN apt update && \
apt install --no-install-recommends -y ninja-build python3-dev libncurses5 binutils clang
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
# plv8 3.2.3 supports v17
# last release v3.2.3 - Sep 7, 2024
#
# clone the repo instead of downloading the release tarball because plv8 has submodule dependencies
# and the release tarball doesn't include them
#
# Use new version only for v17
# because since v3.2, plv8 doesn't include plcoffee and plls extensions
ENV PLV8_TAG=v3.2.3
RUN case "${PG_VERSION}" in \
"v17") \
export PLV8_TAG=v3.2.3 \
;; \
"v14" | "v15" | "v16") \
export PLV8_TAG=v3.1.10 \
;; \
*) \
echo "unexpected PostgreSQL version" && exit 1 \
;; \
esac && \
wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.10.tar.gz -O plv8.tar.gz && \
echo "7096c3290928561f0d4901b7a52794295dc47f6303102fae3f8e42dd575ad97d plv8.tar.gz" | sha256sum --check && \
mkdir plv8-src && cd plv8-src && tar xzf ../plv8.tar.gz --strip-components=1 -C . && \
git clone --recurse-submodules --depth 1 --branch ${PLV8_TAG} https://github.com/plv8/plv8.git plv8-src && \
tar -czf plv8.tar.gz --exclude .git plv8-src && \
cd plv8-src && \
# generate and copy upgrade scripts
mkdir -p upgrade && ./generate_upgrade.sh 3.1.10 && \
cp upgrade/* /usr/local/pgsql/share/extension/ && \
@@ -248,8 +264,17 @@ RUN case "${PG_VERSION}" in "v17") \
find /usr/local/pgsql/ -name "plv8-*.so" | xargs strip && \
# don't break computes with installed old version of plv8
cd /usr/local/pgsql/lib/ && \
ln -s plv8-3.1.10.so plv8-3.1.5.so && \
ln -s plv8-3.1.10.so plv8-3.1.8.so && \
case "${PG_VERSION}" in \
"v17") \
ln -s plv8-3.2.3.so plv8-3.1.8.so && \
ln -s plv8-3.2.3.so plv8-3.1.5.so && \
ln -s plv8-3.2.3.so plv8-3.1.10.so \
;; \
"v14" | "v15" | "v16") \
ln -s plv8-3.1.10.so plv8-3.1.5.so && \
ln -s plv8-3.1.10.so plv8-3.1.8.so \
;; \
esac && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/plv8.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/plcoffee.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/plls.control
@@ -327,6 +352,9 @@ COPY compute/patches/pgvector.patch /pgvector.patch
# By default, pgvector Makefile uses `-march=native`. We don't want that,
# because we build the images on different machines than where we run them.
# Pass OPTFLAGS="" to remove it.
#
# v17 is not supported yet because of upstream issue
# https://github.com/pgvector/pgvector/issues/669
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
@@ -366,11 +394,10 @@ FROM build-deps AS hypopg-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/HypoPG/hypopg/archive/refs/tags/1.4.0.tar.gz -O hypopg.tar.gz && \
echo "0821011743083226fc9b813c1f2ef5897a91901b57b6bea85a78e466187c6819 hypopg.tar.gz" | sha256sum --check && \
# HypoPG 1.4.1 supports v17
# last release 1.4.1 - Apr 28, 2024
RUN wget https://github.com/HypoPG/hypopg/archive/refs/tags/1.4.1.tar.gz -O hypopg.tar.gz && \
echo "9afe6357fd389d8d33fad81703038ce520b09275ec00153c6c89282bcdedd6bc hypopg.tar.gz" | sha256sum --check && \
mkdir hypopg-src && cd hypopg-src && tar xzf ../hypopg.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -407,6 +434,9 @@ COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY compute/patches/rum.patch /rum.patch
# maybe version-specific
# support for v17 is unknown
# last release 1.3.13 - Sep 19, 2022
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
@@ -428,11 +458,10 @@ FROM build-deps AS pgtap-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/theory/pgtap/archive/refs/tags/v1.2.0.tar.gz -O pgtap.tar.gz && \
echo "9c7c3de67ea41638e14f06da5da57bac6f5bd03fea05c165a0ec862205a5c052 pgtap.tar.gz" | sha256sum --check && \
# pgtap 1.3.3 supports v17
# last release v1.3.3 - Apr 8, 2024
RUN wget https://github.com/theory/pgtap/archive/refs/tags/v1.3.3.tar.gz -O pgtap.tar.gz && \
echo "325ea79d0d2515bce96bce43f6823dcd3effbd6c54cb2a4d6c2384fffa3a14c7 pgtap.tar.gz" | sha256sum --check && \
mkdir pgtap-src && cd pgtap-src && tar xzf ../pgtap.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -505,11 +534,10 @@ FROM build-deps AS plpgsql-check-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/okbob/plpgsql_check/archive/refs/tags/v2.5.3.tar.gz -O plpgsql_check.tar.gz && \
echo "6631ec3e7fb3769eaaf56e3dfedb829aa761abf163d13dba354b4c218508e1c0 plpgsql_check.tar.gz" | sha256sum --check && \
# plpgsql_check v2.7.11 supports v17
# last release v2.7.11 - Sep 16, 2024
RUN wget https://github.com/okbob/plpgsql_check/archive/refs/tags/v2.7.11.tar.gz -O plpgsql_check.tar.gz && \
echo "208933f8dbe8e0d2628eb3851e9f52e6892b8e280c63700c0f1ce7883625d172 plpgsql_check.tar.gz" | sha256sum --check && \
mkdir plpgsql_check-src && cd plpgsql_check-src && tar xzf ../plpgsql_check.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
@@ -527,18 +555,19 @@ COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ARG PG_VERSION
ENV PATH="/usr/local/pgsql/bin:$PATH"
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
case "${PG_VERSION}" in \
RUN case "${PG_VERSION}" in \
"v14" | "v15") \
export TIMESCALEDB_VERSION=2.10.1 \
export TIMESCALEDB_CHECKSUM=6fca72a6ed0f6d32d2b3523951ede73dc5f9b0077b38450a029a5f411fdb8c73 \
;; \
*) \
"v16") \
export TIMESCALEDB_VERSION=2.13.0 \
export TIMESCALEDB_CHECKSUM=584a351c7775f0e067eaa0e7277ea88cab9077cc4c455cbbf09a5d9723dce95d \
;; \
"v17") \
export TIMESCALEDB_VERSION=2.17.0 \
export TIMESCALEDB_CHECKSUM=155bf64391d3558c42f31ca0e523cfc6252921974f75298c9039ccad1c89811a \
;; \
esac && \
wget https://github.com/timescale/timescaledb/archive/refs/tags/${TIMESCALEDB_VERSION}.tar.gz -O timescaledb.tar.gz && \
echo "${TIMESCALEDB_CHECKSUM} timescaledb.tar.gz" | sha256sum --check && \
@@ -561,10 +590,8 @@ COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ARG PG_VERSION
ENV PATH="/usr/local/pgsql/bin:$PATH"
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
case "${PG_VERSION}" in \
# version-specific, has separate releases for each version
RUN case "${PG_VERSION}" in \
"v14") \
export PG_HINT_PLAN_VERSION=14_1_4_1 \
export PG_HINT_PLAN_CHECKSUM=c3501becf70ead27f70626bce80ea401ceac6a77e2083ee5f3ff1f1444ec1ad1 \
@@ -578,7 +605,8 @@ RUN case "${PG_VERSION}" in "v17") \
export PG_HINT_PLAN_CHECKSUM=fc85a9212e7d2819d4ae4ac75817481101833c3cfa9f0fe1f980984e12347d00 \
;; \
"v17") \
echo "TODO: PG17 pg_hint_plan support" && exit 0 \
export PG_HINT_PLAN_VERSION=17_1_7_0 \
export PG_HINT_PLAN_CHECKSUM=06dd306328c67a4248f48403c50444f30959fb61ebe963248dbc2afb396fe600 \
;; \
*) \
echo "Export the valid PG_HINT_PLAN_VERSION variable" && exit 1 \
@@ -602,6 +630,10 @@ FROM build-deps AS pg-cron-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# 1.6.4 available, supports v17
# This is an experimental extension that we do not support on prod yet.
# !Do not remove!
# We set it in shared_preload_libraries and computes will fail to start if library is not found.
ENV PATH="/usr/local/pgsql/bin/:$PATH"
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
@@ -623,23 +655,37 @@ FROM build-deps AS rdkit-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
apt-get update && \
RUN apt-get update && \
apt-get install --no-install-recommends -y \
libboost-iostreams1.74-dev \
libboost-regex1.74-dev \
libboost-serialization1.74-dev \
libboost-system1.74-dev \
libeigen3-dev
libeigen3-dev \
libboost-all-dev
# rdkit Release_2024_09_1 supports v17
# last release Release_2024_09_1 - Sep 27, 2024
#
# Use new version only for v17
# because Release_2024_09_1 has some backward incompatible changes
# https://github.com/rdkit/rdkit/releases/tag/Release_2024_09_1
ENV PATH="/usr/local/pgsql/bin/:/usr/local/pgsql/:$PATH"
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
RUN case "${PG_VERSION}" in \
"v17") \
export RDKIT_VERSION=Release_2024_09_1 \
export RDKIT_CHECKSUM=034c00d6e9de323506834da03400761ed8c3721095114369d06805409747a60f \
;; \
"v14" | "v15" | "v16") \
export RDKIT_VERSION=Release_2023_03_3 \
export RDKIT_CHECKSUM=bdbf9a2e6988526bfeb8c56ce3cdfe2998d60ac289078e2215374288185e8c8d \
;; \
*) \
echo "unexpected PostgreSQL version" && exit 1 \
;; \
esac && \
wget https://github.com/rdkit/rdkit/archive/refs/tags/Release_2023_03_3.tar.gz -O rdkit.tar.gz && \
echo "bdbf9a2e6988526bfeb8c56ce3cdfe2998d60ac289078e2215374288185e8c8d rdkit.tar.gz" | sha256sum --check && \
wget https://github.com/rdkit/rdkit/archive/refs/tags/${RDKIT_VERSION}.tar.gz -O rdkit.tar.gz && \
echo "${RDKIT_CHECKSUM} rdkit.tar.gz" | sha256sum --check && \
mkdir rdkit-src && cd rdkit-src && tar xzf ../rdkit.tar.gz --strip-components=1 -C . && \
cmake \
-D RDK_BUILD_CAIRO_SUPPORT=OFF \
@@ -678,12 +724,11 @@ FROM build-deps AS pg-uuidv7-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# not version-specific
# last release v1.6.0 - Oct 9, 2024
ENV PATH="/usr/local/pgsql/bin/:$PATH"
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/fboulnois/pg_uuidv7/archive/refs/tags/v1.0.1.tar.gz -O pg_uuidv7.tar.gz && \
echo "0d0759ab01b7fb23851ecffb0bce27822e1868a4a5819bfd276101c716637a7a pg_uuidv7.tar.gz" | sha256sum --check && \
RUN wget https://github.com/fboulnois/pg_uuidv7/archive/refs/tags/v1.6.0.tar.gz -O pg_uuidv7.tar.gz && \
echo "0fa6c710929d003f6ce276a7de7a864e9d1667b2d78be3dc2c07f2409eb55867 pg_uuidv7.tar.gz" | sha256sum --check && \
mkdir pg_uuidv7-src && cd pg_uuidv7-src && tar xzf ../pg_uuidv7.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
@@ -754,6 +799,8 @@ RUN case "${PG_VERSION}" in \
FROM build-deps AS pg-embedding-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# This is our extension, support stopped in favor of pgvector
# TODO: deprecate it
ARG PG_VERSION
ENV PATH="/usr/local/pgsql/bin/:$PATH"
RUN case "${PG_VERSION}" in \
@@ -780,6 +827,8 @@ FROM build-deps AS pg-anon-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# This is an experimental extension, never got to real production.
# !Do not remove! It can be present in shared_preload_libraries and compute will fail to start if library is not found.
ENV PATH="/usr/local/pgsql/bin/:$PATH"
RUN case "${PG_VERSION}" in "v17") \
echo "postgresql_anonymizer does not yet support PG17" && exit 0;; \
@@ -946,13 +995,12 @@ FROM build-deps AS wal2json-pg-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# wal2json wal2json_2_6 supports v17
# last release wal2json_2_6 - Apr 25, 2024
ENV PATH="/usr/local/pgsql/bin/:$PATH"
RUN case "${PG_VERSION}" in "v17") \
echo "We'll need to update wal2json to 2.6+ for pg17 support" && exit 0;; \
esac && \
wget https://github.com/eulerto/wal2json/archive/refs/tags/wal2json_2_5.tar.gz && \
echo "b516653575541cf221b99cf3f8be9b6821f6dbcfc125675c85f35090f824f00e wal2json_2_5.tar.gz" | sha256sum --check && \
mkdir wal2json-src && cd wal2json-src && tar xzf ../wal2json_2_5.tar.gz --strip-components=1 -C . && \
RUN wget https://github.com/eulerto/wal2json/archive/refs/tags/wal2json_2_6.tar.gz -O wal2json.tar.gz && \
echo "18b4bdec28c74a8fc98a11c72de38378a760327ef8e5e42e975b0029eb96ba0d wal2json.tar.gz" | sha256sum --check && \
mkdir wal2json-src && cd wal2json-src && tar xzf ../wal2json.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install
@@ -966,12 +1014,11 @@ FROM build-deps AS pg-ivm-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# pg_ivm v1.9 supports v17
# last release v1.9 - Jul 31
ENV PATH="/usr/local/pgsql/bin/:$PATH"
RUN case "${PG_VERSION}" in "v17") \
echo "We'll need to update pg_ivm to 1.9+ for pg17 support" && exit 0;; \
esac && \
wget https://github.com/sraoss/pg_ivm/archive/refs/tags/v1.7.tar.gz -O pg_ivm.tar.gz && \
echo "ebfde04f99203c7be4b0e873f91104090e2e83e5429c32ac242d00f334224d5e pg_ivm.tar.gz" | sha256sum --check && \
RUN wget https://github.com/sraoss/pg_ivm/archive/refs/tags/v1.9.tar.gz -O pg_ivm.tar.gz && \
echo "59e15722939f274650abf637f315dd723c87073496ca77236b044cb205270d8b pg_ivm.tar.gz" | sha256sum --check && \
mkdir pg_ivm-src && cd pg_ivm-src && tar xzf ../pg_ivm.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
@@ -987,12 +1034,11 @@ FROM build-deps AS pg-partman-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# should support v17 https://github.com/pgpartman/pg_partman/discussions/693
# last release 5.1.0 Apr 2, 2024
ENV PATH="/usr/local/pgsql/bin/:$PATH"
RUN case "${PG_VERSION}" in "v17") \
echo "pg_partman doesn't support PG17 yet" && exit 0;; \
esac && \
wget https://github.com/pgpartman/pg_partman/archive/refs/tags/v5.0.1.tar.gz -O pg_partman.tar.gz && \
echo "75b541733a9659a6c90dbd40fccb904a630a32880a6e3044d0c4c5f4c8a65525 pg_partman.tar.gz" | sha256sum --check && \
RUN wget https://github.com/pgpartman/pg_partman/archive/refs/tags/v5.1.0.tar.gz -O pg_partman.tar.gz && \
echo "3e3a27d7ff827295d5c55ef72f07a49062d6204b3cb0b9a048645d6db9f3cb9f pg_partman.tar.gz" | sha256sum --check && \
mkdir pg_partman-src && cd pg_partman-src && tar xzf ../pg_partman.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
@@ -1175,12 +1221,13 @@ RUN rm /usr/local/pgsql/lib/lib*.a
#
#########################################################################################
FROM $REPOSITORY/$IMAGE:$TAG AS sql_exporter_preprocessor
ARG PG_VERSION
USER nonroot
COPY --chown=nonroot compute compute
RUN make -C compute
RUN make PG_VERSION="${PG_VERSION}" -C compute
#########################################################################################
#

View File

@@ -6,13 +6,15 @@ jsonnet_files = $(wildcard \
all: neon_collector.yml neon_collector_autoscaling.yml sql_exporter.yml sql_exporter_autoscaling.yml
neon_collector.yml: $(jsonnet_files)
JSONNET_PATH=etc jsonnet \
JSONNET_PATH=jsonnet:etc jsonnet \
--output-file etc/$@ \
--ext-str pg_version=$(PG_VERSION) \
etc/neon_collector.jsonnet
neon_collector_autoscaling.yml: $(jsonnet_files)
JSONNET_PATH=etc jsonnet \
JSONNET_PATH=jsonnet:etc jsonnet \
--output-file etc/$@ \
--ext-str pg_version=$(PG_VERSION) \
etc/neon_collector_autoscaling.jsonnet
sql_exporter.yml: $(jsonnet_files)

View File

@@ -28,7 +28,7 @@ function(collector_file, application_name='sql_exporter') {
// Collectors (referenced by name) to execute on the target.
// Glob patterns are supported (see <https://pkg.go.dev/path/filepath#Match> for syntax).
collectors: [
'neon_collector_autoscaling',
'neon_collector',
],
},

View File

@@ -0,0 +1 @@
SELECT num_requested AS checkpoints_req FROM pg_stat_checkpointer;

View File

@@ -1,3 +1,8 @@
local neon = import 'neon.libsonnet';
local pg_stat_bgwriter = importstr 'sql_exporter/checkpoints_req.sql';
local pg_stat_checkpointer = importstr 'sql_exporter/checkpoints_req.17.sql';
{
metric_name: 'checkpoints_req',
type: 'gauge',
@@ -6,5 +11,5 @@
values: [
'checkpoints_req',
],
query: importstr 'sql_exporter/checkpoints_req.sql',
query: if neon.PG_MAJORVERSION_NUM < 17 then pg_stat_bgwriter else pg_stat_checkpointer,
}

View File

@@ -0,0 +1 @@
SELECT num_timed AS checkpoints_timed FROM pg_stat_checkpointer;

View File

@@ -1,3 +1,8 @@
local neon = import 'neon.libsonnet';
local pg_stat_bgwriter = importstr 'sql_exporter/checkpoints_req.sql';
local pg_stat_checkpointer = importstr 'sql_exporter/checkpoints_req.17.sql';
{
metric_name: 'checkpoints_timed',
type: 'gauge',
@@ -6,5 +11,5 @@
values: [
'checkpoints_timed',
],
query: importstr 'sql_exporter/checkpoints_timed.sql',
query: if neon.PG_MAJORVERSION_NUM < 17 then pg_stat_bgwriter else pg_stat_checkpointer,
}

View File

@@ -0,0 +1,16 @@
local MIN_SUPPORTED_VERSION = 14;
local MAX_SUPPORTED_VERSION = 17;
local SUPPORTED_VERSIONS = std.range(MIN_SUPPORTED_VERSION, MAX_SUPPORTED_VERSION);
# If we receive the pg_version with a leading "v", ditch it.
local pg_version = std.strReplace(std.extVar('pg_version'), 'v', '');
local pg_version_num = std.parseInt(pg_version);
assert std.setMember(pg_version_num, SUPPORTED_VERSIONS) :
std.format('%s is an unsupported Postgres version: %s',
[pg_version, std.toString(SUPPORTED_VERSIONS)]);
{
PG_MAJORVERSION: pg_version,
PG_MAJORVERSION_NUM: pg_version_num,
}

View File

@@ -33,6 +33,7 @@ fn list_dbs(client: &mut Client) -> Result<Vec<String>> {
}
/// Connect to every database (see list_dbs above) and get the list of installed extensions.
///
/// Same extension can be installed in multiple databases with different versions,
/// we only keep the highest and lowest version across all databases.
pub async fn get_installed_extensions(connstr: Url) -> Result<InstalledExtensions> {

View File

@@ -16,7 +16,7 @@ aws-sdk-s3.workspace = true
bytes.workspace = true
camino = { workspace = true, features = ["serde1"] }
humantime-serde.workspace = true
hyper0 = { workspace = true, features = ["stream"] }
hyper = { workspace = true, features = ["client"] }
futures.workspace = true
serde.workspace = true
serde_json.workspace = true
@@ -36,6 +36,7 @@ azure_storage.workspace = true
azure_storage_blobs.workspace = true
futures-util.workspace = true
http-types.workspace = true
http-body-util.workspace = true
itertools.workspace = true
sync_wrapper = { workspace = true, features = ["futures"] }

View File

@@ -28,13 +28,15 @@ use aws_sdk_s3::{
Client,
};
use aws_smithy_async::rt::sleep::TokioSleep;
use http_body_util::StreamBody;
use http_types::StatusCode;
use aws_smithy_types::{body::SdkBody, DateTime};
use aws_smithy_types::{byte_stream::ByteStream, date_time::ConversionError};
use bytes::Bytes;
use futures::stream::Stream;
use hyper0::Body;
use futures_util::StreamExt;
use hyper::body::Frame;
use scopeguard::ScopeGuard;
use tokio_util::sync::CancellationToken;
use utils::backoff;
@@ -710,8 +712,8 @@ impl RemoteStorage for S3Bucket {
let started_at = start_measuring_requests(kind);
let body = Body::wrap_stream(from);
let bytes_stream = ByteStream::new(SdkBody::from_body_0_4(body));
let body = StreamBody::new(from.map(|x| x.map(Frame::data)));
let bytes_stream = ByteStream::new(SdkBody::from_body_1_x(body));
let upload = self
.client

View File

@@ -720,7 +720,12 @@ async fn timeline_archival_config_handler(
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
tenant
.apply_timeline_archival_config(timeline_id, request_data.state, ctx)
.apply_timeline_archival_config(
timeline_id,
request_data.state,
state.broker_client.clone(),
ctx,
)
.await?;
Ok::<_, ApiError>(())
}

View File

@@ -67,7 +67,7 @@ use self::metadata::TimelineMetadata;
use self::mgr::GetActiveTenantError;
use self::mgr::GetTenantError;
use self::remote_timeline_client::upload::upload_index_part;
use self::remote_timeline_client::RemoteTimelineClient;
use self::remote_timeline_client::{RemoteTimelineClient, WaitCompletionError};
use self::timeline::uninit::TimelineCreateGuard;
use self::timeline::uninit::TimelineExclusionError;
use self::timeline::uninit::UninitializedTimeline;
@@ -632,7 +632,7 @@ pub enum TimelineArchivalError {
AlreadyInProgress,
#[error(transparent)]
Other(#[from] anyhow::Error),
Other(anyhow::Error),
}
impl Debug for TimelineArchivalError {
@@ -1554,6 +1554,7 @@ impl Tenant {
async fn unoffload_timeline(
self: &Arc<Self>,
timeline_id: TimelineId,
broker_client: storage_broker::BrokerClientChannel,
ctx: RequestContext,
) -> Result<Arc<Timeline>, TimelineArchivalError> {
info!("unoffloading timeline");
@@ -1602,27 +1603,40 @@ impl Tenant {
"failed to load remote timeline {} for tenant {}",
timeline_id, self.tenant_shard_id
)
})?;
})
.map_err(TimelineArchivalError::Other)?;
let timelines = self.timelines.lock().unwrap();
if let Some(timeline) = timelines.get(&timeline_id) {
let mut offloaded_timelines = self.timelines_offloaded.lock().unwrap();
if offloaded_timelines.remove(&timeline_id).is_none() {
warn!("timeline already removed from offloaded timelines");
}
info!("timeline unoffloading complete");
Ok(Arc::clone(timeline))
} else {
let Some(timeline) = timelines.get(&timeline_id) else {
warn!("timeline not available directly after attach");
Err(TimelineArchivalError::Other(anyhow::anyhow!(
return Err(TimelineArchivalError::Other(anyhow::anyhow!(
"timeline not available directly after attach"
)))
)));
};
let mut offloaded_timelines = self.timelines_offloaded.lock().unwrap();
if offloaded_timelines.remove(&timeline_id).is_none() {
warn!("timeline already removed from offloaded timelines");
}
// Activate the timeline (if it makes sense)
if !(timeline.is_broken() || timeline.is_stopping()) {
let background_jobs_can_start = None;
timeline.activate(
self.clone(),
broker_client.clone(),
background_jobs_can_start,
&ctx,
);
}
info!("timeline unoffloading complete");
Ok(Arc::clone(timeline))
}
pub(crate) async fn apply_timeline_archival_config(
self: &Arc<Self>,
timeline_id: TimelineId,
new_state: TimelineArchivalState,
broker_client: storage_broker::BrokerClientChannel,
ctx: RequestContext,
) -> Result<(), TimelineArchivalError> {
info!("setting timeline archival config");
@@ -1663,18 +1677,29 @@ impl Tenant {
Some(Arc::clone(timeline))
};
// Second part: unarchive timeline (if needed)
// Second part: unoffload timeline (if needed)
let timeline = if let Some(timeline) = timeline_or_unarchive_offloaded {
timeline
} else {
// Turn offloaded timeline into a non-offloaded one
self.unoffload_timeline(timeline_id, ctx).await?
self.unoffload_timeline(timeline_id, broker_client, ctx)
.await?
};
// Third part: upload new timeline archival state and block until it is present in S3
let upload_needed = timeline
let upload_needed = match timeline
.remote_client
.schedule_index_upload_for_timeline_archival_state(new_state)?;
.schedule_index_upload_for_timeline_archival_state(new_state)
{
Ok(upload_needed) => upload_needed,
Err(e) => {
if timeline.cancel.is_cancelled() {
return Err(TimelineArchivalError::Cancelled);
} else {
return Err(TimelineArchivalError::Other(e));
}
}
};
if upload_needed {
info!("Uploading new state");
@@ -1685,7 +1710,14 @@ impl Tenant {
tracing::warn!("reached timeout for waiting on upload queue");
return Err(TimelineArchivalError::Timeout);
};
v.map_err(|e| TimelineArchivalError::Other(anyhow::anyhow!(e)))?;
v.map_err(|e| match e {
WaitCompletionError::NotInitialized(e) => {
TimelineArchivalError::Other(anyhow::anyhow!(e))
}
WaitCompletionError::UploadQueueShutDownOrStopped => {
TimelineArchivalError::Cancelled
}
})?;
}
Ok(())
}
@@ -3336,7 +3368,7 @@ impl Tenant {
/// Populate all Timelines' `GcInfo` with information about their children. We do not set the
/// PITR cutoffs here, because that requires I/O: this is done later, before GC, by [`Self::refresh_gc_info_internal`]
///
/// Subsequently, parent-child relationships are updated incrementally during timeline creation/deletion.
/// Subsequently, parent-child relationships are updated incrementally inside [`Timeline::new`] and [`Timeline::drop`].
fn initialize_gc_info(
&self,
timelines: &std::sync::MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,

View File

@@ -3092,7 +3092,6 @@ impl Timeline {
}
impl Timeline {
#[allow(unknown_lints)] // doc_lazy_continuation is still a new lint
#[allow(clippy::doc_lazy_continuation)]
/// Get the data needed to reconstruct all keys in the provided keyspace
///

View File

@@ -617,31 +617,34 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
/* remove the page from the cache */
entry->bitmap[chunk_offs >> 5] &= ~(1 << (chunk_offs & (32 - 1)));
/*
* If the chunk has no live entries, we can position the chunk to be
* recycled first.
*/
if (entry->bitmap[chunk_offs >> 5] == 0)
if (entry->access_count == 0)
{
bool has_remaining_pages = false;
for (int i = 0; i < CHUNK_BITMAP_SIZE; i++)
{
if (entry->bitmap[i] != 0)
{
has_remaining_pages = true;
break;
}
}
/*
* Put the entry at the position that is first to be reclaimed when we
* have no cached pages remaining in the chunk
* If the chunk has no live entries, we can position the chunk to be
* recycled first.
*/
if (!has_remaining_pages)
if (entry->bitmap[chunk_offs >> 5] == 0)
{
dlist_delete(&entry->list_node);
dlist_push_head(&lfc_ctl->lru, &entry->list_node);
bool has_remaining_pages = false;
for (int i = 0; i < CHUNK_BITMAP_SIZE; i++)
{
if (entry->bitmap[i] != 0)
{
has_remaining_pages = true;
break;
}
}
/*
* Put the entry at the position that is first to be reclaimed when we
* have no cached pages remaining in the chunk
*/
if (!has_remaining_pages)
{
dlist_delete(&entry->list_node);
dlist_push_head(&lfc_ctl->lru, &entry->list_node);
}
}
}

View File

@@ -1,16 +1,15 @@
use super::{ComputeCredentials, ComputeUserInfo};
use crate::{
auth::{self, backend::ComputeCredentialKeys, AuthFlow},
compute,
config::AuthenticationConfig,
context::RequestMonitoring,
control_plane::AuthSecret,
sasl,
stream::{PqStream, Stream},
};
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{info, warn};
use super::{ComputeCredentials, ComputeUserInfo};
use crate::auth::backend::ComputeCredentialKeys;
use crate::auth::{self, AuthFlow};
use crate::config::AuthenticationConfig;
use crate::context::RequestMonitoring;
use crate::control_plane::AuthSecret;
use crate::stream::{PqStream, Stream};
use crate::{compute, sasl};
pub(super) async fn authenticate(
ctx: &RequestMonitoring,
creds: ComputeUserInfo,

View File

@@ -1,15 +1,3 @@
use crate::{
auth,
cache::Cached,
compute,
config::AuthenticationConfig,
context::RequestMonitoring,
control_plane::{self, provider::NodeInfo, CachedNodeInfo},
error::{ReportableError, UserFacingError},
proxy::connect_compute::ComputeConnectBackend,
stream::PqStream,
waiters,
};
use async_trait::async_trait;
use pq_proto::BeMessage as Be;
use thiserror::Error;
@@ -18,6 +6,15 @@ use tokio_postgres::config::SslMode;
use tracing::{info, info_span};
use super::ComputeCredentialKeys;
use crate::cache::Cached;
use crate::config::AuthenticationConfig;
use crate::context::RequestMonitoring;
use crate::control_plane::provider::NodeInfo;
use crate::control_plane::{self, CachedNodeInfo};
use crate::error::{ReportableError, UserFacingError};
use crate::proxy::connect_compute::ComputeConnectBackend;
use crate::stream::PqStream;
use crate::{auth, compute, waiters};
#[derive(Debug, Error)]
pub(crate) enum WebAuthError {

View File

@@ -1,16 +1,15 @@
use super::{ComputeCredentials, ComputeUserInfo, ComputeUserInfoNoEndpoint};
use crate::{
auth::{self, AuthFlow},
config::AuthenticationConfig,
context::RequestMonitoring,
control_plane::AuthSecret,
intern::EndpointIdInt,
sasl,
stream::{self, Stream},
};
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{info, warn};
use super::{ComputeCredentials, ComputeUserInfo, ComputeUserInfoNoEndpoint};
use crate::auth::{self, AuthFlow};
use crate::config::AuthenticationConfig;
use crate::context::RequestMonitoring;
use crate::control_plane::AuthSecret;
use crate::intern::EndpointIdInt;
use crate::sasl;
use crate::stream::{self, Stream};
/// Compared to [SCRAM](crate::scram), cleartext password auth saves
/// one round trip and *expensive* computations (>= 4096 HMAC iterations).
/// These properties are benefical for serverless JS workers, so we

View File

@@ -1,22 +1,22 @@
use std::{
future::Future,
sync::Arc,
time::{Duration, SystemTime},
};
use std::future::Future;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use arc_swap::ArcSwapOption;
use dashmap::DashMap;
use jose_jwk::crypto::KeyInfo;
use serde::{de::Visitor, Deserialize, Deserializer};
use serde::de::Visitor;
use serde::{Deserialize, Deserializer};
use signature::Verifier;
use thiserror::Error;
use tokio::time::Instant;
use crate::{
auth::backend::ComputeCredentialKeys, context::RequestMonitoring,
control_plane::errors::GetEndpointJwksError, http::parse_json_body_with_limit,
intern::RoleNameInt, EndpointId, RoleName,
};
use crate::auth::backend::ComputeCredentialKeys;
use crate::context::RequestMonitoring;
use crate::control_plane::errors::GetEndpointJwksError;
use crate::http::parse_json_body_with_limit;
use crate::intern::RoleNameInt;
use crate::{EndpointId, RoleName};
// TODO(conrad): make these configurable.
const CLOCK_SKEW_LEEWAY: Duration = Duration::from_secs(30);
@@ -381,10 +381,8 @@ fn verify_rsa_signature(
alg: &jose_jwa::Algorithm,
) -> Result<(), JwtError> {
use jose_jwa::{Algorithm, Signing};
use rsa::{
pkcs1v15::{Signature, VerifyingKey},
RsaPublicKey,
};
use rsa::pkcs1v15::{Signature, VerifyingKey};
use rsa::RsaPublicKey;
let key = RsaPublicKey::try_from(key).map_err(JwtError::InvalidRsaKey)?;
@@ -655,11 +653,9 @@ impl From<&jose_jwk::Key> for KeyType {
#[cfg(test)]
mod tests {
use crate::RoleName;
use super::*;
use std::{future::IntoFuture, net::SocketAddr, time::SystemTime};
use std::future::IntoFuture;
use std::net::SocketAddr;
use std::time::SystemTime;
use base64::URL_SAFE_NO_PAD;
use bytes::Bytes;
@@ -672,6 +668,9 @@ mod tests {
use signature::Signer;
use tokio::net::TcpListener;
use super::*;
use crate::RoleName;
fn new_ec_jwk(kid: String) -> (p256::SecretKey, jose_jwk::Jwk) {
let sk = p256::SecretKey::random(&mut OsRng);
let pk = sk.public_key().into();

View File

@@ -2,19 +2,14 @@ use std::net::SocketAddr;
use arc_swap::ArcSwapOption;
use crate::{
auth::backend::jwt::FetchAuthRulesError,
compute::ConnCfg,
context::RequestMonitoring,
control_plane::{
messages::{ColdStartInfo, EndpointJwksResponse, MetricsAuxInfo},
NodeInfo,
},
intern::{BranchIdTag, EndpointIdTag, InternId, ProjectIdTag},
EndpointId,
};
use super::jwt::{AuthRule, FetchAuthRules};
use crate::auth::backend::jwt::FetchAuthRulesError;
use crate::compute::ConnCfg;
use crate::context::RequestMonitoring;
use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse, MetricsAuxInfo};
use crate::control_plane::NodeInfo;
use crate::intern::{BranchIdTag, EndpointIdTag, InternId, ProjectIdTag};
use crate::EndpointId;
pub struct LocalBackend {
pub(crate) node_info: NodeInfo,

View File

@@ -17,29 +17,22 @@ use tokio_postgres::config::AuthKeys;
use tracing::{info, warn};
use crate::auth::credentials::check_peer_addr_is_in_list;
use crate::auth::{validate_password_and_exchange, AuthError};
use crate::auth::{self, validate_password_and_exchange, AuthError, ComputeUserInfoMaybeEndpoint};
use crate::cache::Cached;
use crate::config::AuthenticationConfig;
use crate::context::RequestMonitoring;
use crate::control_plane::errors::GetAuthInfoError;
use crate::control_plane::provider::{CachedRoleSecret, ControlPlaneBackend};
use crate::control_plane::AuthSecret;
use crate::control_plane::provider::{
CachedAllowedIps, CachedNodeInfo, CachedRoleSecret, ControlPlaneBackend,
};
use crate::control_plane::{self, Api, AuthSecret};
use crate::intern::EndpointIdInt;
use crate::metrics::Metrics;
use crate::proxy::connect_compute::ComputeConnectBackend;
use crate::proxy::NeonOptions;
use crate::rate_limiter::{BucketRateLimiter, EndpointRateLimiter, RateBucketInfo};
use crate::stream::Stream;
use crate::{
auth::{self, ComputeUserInfoMaybeEndpoint},
config::AuthenticationConfig,
control_plane::{
self,
provider::{CachedAllowedIps, CachedNodeInfo},
Api,
},
stream,
};
use crate::{scram, EndpointCacheKey, EndpointId, RoleName};
use crate::{scram, stream, EndpointCacheKey, EndpointId, RoleName};
/// Alternative to [`std::borrow::Cow`] but doesn't need `T: ToOwned` as we don't need that functionality
pub enum MaybeOwned<'a, T> {
@@ -500,34 +493,32 @@ impl ComputeConnectBackend for Backend<'_, ComputeCredentials> {
#[cfg(test)]
mod tests {
use std::{net::IpAddr, sync::Arc, time::Duration};
use std::net::IpAddr;
use std::sync::Arc;
use std::time::Duration;
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
use once_cell::sync::Lazy;
use postgres_protocol::{
authentication::sasl::{ChannelBinding, ScramSha256},
message::{backend::Message as PgMessage, frontend},
};
use postgres_protocol::authentication::sasl::{ChannelBinding, ScramSha256};
use postgres_protocol::message::backend::Message as PgMessage;
use postgres_protocol::message::frontend;
use provider::AuthSecret;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
use crate::{
auth::{backend::MaskedIp, ComputeUserInfoMaybeEndpoint, IpPattern},
config::AuthenticationConfig,
context::RequestMonitoring,
control_plane::{
self,
provider::{self, CachedAllowedIps, CachedRoleSecret},
CachedNodeInfo,
},
proxy::NeonOptions,
rate_limiter::{EndpointRateLimiter, RateBucketInfo},
scram::{threadpool::ThreadPool, ServerSecret},
stream::{PqStream, Stream},
};
use super::{auth_quirks, jwt::JwkCache, AuthRateLimiter};
use super::jwt::JwkCache;
use super::{auth_quirks, AuthRateLimiter};
use crate::auth::backend::MaskedIp;
use crate::auth::{ComputeUserInfoMaybeEndpoint, IpPattern};
use crate::config::AuthenticationConfig;
use crate::context::RequestMonitoring;
use crate::control_plane::provider::{self, CachedAllowedIps, CachedRoleSecret};
use crate::control_plane::{self, CachedNodeInfo};
use crate::proxy::NeonOptions;
use crate::rate_limiter::{EndpointRateLimiter, RateBucketInfo};
use crate::scram::threadpool::ThreadPool;
use crate::scram::ServerSecret;
use crate::stream::{PqStream, Stream};
struct Auth {
ips: Vec<IpPattern>,

View File

@@ -1,20 +1,22 @@
//! User credentials used in authentication.
use crate::{
auth::password_hack::parse_endpoint_param,
context::RequestMonitoring,
error::{ReportableError, UserFacingError},
metrics::{Metrics, SniKind},
proxy::NeonOptions,
serverless::SERVERLESS_DRIVER_SNI,
EndpointId, RoleName,
};
use std::collections::HashSet;
use std::net::IpAddr;
use std::str::FromStr;
use itertools::Itertools;
use pq_proto::StartupMessageParams;
use std::{collections::HashSet, net::IpAddr, str::FromStr};
use thiserror::Error;
use tracing::{info, warn};
use crate::auth::password_hack::parse_endpoint_param;
use crate::context::RequestMonitoring;
use crate::error::{ReportableError, UserFacingError};
use crate::metrics::{Metrics, SniKind};
use crate::proxy::NeonOptions;
use crate::serverless::SERVERLESS_DRIVER_SNI;
use crate::{EndpointId, RoleName};
#[derive(Debug, Error, PartialEq, Eq, Clone)]
pub(crate) enum ComputeUserInfoParseError {
#[error("Parameter '{0}' is missing in startup packet.")]
@@ -249,10 +251,11 @@ fn project_name_valid(name: &str) -> bool {
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
use ComputeUserInfoParseError::*;
use super::*;
#[test]
fn parse_bare_minimum() -> anyhow::Result<()> {
// According to postgresql, only `user` should be required.

View File

@@ -1,21 +1,24 @@
//! Main authentication flow.
use super::{backend::ComputeCredentialKeys, AuthErrorImpl, PasswordHackPayload};
use crate::{
config::TlsServerEndPoint,
context::RequestMonitoring,
control_plane::AuthSecret,
intern::EndpointIdInt,
sasl,
scram::{self, threadpool::ThreadPool},
stream::{PqStream, Stream},
};
use std::io;
use std::sync::Arc;
use postgres_protocol::authentication::sasl::{SCRAM_SHA_256, SCRAM_SHA_256_PLUS};
use pq_proto::{BeAuthenticationSaslMessage, BeMessage, BeMessage as Be};
use std::{io, sync::Arc};
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::info;
use super::backend::ComputeCredentialKeys;
use super::{AuthError, PasswordHackPayload};
use crate::config::TlsServerEndPoint;
use crate::context::RequestMonitoring;
use crate::control_plane::AuthSecret;
use crate::intern::EndpointIdInt;
use crate::sasl;
use crate::scram::threadpool::ThreadPool;
use crate::scram::{self};
use crate::stream::{PqStream, Stream};
/// Every authentication selector is supposed to implement this trait.
pub(crate) trait AuthMethod {
/// Any authentication selector should provide initial backend message
@@ -114,14 +117,14 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AuthFlow<'_, S, PasswordHack> {
let msg = self.stream.read_password_message().await?;
let password = msg
.strip_suffix(&[0])
.ok_or(AuthErrorImpl::MalformedPassword("missing terminator"))?;
.ok_or(AuthError::MalformedPassword("missing terminator"))?;
let payload = PasswordHackPayload::parse(password)
// If we ended up here and the payload is malformed, it means that
// the user neither enabled SNI nor resorted to any other method
// for passing the project name we rely on. We should show them
// the most helpful error message and point to the documentation.
.ok_or(AuthErrorImpl::MissingEndpointName)?;
.ok_or(AuthError::MissingEndpointName)?;
Ok(payload)
}
@@ -133,7 +136,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AuthFlow<'_, S, CleartextPassword> {
let msg = self.stream.read_password_message().await?;
let password = msg
.strip_suffix(&[0])
.ok_or(AuthErrorImpl::MalformedPassword("missing terminator"))?;
.ok_or(AuthError::MalformedPassword("missing terminator"))?;
let outcome = validate_password_and_exchange(
&self.state.pool,
@@ -163,7 +166,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AuthFlow<'_, S, Scram<'_>> {
// Initial client message contains the chosen auth method's name.
let msg = self.stream.read_password_message().await?;
let sasl = sasl::FirstMessage::parse(&msg)
.ok_or(AuthErrorImpl::MalformedPassword("bad sasl message"))?;
.ok_or(AuthError::MalformedPassword("bad sasl message"))?;
// Currently, the only supported SASL method is SCRAM.
if !scram::METHODS.contains(&sasl.method) {

View File

@@ -14,22 +14,22 @@ pub(crate) use password_hack::parse_endpoint_param;
use password_hack::PasswordHackPayload;
mod flow;
use std::io;
use std::net::IpAddr;
pub(crate) use flow::*;
use thiserror::Error;
use tokio::time::error::Elapsed;
use crate::{
control_plane,
error::{ReportableError, UserFacingError},
};
use std::{io, net::IpAddr};
use thiserror::Error;
use crate::control_plane;
use crate::error::{ReportableError, UserFacingError};
/// Convenience wrapper for the authentication error.
pub(crate) type Result<T> = std::result::Result<T, AuthError>;
/// Common authentication error.
#[derive(Debug, Error)]
pub(crate) enum AuthErrorImpl {
pub(crate) enum AuthError {
#[error(transparent)]
Web(#[from] backend::WebAuthError),
@@ -78,80 +78,70 @@ pub(crate) enum AuthErrorImpl {
ConfirmationTimeout(humantime::Duration),
}
#[derive(Debug, Error)]
#[error(transparent)]
pub(crate) struct AuthError(Box<AuthErrorImpl>);
impl AuthError {
pub(crate) fn bad_auth_method(name: impl Into<Box<str>>) -> Self {
AuthErrorImpl::BadAuthMethod(name.into()).into()
AuthError::BadAuthMethod(name.into())
}
pub(crate) fn auth_failed(user: impl Into<Box<str>>) -> Self {
AuthErrorImpl::AuthFailed(user.into()).into()
AuthError::AuthFailed(user.into())
}
pub(crate) fn ip_address_not_allowed(ip: IpAddr) -> Self {
AuthErrorImpl::IpAddressNotAllowed(ip).into()
AuthError::IpAddressNotAllowed(ip)
}
pub(crate) fn too_many_connections() -> Self {
AuthErrorImpl::TooManyConnections.into()
AuthError::TooManyConnections
}
pub(crate) fn is_auth_failed(&self) -> bool {
matches!(self.0.as_ref(), AuthErrorImpl::AuthFailed(_))
matches!(self, AuthError::AuthFailed(_))
}
pub(crate) fn user_timeout(elapsed: Elapsed) -> Self {
AuthErrorImpl::UserTimeout(elapsed).into()
AuthError::UserTimeout(elapsed)
}
pub(crate) fn confirmation_timeout(timeout: humantime::Duration) -> Self {
AuthErrorImpl::ConfirmationTimeout(timeout).into()
}
}
impl<E: Into<AuthErrorImpl>> From<E> for AuthError {
fn from(e: E) -> Self {
Self(Box::new(e.into()))
AuthError::ConfirmationTimeout(timeout)
}
}
impl UserFacingError for AuthError {
fn to_string_client(&self) -> String {
match self.0.as_ref() {
AuthErrorImpl::Web(e) => e.to_string_client(),
AuthErrorImpl::GetAuthInfo(e) => e.to_string_client(),
AuthErrorImpl::Sasl(e) => e.to_string_client(),
AuthErrorImpl::AuthFailed(_) => self.to_string(),
AuthErrorImpl::BadAuthMethod(_) => self.to_string(),
AuthErrorImpl::MalformedPassword(_) => self.to_string(),
AuthErrorImpl::MissingEndpointName => self.to_string(),
AuthErrorImpl::Io(_) => "Internal error".to_string(),
AuthErrorImpl::IpAddressNotAllowed(_) => self.to_string(),
AuthErrorImpl::TooManyConnections => self.to_string(),
AuthErrorImpl::UserTimeout(_) => self.to_string(),
AuthErrorImpl::ConfirmationTimeout(_) => self.to_string(),
match self {
Self::Web(e) => e.to_string_client(),
Self::GetAuthInfo(e) => e.to_string_client(),
Self::Sasl(e) => e.to_string_client(),
Self::AuthFailed(_) => self.to_string(),
Self::BadAuthMethod(_) => self.to_string(),
Self::MalformedPassword(_) => self.to_string(),
Self::MissingEndpointName => self.to_string(),
Self::Io(_) => "Internal error".to_string(),
Self::IpAddressNotAllowed(_) => self.to_string(),
Self::TooManyConnections => self.to_string(),
Self::UserTimeout(_) => self.to_string(),
Self::ConfirmationTimeout(_) => self.to_string(),
}
}
}
impl ReportableError for AuthError {
fn get_error_kind(&self) -> crate::error::ErrorKind {
match self.0.as_ref() {
AuthErrorImpl::Web(e) => e.get_error_kind(),
AuthErrorImpl::GetAuthInfo(e) => e.get_error_kind(),
AuthErrorImpl::Sasl(e) => e.get_error_kind(),
AuthErrorImpl::AuthFailed(_) => crate::error::ErrorKind::User,
AuthErrorImpl::BadAuthMethod(_) => crate::error::ErrorKind::User,
AuthErrorImpl::MalformedPassword(_) => crate::error::ErrorKind::User,
AuthErrorImpl::MissingEndpointName => crate::error::ErrorKind::User,
AuthErrorImpl::Io(_) => crate::error::ErrorKind::ClientDisconnect,
AuthErrorImpl::IpAddressNotAllowed(_) => crate::error::ErrorKind::User,
AuthErrorImpl::TooManyConnections => crate::error::ErrorKind::RateLimit,
AuthErrorImpl::UserTimeout(_) => crate::error::ErrorKind::User,
AuthErrorImpl::ConfirmationTimeout(_) => crate::error::ErrorKind::User,
match self {
Self::Web(e) => e.get_error_kind(),
Self::GetAuthInfo(e) => e.get_error_kind(),
Self::Sasl(e) => e.get_error_kind(),
Self::AuthFailed(_) => crate::error::ErrorKind::User,
Self::BadAuthMethod(_) => crate::error::ErrorKind::User,
Self::MalformedPassword(_) => crate::error::ErrorKind::User,
Self::MissingEndpointName => crate::error::ErrorKind::User,
Self::Io(_) => crate::error::ErrorKind::ClientDisconnect,
Self::IpAddressNotAllowed(_) => crate::error::ErrorKind::User,
Self::TooManyConnections => crate::error::ErrorKind::RateLimit,
Self::UserTimeout(_) => crate::error::ErrorKind::User,
Self::ConfirmationTimeout(_) => crate::error::ErrorKind::User,
}
}
}

View File

@@ -1,41 +1,43 @@
use std::{net::SocketAddr, pin::pin, str::FromStr, sync::Arc, time::Duration};
use std::net::SocketAddr;
use std::pin::pin;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{bail, ensure, Context};
use camino::{Utf8Path, Utf8PathBuf};
use compute_api::spec::LocalProxySpec;
use dashmap::DashMap;
use futures::future::Either;
use proxy::{
auth::{
self,
backend::{
jwt::JwkCache,
local::{LocalBackend, JWKS_ROLE_MAP},
},
},
cancellation::CancellationHandlerMain,
config::{self, AuthenticationConfig, HttpConfig, ProxyConfig, RetryConfig},
control_plane::{
locks::ApiLocks,
messages::{EndpointJwksResponse, JwksSettings},
},
http::health_server::AppMetrics,
intern::RoleNameInt,
metrics::{Metrics, ThreadPoolMetrics},
rate_limiter::{BucketRateLimiter, EndpointRateLimiter, LeakyBucketConfig, RateBucketInfo},
scram::threadpool::ThreadPool,
serverless::{self, cancel_set::CancelSet, GlobalConnPoolOptions},
RoleName,
use proxy::auth::backend::jwt::JwkCache;
use proxy::auth::backend::local::{LocalBackend, JWKS_ROLE_MAP};
use proxy::auth::{self};
use proxy::cancellation::CancellationHandlerMain;
use proxy::config::{self, AuthenticationConfig, HttpConfig, ProxyConfig, RetryConfig};
use proxy::control_plane::locks::ApiLocks;
use proxy::control_plane::messages::{EndpointJwksResponse, JwksSettings};
use proxy::http::health_server::AppMetrics;
use proxy::intern::RoleNameInt;
use proxy::metrics::{Metrics, ThreadPoolMetrics};
use proxy::rate_limiter::{
BucketRateLimiter, EndpointRateLimiter, LeakyBucketConfig, RateBucketInfo,
};
use proxy::scram::threadpool::ThreadPool;
use proxy::serverless::cancel_set::CancelSet;
use proxy::serverless::{self, GlobalConnPoolOptions};
use proxy::RoleName;
project_git_version!(GIT_VERSION);
project_build_tag!(BUILD_TAG);
use clap::Parser;
use tokio::{net::TcpListener, sync::Notify, task::JoinSet};
use tokio::net::TcpListener;
use tokio::sync::Notify;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use utils::{pid_file, project_build_tag, project_git_version, sentry_init::init_sentry};
use utils::sentry_init::init_sentry;
use utils::{pid_file, project_build_tag, project_git_version};
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;

View File

@@ -5,25 +5,23 @@
/// the outside. Similar to an ingress controller for HTTPS.
use std::{net::SocketAddr, sync::Arc};
use anyhow::{anyhow, bail, ensure, Context};
use clap::Arg;
use futures::future::Either;
use futures::TryFutureExt;
use itertools::Itertools;
use proxy::config::TlsServerEndPoint;
use proxy::context::RequestMonitoring;
use proxy::metrics::{Metrics, ThreadPoolMetrics};
use proxy::proxy::{copy_bidirectional_client_compute, run_until_cancelled, ErrorSource};
use rustls::pki_types::PrivateKeyDer;
use tokio::net::TcpListener;
use anyhow::{anyhow, bail, ensure, Context};
use clap::Arg;
use futures::TryFutureExt;
use proxy::stream::{PqStream, Stream};
use rustls::pki_types::PrivateKeyDer;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpListener;
use tokio_util::sync::CancellationToken;
use utils::{project_git_version, sentry_init::init_sentry};
use tracing::{error, info, Instrument};
use utils::project_git_version;
use utils::sentry_init::init_sentry;
project_git_version!(GIT_VERSION);

View File

@@ -1,3 +1,8 @@
use std::net::SocketAddr;
use std::pin::pin;
use std::sync::Arc;
use anyhow::bail;
use aws_config::environment::EnvironmentVariableCredentialsProvider;
use aws_config::imds::credentials::ImdsCredentialsProvider;
use aws_config::meta::credentials::CredentialsProviderChain;
@@ -7,52 +12,34 @@ use aws_config::provider_config::ProviderConfig;
use aws_config::web_identity_token::WebIdentityTokenCredentialsProvider;
use aws_config::Region;
use futures::future::Either;
use proxy::auth;
use proxy::auth::backend::jwt::JwkCache;
use proxy::auth::backend::AuthRateLimiter;
use proxy::auth::backend::ConsoleRedirectBackend;
use proxy::auth::backend::MaybeOwned;
use proxy::cancellation::CancelMap;
use proxy::cancellation::CancellationHandler;
use proxy::config::remote_storage_from_toml;
use proxy::config::AuthenticationConfig;
use proxy::config::CacheOptions;
use proxy::config::HttpConfig;
use proxy::config::ProjectInfoCacheOptions;
use proxy::config::ProxyProtocolV2;
use proxy::auth::backend::{AuthRateLimiter, ConsoleRedirectBackend, MaybeOwned};
use proxy::cancellation::{CancelMap, CancellationHandler};
use proxy::config::{
self, remote_storage_from_toml, AuthenticationConfig, CacheOptions, HttpConfig,
ProjectInfoCacheOptions, ProxyConfig, ProxyProtocolV2,
};
use proxy::context::parquet::ParquetUploadArgs;
use proxy::control_plane;
use proxy::http;
use proxy::http::health_server::AppMetrics;
use proxy::metrics::Metrics;
use proxy::rate_limiter::EndpointRateLimiter;
use proxy::rate_limiter::LeakyBucketConfig;
use proxy::rate_limiter::RateBucketInfo;
use proxy::rate_limiter::WakeComputeRateLimiter;
use proxy::rate_limiter::{
EndpointRateLimiter, LeakyBucketConfig, RateBucketInfo, WakeComputeRateLimiter,
};
use proxy::redis::cancellation_publisher::RedisPublisherClient;
use proxy::redis::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
use proxy::redis::elasticache;
use proxy::redis::notifications;
use proxy::redis::{elasticache, notifications};
use proxy::scram::threadpool::ThreadPool;
use proxy::serverless::cancel_set::CancelSet;
use proxy::serverless::GlobalConnPoolOptions;
use proxy::usage_metrics;
use anyhow::bail;
use proxy::config::{self, ProxyConfig};
use proxy::serverless;
use proxy::{auth, control_plane, http, serverless, usage_metrics};
use remote_storage::RemoteStorageConfig;
use std::net::SocketAddr;
use std::pin::pin;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing::warn;
use tracing::Instrument;
use utils::{project_build_tag, project_git_version, sentry_init::init_sentry};
use tracing::{info, warn, Instrument};
use utils::sentry_init::init_sentry;
use utils::{project_build_tag, project_git_version};
project_git_version!(GIT_VERSION);
project_build_tag!(BUILD_TAG);

View File

@@ -1,31 +1,23 @@
use std::{
convert::Infallible,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use std::convert::Infallible;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use dashmap::DashSet;
use redis::{
streams::{StreamReadOptions, StreamReadReply},
AsyncCommands, FromRedisValue, Value,
};
use redis::streams::{StreamReadOptions, StreamReadReply};
use redis::{AsyncCommands, FromRedisValue, Value};
use serde::Deserialize;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
use tracing::info;
use crate::{
config::EndpointCacheConfig,
context::RequestMonitoring,
intern::{BranchIdInt, EndpointIdInt, ProjectIdInt},
metrics::{Metrics, RedisErrors, RedisEventsCount},
rate_limiter::GlobalRateLimiter,
redis::connection_with_credentials_provider::ConnectionWithCredentialsProvider,
EndpointId,
};
use crate::config::EndpointCacheConfig;
use crate::context::RequestMonitoring;
use crate::intern::{BranchIdInt, EndpointIdInt, ProjectIdInt};
use crate::metrics::{Metrics, RedisErrors, RedisEventsCount};
use crate::rate_limiter::GlobalRateLimiter;
use crate::redis::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
use crate::EndpointId;
#[derive(Deserialize, Debug, Clone)]
pub(crate) struct ControlPlaneEventKey {

View File

@@ -1,9 +1,8 @@
use std::{
collections::HashSet,
convert::Infallible,
sync::{atomic::AtomicU64, Arc},
time::Duration,
};
use std::collections::HashSet;
use std::convert::Infallible;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use dashmap::DashMap;
@@ -13,15 +12,12 @@ use tokio::sync::Mutex;
use tokio::time::Instant;
use tracing::{debug, info};
use crate::{
auth::IpPattern,
config::ProjectInfoCacheOptions,
control_plane::AuthSecret,
intern::{EndpointIdInt, ProjectIdInt, RoleNameInt},
EndpointId, RoleName,
};
use super::{Cache, Cached};
use crate::auth::IpPattern;
use crate::config::ProjectInfoCacheOptions;
use crate::control_plane::AuthSecret;
use crate::intern::{EndpointIdInt, ProjectIdInt, RoleNameInt};
use crate::{EndpointId, RoleName};
#[async_trait]
pub(crate) trait ProjectInfoCache {
@@ -371,7 +367,8 @@ impl Cache for ProjectInfoCacheImpl {
#[cfg(test)]
mod tests {
use super::*;
use crate::{scram::ServerSecret, ProjectId};
use crate::scram::ServerSecret;
use crate::ProjectId;
#[tokio::test]
async fn test_project_info_cache_settings() {

View File

@@ -1,9 +1,6 @@
use std::{
borrow::Borrow,
hash::Hash,
time::{Duration, Instant},
};
use tracing::debug;
use std::borrow::Borrow;
use std::hash::Hash;
use std::time::{Duration, Instant};
// This seems to make more sense than `lru` or `cached`:
//
@@ -15,8 +12,10 @@ use tracing::debug;
//
// On the other hand, `hashlink` has good download stats and appears to be maintained.
use hashlink::{linked_hash_map::RawEntryMut, LruCache};
use tracing::debug;
use super::{common::Cached, timed_lru, Cache};
use super::common::Cached;
use super::{timed_lru, Cache};
/// An implementation of timed LRU cache with fixed capacity.
/// Key properties:

View File

@@ -1,6 +1,8 @@
use std::net::SocketAddr;
use std::sync::Arc;
use dashmap::DashMap;
use pq_proto::CancelKeyData;
use std::{net::SocketAddr, sync::Arc};
use thiserror::Error;
use tokio::net::TcpStream;
use tokio::sync::Mutex;
@@ -8,12 +10,10 @@ use tokio_postgres::{CancelToken, NoTls};
use tracing::info;
use uuid::Uuid;
use crate::{
error::ReportableError,
metrics::{CancellationRequest, CancellationSource, Metrics},
redis::cancellation_publisher::{
CancellationPublisher, CancellationPublisherMut, RedisPublisherClient,
},
use crate::error::ReportableError;
use crate::metrics::{CancellationRequest, CancellationSource, Metrics};
use crate::redis::cancellation_publisher::{
CancellationPublisher, CancellationPublisherMut, RedisPublisherClient,
};
pub type CancelMap = Arc<DashMap<CancelKeyData, Option<CancelClosure>>>;

View File

@@ -1,25 +1,31 @@
use crate::{
auth::parse_endpoint_param,
cancellation::CancelClosure,
context::RequestMonitoring,
control_plane::{errors::WakeComputeError, messages::MetricsAuxInfo, provider::ApiLockError},
error::{ReportableError, UserFacingError},
metrics::{Metrics, NumDbConnectionsGuard},
proxy::neon_option,
Host,
};
use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
use once_cell::sync::OnceCell;
use pq_proto::StartupMessageParams;
use rustls::{client::danger::ServerCertVerifier, pki_types::InvalidDnsNameError};
use std::{io, net::SocketAddr, sync::Arc, time::Duration};
use rustls::client::danger::ServerCertVerifier;
use rustls::pki_types::InvalidDnsNameError;
use thiserror::Error;
use tokio::net::TcpStream;
use tokio_postgres::tls::MakeTlsConnect;
use tokio_postgres_rustls::MakeRustlsConnect;
use tracing::{error, info, warn};
use crate::auth::parse_endpoint_param;
use crate::cancellation::CancelClosure;
use crate::context::RequestMonitoring;
use crate::control_plane::errors::WakeComputeError;
use crate::control_plane::messages::MetricsAuxInfo;
use crate::control_plane::provider::ApiLockError;
use crate::error::{ReportableError, UserFacingError};
use crate::metrics::{Metrics, NumDbConnectionsGuard};
use crate::proxy::neon_option;
use crate::Host;
pub const COULD_NOT_CONNECT: &str = "Couldn't connect to compute node";
#[derive(Debug, Error)]

View File

@@ -1,29 +1,27 @@
use crate::{
auth::backend::{jwt::JwkCache, AuthRateLimiter},
control_plane::locks::ApiLocks,
rate_limiter::{RateBucketInfo, RateLimitAlgorithm, RateLimiterConfig},
scram::threadpool::ThreadPool,
serverless::{cancel_set::CancelSet, GlobalConnPoolOptions},
Host,
};
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{bail, ensure, Context, Ok};
use clap::ValueEnum;
use itertools::Itertools;
use remote_storage::RemoteStorageConfig;
use rustls::{
crypto::ring::sign,
pki_types::{CertificateDer, PrivateKeyDer},
};
use rustls::crypto::ring::sign;
use rustls::pki_types::{CertificateDer, PrivateKeyDer};
use sha2::{Digest, Sha256};
use std::{
collections::{HashMap, HashSet},
str::FromStr,
sync::Arc,
time::Duration,
};
use tracing::{error, info};
use x509_parser::oid_registry;
use crate::auth::backend::jwt::JwkCache;
use crate::auth::backend::AuthRateLimiter;
use crate::control_plane::locks::ApiLocks;
use crate::rate_limiter::{RateBucketInfo, RateLimitAlgorithm, RateLimiterConfig};
use crate::scram::threadpool::ThreadPool;
use crate::serverless::cancel_set::CancelSet;
use crate::serverless::GlobalConnPoolOptions;
use crate::Host;
pub struct ProxyConfig {
pub tls_config: Option<TlsConfig>,
pub metric_collection: Option<MetricCollectionConfig>,
@@ -692,9 +690,8 @@ impl FromStr for ConcurrencyLockOptions {
#[cfg(test)]
mod tests {
use crate::rate_limiter::Aimd;
use super::*;
use crate::rate_limiter::Aimd;
#[test]
fn test_parse_cache_options() -> anyhow::Result<()> {

View File

@@ -1,25 +1,22 @@
use crate::auth::backend::ConsoleRedirectBackend;
use crate::config::{ProxyConfig, ProxyProtocolV2};
use crate::proxy::{
prepare_client_connection, run_until_cancelled, ClientRequestError, ErrorSource,
};
use crate::{
cancellation::{CancellationHandlerMain, CancellationHandlerMainInternal},
context::RequestMonitoring,
error::ReportableError,
metrics::{Metrics, NumClientConnectionsGuard},
protocol2::read_proxy_protocol,
proxy::handshake::{handshake, HandshakeData},
};
use futures::TryFutureExt;
use std::sync::Arc;
use futures::TryFutureExt;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, Instrument};
use crate::auth::backend::ConsoleRedirectBackend;
use crate::cancellation::{CancellationHandlerMain, CancellationHandlerMainInternal};
use crate::config::{ProxyConfig, ProxyProtocolV2};
use crate::context::RequestMonitoring;
use crate::error::ReportableError;
use crate::metrics::{Metrics, NumClientConnectionsGuard};
use crate::protocol2::read_proxy_protocol;
use crate::proxy::connect_compute::{connect_to_compute, TcpMechanism};
use crate::proxy::handshake::{handshake, HandshakeData};
use crate::proxy::passthrough::ProxyPassthrough;
use crate::proxy::{
connect_compute::{connect_to_compute, TcpMechanism},
passthrough::ProxyPassthrough,
prepare_client_connection, run_until_cancelled, ClientRequestError, ErrorSource,
};
pub async fn task_main(

View File

@@ -1,24 +1,25 @@
//! Connection request monitoring contexts
use std::net::IpAddr;
use chrono::Utc;
use once_cell::sync::OnceCell;
use pq_proto::StartupMessageParams;
use smol_str::SmolStr;
use std::net::IpAddr;
use tokio::sync::mpsc;
use tracing::{debug, field::display, info, info_span, Span};
use tracing::field::display;
use tracing::{debug, info, info_span, Span};
use try_lock::TryLock;
use uuid::Uuid;
use crate::{
control_plane::messages::{ColdStartInfo, MetricsAuxInfo},
error::ErrorKind,
intern::{BranchIdInt, ProjectIdInt},
metrics::{ConnectOutcome, InvalidEndpointsGroup, LatencyTimer, Metrics, Protocol, Waiting},
DbName, EndpointId, RoleName,
};
use self::parquet::RequestData;
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::error::ErrorKind;
use crate::intern::{BranchIdInt, ProjectIdInt};
use crate::metrics::{
ConnectOutcome, InvalidEndpointsGroup, LatencyTimer, Metrics, Protocol, Waiting,
};
use crate::{DbName, EndpointId, RoleName};
pub mod parquet;

View File

@@ -1,29 +1,28 @@
use std::{sync::Arc, time::SystemTime};
use std::sync::Arc;
use std::time::SystemTime;
use anyhow::Context;
use bytes::{buf::Writer, BufMut, BytesMut};
use bytes::buf::Writer;
use bytes::{BufMut, BytesMut};
use chrono::{Datelike, Timelike};
use futures::{Stream, StreamExt};
use parquet::{
basic::Compression,
file::{
metadata::RowGroupMetaDataPtr,
properties::{WriterProperties, WriterPropertiesPtr, DEFAULT_PAGE_SIZE},
writer::SerializedFileWriter,
},
record::RecordWriter,
};
use parquet::basic::Compression;
use parquet::file::metadata::RowGroupMetaDataPtr;
use parquet::file::properties::{WriterProperties, WriterPropertiesPtr, DEFAULT_PAGE_SIZE};
use parquet::file::writer::SerializedFileWriter;
use parquet::record::RecordWriter;
use pq_proto::StartupMessageParams;
use remote_storage::{GenericRemoteStorage, RemotePath, RemoteStorageConfig, TimeoutOrCancel};
use serde::ser::SerializeMap;
use tokio::{sync::mpsc, time};
use tokio::sync::mpsc;
use tokio::time;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, Span};
use utils::backoff;
use crate::{config::remote_storage_from_toml, context::LOG_CHAN_DISCONNECT};
use super::{RequestMonitoringInner, LOG_CHAN};
use crate::config::remote_storage_from_toml;
use crate::context::LOG_CHAN_DISCONNECT;
#[derive(clap::Args, Clone, Debug)]
pub struct ParquetUploadArgs {
@@ -407,26 +406,26 @@ async fn upload_parquet(
#[cfg(test)]
mod tests {
use std::{net::Ipv4Addr, num::NonZeroUsize, sync::Arc};
use std::net::Ipv4Addr;
use std::num::NonZeroUsize;
use std::sync::Arc;
use camino::Utf8Path;
use clap::Parser;
use futures::{Stream, StreamExt};
use itertools::Itertools;
use parquet::{
basic::{Compression, ZstdLevel},
file::{
properties::{WriterProperties, DEFAULT_PAGE_SIZE},
reader::FileReader,
serialized_reader::SerializedFileReader,
},
};
use rand::{rngs::StdRng, Rng, SeedableRng};
use parquet::basic::{Compression, ZstdLevel};
use parquet::file::properties::{WriterProperties, DEFAULT_PAGE_SIZE};
use parquet::file::reader::FileReader;
use parquet::file::serialized_reader::SerializedFileReader;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use remote_storage::{
GenericRemoteStorage, RemoteStorageConfig, RemoteStorageKind, S3Config,
DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
};
use tokio::{sync::mpsc, time};
use tokio::sync::mpsc;
use tokio::time;
use walkdir::WalkDir;
use super::{worker_inner, ParquetConfig, ParquetUploadArgs, RequestData};

View File

@@ -1,9 +1,9 @@
use measured::FixedCardinalityLabel;
use serde::{Deserialize, Serialize};
use std::fmt::{self, Display};
use crate::auth::IpPattern;
use measured::FixedCardinalityLabel;
use serde::{Deserialize, Serialize};
use crate::auth::IpPattern;
use crate::intern::{BranchIdInt, EndpointIdInt, ProjectIdInt, RoleNameInt};
use crate::proxy::retry::CouldRetry;
@@ -362,9 +362,10 @@ pub struct JwksSettings {
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
use super::*;
fn dummy_aux() -> serde_json::Value {
json!({
"endpoint_id": "endpoint",

View File

@@ -1,16 +1,16 @@
use crate::{
control_plane::messages::{DatabaseInfo, KickSession},
waiters::{self, Waiter, Waiters},
};
use std::convert::Infallible;
use anyhow::Context;
use once_cell::sync::Lazy;
use postgres_backend::{AuthType, PostgresBackend, PostgresBackendTCP, QueryError};
use pq_proto::{BeMessage, SINGLE_COL_ROWDESC};
use std::convert::Infallible;
use tokio::net::{TcpListener, TcpStream};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, info_span, Instrument};
use crate::control_plane::messages::{DatabaseInfo, KickSession};
use crate::waiters::{self, Waiter, Waiters};
static CPLANE_WAITERS: Lazy<Waiters<ComputeReady>> = Lazy::new(Default::default);
/// Give caller an opportunity to wait for the cloud's reply.

View File

@@ -1,28 +1,29 @@
//! Mock console backend which relies on a user-provided postgres instance.
use super::{
errors::{ApiError, GetAuthInfoError, WakeComputeError},
AuthInfo, AuthSecret, CachedNodeInfo, NodeInfo,
};
use crate::{
auth::backend::jwt::AuthRule, context::RequestMonitoring,
control_plane::errors::GetEndpointJwksError, intern::RoleNameInt, RoleName,
};
use crate::{auth::backend::ComputeUserInfo, compute, error::io_error, scram, url::ApiUrl};
use crate::{auth::IpPattern, cache::Cached};
use crate::{
control_plane::{
messages::MetricsAuxInfo,
provider::{CachedAllowedIps, CachedRoleSecret},
},
BranchId, EndpointId, ProjectId,
};
use std::str::FromStr;
use std::sync::Arc;
use futures::TryFutureExt;
use std::{str::FromStr, sync::Arc};
use thiserror::Error;
use tokio_postgres::{config::SslMode, Client};
use tokio_postgres::config::SslMode;
use tokio_postgres::Client;
use tracing::{error, info, info_span, warn, Instrument};
use super::errors::{ApiError, GetAuthInfoError, WakeComputeError};
use super::{AuthInfo, AuthSecret, CachedNodeInfo, NodeInfo};
use crate::auth::backend::jwt::AuthRule;
use crate::auth::backend::ComputeUserInfo;
use crate::auth::IpPattern;
use crate::cache::Cached;
use crate::context::RequestMonitoring;
use crate::control_plane::errors::GetEndpointJwksError;
use crate::control_plane::messages::MetricsAuxInfo;
use crate::control_plane::provider::{CachedAllowedIps, CachedRoleSecret};
use crate::error::io_error;
use crate::intern::RoleNameInt;
use crate::url::ApiUrl;
use crate::{compute, scram, BranchId, EndpointId, ProjectId, RoleName};
#[derive(Debug, Error)]
enum MockApiError {
#[error("Failed to read password: {0}")]

View File

@@ -2,39 +2,36 @@
pub mod mock;
pub mod neon;
use super::messages::{ControlPlaneError, MetricsAuxInfo};
use crate::{
auth::{
backend::{
jwt::{AuthRule, FetchAuthRules, FetchAuthRulesError},
ComputeCredentialKeys, ComputeUserInfo,
},
IpPattern,
},
cache::{endpoints::EndpointsCache, project_info::ProjectInfoCacheImpl, Cached, TimedLru},
compute,
config::{CacheOptions, EndpointCacheConfig, ProjectInfoCacheOptions},
context::RequestMonitoring,
error::ReportableError,
intern::ProjectIdInt,
metrics::ApiLockMetrics,
rate_limiter::{DynamicLimiter, Outcome, RateLimiterConfig, Token},
scram, EndpointCacheKey, EndpointId,
};
use std::hash::Hash;
use std::sync::Arc;
use std::time::Duration;
use dashmap::DashMap;
use std::{hash::Hash, sync::Arc, time::Duration};
use tokio::time::Instant;
use tracing::info;
use super::messages::{ControlPlaneError, MetricsAuxInfo};
use crate::auth::backend::jwt::{AuthRule, FetchAuthRules, FetchAuthRulesError};
use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo};
use crate::auth::IpPattern;
use crate::cache::endpoints::EndpointsCache;
use crate::cache::project_info::ProjectInfoCacheImpl;
use crate::cache::{Cached, TimedLru};
use crate::config::{CacheOptions, EndpointCacheConfig, ProjectInfoCacheOptions};
use crate::context::RequestMonitoring;
use crate::error::ReportableError;
use crate::intern::ProjectIdInt;
use crate::metrics::ApiLockMetrics;
use crate::rate_limiter::{DynamicLimiter, Outcome, RateLimiterConfig, Token};
use crate::{compute, scram, EndpointCacheKey, EndpointId};
pub(crate) mod errors {
use crate::{
control_plane::messages::{self, ControlPlaneError, Reason},
error::{io_error, ErrorKind, ReportableError, UserFacingError},
proxy::retry::CouldRetry,
};
use thiserror::Error;
use super::ApiLockError;
use crate::control_plane::messages::{self, ControlPlaneError, Reason};
use crate::error::{io_error, ErrorKind, ReportableError, UserFacingError};
use crate::proxy::retry::CouldRetry;
/// A go-to error message which doesn't leak any detail.
pub(crate) const REQUEST_FAILED: &str = "Console request failed";

View File

@@ -1,31 +1,31 @@
//! Production console backend.
use super::{
super::messages::{ControlPlaneError, GetRoleSecret, WakeCompute},
errors::{ApiError, GetAuthInfoError, WakeComputeError},
ApiCaches, ApiLocks, AuthInfo, AuthSecret, CachedAllowedIps, CachedNodeInfo, CachedRoleSecret,
NodeInfo,
};
use crate::{
auth::backend::{jwt::AuthRule, ComputeUserInfo},
compute,
control_plane::{
errors::GetEndpointJwksError,
messages::{ColdStartInfo, EndpointJwksResponse, Reason},
},
http,
metrics::{CacheOutcome, Metrics},
rate_limiter::WakeComputeRateLimiter,
scram, EndpointCacheKey, EndpointId,
};
use crate::{cache::Cached, context::RequestMonitoring};
use ::http::{header::AUTHORIZATION, HeaderName};
use std::sync::Arc;
use std::time::Duration;
use ::http::header::AUTHORIZATION;
use ::http::HeaderName;
use futures::TryFutureExt;
use std::{sync::Arc, time::Duration};
use tokio::time::Instant;
use tokio_postgres::config::SslMode;
use tracing::{debug, info, info_span, warn, Instrument};
use super::super::messages::{ControlPlaneError, GetRoleSecret, WakeCompute};
use super::errors::{ApiError, GetAuthInfoError, WakeComputeError};
use super::{
ApiCaches, ApiLocks, AuthInfo, AuthSecret, CachedAllowedIps, CachedNodeInfo, CachedRoleSecret,
NodeInfo,
};
use crate::auth::backend::jwt::AuthRule;
use crate::auth::backend::ComputeUserInfo;
use crate::cache::Cached;
use crate::context::RequestMonitoring;
use crate::control_plane::errors::GetEndpointJwksError;
use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse, Reason};
use crate::metrics::{CacheOutcome, Metrics};
use crate::rate_limiter::WakeComputeRateLimiter;
use crate::{compute, http, scram, EndpointCacheKey, EndpointId};
const X_REQUEST_ID: HeaderName = HeaderName::from_static("x-request-id");
#[derive(Clone)]

View File

@@ -1,4 +1,5 @@
use std::{error::Error as StdError, fmt, io};
use std::error::Error as StdError;
use std::{fmt, io};
use measured::FixedCardinalityLabel;

View File

@@ -1,19 +1,18 @@
use std::convert::Infallible;
use std::net::TcpListener;
use std::sync::{Arc, Mutex};
use anyhow::{anyhow, bail};
use hyper0::{header::CONTENT_TYPE, Body, Request, Response, StatusCode};
use measured::{text::BufferedTextEncoder, MetricGroup};
use hyper0::header::CONTENT_TYPE;
use hyper0::{Body, Request, Response, StatusCode};
use measured::text::BufferedTextEncoder;
use measured::MetricGroup;
use metrics::NeonMetrics;
use std::{
convert::Infallible,
net::TcpListener,
sync::{Arc, Mutex},
};
use tracing::{info, info_span};
use utils::http::{
endpoint::{self, request_span},
error::ApiError,
json::json_response,
RouterBuilder, RouterService,
};
use utils::http::endpoint::{self, request_span};
use utils::http::error::ApiError;
use utils::http::json::json_response;
use utils::http::{RouterBuilder, RouterService};
use crate::jemalloc;

View File

@@ -10,17 +10,15 @@ use anyhow::bail;
use bytes::Bytes;
use http_body_util::BodyExt;
use hyper::body::Body;
pub(crate) use reqwest::{Request, Response};
use reqwest_middleware::RequestBuilder;
pub(crate) use reqwest_middleware::{ClientWithMiddleware, Error};
pub(crate) use reqwest_retry::policies::ExponentialBackoff;
pub(crate) use reqwest_retry::RetryTransientMiddleware;
use serde::de::DeserializeOwned;
pub(crate) use reqwest::{Request, Response};
pub(crate) use reqwest_middleware::{ClientWithMiddleware, Error};
pub(crate) use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use crate::{
metrics::{ConsoleRequest, Metrics},
url::ApiUrl,
};
use reqwest_middleware::RequestBuilder;
use crate::metrics::{ConsoleRequest, Metrics};
use crate::url::ApiUrl;
/// This is the preferred way to create new http clients,
/// because it takes care of observability (OpenTelemetry).
@@ -142,9 +140,10 @@ pub(crate) async fn parse_json_body_with_limit<D: DeserializeOwned>(
#[cfg(test)]
mod tests {
use super::*;
use reqwest::Client;
use super::*;
#[test]
fn optional_query_params() -> anyhow::Result<()> {
let url = "http://example.com".parse()?;

View File

@@ -1,6 +1,8 @@
use std::{
hash::BuildHasherDefault, marker::PhantomData, num::NonZeroUsize, ops::Index, sync::OnceLock,
};
use std::hash::BuildHasherDefault;
use std::marker::PhantomData;
use std::num::NonZeroUsize;
use std::ops::Index;
use std::sync::OnceLock;
use lasso::{Capacity, MemoryLimits, Spur, ThreadedRodeo};
use rustc_hash::FxHasher;
@@ -208,9 +210,8 @@ impl From<ProjectId> for ProjectIdInt {
mod tests {
use std::sync::OnceLock;
use crate::intern::StringInterner;
use super::InternId;
use crate::intern::StringInterner;
struct MyId;
impl InternId for MyId {
@@ -222,7 +223,8 @@ mod tests {
#[test]
fn push_many_strings() {
use rand::{rngs::StdRng, Rng, SeedableRng};
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use rand_distr::Zipf;
let endpoint_dist = Zipf::new(500000, 0.8).unwrap();

View File

@@ -1,14 +1,12 @@
use std::marker::PhantomData;
use measured::{
label::NoLabels,
metric::{
gauge::GaugeState, group::Encoding, name::MetricNameEncoder, MetricEncoding,
MetricFamilyEncoding, MetricType,
},
text::TextEncoder,
LabelGroup, MetricGroup,
};
use measured::label::NoLabels;
use measured::metric::gauge::GaugeState;
use measured::metric::group::Encoding;
use measured::metric::name::MetricNameEncoder;
use measured::metric::{MetricEncoding, MetricFamilyEncoding, MetricType};
use measured::text::TextEncoder;
use measured::{LabelGroup, MetricGroup};
use tikv_jemalloc_ctl::{config, epoch, epoch_mib, stats, version};
pub struct MetricRecorder {

View File

@@ -1,14 +1,10 @@
use tracing::Subscriber;
use tracing_subscriber::{
filter::{EnvFilter, LevelFilter},
fmt::{
format::{Format, Full},
time::SystemTime,
FormatEvent, FormatFields,
},
prelude::*,
registry::LookupSpan,
};
use tracing_subscriber::filter::{EnvFilter, LevelFilter};
use tracing_subscriber::fmt::format::{Format, Full};
use tracing_subscriber::fmt::time::SystemTime;
use tracing_subscriber::fmt::{FormatEvent, FormatFields};
use tracing_subscriber::prelude::*;
use tracing_subscriber::registry::LookupSpan;
/// Initialize logging and OpenTelemetry tracing and exporter.
///

View File

@@ -1,14 +1,16 @@
use std::sync::{Arc, OnceLock};
use lasso::ThreadedRodeo;
use measured::label::{
FixedCardinalitySet, LabelGroupSet, LabelName, LabelSet, LabelValue, StaticLabelSet,
};
use measured::metric::histogram::Thresholds;
use measured::metric::name::MetricName;
use measured::{
label::{FixedCardinalitySet, LabelGroupSet, LabelName, LabelSet, LabelValue, StaticLabelSet},
metric::{histogram::Thresholds, name::MetricName},
Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup,
MetricGroup,
};
use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLog, HyperLogLogVec};
use tokio::time::{self, Instant};
use crate::control_plane::messages::ColdStartInfo;

View File

@@ -1,11 +1,9 @@
//! Proxy Protocol V2 implementation
use std::{
io,
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
};
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::BytesMut;
use pin_project_lite::pin_project;

View File

@@ -1,24 +1,23 @@
use crate::{
auth::backend::ComputeCredentialKeys,
compute::COULD_NOT_CONNECT,
compute::{self, PostgresConnection},
config::RetryConfig,
context::RequestMonitoring,
control_plane::{self, errors::WakeComputeError, locks::ApiLocks, CachedNodeInfo, NodeInfo},
error::ReportableError,
metrics::{ConnectOutcome, ConnectionFailureKind, Metrics, RetriesMetricGroup, RetryType},
proxy::{
retry::{retry_after, should_retry, CouldRetry},
wake_compute::wake_compute,
},
Host,
};
use async_trait::async_trait;
use pq_proto::StartupMessageParams;
use tokio::time;
use tracing::{debug, info, warn};
use super::retry::ShouldRetryWakeCompute;
use crate::auth::backend::ComputeCredentialKeys;
use crate::compute::{self, PostgresConnection, COULD_NOT_CONNECT};
use crate::config::RetryConfig;
use crate::context::RequestMonitoring;
use crate::control_plane::errors::WakeComputeError;
use crate::control_plane::locks::ApiLocks;
use crate::control_plane::{self, CachedNodeInfo, NodeInfo};
use crate::error::ReportableError;
use crate::metrics::{
ConnectOutcome, ConnectionFailureKind, Metrics, RetriesMetricGroup, RetryType,
};
use crate::proxy::retry::{retry_after, should_retry, CouldRetry};
use crate::proxy::wake_compute::wake_compute;
use crate::Host;
const CONNECT_TIMEOUT: time::Duration = time::Duration::from_secs(2);

View File

@@ -1,11 +1,11 @@
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tracing::info;
use std::future::poll_fn;
use std::io;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tracing::info;
#[derive(Debug)]
enum TransferState {
Running(CopyBuffer),
@@ -256,9 +256,10 @@ impl CopyBuffer {
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::AsyncWriteExt;
use super::*;
#[tokio::test]
async fn test_client_to_compute() {
let (mut client_client, mut client_proxy) = tokio::io::duplex(8); // Create a mock duplex stream

View File

@@ -1,21 +1,19 @@
use bytes::Buf;
use pq_proto::framed::Framed;
use pq_proto::{
framed::Framed, BeMessage as Be, CancelKeyData, FeStartupPacket, ProtocolVersion,
StartupMessageParams,
BeMessage as Be, CancelKeyData, FeStartupPacket, ProtocolVersion, StartupMessageParams,
};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{info, warn};
use crate::{
auth::endpoint_sni,
config::{TlsConfig, PG_ALPN_PROTOCOL},
context::RequestMonitoring,
error::ReportableError,
metrics::Metrics,
proxy::ERR_INSECURE_CONNECTION,
stream::{PqStream, Stream, StreamUpgradeError},
};
use crate::auth::endpoint_sni;
use crate::config::{TlsConfig, PG_ALPN_PROTOCOL};
use crate::context::RequestMonitoring;
use crate::error::ReportableError;
use crate::metrics::Metrics;
use crate::proxy::ERR_INSECURE_CONNECTION;
use crate::stream::{PqStream, Stream, StreamUpgradeError};
#[derive(Error, Debug)]
pub(crate) enum HandshakeError {

View File

@@ -7,40 +7,32 @@ pub(crate) mod handshake;
pub(crate) mod passthrough;
pub(crate) mod retry;
pub(crate) mod wake_compute;
pub use copy_bidirectional::copy_bidirectional_client_compute;
pub use copy_bidirectional::ErrorSource;
use std::sync::Arc;
use crate::config::ProxyProtocolV2;
use crate::{
auth,
cancellation::{self, CancellationHandlerMain, CancellationHandlerMainInternal},
compute,
config::{ProxyConfig, TlsConfig},
context::RequestMonitoring,
error::ReportableError,
metrics::{Metrics, NumClientConnectionsGuard},
protocol2::read_proxy_protocol,
proxy::handshake::{handshake, HandshakeData},
rate_limiter::EndpointRateLimiter,
stream::{PqStream, Stream},
EndpointCacheKey,
};
pub use copy_bidirectional::{copy_bidirectional_client_compute, ErrorSource};
use futures::TryFutureExt;
use itertools::Itertools;
use once_cell::sync::OnceCell;
use pq_proto::{BeMessage as Be, StartupMessageParams};
use regex::Regex;
use smol_str::{format_smolstr, SmolStr};
use std::sync::Arc;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn, Instrument};
use self::{
connect_compute::{connect_to_compute, TcpMechanism},
passthrough::ProxyPassthrough,
};
use self::connect_compute::{connect_to_compute, TcpMechanism};
use self::passthrough::ProxyPassthrough;
use crate::cancellation::{self, CancellationHandlerMain, CancellationHandlerMainInternal};
use crate::config::{ProxyConfig, ProxyProtocolV2, TlsConfig};
use crate::context::RequestMonitoring;
use crate::error::ReportableError;
use crate::metrics::{Metrics, NumClientConnectionsGuard};
use crate::protocol2::read_proxy_protocol;
use crate::proxy::handshake::{handshake, HandshakeData};
use crate::rate_limiter::EndpointRateLimiter;
use crate::stream::{PqStream, Stream};
use crate::{auth, compute, EndpointCacheKey};
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";

View File

@@ -1,16 +1,14 @@
use crate::{
cancellation,
compute::PostgresConnection,
control_plane::messages::MetricsAuxInfo,
metrics::{Direction, Metrics, NumClientConnectionsGuard, NumConnectionRequestsGuard},
stream::Stream,
usage_metrics::{Ids, MetricCounterRecorder, USAGE_METRICS},
};
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::info;
use utils::measured_stream::MeasuredStream;
use super::copy_bidirectional::ErrorSource;
use crate::cancellation;
use crate::compute::PostgresConnection;
use crate::control_plane::messages::MetricsAuxInfo;
use crate::metrics::{Direction, Metrics, NumClientConnectionsGuard, NumConnectionRequestsGuard};
use crate::stream::Stream;
use crate::usage_metrics::{Ids, MetricCounterRecorder, USAGE_METRICS};
/// Forward bytes in both directions (client <-> compute).
#[tracing::instrument(skip_all)]

View File

@@ -1,7 +1,11 @@
use crate::{compute, config::RetryConfig};
use std::{error::Error, io};
use std::error::Error;
use std::io;
use tokio::time;
use crate::compute;
use crate::config::RetryConfig;
pub(crate) trait CouldRetry {
/// Returns true if the error could be retried
fn could_retry(&self) -> bool;

View File

@@ -6,7 +6,6 @@
use std::fmt::Debug;
use super::*;
use bytes::{Bytes, BytesMut};
use futures::{SinkExt, StreamExt};
use postgres_protocol::message::frontend;
@@ -14,6 +13,8 @@ use tokio::io::{AsyncReadExt, DuplexStream};
use tokio_postgres::tls::TlsConnect;
use tokio_util::codec::{Decoder, Encoder};
use super::*;
enum Intercept {
None,
Methods,

View File

@@ -4,6 +4,16 @@ mod mitm;
use std::time::Duration;
use anyhow::{bail, Context};
use async_trait::async_trait;
use http::StatusCode;
use retry::{retry_after, ShouldRetryWakeCompute};
use rstest::rstest;
use rustls::pki_types;
use tokio_postgres::config::SslMode;
use tokio_postgres::tls::{MakeTlsConnect, NoTls};
use tokio_postgres_rustls::{MakeRustlsConnect, RustlsStream};
use super::connect_compute::ConnectMechanism;
use super::retry::CouldRetry;
use super::*;
@@ -18,15 +28,6 @@ use crate::control_plane::provider::{
use crate::control_plane::{self, CachedNodeInfo, NodeInfo};
use crate::error::ErrorKind;
use crate::{sasl, scram, BranchId, EndpointId, ProjectId};
use anyhow::{bail, Context};
use async_trait::async_trait;
use http::StatusCode;
use retry::{retry_after, ShouldRetryWakeCompute};
use rstest::rstest;
use rustls::pki_types;
use tokio_postgres::config::SslMode;
use tokio_postgres::tls::{MakeTlsConnect, NoTls};
use tokio_postgres_rustls::{MakeRustlsConnect, RustlsStream};
/// Generate a set of TLS certificates: CA + server.
fn generate_certs(
@@ -336,7 +337,8 @@ async fn scram_auth_mock() -> anyhow::Result<()> {
generate_tls_config("generic-project-name.localhost", "localhost")?;
let proxy = tokio::spawn(dummy_proxy(client, Some(server_config), Scram::mock()));
use rand::{distributions::Alphanumeric, Rng};
use rand::distributions::Alphanumeric;
use rand::Rng;
let password: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(rand::random::<u8>() as usize)

View File

@@ -1,16 +1,17 @@
use hyper::StatusCode;
use tracing::{error, info, warn};
use super::connect_compute::ComputeConnectBackend;
use crate::config::RetryConfig;
use crate::context::RequestMonitoring;
use crate::control_plane::errors::WakeComputeError;
use crate::control_plane::messages::{ControlPlaneError, Reason};
use crate::control_plane::{errors::WakeComputeError, provider::CachedNodeInfo};
use crate::control_plane::provider::CachedNodeInfo;
use crate::metrics::{
ConnectOutcome, ConnectionFailuresBreakdownGroup, Metrics, RetriesMetricGroup, RetryType,
WakeupFailureKind,
};
use crate::proxy::retry::{retry_after, should_retry};
use hyper::StatusCode;
use tracing::{error, info, warn};
use super::connect_compute::ComputeConnectBackend;
pub(crate) async fn wake_compute<B: ComputeConnectBackend>(
num_retries: &mut u32,

View File

@@ -1,7 +1,5 @@
use std::{
hash::Hash,
sync::atomic::{AtomicUsize, Ordering},
};
use std::hash::Hash;
use std::sync::atomic::{AtomicUsize, Ordering};
use ahash::RandomState;
use dashmap::DashMap;

View File

@@ -1,10 +1,12 @@
//! Algorithms for controlling concurrency limits.
use std::pin::pin;
use std::sync::Arc;
use std::time::Duration;
use parking_lot::Mutex;
use std::{pin::pin, sync::Arc, time::Duration};
use tokio::{
sync::Notify,
time::{error::Elapsed, Instant},
};
use tokio::sync::Notify;
use tokio::time::error::Elapsed;
use tokio::time::Instant;
use self::aimd::Aimd;

View File

@@ -60,12 +60,11 @@ impl LimitAlgorithm for Aimd {
mod tests {
use std::time::Duration;
use super::*;
use crate::rate_limiter::limit_algorithm::{
DynamicLimiter, RateLimitAlgorithm, RateLimiterConfig,
};
use super::*;
#[tokio::test(start_paused = true)]
async fn increase_decrease() {
let config = RateLimiterConfig {

View File

@@ -1,17 +1,14 @@
use std::{
borrow::Cow,
collections::hash_map::RandomState,
hash::{BuildHasher, Hash},
sync::{
atomic::{AtomicUsize, Ordering},
Mutex,
},
};
use std::borrow::Cow;
use std::collections::hash_map::RandomState;
use std::hash::{BuildHasher, Hash};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;
use anyhow::bail;
use dashmap::DashMap;
use itertools::Itertools;
use rand::{rngs::StdRng, Rng, SeedableRng};
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use tokio::time::{Duration, Instant};
use tracing::info;
@@ -243,14 +240,17 @@ impl<K: Hash + Eq, R: Rng, S: BuildHasher + Clone> BucketRateLimiter<K, R, S> {
#[cfg(test)]
mod tests {
use std::{hash::BuildHasherDefault, time::Duration};
use std::hash::BuildHasherDefault;
use std::time::Duration;
use rand::SeedableRng;
use rustc_hash::FxHasher;
use tokio::time;
use super::{BucketRateLimiter, WakeComputeRateLimiter};
use crate::{intern::EndpointIdInt, rate_limiter::RateBucketInfo, EndpointId};
use crate::intern::EndpointIdInt;
use crate::rate_limiter::RateBucketInfo;
use crate::EndpointId;
#[test]
fn rate_bucket_rpi() {

View File

@@ -2,13 +2,11 @@ mod leaky_bucket;
mod limit_algorithm;
mod limiter;
pub use leaky_bucket::{EndpointRateLimiter, LeakyBucketConfig, LeakyBucketRateLimiter};
#[cfg(test)]
pub(crate) use limit_algorithm::aimd::Aimd;
pub(crate) use limit_algorithm::{
DynamicLimiter, Outcome, RateLimitAlgorithm, RateLimiterConfig, Token,
};
pub(crate) use limiter::GlobalRateLimiter;
pub use leaky_bucket::{EndpointRateLimiter, LeakyBucketConfig, LeakyBucketRateLimiter};
pub use limiter::{BucketRateLimiter, RateBucketInfo, WakeComputeRateLimiter};

View File

@@ -5,13 +5,10 @@ use redis::AsyncCommands;
use tokio::sync::Mutex;
use uuid::Uuid;
use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
use super::notifications::{CancelSession, Notification, PROXY_CHANNEL_NAME};
use crate::rate_limiter::{GlobalRateLimiter, RateBucketInfo};
use super::{
connection_with_credentials_provider::ConnectionWithCredentialsProvider,
notifications::{CancelSession, Notification, PROXY_CHANNEL_NAME},
};
pub trait CancellationPublisherMut: Send + Sync + 'static {
#[allow(async_fn_in_trait)]
async fn try_publish(

View File

@@ -1,10 +1,9 @@
use std::{sync::Arc, time::Duration};
use std::sync::Arc;
use std::time::Duration;
use futures::FutureExt;
use redis::{
aio::{ConnectionLike, MultiplexedConnection},
ConnectionInfo, IntoConnectionInfo, RedisConnectionInfo, RedisResult,
};
use redis::aio::{ConnectionLike, MultiplexedConnection};
use redis::{ConnectionInfo, IntoConnectionInfo, RedisConnectionInfo, RedisResult};
use tokio::task::JoinHandle;
use tracing::{debug, error, info, warn};

View File

@@ -1,4 +1,5 @@
use std::{convert::Infallible, sync::Arc};
use std::convert::Infallible;
use std::sync::Arc;
use futures::StreamExt;
use pq_proto::CancelKeyData;
@@ -8,12 +9,10 @@ use tokio_util::sync::CancellationToken;
use uuid::Uuid;
use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
use crate::{
cache::project_info::ProjectInfoCache,
cancellation::{CancelMap, CancellationHandler},
intern::{ProjectIdInt, RoleNameInt},
metrics::{Metrics, RedisErrors, RedisEventsCount},
};
use crate::cache::project_info::ProjectInfoCache;
use crate::cancellation::{CancelMap, CancellationHandler};
use crate::intern::{ProjectIdInt, RoleNameInt};
use crate::metrics::{Metrics, RedisErrors, RedisEventsCount};
const CPLANE_CHANNEL_NAME: &str = "neondb-proxy-ws-updates";
pub(crate) const PROXY_CHANNEL_NAME: &str = "neondb-proxy-to-proxy-updates";
@@ -269,10 +268,10 @@ where
#[cfg(test)]
mod tests {
use crate::{ProjectId, RoleName};
use serde_json::json;
use super::*;
use serde_json::json;
use crate::{ProjectId, RoleName};
#[test]
fn parse_allowed_ips() -> anyhow::Result<()> {

View File

@@ -1,8 +1,9 @@
//! Definitions for SASL messages.
use crate::parse::{split_at_const, split_cstr};
use pq_proto::{BeAuthenticationSaslMessage, BeMessage};
use crate::parse::{split_at_const, split_cstr};
/// SASL-specific payload of [`PasswordMessage`](pq_proto::FeMessage::PasswordMessage).
#[derive(Debug)]
pub(crate) struct FirstMessage<'a> {

View File

@@ -10,13 +10,14 @@ mod channel_binding;
mod messages;
mod stream;
use crate::error::{ReportableError, UserFacingError};
use std::io;
use thiserror::Error;
pub(crate) use channel_binding::ChannelBinding;
pub(crate) use messages::FirstMessage;
pub(crate) use stream::{Outcome, SaslStream};
use thiserror::Error;
use crate::error::{ReportableError, UserFacingError};
/// Fine-grained auth errors help in writing tests.
#[derive(Error, Debug)]

View File

@@ -1,11 +1,14 @@
//! Abstraction for the string-oriented SASL protocols.
use super::{messages::ServerMessage, Mechanism};
use crate::stream::PqStream;
use std::io;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::info;
use super::messages::ServerMessage;
use super::Mechanism;
use crate::stream::PqStream;
/// Abstracts away all peculiarities of the libpq's protocol.
pub(crate) struct SaslStream<'a, S> {
/// The underlying stream.

View File

@@ -69,7 +69,9 @@ impl CountMinSketch {
#[cfg(test)]
mod tests {
use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng};
use rand::rngs::StdRng;
use rand::seq::SliceRandom;
use rand::{Rng, SeedableRng};
use super::CountMinSketch;

View File

@@ -209,7 +209,8 @@ impl sasl::Mechanism for Exchange<'_> {
type Output = super::ScramKey;
fn exchange(mut self, input: &str) -> sasl::Result<sasl::Step<Self, Self::Output>> {
use {sasl::Step, ExchangeState};
use sasl::Step;
use ExchangeState;
match &self.state {
ExchangeState::Initial(init) => {
match init.transition(self.secret, &self.tls_server_end_point, input)? {

View File

@@ -1,11 +1,12 @@
//! Definitions for SCRAM messages.
use std::fmt;
use std::ops::Range;
use super::base64_decode_array;
use super::key::{ScramKey, SCRAM_KEY_LEN};
use super::signature::SignatureBuilder;
use crate::sasl::ChannelBinding;
use std::fmt;
use std::ops::Range;
/// Faithfully taken from PostgreSQL.
pub(crate) const SCRAM_RAW_NONCE_LEN: usize = 18;

View File

@@ -16,10 +16,9 @@ mod signature;
pub mod threadpool;
pub(crate) use exchange::{exchange, Exchange};
use hmac::{Hmac, Mac};
pub(crate) use key::ScramKey;
pub(crate) use secret::ServerSecret;
use hmac::{Hmac, Mac};
use sha2::{Digest, Sha256};
const SCRAM_SHA_256: &str = "SCRAM-SHA-256";
@@ -59,13 +58,11 @@ fn sha256<'a>(parts: impl IntoIterator<Item = &'a [u8]>) -> [u8; 32] {
#[cfg(test)]
mod tests {
use crate::{
intern::EndpointIdInt,
sasl::{Mechanism, Step},
EndpointId,
};
use super::{threadpool::ThreadPool, Exchange, ServerSecret};
use super::threadpool::ThreadPool;
use super::{Exchange, ServerSecret};
use crate::intern::EndpointIdInt;
use crate::sasl::{Mechanism, Step};
use crate::EndpointId;
#[test]
fn snapshot() {

View File

@@ -1,7 +1,6 @@
use hmac::{
digest::{consts::U32, generic_array::GenericArray},
Hmac, Mac,
};
use hmac::digest::consts::U32;
use hmac::digest::generic_array::GenericArray;
use hmac::{Hmac, Mac};
use sha2::Sha256;
pub(crate) struct Pbkdf2 {
@@ -66,10 +65,11 @@ impl Pbkdf2 {
#[cfg(test)]
mod tests {
use super::Pbkdf2;
use pbkdf2::pbkdf2_hmac_array;
use sha2::Sha256;
use super::Pbkdf2;
#[test]
fn works() {
let salt = b"sodium chloride";

View File

@@ -4,28 +4,21 @@
//! 1. Fairness per endpoint.
//! 2. Yield support for high iteration counts.
use std::{
cell::RefCell,
future::Future,
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Weak,
},
task::{Context, Poll},
};
use std::cell::RefCell;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use std::task::{Context, Poll};
use futures::FutureExt;
use rand::Rng;
use rand::{rngs::SmallRng, SeedableRng};
use crate::{
intern::EndpointIdInt,
metrics::{ThreadPoolMetrics, ThreadPoolWorkerId},
scram::countmin::CountMinSketch,
};
use rand::rngs::SmallRng;
use rand::{Rng, SeedableRng};
use super::pbkdf2::Pbkdf2;
use crate::intern::EndpointIdInt;
use crate::metrics::{ThreadPoolMetrics, ThreadPoolWorkerId};
use crate::scram::countmin::CountMinSketch;
pub struct ThreadPool {
runtime: Option<tokio::runtime::Runtime>,
@@ -195,9 +188,8 @@ impl Drop for JobHandle {
#[cfg(test)]
mod tests {
use crate::EndpointId;
use super::*;
use crate::EndpointId;
#[tokio::test]
async fn hash_is_correct() {

View File

@@ -1,42 +1,34 @@
use std::{io, sync::Arc, time::Duration};
use std::io;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
use p256::{ecdsa::SigningKey, elliptic_curve::JwkEcKey};
use p256::ecdsa::SigningKey;
use p256::elliptic_curve::JwkEcKey;
use rand::rngs::OsRng;
use tokio::net::{lookup_host, TcpStream};
use tracing::{debug, field::display, info};
use tracing::field::display;
use tracing::{debug, info};
use crate::{
auth::{
self,
backend::{local::StaticAuthRules, ComputeCredentials, ComputeUserInfo},
check_peer_addr_is_in_list, AuthError,
},
compute,
config::ProxyConfig,
context::RequestMonitoring,
control_plane::{
errors::{GetAuthInfoError, WakeComputeError},
locks::ApiLocks,
provider::ApiLockError,
CachedNodeInfo,
},
error::{ErrorKind, ReportableError, UserFacingError},
intern::EndpointIdInt,
proxy::{
connect_compute::ConnectMechanism,
retry::{CouldRetry, ShouldRetryWakeCompute},
},
rate_limiter::EndpointRateLimiter,
EndpointId, Host,
};
use super::{
conn_pool::{poll_client, Client, ConnInfo, GlobalConnPool},
http_conn_pool::{self, poll_http2_client},
local_conn_pool::{self, LocalClient, LocalConnPool},
};
use super::conn_pool::{poll_client, Client, ConnInfo, GlobalConnPool};
use super::http_conn_pool::{self, poll_http2_client};
use super::local_conn_pool::{self, LocalClient, LocalConnPool};
use crate::auth::backend::local::StaticAuthRules;
use crate::auth::backend::{ComputeCredentials, ComputeUserInfo};
use crate::auth::{self, check_peer_addr_is_in_list, AuthError};
use crate::config::ProxyConfig;
use crate::context::RequestMonitoring;
use crate::control_plane::errors::{GetAuthInfoError, WakeComputeError};
use crate::control_plane::locks::ApiLocks;
use crate::control_plane::provider::ApiLockError;
use crate::control_plane::CachedNodeInfo;
use crate::error::{ErrorKind, ReportableError, UserFacingError};
use crate::intern::EndpointIdInt;
use crate::proxy::connect_compute::ConnectMechanism;
use crate::proxy::retry::{CouldRetry, ShouldRetryWakeCompute};
use crate::rate_limiter::EndpointRateLimiter;
use crate::{compute, EndpointId, Host};
pub(crate) struct PoolingBackend {
pub(crate) http_conn_pool: Arc<super::http_conn_pool::GlobalConnPool>,

View File

@@ -1,10 +1,8 @@
//! A set for cancelling random http connections
use std::{
hash::{BuildHasher, BuildHasherDefault},
num::NonZeroUsize,
time::Duration,
};
use std::hash::{BuildHasher, BuildHasherDefault};
use std::num::NonZeroUsize;
use std::time::Duration;
use indexmap::IndexMap;
use parking_lot::Mutex;

View File

@@ -1,33 +1,31 @@
use std::collections::HashMap;
use std::fmt;
use std::ops::Deref;
use std::pin::pin;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::{Arc, Weak};
use std::task::{ready, Poll};
use std::time::Duration;
use dashmap::DashMap;
use futures::{future::poll_fn, Future};
use futures::future::poll_fn;
use futures::Future;
use parking_lot::RwLock;
use rand::Rng;
use smallvec::SmallVec;
use std::{collections::HashMap, pin::pin, sync::Arc, sync::Weak, time::Duration};
use std::{
fmt,
task::{ready, Poll},
};
use std::{
ops::Deref,
sync::atomic::{self, AtomicUsize},
};
use tokio::time::Instant;
use tokio_postgres::tls::NoTlsStream;
use tokio_postgres::{AsyncMessage, ReadyForQueryStatus, Socket};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, info_span, warn, Instrument, Span};
use super::backend::HttpConnError;
use crate::auth::backend::ComputeUserInfo;
use crate::context::RequestMonitoring;
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
use crate::{
auth::backend::ComputeUserInfo, context::RequestMonitoring, DbName, EndpointCacheKey, RoleName,
};
use tracing::{debug, error, warn, Span};
use tracing::{info, info_span, Instrument};
use super::backend::HttpConnError;
use crate::{DbName, EndpointCacheKey, RoleName};
#[derive(Debug, Clone)]
pub(crate) struct ConnInfoWithAuth {
@@ -724,13 +722,13 @@ impl<C: ClientInnerExt> Drop for Client<C> {
#[cfg(test)]
mod tests {
use std::{mem, sync::atomic::AtomicBool};
use crate::{
proxy::NeonOptions, serverless::cancel_set::CancelSet, BranchId, EndpointId, ProjectId,
};
use std::mem;
use std::sync::atomic::AtomicBool;
use super::*;
use crate::proxy::NeonOptions;
use crate::serverless::cancel_set::CancelSet;
use crate::{BranchId, EndpointId, ProjectId};
struct MockClient(Arc<AtomicBool>);
impl MockClient {

View File

@@ -1,22 +1,21 @@
use std::collections::VecDeque;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::{Arc, Weak};
use dashmap::DashMap;
use hyper::client::conn::http2;
use hyper_util::rt::{TokioExecutor, TokioIo};
use parking_lot::RwLock;
use rand::Rng;
use std::collections::VecDeque;
use std::sync::atomic::{self, AtomicUsize};
use std::{sync::Arc, sync::Weak};
use tokio::net::TcpStream;
use tracing::{debug, error, info, info_span, Instrument};
use super::conn_pool::ConnInfo;
use crate::context::RequestMonitoring;
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
use crate::{context::RequestMonitoring, EndpointCacheKey};
use tracing::{debug, error};
use tracing::{info, info_span, Instrument};
use super::conn_pool::ConnInfo;
use crate::EndpointCacheKey;
pub(crate) type Send = http2::SendRequest<hyper::body::Incoming>;
pub(crate) type Connect =

View File

@@ -1,12 +1,11 @@
//! Things stolen from `libs/utils/src/http` to add hyper 1.0 compatibility
//! Will merge back in at some point in the future.
use bytes::Bytes;
use anyhow::Context;
use bytes::Bytes;
use http::{Response, StatusCode};
use http_body_util::{combinators::BoxBody, BodyExt, Full};
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full};
use serde::Serialize;
use utils::http::error::ApiError;

View File

@@ -1,7 +1,5 @@
use serde_json::Map;
use serde_json::Value;
use tokio_postgres::types::Kind;
use tokio_postgres::types::Type;
use serde_json::{Map, Value};
use tokio_postgres::types::{Kind, Type};
use tokio_postgres::Row;
//
@@ -256,9 +254,10 @@ fn _pg_array_parse(
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
use super::*;
#[test]
fn test_atomic_types_to_pg_params() {
let json = vec![Value::Bool(true), Value::Bool(false)];

View File

@@ -1,28 +1,31 @@
use futures::{future::poll_fn, Future};
use std::collections::HashMap;
use std::pin::pin;
use std::sync::{Arc, Weak};
use std::task::{ready, Poll};
use std::time::Duration;
use futures::future::poll_fn;
use futures::Future;
use indexmap::IndexMap;
use jose_jwk::jose_b64::base64ct::{Base64UrlUnpadded, Encoding};
use p256::ecdsa::{Signature, SigningKey};
use parking_lot::RwLock;
use serde_json::value::RawValue;
use signature::Signer;
use std::task::{ready, Poll};
use std::{collections::HashMap, pin::pin, sync::Arc, sync::Weak, time::Duration};
use tokio::time::Instant;
use tokio_postgres::tls::NoTlsStream;
use tokio_postgres::types::ToSql;
use tokio_postgres::{AsyncMessage, ReadyForQueryStatus, Socket};
use tokio_util::sync::CancellationToken;
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::Metrics;
use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
use crate::{context::RequestMonitoring, DbName, RoleName};
use tracing::{error, warn, Span};
use tracing::{info, info_span, Instrument};
use tracing::{error, info, info_span, warn, Instrument, Span};
use super::backend::HttpConnError;
use super::conn_pool::{ClientInnerExt, ConnInfo};
use crate::context::RequestMonitoring;
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::Metrics;
use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
use crate::{DbName, RoleName};
struct ConnPoolEntry<C: ClientInnerExt> {
conn: ClientInner<C>,

View File

@@ -12,12 +12,15 @@ mod local_conn_pool;
mod sql_over_http;
mod websocket;
use std::net::{IpAddr, SocketAddr};
use std::pin::{pin, Pin};
use std::sync::Arc;
use anyhow::Context;
use async_trait::async_trait;
use atomic_take::AtomicTake;
use bytes::Bytes;
pub use conn_pool::GlobalConnPoolOptions;
use anyhow::Context;
use futures::future::{select, Either};
use futures::TryFutureExt;
use http::{Method, Response, StatusCode};
@@ -29,9 +32,13 @@ use hyper_util::server::conn::auto::Builder;
use rand::rngs::StdRng;
use rand::SeedableRng;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream};
use tokio::time::timeout;
use tokio_rustls::TlsAcceptor;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
use tracing::{info, warn, Instrument};
use utils::http::error::ApiError;
use crate::cancellation::CancellationHandlerMain;
use crate::config::ProxyConfig;
@@ -43,14 +50,6 @@ use crate::rate_limiter::EndpointRateLimiter;
use crate::serverless::backend::PoolingBackend;
use crate::serverless::http_util::{api_error_into_response, json_response};
use std::net::{IpAddr, SocketAddr};
use std::pin::{pin, Pin};
use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream};
use tokio_util::sync::CancellationToken;
use tracing::{info, warn, Instrument};
use utils::http::error::ApiError;
pub(crate) const SERVERLESS_DRIVER_SNI: &str = "api";
pub async fn task_main(

View File

@@ -2,77 +2,43 @@ use std::pin::pin;
use std::sync::Arc;
use bytes::Bytes;
use futures::future::select;
use futures::future::try_join;
use futures::future::Either;
use futures::StreamExt;
use futures::TryFutureExt;
use futures::future::{select, try_join, Either};
use futures::{StreamExt, TryFutureExt};
use http::header::AUTHORIZATION;
use http::Method;
use http_body_util::combinators::BoxBody;
use http_body_util::BodyExt;
use http_body_util::Full;
use hyper::body::Body;
use hyper::body::Incoming;
use hyper::header;
use hyper::http::HeaderName;
use hyper::http::HeaderValue;
use hyper::Response;
use hyper::StatusCode;
use hyper::{HeaderMap, Request};
use http_body_util::{BodyExt, Full};
use hyper::body::{Body, Incoming};
use hyper::http::{HeaderName, HeaderValue};
use hyper::{header, HeaderMap, Request, Response, StatusCode};
use pq_proto::StartupMessageParamsBuilder;
use serde::Serialize;
use serde_json::Value;
use tokio::time;
use tokio_postgres::error::DbError;
use tokio_postgres::error::ErrorPosition;
use tokio_postgres::error::SqlState;
use tokio_postgres::GenericClient;
use tokio_postgres::IsolationLevel;
use tokio_postgres::NoTls;
use tokio_postgres::ReadyForQueryStatus;
use tokio_postgres::Transaction;
use tokio_postgres::error::{DbError, ErrorPosition, SqlState};
use tokio_postgres::{GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, Transaction};
use tokio_util::sync::CancellationToken;
use tracing::error;
use tracing::info;
use tracing::{error, info};
use typed_json::json;
use url::Url;
use urlencoding;
use utils::http::error::ApiError;
use crate::auth::backend::ComputeCredentialKeys;
use crate::auth::backend::ComputeUserInfo;
use crate::auth::endpoint_sni;
use crate::auth::ComputeUserInfoParseError;
use crate::config::AuthenticationConfig;
use crate::config::HttpConfig;
use crate::config::ProxyConfig;
use crate::config::TlsConfig;
use crate::context::RequestMonitoring;
use crate::error::ErrorKind;
use crate::error::ReportableError;
use crate::error::UserFacingError;
use crate::metrics::HttpDirection;
use crate::metrics::Metrics;
use crate::proxy::run_until_cancelled;
use crate::proxy::NeonOptions;
use crate::serverless::backend::HttpConnError;
use crate::usage_metrics::MetricCounter;
use crate::usage_metrics::MetricCounterRecorder;
use crate::DbName;
use crate::RoleName;
use super::backend::LocalProxyConnError;
use super::backend::PoolingBackend;
use super::conn_pool;
use super::conn_pool::AuthData;
use super::conn_pool::ConnInfo;
use super::conn_pool::ConnInfoWithAuth;
use super::backend::{LocalProxyConnError, PoolingBackend};
use super::conn_pool::{AuthData, ConnInfo, ConnInfoWithAuth};
use super::http_util::json_response;
use super::json::json_to_pg_text;
use super::json::pg_text_row_to_json;
use super::json::JsonConversionError;
use super::local_conn_pool;
use super::json::{json_to_pg_text, pg_text_row_to_json, JsonConversionError};
use super::{conn_pool, local_conn_pool};
use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo};
use crate::auth::{endpoint_sni, ComputeUserInfoParseError};
use crate::config::{AuthenticationConfig, HttpConfig, ProxyConfig, TlsConfig};
use crate::context::RequestMonitoring;
use crate::error::{ErrorKind, ReportableError, UserFacingError};
use crate::metrics::{HttpDirection, Metrics};
use crate::proxy::{run_until_cancelled, NeonOptions};
use crate::serverless::backend::HttpConnError;
use crate::usage_metrics::{MetricCounter, MetricCounterRecorder};
use crate::{DbName, RoleName};
#[derive(serde::Deserialize)]
#[serde(rename_all = "camelCase")]

View File

@@ -1,13 +1,7 @@
use crate::proxy::ErrorSource;
use crate::{
cancellation::CancellationHandlerMain,
config::ProxyConfig,
context::RequestMonitoring,
error::{io_error, ReportableError},
metrics::Metrics,
proxy::{handle_client, ClientMode},
rate_limiter::EndpointRateLimiter,
};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{ready, Context, Poll};
use anyhow::Context as _;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use framed_websockets::{Frame, OpCode, WebSocketServer};
@@ -15,15 +9,17 @@ use futures::{Sink, Stream};
use hyper::upgrade::OnUpgrade;
use hyper_util::rt::TokioIo;
use pin_project_lite::pin_project;
use std::{
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
};
use tokio::io::{self, AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf};
use tracing::warn;
use crate::cancellation::CancellationHandlerMain;
use crate::config::ProxyConfig;
use crate::context::RequestMonitoring;
use crate::error::{io_error, ReportableError};
use crate::metrics::Metrics;
use crate::proxy::{handle_client, ClientMode, ErrorSource};
use crate::rate_limiter::EndpointRateLimiter;
pin_project! {
/// This is a wrapper around a [`WebSocketStream`] that
/// implements [`AsyncRead`] and [`AsyncWrite`].
@@ -184,14 +180,11 @@ mod tests {
use framed_websockets::WebSocketServer;
use futures::{SinkExt, StreamExt};
use tokio::{
io::{duplex, AsyncReadExt, AsyncWriteExt},
task::JoinSet,
};
use tokio_tungstenite::{
tungstenite::{protocol::Role, Message},
WebSocketStream,
};
use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt};
use tokio::task::JoinSet;
use tokio_tungstenite::tungstenite::protocol::Role;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::WebSocketStream;
use super::WebSocketRw;

View File

@@ -1,19 +1,20 @@
use crate::config::TlsServerEndPoint;
use crate::error::{ErrorKind, ReportableError, UserFacingError};
use crate::metrics::Metrics;
use bytes::BytesMut;
use pq_proto::framed::{ConnectionError, Framed};
use pq_proto::{BeMessage, FeMessage, FeStartupPacket, ProtocolError};
use rustls::ServerConfig;
use std::pin::Pin;
use std::sync::Arc;
use std::{io, task};
use bytes::BytesMut;
use pq_proto::framed::{ConnectionError, Framed};
use pq_proto::{BeMessage, FeMessage, FeStartupPacket, ProtocolError};
use rustls::ServerConfig;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_rustls::server::TlsStream;
use tracing::debug;
use crate::config::TlsServerEndPoint;
use crate::error::{ErrorKind, ReportableError, UserFacingError};
use crate::metrics::Metrics;
/// Stream wrapper which implements libpq's protocol.
///
/// NOTE: This object deliberately doesn't implement [`AsyncRead`]

View File

@@ -1,36 +1,33 @@
//! Periodically collect proxy consumption metrics
//! and push them to a HTTP endpoint.
use crate::{
config::{MetricBackupCollectionConfig, MetricCollectionConfig},
context::parquet::{FAILED_UPLOAD_MAX_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD},
http,
intern::{BranchIdInt, EndpointIdInt},
};
use std::convert::Infallible;
use std::pin::pin;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use async_compression::tokio::write::GzipEncoder;
use bytes::Bytes;
use chrono::{DateTime, Datelike, Timelike, Utc};
use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
use dashmap::{mapref::entry::Entry, DashMap};
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use futures::future::select;
use once_cell::sync::Lazy;
use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
use serde::{Deserialize, Serialize};
use std::{
convert::Infallible,
pin::pin,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use tokio::io::AsyncWriteExt;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, instrument, trace, warn};
use utils::backoff;
use uuid::{NoContext, Timestamp};
use crate::config::{MetricBackupCollectionConfig, MetricCollectionConfig};
use crate::context::parquet::{FAILED_UPLOAD_MAX_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD};
use crate::http;
use crate::intern::{BranchIdInt, EndpointIdInt};
const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
const HTTP_REPORTING_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
@@ -485,19 +482,23 @@ async fn upload_events_chunk(
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
use crate::{http, BranchId, EndpointId};
use anyhow::Error;
use chrono::Utc;
use consumption_metrics::{Event, EventChunk};
use http_body_util::BodyExt;
use hyper::{body::Incoming, server::conn::http1, service::service_fn, Request, Response};
use hyper::body::Incoming;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Request, Response};
use hyper_util::rt::TokioIo;
use std::sync::{Arc, Mutex};
use tokio::net::TcpListener;
use url::Url;
use super::*;
use crate::{http, BranchId, EndpointId};
#[tokio::test]
async fn metrics() {
type Report = EventChunk<'static, Event<Ids, String>>;

View File

@@ -1,8 +1,9 @@
use std::pin::Pin;
use std::task;
use hashbrown::HashMap;
use parking_lot::Mutex;
use pin_project_lite::pin_project;
use std::pin::Pin;
use std::task;
use thiserror::Error;
use tokio::sync::oneshot;
@@ -99,9 +100,10 @@ impl<T> std::future::Future for Waiter<'_, T> {
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use super::*;
#[tokio::test]
async fn test_waiter() -> anyhow::Result<()> {
let waiters = Arc::new(Waiters::default());

View File

@@ -1074,8 +1074,9 @@ impl Service {
/// the observed state of the tenant such that subsequent calls to [`TenantShard::get_reconcile_needed`]
/// will indicate that reconciliation is not needed.
#[instrument(skip_all, fields(
tenant_id=%result.tenant_shard_id.tenant_id, shard_id=%result.tenant_shard_id.shard_slug(),
sequence=%result.sequence
seq=%result.sequence,
tenant_id=%result.tenant_shard_id.tenant_id,
shard_id=%result.tenant_shard_id.shard_slug(),
))]
fn process_result(&self, result: ReconcileResult) {
let mut locked = self.inner.write().unwrap();

View File

@@ -0,0 +1,38 @@
import threading
from fixtures.neon_fixtures import NeonEnv
def test_compute_restart(neon_simple_env: NeonEnv):
env = neon_simple_env
env.create_branch("publisher")
pub = env.endpoints.create("publisher")
pub.start()
sub_timeline_id = env.create_branch("subscriber")
sub = env.endpoints.create("subscriber")
sub.start()
n_records = 100000
n_restarts = 200
def insert_data(pub):
with pub.cursor() as pcur:
for i in range(0, n_records):
pcur.execute("INSERT into t values (%s,random()*100000)", (i,))
with pub.cursor() as pcur:
with sub.cursor() as scur:
pcur.execute("CREATE TABLE t (pk integer primary key, sk integer)")
scur.execute("CREATE TABLE t (pk integer primary key, sk integer)")
thread = threading.Thread(target=insert_data, args=(pub,), daemon=True)
thread.start()
for _ in range(n_restarts):
# restart subscriber
# time.sleep(2)
sub.stop("immediate", sks_wait_walreceiver_gone=(env.safekeepers, sub_timeline_id))
sub.start()
thread.join()

View File

@@ -72,7 +72,7 @@ WITH (fillfactor='100');
# verify working set size after some index access of a few select pages only
blocks = query_scalar(cur, "select approximate_working_set_size(true)")
log.info(f"working set size after some index access of a few select pages only {blocks}")
assert blocks < 12
assert blocks < 10
def test_sliding_working_set_approximation(neon_simple_env: NeonEnv):

View File

@@ -19,6 +19,7 @@ from fixtures.metrics import (
parse_metrics,
)
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
NeonEnvBuilder,
wait_for_last_flush_lsn,
@@ -490,8 +491,8 @@ def test_timelines_parallel_endpoints(neon_simple_env: NeonEnv):
n_threads = 16
barrier = threading.Barrier(n_threads)
def test_timeline(branch_name: str, timeline_id: TimelineId):
endpoint = env.endpoints.create_start(branch_name)
def test_timeline(branch_name: str, timeline_id: TimelineId, endpoint: Endpoint):
endpoint.start()
endpoint.stop()
# Use a barrier to make sure we restart endpoints at the same time
barrier.wait()
@@ -502,8 +503,12 @@ def test_timelines_parallel_endpoints(neon_simple_env: NeonEnv):
for i in range(0, n_threads):
branch_name = f"branch_{i}"
timeline_id = env.create_branch(branch_name)
w = threading.Thread(target=test_timeline, args=[branch_name, timeline_id])
endpoint = env.endpoints.create(branch_name)
w = threading.Thread(target=test_timeline, args=[branch_name, timeline_id, endpoint])
workers.append(w)
# Only start the restarts once we're done creating all timelines & endpoints
for w in workers:
w.start()
for w in workers:

View File

@@ -136,6 +136,17 @@ def test_timeline_offloading(neon_env_builder: NeonEnvBuilder, manual_offload: b
"test_ancestor_branch_archive_branch1", tenant_id, "test_ancestor_branch_archive_parent"
)
with env.endpoints.create_start(
"test_ancestor_branch_archive_branch1", tenant_id=tenant_id
) as endpoint:
endpoint.safe_psql_many(
[
"CREATE TABLE foo(key serial primary key, t text default 'data_content')",
"INSERT INTO foo SELECT FROM generate_series(1,1000)",
]
)
sum = endpoint.safe_psql("SELECT sum(key) from foo where key > 50")
ps_http.timeline_archival_config(
tenant_id,
leaf_timeline_id,
@@ -197,4 +208,10 @@ def test_timeline_offloading(neon_env_builder: NeonEnvBuilder, manual_offload: b
)
assert leaf_detail["is_archived"] is False
with env.endpoints.create_start(
"test_ancestor_branch_archive_branch1", tenant_id=tenant_id
) as endpoint:
sum_again = endpoint.safe_psql("SELECT sum(key) from foo where key > 50")
assert sum == sum_again
assert not timeline_offloaded(initial_timeline_id)