Compare commits

..

26 Commits

Author SHA1 Message Date
Alexander Bayandin
2c7ce7ca0e DO NOT MERGE: trigger pipeline on branch 2023-05-25 18:10:43 +01:00
Alexander Bayandin
0baf82296a Generate code coverage report in json and lcov formats 2023-05-25 18:10:43 +01:00
Alexander Bayandin
6af13ba964 Fix test_metric_collection 2023-05-25 17:24:02 +01:00
Joonas Koivunen
85e76090ea test: fix ancestor is stopping flakyness (#4234)
Flakyness most likely introduced in #4170, detected in
https://neon-github-public-dev.s3.amazonaws.com/reports/pr-4232/4980691289/index.html#suites/542b1248464b42cc5a4560f408115965/18e623585e47af33.

Opted to allow it globally because it can happen in other tests as well,
basically whenever compaction is enabled and we stop pageserver
gracefully.
2023-05-25 16:22:58 +00:00
Alexander Bayandin
08e7d2407b Storage: use Postgres 15 as default (#2809) 2023-05-25 15:55:46 +01:00
Alex Chi Z
ab2757f64a bump dependencies version (#4336)
proceeding https://github.com/neondatabase/neon/pull/4237, this PR bumps
AWS dependencies along with all other dependencies to the latest
compatible semver.

Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-05-25 10:21:15 -04:00
Christian Schwarz
e5617021a7 refactor: eliminate global storage_broker client state (#4318)
(This is prep work to make `Timeline::activate` infallible.)

This patch removes the global storage_broker client instance from the
pageserver codebase.

Instead, pageserver startup instantiates it and passes it down to the
`Timeline::activate` function, which in turn passes it to the
WalReceiver, which is the entity that actually uses it.

Patch series:

- #4316
- #4317
- #4318
- #4319
2023-05-25 16:47:42 +03:00
Christian Schwarz
83ba02b431 tenant_status: don't InternalServerError if tenant not found (#4337)
Note this also changes the status code to the (correct) 404. Not sure if
that's relevant to Console.

Context:
https://neondb.slack.com/archives/C04PSBP2SAF/p1684746238831449?thread_ts=1684742106.169859&cid=C04PSBP2SAF

Atop #4300 because it cleans up the mgr::get_tenant() error type and I want eyes on that PR.
2023-05-25 11:38:04 +02:00
Christian Schwarz
37ecebe45b mgr::get_tenant: distinguished error type (#4300)
Before this patch, it would use error type `TenantStateError` which has
many more error variants than can actually happen with
`mgr::get_tenant`.

Along the way, I also introduced `SetNewTenantConfigError` because it
uses `mgr::get_tenant` and also can only fail in much fewer ways than
`TenantStateError` suggests.

The new `page_service.rs`'s `GetActiveTimelineError` and
`GetActiveTenantError` types were necessary to avoid an `Other` variant
on the `GetTenantError`.

This patch is a by-product of reading code that subscribes to
`Tenant::state` changes.
Can't really connect it to any given project.
2023-05-25 11:37:12 +02:00
Sasha Krassovsky
6052ecee07 Add connector extension to send Role/Database updates to console (#3891)
## Describe your changes

## Issue ticket number and link

## Checklist before requesting a review
- [x] I have performed a self-review of my code.
- [x] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.
2023-05-25 12:36:57 +03:00
Christian Schwarz
e11ba24ec5 tenant loops: operate on the Arc<Tenant> directly (#4298)
(Instead of going through mgr every iteration.)

The `wait_for_active_tenant` function's `wait` argument could be removed
because it was only used for the loop that waits for the tenant to show
up in the tenants map. Since we're passing the tenant in, we now longer
need to get it from the tenants map.

NB that there's no guarantee that the tenant object is in the tenants
map at the time the background loop function starts running. But the
tenant mgr guarantees that it will be quite soon. See
`tenant_map_insert` way upwards in the call hierarchy for details.

This is prep work to eliminate `subscribe_for_state_updates` (PR #4299 )

Fixes: #3501
2023-05-25 10:49:09 +02:00
Alex Chi Z
f276f21636 ci: use eu-central-1 bucket (#4315)
Probably increase CI success rate.

---------

Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-05-25 00:00:21 +03:00
Alex Chi Z
7126197000 pagectl: refactor ctl and support dump kv in delta (#4268)
This PR refactors the original page_binutils with a single tool pagectl,
use clap derive for better command line parsing, and adds the dump kv
tool to extract information from delta file. This helps me better
understand what's inside the page server. We can add support for other
types of file and more functionalities in the future.

---------

Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-05-24 19:36:07 +03:00
Christian Schwarz
afc48e2cd9 refactor responsibility for tenant/timeline activation (#4317)
(This is prep work to make `Timeline::activate()` infallible.)

The current possibility for failure in `Timeline::activate()` is the
broker client's presence / absence. It should be an assert, but we're
careful with these. So, I'm planning to pass in the broker client to
activate(), thereby eliminating the possiblity of its absence.

In the unit tests, we don't have a broker client. So, I thought I'd be
in trouble because the unit tests also called `activate()` before this
PR.

However, closer inspection reveals a long-standing FIXME about this,
which is addressed by this patch.

It turns out that the unit tests don't actually need the background
loops to be running. They just need the state value to be `Active`. So,
for the tests, we just set it to that value but don't spawn the
background loops.

We'll need to revisit this if we ever do more Rust unit tests in the
future. But right now, this refactoring improves the code, so, let's
revisit when we get there.

Patch series:

- #4316
- #4317
- #4318
- #4319
2023-05-24 16:54:11 +02:00
Christian Schwarz
df52587bef attach-time tenant config (#4255)
This PR adds support for supplying the tenant config upon /attach.

Before this change, when relocating a tenant using `/detach` and
`/attach`, the tenant config after `/attach` would be the default config
from `pageserver.toml`.
That is undesirable for settings such as the PITR-interval: if the
tenant's config on the source was `30 days` and the default config on
the attach-side is `7 days`, then the first GC run would eradicate 23
days worth of PITR capability.

The API change is backwards-compatible: if the body is empty, we
continue to use the default config.
We'll remove that capability as soon as the cloud.git code is updated to
use attach-time tenant config
(https://github.com/neondatabase/neon/issues/4282 keeps track of this).

unblocks https://github.com/neondatabase/cloud/issues/5092 
fixes https://github.com/neondatabase/neon/issues/1555
part of https://github.com/neondatabase/neon/issues/886 (Tenant
Relocation)

Implementation
==============

The preliminary PRs for this work were (most-recent to least-recent)

* https://github.com/neondatabase/neon/pull/4279
* https://github.com/neondatabase/neon/pull/4267
* https://github.com/neondatabase/neon/pull/4252
* https://github.com/neondatabase/neon/pull/4235
2023-05-24 17:46:30 +03:00
Alexander Bayandin
35bb10757d scripts/ingest_perf_test_result.py: increase connection timeout (#4329)
## Problem

Sometimes default connection timeout is not enough to connect to the DB
with perf test results, [an
example](https://github.com/neondatabase/neon/actions/runs/5064263522/jobs/9091692868#step:10:332).

Similar changes were made for similar scripts:
- For `scripts/flaky_tests.py` in
https://github.com/neondatabase/neon/pull/4096
- For `scripts/ingest_regress_test_result.py` in
https://github.com/neondatabase/neon/pull/2367 (from the very
begginning)

## Summary of changes
- Connection timeout increased to 30s for
`scripts/ingest_perf_test_result.py`
2023-05-24 10:11:24 -04:00
Alexander Bayandin
2a3f54002c test_runner: update dependencies (#4328)
## Problem

`pytest` 6 truncates error messages and this is not configured.
It's fixed in `pytest` 7, it prints the whole message (truncating limit
is higher) if `--verbose` is set (it's set on CI).

## Summary of changes
- `pytest` and `pytest` plugins are updated to their latest versions
- linters (`black` and `ruff`) are updated to their latest versions
- `mypy` and types are updated to their latest versions, new warnings
are fixed
- while we're here, allure updated its latest version as well
2023-05-24 12:47:01 +01:00
Joonas Koivunen
f3769d45ae chore: upgrade tokio to 1.28.1 (#4294)
no major changes, but this is the most recent LTS release and will be
required by #4292.
2023-05-24 08:15:39 +03:00
Arseny Sher
c200ebc096 proxy: log endpoint name everywhere.
Checking out proxy logs for the endpoint is a frequent (often first) operation
during user issues investigation; let's remove endpoint id -> session id mapping
annoying extra step here.
2023-05-24 09:11:23 +04:00
Konstantin Knizhnik
417f37b2e8 Pass set of wanted image layers from GC to compaction (#3673)
## Describe your changes

Right now the only criteria for image layer generation is number of
delta layer since last image layer.
If we have "stairs" layout of delta layers (see link below) then it can
happen that there a lot of old delta layers which can not be reclaimed
by GC because are not fully covered with image layers.

This PR constructs list of "wanted" image layers in GC (which image
layers are needed to be able to remove old layers)
and pass this list to compaction task which performs generation of image
layers.
So right now except deltas count criteria we also take in account
"wishes" of GC.

## Issue ticket number and link

See 
https://neondb.slack.com/archives/C033RQ5SPDH/p1676914249982519


## Checklist before requesting a review
- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

---------

Co-authored-by: Joonas Koivunen <joonas@neon.tech>
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2023-05-24 08:01:41 +03:00
sharnoff
7f1973f8ac bump vm-builder, use Neon-specific version (#4155)
In the v0.6.0 release, vm-builder was changed to be Neon-specific, so
it's handling all the stuff that Dockerfile.vm-compute-node used to do.

This commit bumps vm-builder to v0.7.3-alpha3.
2023-05-23 15:20:20 -07:00
Christian Schwarz
00f7fc324d tenant_map_insert: don't expose the vacant entry to the closure (#4316)
This tightens up the API a little.
Byproduct of some refactoring work that I'm doing right now.
2023-05-23 15:16:12 -04:00
Stas Kelvich
dad3519351 Add SQL-over-HTTP endpoint to Proxy
This commit introduces an SQL-over-HTTP endpoint in the proxy, with a JSON
response structure resembling that of the node-postgres driver. This method,
using HTTP POST, achieves smaller amortized latencies in edge setups due to
fewer round trips and an enhanced open connection reuse by the v8 engine.

This update involves several intricacies:
1. SQL injection protection: We employed the extended query protocol, modifying
   the rust-postgres driver to send queries in one roundtrip using a text
   protocol rather than binary, bypassing potential issues like those identified
   in https://github.com/sfackler/rust-postgres/issues/1030.

2. Postgres type compatibility: As not all postgres types have binary
   representations (e.g., acl's in pg_class), we adjusted rust-postgres to
   respond with text protocol, simplifying serialization and fixing queries with
   text-only types in response.

3. Data type conversion: Considering JSON supports fewer data types than
   Postgres, we perform conversions where possible, passing all other types as
   strings. Key conversions include:
   - postgres int2, int4, float4, float8 -> json number (NaN and Inf remain
     text)
   - postgres bool, null, text -> json bool, null, string
   - postgres array -> json array
   - postgres json and jsonb -> json object

4. Alignment with node-postgres: To facilitate integration with js libraries,
   we've matched the response structure of node-postgres, returning command tags
   and column oids. Command tag capturing was added to the rust-postgres
   functionality as part of this change.
2023-05-23 20:01:40 +03:00
dependabot[bot]
d75b4e0f16 Bump requests from 2.28.1 to 2.31.0 (#4305) 2023-05-23 14:54:51 +01:00
Christian Schwarz
4d41b2d379 fix: max_lsn_wal_lag broken in tenant conf (#4279)
This patch fixes parsing of the `max_lsn_wal_lag` tenant config item.
We were incorrectly expecting a string before, but the type is a
NonZeroU64.

So, when setting it in the config, the (updated) test case would fail
with

```
 E       psycopg2.errors.InternalError_: Tenant a1fa9cc383e32ddafb73ff920de5f2e6 will not become active. Current state: Broken due to: Failed to parse config from file '.../repo/tenants/a1fa9cc383e32ddafb73ff920de5f2e6/config' as pageserver config: configure option max_lsn_wal_lag is not a string. Backtrace:
```

So, not even the assertions added are necessary.

The test coverage for tenant config is rather thin in general.
For example, the `test_tenant_conf.py` test doesn't cover all the
options.

I'll add a new regression test as part of attach-time-tenant-conf PR
https://github.com/neondatabase/neon/pull/4255
2023-05-23 16:29:59 +03:00
Shany Pozin
d6cf347670 Add an option to set "latest gc cutoff lsn" in pageserver binutils (#4290)
## Problem
[#2539](https://github.com/neondatabase/neon/issues/2539)
## Summary of changes
Add support  for latest_gc_cutoff_lsn update in pageserver_binutils
2023-05-23 15:48:43 +03:00
73 changed files with 4084 additions and 1305 deletions

View File

@@ -57,14 +57,14 @@ runs:
if ! which allure; then
ALLURE_ZIP=allure-${ALLURE_VERSION}.zip
wget -q https://github.com/allure-framework/allure2/releases/download/${ALLURE_VERSION}/${ALLURE_ZIP}
echo "${ALLURE_ZIP_MD5} ${ALLURE_ZIP}" | md5sum -c
echo "${ALLURE_ZIP_SHA256} ${ALLURE_ZIP}" | sha256sum --check
unzip -q ${ALLURE_ZIP}
echo "$(pwd)/allure-${ALLURE_VERSION}/bin" >> $GITHUB_PATH
rm -f ${ALLURE_ZIP}
fi
env:
ALLURE_VERSION: 2.22.0
ALLURE_ZIP_MD5: d5c9f0989b896482536956340a7d5ec9
ALLURE_VERSION: 2.22.1
ALLURE_ZIP_SHA256: fdc7a62d94b14c5e0bf25198ae1feded6b005fdbed864b4d3cb4e5e901720b0b
# Potentially we could have several running build for the same key (for example, for the main branch), so we use improvised lock for this
- name: Acquire lock

View File

@@ -36,14 +36,6 @@ inputs:
description: 'Region name for real s3 tests'
required: false
default: ''
real_s3_access_key_id:
description: 'Access key id'
required: false
default: ''
real_s3_secret_access_key:
description: 'Secret access key'
required: false
default: ''
rerun_flaky:
description: 'Whether to rerun flaky tests'
required: false
@@ -104,8 +96,6 @@ runs:
COMPATIBILITY_POSTGRES_DISTRIB_DIR: /tmp/neon-previous/pg_install
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: ${{ inputs.build_type }}
AWS_ACCESS_KEY_ID: ${{ inputs.real_s3_access_key_id }}
AWS_SECRET_ACCESS_KEY: ${{ inputs.real_s3_secret_access_key }}
COMPATIBILITY_SNAPSHOT_DIR: /tmp/compatibility_snapshot_pg${{ inputs.pg_version }}
ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE: contains(github.event.pull_request.labels.*.name, 'backward compatibility breakage')
ALLOW_FORWARD_COMPATIBILITY_BREAKAGE: contains(github.event.pull_request.labels.*.name, 'forward compatibility breakage')

View File

@@ -5,6 +5,7 @@ on:
branches:
- main
- release
- bayandin/code-coverage
pull_request:
defaults:
@@ -346,10 +347,8 @@ jobs:
test_selection: regress
needs_postgres_source: true
run_with_real_s3: true
real_s3_bucket: ci-tests-s3
real_s3_region: us-west-2
real_s3_access_key_id: "${{ secrets.AWS_ACCESS_KEY_ID_CI_TESTS_S3 }}"
real_s3_secret_access_key: "${{ secrets.AWS_SECRET_ACCESS_KEY_CI_TESTS_S3 }}"
real_s3_bucket: neon-github-ci-tests
real_s3_region: eu-central-1
rerun_flaky: true
pg_version: ${{ matrix.pg_version }}
env:
@@ -496,19 +495,29 @@ jobs:
env:
COMMIT_URL: ${{ github.server_url }}/${{ github.repository }}/commit/${{ github.event.pull_request.head.sha || github.sha }}
run: |
scripts/coverage \
--dir=/tmp/coverage report \
scripts/coverage --dir=/tmp/coverage \
report \
--input-objects=/tmp/coverage/binaries.list \
--commit-url=${COMMIT_URL} \
--format=github
scripts/coverage --dir=/tmp/coverage \
report \
--input-objects=/tmp/coverage/binaries.list \
--format=lcov
scripts/coverage --dir=/tmp/coverage \
report \
--input-objects=/tmp/coverage/binaries.list \
--format=json
- name: Upload coverage report
id: upload-coverage-report
env:
BUCKET: neon-github-public-dev
COMMIT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
run: |
aws s3 cp --only-show-errors --recursive /tmp/coverage/report s3://neon-github-public-dev/code-coverage/${COMMIT_SHA}
aws s3 cp --only-show-errors --recursive /tmp/coverage/report s3://${BUCKET}/code-coverage/${COMMIT_SHA}
REPORT_URL=https://${BUCKET}.s3.amazonaws.com/code-coverage/${COMMIT_SHA}/index.html
echo "report-url=${REPORT_URL}" >> $GITHUB_OUTPUT
@@ -663,6 +672,9 @@ jobs:
project: nrdv0s4kcs
push: true
tags: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:depot-${{needs.tag.outputs.build-tag}}
build-args: |
GIT_VERSION=${{ github.sha }}
REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
compute-tools-image:
runs-on: [ self-hosted, gen3, large ]
@@ -777,7 +789,7 @@ jobs:
run:
shell: sh -eu {0}
env:
VM_BUILDER_VERSION: v0.4.6
VM_BUILDER_VERSION: v0.7.3-alpha3
steps:
- name: Checkout
@@ -787,21 +799,18 @@ jobs:
- name: Downloading vm-builder
run: |
curl -L https://github.com/neondatabase/neonvm/releases/download/$VM_BUILDER_VERSION/vm-builder -o vm-builder
curl -fL https://github.com/neondatabase/autoscaling/releases/download/$VM_BUILDER_VERSION/vm-builder -o vm-builder
chmod +x vm-builder
# Note: we need a separate pull step here because otherwise vm-builder will try to pull, and
# it won't have the proper authentication (written at v0.6.0)
- name: Pulling compute-node image
run: |
docker pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
- name: Building VM compute-node rootfs
run: |
docker build -t temp-vm-compute-node --build-arg SRC_IMAGE=369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} -f Dockerfile.vm-compute-node .
- name: Build vm image
run: |
# note: as of 2023-01-12, vm-builder requires a trailing ":latest" for local images
./vm-builder -use-inittab -src=temp-vm-compute-node:latest -dst=369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
./vm-builder -src=369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} -dst=369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
- name: Pushing vm-compute-node image
run: |

647
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -3,6 +3,7 @@ members = [
"compute_tools",
"control_plane",
"pageserver",
"pageserver/ctl",
"proxy",
"safekeeper",
"storage_broker",
@@ -22,7 +23,7 @@ async-stream = "0.3"
async-trait = "0.1"
atty = "0.2.14"
aws-config = { version = "0.55", default-features = false, features=["rustls"] }
aws-sdk-s3 = "0.25"
aws-sdk-s3 = "0.27"
aws-smithy-http = "0.55"
aws-credential-types = "0.55"
aws-types = "0.55"
@@ -126,11 +127,11 @@ env_logger = "0.10"
log = "0.4"
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" }
tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df61437de0feef49ba2ccdbdd94eb8ad6e142" }
## Other git libraries
@@ -166,7 +167,7 @@ tonic-build = "0.9"
# This is only needed for proxy's tests.
# TODO: we should probably fork `tokio-postgres-rustls` instead.
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" }
# Changes the MAX_THREADS limit from 4096 to 32768.
# This is a temporary workaround for using tracing from many threads in safekeepers code,

View File

@@ -47,8 +47,7 @@ RUN set -e \
&& mold -run cargo build \
--bin pg_sni_router \
--bin pageserver \
--bin pageserver_binutils \
--bin draw_timeline_dir \
--bin pagectl \
--bin safekeeper \
--bin storage_broker \
--bin proxy \
@@ -73,8 +72,7 @@ RUN set -e \
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pg_sni_router /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver_binutils /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/draw_timeline_dir /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pagectl /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_broker /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin

View File

@@ -632,6 +632,7 @@ RUN apt update && \
libxml2 \
libxslt1.1 \
libzstd1 \
libcurl4-openssl-dev \
procps && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8

View File

@@ -1,70 +0,0 @@
# Note: this file *mostly* just builds on Dockerfile.compute-node
ARG SRC_IMAGE
ARG VM_INFORMANT_VERSION=v0.1.14
# on libcgroup update, make sure to check bootstrap.sh for changes
ARG LIBCGROUP_VERSION=v2.0.3
# Pull VM informant, to copy from later
FROM neondatabase/vm-informant:$VM_INFORMANT_VERSION as informant
# Build cgroup-tools
#
# At time of writing (2023-03-14), debian bullseye has a version of cgroup-tools (technically
# libcgroup) that doesn't support cgroup v2 (version 0.41-11). Unfortunately, the vm-informant
# requires cgroup v2, so we'll build cgroup-tools ourselves.
FROM debian:bullseye-slim as libcgroup-builder
ARG LIBCGROUP_VERSION
RUN set -exu \
&& apt update \
&& apt install --no-install-recommends -y \
git \
ca-certificates \
automake \
cmake \
make \
gcc \
byacc \
flex \
libtool \
libpam0g-dev \
&& git clone --depth 1 -b $LIBCGROUP_VERSION https://github.com/libcgroup/libcgroup \
&& INSTALL_DIR="/libcgroup-install" \
&& mkdir -p "$INSTALL_DIR/bin" "$INSTALL_DIR/include" \
&& cd libcgroup \
# extracted from bootstrap.sh, with modified flags:
&& (test -d m4 || mkdir m4) \
&& autoreconf -fi \
&& rm -rf autom4te.cache \
&& CFLAGS="-O3" ./configure --prefix="$INSTALL_DIR" --sysconfdir=/etc --localstatedir=/var --enable-opaque-hierarchy="name=systemd" \
# actually build the thing...
&& make install
# Combine, starting from non-VM compute node image.
FROM $SRC_IMAGE as base
# Temporarily set user back to root so we can run adduser, set inittab
USER root
RUN adduser vm-informant --disabled-password --no-create-home
RUN set -e \
&& rm -f /etc/inittab \
&& touch /etc/inittab
RUN set -e \
&& echo "::sysinit:cgconfigparser -l /etc/cgconfig.conf -s 1664" >> /etc/inittab \
&& CONNSTR="dbname=postgres user=cloud_admin sslmode=disable" \
&& ARGS="--auto-restart --cgroup=neon-postgres --pgconnstr=\"$CONNSTR\"" \
&& echo "::respawn:su vm-informant -c '/usr/local/bin/vm-informant $ARGS'" >> /etc/inittab
USER postgres
ADD vm-cgconfig.conf /etc/cgconfig.conf
COPY --from=informant /usr/bin/vm-informant /usr/local/bin/vm-informant
COPY --from=libcgroup-builder /libcgroup-install/bin/* /usr/bin/
COPY --from=libcgroup-builder /libcgroup-install/lib/* /usr/lib/
COPY --from=libcgroup-builder /libcgroup-install/sbin/* /usr/sbin/
ENTRYPOINT ["/usr/sbin/cgexec", "-g", "*:neon-postgres", "/usr/local/bin/compute_ctl"]

View File

@@ -362,6 +362,8 @@ impl ComputeNode {
};
// Proceed with post-startup configuration. Note, that order of operations is important.
// Disable DDL forwarding because control plane already knows about these roles/databases.
client.simple_query("SET neon.forward_ddl = false")?;
let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec;
handle_roles(spec, &mut client)?;
handle_databases(spec, &mut client)?;
@@ -403,7 +405,9 @@ impl ComputeNode {
self.pg_reload_conf(&mut client)?;
// Proceed with post-startup configuration. Note, that order of operations is important.
// Disable DDL forwarding because control plane already knows about these roles/databases.
if spec.mode == ComputeMode::Primary {
client.simple_query("SET neon.forward_ddl = false")?;
handle_roles(&spec, &mut client)?;
handle_databases(&spec, &mut client)?;
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;

View File

@@ -121,9 +121,8 @@ impl RoleExt for Role {
/// string of arguments.
fn to_pg_options(&self) -> String {
// XXX: consider putting LOGIN as a default option somewhere higher, e.g. in control-plane.
// For now, we do not use generic `options` for roles. Once used, add
// `self.options.as_pg_options()` somewhere here.
let mut params: String = "LOGIN".to_string();
let mut params: String = self.options.as_pg_options();
params.push_str(" LOGIN");
if let Some(pass) = &self.encrypted_password {
// Some time ago we supported only md5 and treated all encrypted_password as md5.

View File

@@ -62,7 +62,7 @@ fn do_control_plane_request(
}
}
/// Request spec from the control-plane by compute_id. If `NEON_CONSOLE_JWT`
/// Request spec from the control-plane by compute_id. If `NEON_CONTROL_PLANE_TOKEN`
/// env variable is set, it will be used for authorization.
pub fn get_spec_from_control_plane(
base_uri: &str,

View File

@@ -16,7 +16,7 @@ mod pg_helpers_tests {
);
assert_eq!(
spec.cluster.roles.first().unwrap().to_pg_options(),
"LOGIN PASSWORD 'md56b1d16b78004bbd51fa06af9eda75972'"
" LOGIN PASSWORD 'md56b1d16b78004bbd51fa06af9eda75972'"
);
}

View File

@@ -41,7 +41,7 @@ const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1);
const DEFAULT_BRANCH_NAME: &str = "main";
project_git_version!(GIT_VERSION);
const DEFAULT_PG_VERSION: &str = "14";
const DEFAULT_PG_VERSION: &str = "15";
fn default_conf() -> String {
format!(

View File

@@ -24,7 +24,7 @@ use utils::{
use crate::safekeeper::SafekeeperNode;
pub const DEFAULT_PG_VERSION: u32 = 14;
pub const DEFAULT_PG_VERSION: u32 = 15;
//
// This data structures represents neon_local CLI config

View File

@@ -234,6 +234,28 @@ impl TenantConfigRequest {
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TenantAttachRequest {
pub config: TenantAttachConfig,
}
/// Newtype to enforce deny_unknown_fields on TenantConfig for
/// its usage inside `TenantAttachRequest`.
#[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct TenantAttachConfig {
#[serde(flatten)]
allowing_unknown_fields: TenantConfig,
}
impl std::ops::Deref for TenantAttachConfig {
type Target = TenantConfig;
fn deref(&self) -> &Self::Target {
&self.allowing_unknown_fields
}
}
/// See [`TenantState::attachment_status`] and the OpenAPI docs for context.
#[derive(Serialize, Deserialize, Clone)]
#[serde(rename_all = "snake_case")]
@@ -796,5 +818,17 @@ mod tests {
"expect unknown field `unknown_field` error, got: {}",
err
);
let attach_request = json!({
"config": {
"unknown_field": "unknown_value".to_string(),
},
});
let err = serde_json::from_value::<TenantAttachRequest>(attach_request).unwrap_err();
assert!(
err.to_string().contains("unknown field `unknown_field`"),
"expect unknown field `unknown_field` error, got: {}",
err
);
}
}

View File

@@ -8,12 +8,26 @@ use super::error::ApiError;
pub async fn json_request<T: for<'de> Deserialize<'de>>(
request: &mut Request<Body>,
) -> Result<T, ApiError> {
let whole_body = hyper::body::aggregate(request.body_mut())
json_request_or_empty_body(request)
.await?
.context("missing request body")
.map_err(ApiError::BadRequest)
}
/// Will be removed as part of https://github.com/neondatabase/neon/issues/4282
pub async fn json_request_or_empty_body<T: for<'de> Deserialize<'de>>(
request: &mut Request<Body>,
) -> Result<Option<T>, ApiError> {
let body = hyper::body::aggregate(request.body_mut())
.await
.context("Failed to read request body")
.map_err(ApiError::BadRequest)?;
serde_json::from_reader(whole_body.reader())
if body.remaining() == 0 {
return Ok(None);
}
serde_json::from_reader(body.reader())
.context("Failed to parse json request")
.map(Some)
.map_err(ApiError::BadRequest)
}

18
pageserver/ctl/Cargo.toml Normal file
View File

@@ -0,0 +1,18 @@
[package]
name = "pagectl"
version = "0.1.0"
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow.workspace = true
bytes.workspace = true
clap = { workspace = true, features = ["string"] }
git-version.workspace = true
pageserver = { path = ".." }
postgres_ffi.workspace = true
utils.workspace = true
svg_fmt.workspace = true
workspace_hack.workspace = true

View File

@@ -12,7 +12,7 @@
//! Example use:
//! ```
//! $ ls test_output/test_pgbench\[neon-45-684\]/repo/tenants/$TENANT/timelines/$TIMELINE | \
//! $ grep "__" | cargo run --release --bin draw_timeline_dir > out.svg
//! $ grep "__" | cargo run --release --bin pagectl draw-timeline-dir > out.svg
//! $ firefox out.svg
//! ```
//!
@@ -62,7 +62,7 @@ fn parse_filename(name: &str) -> (Range<Key>, Range<Lsn>) {
(keys, lsns)
}
fn main() -> Result<()> {
pub fn main() -> Result<()> {
// Parse layer filenames from stdin
let mut ranges: Vec<(Range<Key>, Range<Lsn>)> = vec![];
let stdin = io::stdin();

View File

@@ -6,7 +6,7 @@ use anyhow::Result;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::ops::Range;
use std::{env, fs, path::Path, path::PathBuf, str, str::FromStr};
use std::{fs, path::Path, str};
use pageserver::page_cache::PAGE_SZ;
use pageserver::repository::{Key, KEY_SIZE};
@@ -18,12 +18,14 @@ use pageserver::virtual_file::VirtualFile;
use utils::{bin_ser::BeSer, lsn::Lsn};
use crate::AnalyzeLayerMapCmd;
const MIN_HOLE_LENGTH: i128 = (128 * 1024 * 1024 / PAGE_SZ) as i128;
const DEFAULT_MAX_HOLES: usize = 10;
/// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
#[derive(PartialEq, Eq)]
struct Hole(Range<Key>);
pub struct Hole(Range<Key>);
impl Ord for Hole {
fn cmp(&self, other: &Self) -> Ordering {
@@ -39,11 +41,11 @@ impl PartialOrd for Hole {
}
}
struct LayerFile {
key_range: Range<Key>,
lsn_range: Range<Lsn>,
is_delta: bool,
holes: Vec<Hole>,
pub(crate) struct LayerFile {
pub key_range: Range<Key>,
pub lsn_range: Range<Lsn>,
pub is_delta: bool,
pub holes: Vec<Hole>,
}
impl LayerFile {
@@ -67,7 +69,7 @@ impl LayerFile {
}
}
fn parse_filename(name: &str) -> Option<LayerFile> {
pub(crate) fn parse_filename(name: &str) -> Option<LayerFile> {
let split: Vec<&str> = name.split("__").collect();
if split.len() != 2 {
return None;
@@ -127,18 +129,9 @@ fn get_holes(path: &Path, max_holes: usize) -> Result<Vec<Hole>> {
Ok(holes)
}
fn main() -> Result<()> {
let args: Vec<String> = env::args().collect();
if args.len() < 2 {
println!("Usage: layer_map_analyzer PAGESERVER_DATA_DIR [MAX_HOLES]");
return Ok(());
}
let storage_path = PathBuf::from_str(&args[1])?;
let max_holes = if args.len() > 2 {
args[2].parse::<usize>().unwrap()
} else {
DEFAULT_MAX_HOLES
};
pub(crate) fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
let storage_path = &cmd.path;
let max_holes = cmd.max_holes.unwrap_or(DEFAULT_MAX_HOLES);
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
pageserver::virtual_file::init(10);

View File

@@ -0,0 +1,169 @@
use std::path::{Path, PathBuf};
use anyhow::Result;
use clap::Subcommand;
use pageserver::tenant::block_io::BlockCursor;
use pageserver::tenant::disk_btree::DiskBtreeReader;
use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
use pageserver::{page_cache, virtual_file};
use pageserver::{
repository::{Key, KEY_SIZE},
tenant::{
block_io::FileBlockReader, disk_btree::VisitDirection,
storage_layer::delta_layer::DELTA_KEY_SIZE,
},
virtual_file::VirtualFile,
};
use std::fs;
use utils::bin_ser::BeSer;
use crate::layer_map_analyzer::parse_filename;
#[derive(Subcommand)]
pub(crate) enum LayerCmd {
/// List all tenants and timelines under the pageserver path
///
/// Example: `cargo run --bin pagectl layer list .neon/`
List { path: PathBuf },
/// List all layers of a given tenant and timeline
///
/// Example: `cargo run --bin pagectl layer list .neon/`
ListLayer {
path: PathBuf,
tenant: String,
timeline: String,
},
/// Dump all information of a layer file
DumpLayer {
path: PathBuf,
tenant: String,
timeline: String,
/// The id from list-layer command
id: usize,
},
}
fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
use pageserver::tenant::blob_io::BlobCursor;
use pageserver::tenant::block_io::BlockReader;
let path = path.as_ref();
virtual_file::init(10);
page_cache::init(100);
let file = FileBlockReader::new(VirtualFile::open(path)?);
let summary_blk = file.read_blk(0)?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
actual_summary.index_start_blk,
actual_summary.index_root_blk,
&file,
);
// TODO(chi): dedup w/ `delta_layer.rs` by exposing the API.
let mut all = vec![];
tree_reader.visit(
&[0u8; DELTA_KEY_SIZE],
VisitDirection::Forwards,
|key, value_offset| {
let curr = Key::from_slice(&key[..KEY_SIZE]);
all.push((curr, BlobRef(value_offset)));
true
},
)?;
let mut cursor = BlockCursor::new(&file);
for (k, v) in all {
let value = cursor.read_blob(v.pos())?;
println!("key:{} value_len:{}", k, value.len());
}
// TODO(chi): special handling for last key?
Ok(())
}
pub(crate) fn main(cmd: &LayerCmd) -> Result<()> {
match cmd {
LayerCmd::List { path } => {
for tenant in fs::read_dir(path.join("tenants"))? {
let tenant = tenant?;
if !tenant.file_type()?.is_dir() {
continue;
}
println!("tenant {}", tenant.file_name().to_string_lossy());
for timeline in fs::read_dir(tenant.path().join("timelines"))? {
let timeline = timeline?;
if !timeline.file_type()?.is_dir() {
continue;
}
println!("- timeline {}", timeline.file_name().to_string_lossy());
}
}
}
LayerCmd::ListLayer {
path,
tenant,
timeline,
} => {
let timeline_path = path
.join("tenants")
.join(tenant)
.join("timelines")
.join(timeline);
let mut idx = 0;
for layer in fs::read_dir(timeline_path)? {
let layer = layer?;
if let Some(layer_file) = parse_filename(&layer.file_name().into_string().unwrap())
{
println!(
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
idx,
layer_file.key_range.start,
layer_file.key_range.end,
layer_file.lsn_range.start,
layer_file.lsn_range.end,
layer_file.is_delta,
);
idx += 1;
}
}
}
LayerCmd::DumpLayer {
path,
tenant,
timeline,
id,
} => {
let timeline_path = path
.join("tenants")
.join(tenant)
.join("timelines")
.join(timeline);
let mut idx = 0;
for layer in fs::read_dir(timeline_path)? {
let layer = layer?;
if let Some(layer_file) = parse_filename(&layer.file_name().into_string().unwrap())
{
if *id == idx {
// TODO(chi): dedup code
println!(
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
idx,
layer_file.key_range.start,
layer_file.key_range.end,
layer_file.lsn_range.start,
layer_file.lsn_range.end,
layer_file.is_delta,
);
if layer_file.is_delta {
read_delta_file(layer.path())?;
} else {
anyhow::bail!("not supported yet :(");
}
break;
}
idx += 1;
}
}
}
}
Ok(())
}

179
pageserver/ctl/src/main.rs Normal file
View File

@@ -0,0 +1,179 @@
//! A helper tool to manage pageserver binary files.
//! Accepts a file as an argument, attempts to parse it with all ways possible
//! and prints its interpreted context.
//!
//! Separate, `metadata` subcommand allows to print and update pageserver's metadata file.
mod draw_timeline_dir;
mod layer_map_analyzer;
mod layers;
use clap::{Parser, Subcommand};
use layers::LayerCmd;
use pageserver::{
context::{DownloadBehavior, RequestContext},
page_cache,
task_mgr::TaskKind,
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
virtual_file,
};
use postgres_ffi::ControlFileData;
use std::path::{Path, PathBuf};
use utils::{lsn::Lsn, project_git_version};
project_git_version!(GIT_VERSION);
#[derive(Parser)]
#[command(
version = GIT_VERSION,
about = "Neon Pageserver binutils",
long_about = "Reads pageserver (and related) binary files management utility"
)]
#[command(propagate_version = true)]
struct CliOpts {
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
Metadata(MetadataCmd),
PrintLayerFile(PrintLayerFileCmd),
DrawTimeline {},
AnalyzeLayerMap(AnalyzeLayerMapCmd),
#[command(subcommand)]
Layer(LayerCmd),
}
/// Read and update pageserver metadata file
#[derive(Parser)]
struct MetadataCmd {
/// Input metadata file path
metadata_path: PathBuf,
/// Replace disk consistent Lsn
disk_consistent_lsn: Option<Lsn>,
/// Replace previous record Lsn
prev_record_lsn: Option<Lsn>,
/// Replace latest gc cuttoff
latest_gc_cuttoff: Option<Lsn>,
}
#[derive(Parser)]
struct PrintLayerFileCmd {
/// Pageserver data path
path: PathBuf,
}
#[derive(Parser)]
struct AnalyzeLayerMapCmd {
/// Pageserver data path
path: PathBuf,
/// Max holes
max_holes: Option<usize>,
}
fn main() -> anyhow::Result<()> {
let cli = CliOpts::parse();
match cli.command {
Commands::Layer(cmd) => {
layers::main(&cmd)?;
}
Commands::Metadata(cmd) => {
handle_metadata(&cmd)?;
}
Commands::DrawTimeline {} => {
draw_timeline_dir::main()?;
}
Commands::AnalyzeLayerMap(cmd) => {
layer_map_analyzer::main(&cmd)?;
}
Commands::PrintLayerFile(cmd) => {
if let Err(e) = read_pg_control_file(&cmd.path) {
println!(
"Failed to read input file as a pg control one: {e:#}\n\
Attempting to read it as layer file"
);
print_layerfile(&cmd.path)?;
}
}
};
Ok(())
}
fn read_pg_control_file(control_file_path: &Path) -> anyhow::Result<()> {
let control_file = ControlFileData::decode(&std::fs::read(control_file_path)?)?;
println!("{control_file:?}");
let control_file_initdb = Lsn(control_file.checkPoint);
println!(
"pg_initdb_lsn: {}, aligned: {}",
control_file_initdb,
control_file_initdb.align()
);
Ok(())
}
fn print_layerfile(path: &Path) -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup
virtual_file::init(10);
page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
dump_layerfile_from_path(path, true, &ctx)
}
fn handle_metadata(
MetadataCmd {
metadata_path: path,
disk_consistent_lsn,
prev_record_lsn,
latest_gc_cuttoff,
}: &MetadataCmd,
) -> Result<(), anyhow::Error> {
let metadata_bytes = std::fs::read(path)?;
let mut meta = TimelineMetadata::from_bytes(&metadata_bytes)?;
println!("Current metadata:\n{meta:?}");
let mut update_meta = false;
if let Some(disk_consistent_lsn) = disk_consistent_lsn {
meta = TimelineMetadata::new(
*disk_consistent_lsn,
meta.prev_record_lsn(),
meta.ancestor_timeline(),
meta.ancestor_lsn(),
meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(),
meta.pg_version(),
);
update_meta = true;
}
if let Some(prev_record_lsn) = prev_record_lsn {
meta = TimelineMetadata::new(
meta.disk_consistent_lsn(),
Some(*prev_record_lsn),
meta.ancestor_timeline(),
meta.ancestor_lsn(),
meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(),
meta.pg_version(),
);
update_meta = true;
}
if let Some(latest_gc_cuttoff) = latest_gc_cuttoff {
meta = TimelineMetadata::new(
meta.disk_consistent_lsn(),
meta.prev_record_lsn(),
meta.ancestor_timeline(),
meta.ancestor_lsn(),
*latest_gc_cuttoff,
meta.initdb_lsn(),
meta.pg_version(),
);
update_meta = true;
}
if update_meta {
let metadata_bytes = meta.to_bytes()?;
std::fs::write(path, metadata_bytes)?;
}
Ok(())
}

View File

@@ -9,6 +9,7 @@ use clap::{Arg, ArgAction, Command};
use fail::FailScenario;
use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
use pageserver::task_mgr::WALRECEIVER_RUNTIME;
use remote_storage::GenericRemoteStorage;
use tracing::*;
@@ -18,9 +19,7 @@ use pageserver::{
context::{DownloadBehavior, RequestContext},
http, page_cache, page_service, task_mgr,
task_mgr::TaskKind,
task_mgr::{
BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME, WALRECEIVER_RUNTIME,
},
task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME},
tenant::mgr,
virtual_file,
};
@@ -276,7 +275,18 @@ fn start_pageserver(
let pageserver_listener = tcp_listener::bind(pg_addr)?;
// Launch broker client
WALRECEIVER_RUNTIME.block_on(pageserver::broker_client::init_broker_client(conf))?;
// The storage_broker::connect call needs to happen inside a tokio runtime thread.
let broker_client = WALRECEIVER_RUNTIME
.block_on(async {
// Note: we do not attempt connecting here (but validate endpoints sanity).
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)
})
.with_context(|| {
format!(
"create broker client for uri={:?} keepalive_interval={:?}",
&conf.broker_endpoint, conf.broker_keepalive_interval,
)
})?;
// Initialize authentication for incoming connections
let http_auth;
@@ -326,7 +336,11 @@ fn start_pageserver(
let remote_storage = create_remote_storage_client(conf)?;
// Scan the local 'tenants/' directory and start loading the tenants
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(conf, remote_storage.clone()))?;
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
broker_client.clone(),
remote_storage.clone(),
))?;
// shared state between the disk-usage backed eviction background task and the http endpoint
// that allows triggering disk-usage based eviction manually. note that the http endpoint
@@ -351,6 +365,7 @@ fn start_pageserver(
conf,
launch_ts,
http_auth,
broker_client.clone(),
remote_storage,
disk_usage_eviction_state,
)?
@@ -427,6 +442,7 @@ fn start_pageserver(
async move {
page_service::libpq_listener_main(
conf,
broker_client,
pg_auth,
pageserver_listener,
conf.pg_auth_type,

View File

@@ -1,157 +0,0 @@
//! A helper tool to manage pageserver binary files.
//! Accepts a file as an argument, attempts to parse it with all ways possible
//! and prints its interpreted context.
//!
//! Separate, `metadata` subcommand allows to print and update pageserver's metadata file.
use std::{
path::{Path, PathBuf},
str::FromStr,
};
use anyhow::Context;
use clap::{value_parser, Arg, Command};
use pageserver::{
context::{DownloadBehavior, RequestContext},
page_cache,
task_mgr::TaskKind,
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
virtual_file,
};
use postgres_ffi::ControlFileData;
use utils::{lsn::Lsn, project_git_version};
project_git_version!(GIT_VERSION);
const METADATA_SUBCOMMAND: &str = "metadata";
fn main() -> anyhow::Result<()> {
let arg_matches = cli().get_matches();
match arg_matches.subcommand() {
Some((subcommand_name, subcommand_matches)) => {
let path = subcommand_matches
.get_one::<PathBuf>("metadata_path")
.context("'metadata_path' argument is missing")?
.to_path_buf();
anyhow::ensure!(
subcommand_name == METADATA_SUBCOMMAND,
"Unknown subcommand {subcommand_name}"
);
handle_metadata(&path, subcommand_matches)?;
}
None => {
let path = arg_matches
.get_one::<PathBuf>("path")
.context("'path' argument is missing")?
.to_path_buf();
println!(
"No subcommand specified, attempting to guess the format for file {}",
path.display()
);
if let Err(e) = read_pg_control_file(&path) {
println!(
"Failed to read input file as a pg control one: {e:#}\n\
Attempting to read it as layer file"
);
print_layerfile(&path)?;
}
}
};
Ok(())
}
fn read_pg_control_file(control_file_path: &Path) -> anyhow::Result<()> {
let control_file = ControlFileData::decode(&std::fs::read(control_file_path)?)?;
println!("{control_file:?}");
let control_file_initdb = Lsn(control_file.checkPoint);
println!(
"pg_initdb_lsn: {}, aligned: {}",
control_file_initdb,
control_file_initdb.align()
);
Ok(())
}
fn print_layerfile(path: &Path) -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup
virtual_file::init(10);
page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
dump_layerfile_from_path(path, true, &ctx)
}
fn handle_metadata(path: &Path, arg_matches: &clap::ArgMatches) -> Result<(), anyhow::Error> {
let metadata_bytes = std::fs::read(path)?;
let mut meta = TimelineMetadata::from_bytes(&metadata_bytes)?;
println!("Current metadata:\n{meta:?}");
let mut update_meta = false;
if let Some(disk_consistent_lsn) = arg_matches.get_one::<String>("disk_consistent_lsn") {
meta = TimelineMetadata::new(
Lsn::from_str(disk_consistent_lsn)?,
meta.prev_record_lsn(),
meta.ancestor_timeline(),
meta.ancestor_lsn(),
meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(),
meta.pg_version(),
);
update_meta = true;
}
if let Some(prev_record_lsn) = arg_matches.get_one::<String>("prev_record_lsn") {
meta = TimelineMetadata::new(
meta.disk_consistent_lsn(),
Some(Lsn::from_str(prev_record_lsn)?),
meta.ancestor_timeline(),
meta.ancestor_lsn(),
meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(),
meta.pg_version(),
);
update_meta = true;
}
if update_meta {
let metadata_bytes = meta.to_bytes()?;
std::fs::write(path, metadata_bytes)?;
}
Ok(())
}
fn cli() -> Command {
Command::new("Neon Pageserver binutils")
.about("Reads pageserver (and related) binary files management utility")
.version(GIT_VERSION)
.arg(
Arg::new("path")
.help("Input file path")
.value_parser(value_parser!(PathBuf))
.required(false),
)
.subcommand(
Command::new(METADATA_SUBCOMMAND)
.about("Read and update pageserver metadata file")
.arg(
Arg::new("metadata_path")
.help("Input metadata file path")
.value_parser(value_parser!(PathBuf))
.required(false),
)
.arg(
Arg::new("disk_consistent_lsn")
.long("disk_consistent_lsn")
.help("Replace disk consistent Lsn"),
)
.arg(
Arg::new("prev_record_lsn")
.long("prev_record_lsn")
.help("Replace previous record Lsn"),
),
)
}
#[test]
fn verify_cli() {
cli().debug_assert();
}

View File

@@ -1,48 +0,0 @@
//! The broker client instance of the pageserver, created during pageserver startup.
//! Used by each timelines' [`walreceiver`].
use crate::config::PageServerConf;
use anyhow::Context;
use once_cell::sync::OnceCell;
use storage_broker::BrokerClientChannel;
use tracing::*;
static BROKER_CLIENT: OnceCell<BrokerClientChannel> = OnceCell::new();
///
/// Initialize the broker client. This must be called once at page server startup.
///
pub async fn init_broker_client(conf: &'static PageServerConf) -> anyhow::Result<()> {
let broker_endpoint = conf.broker_endpoint.clone();
// Note: we do not attempt connecting here (but validate endpoints sanity).
let broker_client =
storage_broker::connect(broker_endpoint.clone(), conf.broker_keepalive_interval).context(
format!(
"Failed to create broker client to {}",
&conf.broker_endpoint
),
)?;
if BROKER_CLIENT.set(broker_client).is_err() {
panic!("broker already initialized");
}
info!(
"Initialized broker client with endpoints: {}",
broker_endpoint
);
Ok(())
}
///
/// Get a handle to the broker client
///
pub fn get_broker_client() -> &'static BrokerClientChannel {
BROKER_CLIENT.get().expect("broker client not initialized")
}
pub fn is_broker_client_initialized() -> bool {
BROKER_CLIENT.get().is_some()
}

View File

@@ -797,7 +797,8 @@ impl PageServerConf {
)?);
}
if let Some(max_lsn_wal_lag) = item.get("max_lsn_wal_lag") {
t_conf.max_lsn_wal_lag = Some(parse_toml_from_str("max_lsn_wal_lag", max_lsn_wal_lag)?);
t_conf.max_lsn_wal_lag =
Some(deserialize_from_item("max_lsn_wal_lag", max_lsn_wal_lag)?);
}
if let Some(trace_read_requests) = item.get("trace_read_requests") {
t_conf.trace_read_requests =

View File

@@ -363,11 +363,29 @@ paths:
* MUST NOT ASSUME that the request has been lost, based on the observation
that a subsequent tenant status request returns 404. The request may
still be in flight. It must be retried.
The client SHOULD supply a `TenantConfig` for the tenant in the request body.
Settings specified in the config override the pageserver's defaults.
It is guaranteed that the config settings are applied before the pageserver
starts operating on the tenant. E.g., if the config specifies a specific
PITR interval for a tenant, then that setting will be in effect before the
pageserver starts the garbage collection loop. This enables a client to
guarantee a specific PITR setting across detach/attach cycles.
The pageserver will reject the request if it cannot parse the config, or
if there are any unknown fields in it.
If the client does not supply a config, the pageserver will use its defaults.
This behavior is deprecated: https://github.com/neondatabase/neon/issues/4282
requestBody:
required: false
content:
application/json:
schema:
$ref: "#/components/schemas/TenantAttachRequest"
responses:
"202":
description: Tenant attaching scheduled
"400":
description: Error when no tenant id found in path parameters
content:
application/json:
schema:
@@ -922,6 +940,13 @@ components:
new_tenant_id:
type: string
format: hex
TenantAttachRequest:
type: object
required:
- config
properties:
config:
$ref: '#/components/schemas/TenantConfig'
TenantConfigRequest:
allOf:
- $ref: '#/components/schemas/TenantConfig'

View File

@@ -5,12 +5,14 @@ use anyhow::{anyhow, Context, Result};
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::DownloadRemoteLayersTaskSpawnRequest;
use pageserver_api::models::{DownloadRemoteLayersTaskSpawnRequest, TenantAttachRequest};
use remote_storage::GenericRemoteStorage;
use storage_broker::BrokerClientChannel;
use tenant_size_model::{SizeResult, StorageModel};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::http::endpoint::RequestSpan;
use utils::http::json::json_request_or_empty_body;
use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
use super::models::{
@@ -23,7 +25,9 @@ use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::mgr::{TenantMapInsertError, TenantStateError};
use crate::tenant::mgr::{
GetTenantError, SetNewTenantConfigError, TenantMapInsertError, TenantStateError,
};
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::LayerAccessStatsReset;
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
@@ -50,6 +54,7 @@ struct State {
auth: Option<Arc<JwtAuth>>,
allowlist_routes: Vec<Uri>,
remote_storage: Option<GenericRemoteStorage>,
broker_client: storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
}
@@ -58,6 +63,7 @@ impl State {
conf: &'static PageServerConf,
auth: Option<Arc<JwtAuth>>,
remote_storage: Option<GenericRemoteStorage>,
broker_client: storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
) -> anyhow::Result<Self> {
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"]
@@ -69,6 +75,7 @@ impl State {
auth,
allowlist_routes,
remote_storage,
broker_client,
disk_usage_eviction_state,
})
}
@@ -139,6 +146,36 @@ impl From<TenantStateError> for ApiError {
}
}
impl From<GetTenantError> for ApiError {
fn from(tse: GetTenantError) -> ApiError {
match tse {
GetTenantError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid)),
e @ GetTenantError::NotActive(_) => {
// Why is this not `ApiError::NotFound`?
// Because we must be careful to never return 404 for a tenant if it does
// in fact exist locally. If we did, the caller could draw the conclusion
// that it can attach the tenant to another PS and we'd be in split-brain.
//
// (We can produce this variant only in `mgr::get_tenant(..., active=true)` calls).
ApiError::InternalServerError(anyhow::Error::new(e))
}
}
}
}
impl From<SetNewTenantConfigError> for ApiError {
fn from(e: SetNewTenantConfigError) -> ApiError {
match e {
SetNewTenantConfigError::GetTenant(tid) => {
ApiError::NotFound(anyhow!("tenant {}", tid))
}
e @ SetNewTenantConfigError::Persist(_) => {
ApiError::InternalServerError(anyhow::Error::new(e))
}
}
}
}
impl From<crate::tenant::DeleteTimelineError> for ApiError {
fn from(value: crate::tenant::DeleteTimelineError) -> Self {
use crate::tenant::DeleteTimelineError::*;
@@ -158,7 +195,7 @@ impl From<crate::tenant::mgr::DeleteTimelineError> for ApiError {
match value {
// Report Precondition failed so client can distinguish between
// "tenant is missing" case from "timeline is missing"
Tenant(TenantStateError::NotFound(..)) => {
Tenant(GetTenantError::NotFound(..)) => {
ApiError::PreconditionFailed("Requested tenant is missing")
}
Tenant(t) => ApiError::from(t),
@@ -270,6 +307,8 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
let state = get_state(&request);
async {
let tenant = mgr::get_tenant(tenant_id, true).await?;
match tenant.create_timeline(
@@ -277,6 +316,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
request_data.ancestor_timeline_id.map(TimelineId::from),
request_data.ancestor_start_lsn,
request_data.pg_version.unwrap_or(crate::DEFAULT_PG_VERSION),
state.broker_client.clone(),
&ctx,
)
.await {
@@ -386,11 +426,16 @@ async fn get_lsn_by_timestamp_handler(request: Request<Body>) -> Result<Response
json_response(StatusCode::OK, result)
}
// TODO makes sense to provide tenant config right away the same way as it handled in tenant_create
async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
async fn tenant_attach_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let maybe_body: Option<TenantAttachRequest> = json_request_or_empty_body(&mut request).await?;
let tenant_conf = match maybe_body {
Some(request) => TenantConfOpt::try_from(&*request.config).map_err(ApiError::BadRequest)?,
None => TenantConfOpt::default(),
};
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
info!("Handling tenant attach {tenant_id}");
@@ -401,9 +446,8 @@ async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>,
mgr::attach_tenant(
state.conf,
tenant_id,
// XXX: Attach should provide the config, especially during tenant migration.
// See https://github.com/neondatabase/neon/issues/1555
TenantConfOpt::default(),
tenant_conf,
state.broker_client.clone(),
remote_storage.clone(),
&ctx,
)
@@ -453,9 +497,15 @@ async fn tenant_load_handler(request: Request<Body>) -> Result<Response<Body>, A
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let state = get_state(&request);
mgr::load_tenant(state.conf, tenant_id, state.remote_storage.clone(), &ctx)
.instrument(info_span!("load", tenant = %tenant_id))
.await?;
mgr::load_tenant(
state.conf,
tenant_id,
state.broker_client.clone(),
state.remote_storage.clone(),
&ctx,
)
.instrument(info_span!("load", tenant = %tenant_id))
.await?;
json_response(StatusCode::ACCEPTED, ())
}
@@ -507,7 +557,7 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
}
let state = tenant.current_state();
Ok(TenantInfo {
Result::<_, ApiError>::Ok(TenantInfo {
id: tenant_id,
state: state.clone(),
current_physical_size: Some(current_physical_size),
@@ -515,8 +565,7 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
})
}
.instrument(info_span!("tenant_status_handler", tenant = %tenant_id))
.await
.map_err(ApiError::InternalServerError)?;
.await?;
json_response(StatusCode::OK, tenant_info)
}
@@ -740,6 +789,7 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
state.conf,
tenant_conf,
target_tenant_id,
state.broker_client.clone(),
state.remote_storage.clone(),
&ctx,
)
@@ -1095,6 +1145,7 @@ pub fn make_router(
conf: &'static PageServerConf,
launch_ts: &'static LaunchTimestamp,
auth: Option<Arc<JwtAuth>>,
broker_client: BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
) -> anyhow::Result<RouterBuilder<hyper::Body, ApiError>> {
@@ -1141,8 +1192,14 @@ pub fn make_router(
Ok(router
.data(Arc::new(
State::new(conf, auth, remote_storage, disk_usage_eviction_state)
.context("Failed to initialize router state")?,
State::new(
conf,
auth,
remote_storage,
broker_client,
disk_usage_eviction_state,
)
.context("Failed to initialize router state")?,
))
.get("/v1/status", |r| RequestSpan(status_handler).handle(r))
.put(

View File

@@ -5,7 +5,7 @@ use std::ops::Range;
///
/// Represents a set of Keys, in a compact form.
///
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
pub struct KeySpace {
/// Contiguous ranges of keys that belong to the key space. In key order,
/// and with no overlap.
@@ -61,6 +61,18 @@ impl KeySpace {
KeyPartitioning { parts }
}
///
/// Check if key space contains overlapping range
///
pub fn overlaps(&self, range: &Range<Key>) -> bool {
match self.ranges.binary_search_by_key(&range.end, |r| r.start) {
Ok(0) => false,
Err(0) => false,
Ok(index) => self.ranges[index - 1].end > range.start,
Err(index) => self.ranges[index - 1].end > range.start,
}
}
}
///
@@ -129,3 +141,226 @@ impl KeySpaceAccum {
}
}
}
///
/// A helper object, to collect a set of keys and key ranges into a KeySpace
/// object. Key ranges may be inserted in any order and can overlap.
///
#[derive(Clone, Debug, Default)]
pub struct KeySpaceRandomAccum {
ranges: Vec<Range<Key>>,
}
impl KeySpaceRandomAccum {
pub fn new() -> Self {
Self { ranges: Vec::new() }
}
pub fn add_key(&mut self, key: Key) {
self.add_range(singleton_range(key))
}
pub fn add_range(&mut self, range: Range<Key>) {
self.ranges.push(range);
}
pub fn to_keyspace(mut self) -> KeySpace {
let mut ranges = Vec::new();
if !self.ranges.is_empty() {
self.ranges.sort_by_key(|r| r.start);
let mut start = self.ranges.first().unwrap().start;
let mut end = self.ranges.first().unwrap().end;
for r in self.ranges {
assert!(r.start >= start);
if r.start > end {
ranges.push(start..end);
start = r.start;
end = r.end;
} else if r.end > end {
end = r.end;
}
}
ranges.push(start..end);
}
KeySpace { ranges }
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fmt::Write;
// Helper function to create a key range.
//
// Make the tests below less verbose.
fn kr(irange: Range<i128>) -> Range<Key> {
Key::from_i128(irange.start)..Key::from_i128(irange.end)
}
#[allow(dead_code)]
fn dump_keyspace(ks: &KeySpace) {
for r in ks.ranges.iter() {
println!(" {}..{}", r.start.to_i128(), r.end.to_i128());
}
}
fn assert_ks_eq(actual: &KeySpace, expected: Vec<Range<Key>>) {
if actual.ranges != expected {
let mut msg = String::new();
writeln!(msg, "expected:").unwrap();
for r in &expected {
writeln!(msg, " {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
}
writeln!(msg, "got:").unwrap();
for r in &actual.ranges {
writeln!(msg, " {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
}
panic!("{}", msg);
}
}
#[test]
fn keyspace_add_range() {
// two separate ranges
//
// #####
// #####
let mut ks = KeySpaceRandomAccum::default();
ks.add_range(kr(0..10));
ks.add_range(kr(20..30));
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..10), kr(20..30)]);
// two separate ranges, added in reverse order
//
// #####
// #####
let mut ks = KeySpaceRandomAccum::default();
ks.add_range(kr(20..30));
ks.add_range(kr(0..10));
// add range that is adjacent to the end of an existing range
//
// #####
// #####
ks.add_range(kr(0..10));
ks.add_range(kr(10..30));
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
// add range that is adjacent to the start of an existing range
//
// #####
// #####
let mut ks = KeySpaceRandomAccum::default();
ks.add_range(kr(10..30));
ks.add_range(kr(0..10));
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
// add range that overlaps with the end of an existing range
//
// #####
// #####
let mut ks = KeySpaceRandomAccum::default();
ks.add_range(kr(0..10));
ks.add_range(kr(5..30));
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
// add range that overlaps with the start of an existing range
//
// #####
// #####
let mut ks = KeySpaceRandomAccum::default();
ks.add_range(kr(5..30));
ks.add_range(kr(0..10));
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
// add range that is fully covered by an existing range
//
// #########
// #####
let mut ks = KeySpaceRandomAccum::default();
ks.add_range(kr(0..30));
ks.add_range(kr(10..20));
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
// add range that extends an existing range from both ends
//
// #####
// #########
let mut ks = KeySpaceRandomAccum::default();
ks.add_range(kr(10..20));
ks.add_range(kr(0..30));
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
// add a range that overlaps with two existing ranges, joining them
//
// ##### #####
// #######
let mut ks = KeySpaceRandomAccum::default();
ks.add_range(kr(0..10));
ks.add_range(kr(20..30));
ks.add_range(kr(5..25));
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
}
#[test]
fn keyspace_overlaps() {
let mut ks = KeySpaceRandomAccum::default();
ks.add_range(kr(10..20));
ks.add_range(kr(30..40));
let ks = ks.to_keyspace();
// ##### #####
// xxxx
assert!(!ks.overlaps(&kr(0..5)));
// ##### #####
// xxxx
assert!(!ks.overlaps(&kr(5..9)));
// ##### #####
// xxxx
assert!(!ks.overlaps(&kr(5..10)));
// ##### #####
// xxxx
assert!(ks.overlaps(&kr(5..11)));
// ##### #####
// xxxx
assert!(ks.overlaps(&kr(10..15)));
// ##### #####
// xxxx
assert!(ks.overlaps(&kr(15..20)));
// ##### #####
// xxxx
assert!(ks.overlaps(&kr(15..25)));
// ##### #####
// xxxx
assert!(!ks.overlaps(&kr(22..28)));
// ##### #####
// xxxx
assert!(!ks.overlaps(&kr(25..30)));
// ##### #####
// xxxx
assert!(ks.overlaps(&kr(35..35)));
// ##### #####
// xxxx
assert!(!ks.overlaps(&kr(40..45)));
// ##### #####
// xxxx
assert!(!ks.overlaps(&kr(45..50)));
// ##### #####
// xxxxxxxxxxx
assert!(ks.overlaps(&kr(0..30))); // XXXXX This fails currently!
}
}

View File

@@ -1,6 +1,5 @@
mod auth;
pub mod basebackup;
pub mod broker_client;
pub mod config;
pub mod consumption_metrics;
pub mod context;
@@ -36,7 +35,7 @@ use tracing::info;
/// backwards-compatible changes to the metadata format.
pub const STORAGE_FORMAT_VERSION: u16 = 3;
pub const DEFAULT_PG_VERSION: u32 = 14;
pub const DEFAULT_PG_VERSION: u32 = 15;
// Magic constants used to identify different kinds of files
pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;

View File

@@ -50,7 +50,9 @@ use crate::import_datadir::import_wal_from_tar;
use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant;
use crate::tenant::mgr;
use crate::tenant::mgr::GetTenantError;
use crate::tenant::{Tenant, Timeline};
use crate::trace::Tracer;
@@ -172,6 +174,7 @@ async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<()
///
pub async fn libpq_listener_main(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<JwtAuth>>,
listener: TcpListener,
auth_type: AuthType,
@@ -213,7 +216,14 @@ pub async fn libpq_listener_main(
None,
"serving compute connection task",
false,
page_service_conn_main(conf, local_auth, socket, auth_type, connection_ctx),
page_service_conn_main(
conf,
broker_client.clone(),
local_auth,
socket,
auth_type,
connection_ctx,
),
);
}
Err(err) => {
@@ -230,6 +240,7 @@ pub async fn libpq_listener_main(
async fn page_service_conn_main(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<JwtAuth>>,
socket: tokio::net::TcpStream,
auth_type: AuthType,
@@ -266,7 +277,7 @@ async fn page_service_conn_main(
// and create a child per-query context when it invokes process_query.
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
// and create the per-query context in process_query ourselves.
let mut conn_handler = PageServerHandler::new(conf, auth, connection_ctx);
let mut conn_handler = PageServerHandler::new(conf, broker_client, auth, connection_ctx);
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
match pgbackend
@@ -324,6 +335,7 @@ impl PageRequestMetrics {
struct PageServerHandler {
_conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<JwtAuth>>,
claims: Option<Claims>,
@@ -337,11 +349,13 @@ struct PageServerHandler {
impl PageServerHandler {
pub fn new(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<JwtAuth>>,
connection_ctx: RequestContext,
) -> Self {
PageServerHandler {
_conf: conf,
broker_client,
auth,
claims: None,
connection_ctx,
@@ -494,7 +508,12 @@ impl PageServerHandler {
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
timeline
.import_basebackup_from_tar(&mut copyin_reader, base_lsn, &ctx)
.import_basebackup_from_tar(
&mut copyin_reader,
base_lsn,
self.broker_client.clone(),
&ctx,
)
.await?;
// Read the end of the tar archive.
@@ -1131,7 +1150,9 @@ enum GetActiveTenantError {
wait_time: Duration,
},
#[error(transparent)]
Other(#[from] anyhow::Error),
NotFound(GetTenantError),
#[error(transparent)]
WaitTenantActive(tenant::WaitToBecomeActiveError),
}
impl From<GetActiveTenantError> for QueryError {
@@ -1140,7 +1161,8 @@ impl From<GetActiveTenantError> for QueryError {
GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
),
GetActiveTenantError::Other(e) => QueryError::Other(e),
GetActiveTenantError::WaitTenantActive(e) => QueryError::Other(anyhow::Error::new(e)),
GetActiveTenantError::NotFound(e) => QueryError::Other(anyhow::Error::new(e)),
}
}
}
@@ -1156,13 +1178,16 @@ async fn get_active_tenant_with_timeout(
) -> Result<Arc<Tenant>, GetActiveTenantError> {
let tenant = match mgr::get_tenant(tenant_id, false).await {
Ok(tenant) => tenant,
Err(e) => return Err(GetActiveTenantError::Other(e.into())),
Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)),
Err(GetTenantError::NotActive(_)) => {
unreachable!("we're calling get_tenant with active=false")
}
};
let wait_time = Duration::from_secs(30);
match tokio::time::timeout(wait_time, tenant.wait_to_become_active()).await {
Ok(Ok(())) => Ok(tenant),
// no .context(), the error message is good enough and some tests depend on it
Ok(Err(wait_error)) => Err(GetActiveTenantError::Other(wait_error)),
Ok(Err(e)) => Err(GetActiveTenantError::WaitTenantActive(e)),
Err(_) => {
let latest_state = tenant.current_state();
if latest_state == TenantState::Active {
@@ -1177,13 +1202,34 @@ async fn get_active_tenant_with_timeout(
}
}
#[derive(Debug, thiserror::Error)]
enum GetActiveTimelineError {
#[error(transparent)]
Tenant(GetActiveTenantError),
#[error(transparent)]
Timeline(anyhow::Error),
}
impl From<GetActiveTimelineError> for QueryError {
fn from(e: GetActiveTimelineError) -> Self {
match e {
GetActiveTimelineError::Tenant(e) => e.into(),
GetActiveTimelineError::Timeline(e) => QueryError::Other(e),
}
}
}
/// Shorthand for getting a reference to a Timeline of an Active tenant.
async fn get_active_tenant_timeline(
tenant_id: TenantId,
timeline_id: TimelineId,
ctx: &RequestContext,
) -> Result<Arc<Timeline>, GetActiveTenantError> {
let tenant = get_active_tenant_with_timeout(tenant_id, ctx).await?;
let timeline = tenant.get_timeline(timeline_id, true)?;
) -> Result<Arc<Timeline>, GetActiveTimelineError> {
let tenant = get_active_tenant_with_timeout(tenant_id, ctx)
.await
.map_err(GetActiveTimelineError::Tenant)?;
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(GetActiveTimelineError::Timeline)?;
Ok(timeline)
}

View File

@@ -1600,9 +1600,7 @@ pub fn create_test_timeline(
pg_version: u32,
ctx: &RequestContext,
) -> anyhow::Result<std::sync::Arc<Timeline>> {
let tline = tenant
.create_empty_timeline(timeline_id, Lsn(8), pg_version, ctx)?
.initialize(ctx)?;
let tline = tenant.create_test_timeline(timeline_id, Lsn(8), pg_version, ctx)?;
let mut m = tline.begin_modification(Lsn(8));
m.init_empty()?;
m.commit()?;

View File

@@ -16,6 +16,7 @@ use futures::FutureExt;
use pageserver_api::models::TimelineState;
use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use storage_broker::BrokerClientChannel;
use tokio::sync::watch;
use tokio::task::JoinSet;
use tracing::*;
@@ -77,7 +78,7 @@ use utils::{
lsn::{Lsn, RecordLsn},
};
mod blob_io;
pub mod blob_io;
pub mod block_io;
pub mod disk_btree;
pub(crate) mod ephemeral_file;
@@ -184,24 +185,14 @@ impl UninitializedTimeline<'_> {
/// Ensures timeline data is valid, loads it into pageserver's memory and removes
/// uninit mark file on success.
///
/// The new timeline is initialized in Active state, and its background jobs are
/// started
pub fn initialize(self, ctx: &RequestContext) -> anyhow::Result<Arc<Timeline>> {
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
self.initialize_with_lock(ctx, &mut timelines, true, true)
}
/// Like `initialize`, but the caller is already holding lock on Tenant::timelines.
/// If `launch_wal_receiver` is false, the WAL receiver not launched, even though
/// timeline is initialized in Active state. This is used during tenant load and
/// attach, where the WAL receivers are launched only after all the timelines have
/// been initialized.
/// This function launches the flush loop if not already done.
///
/// The caller is responsible for activating the timeline (function `.activate()`).
fn initialize_with_lock(
mut self,
ctx: &RequestContext,
_ctx: &RequestContext,
timelines: &mut HashMap<TimelineId, Arc<Timeline>>,
load_layer_map: bool,
activate: bool,
) -> anyhow::Result<Arc<Timeline>> {
let timeline_id = self.timeline_id;
let tenant_id = self.owning_tenant.tenant_id;
@@ -237,12 +228,6 @@ impl UninitializedTimeline<'_> {
v.insert(Arc::clone(&new_timeline));
new_timeline.maybe_spawn_flush_loop();
if activate {
new_timeline
.activate(ctx)
.context("initializing timeline activation")?;
}
}
}
@@ -254,6 +239,7 @@ impl UninitializedTimeline<'_> {
self,
copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
base_lsn: Lsn,
broker_client: storage_broker::BrokerClientChannel,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let raw_timeline = self.raw_timeline()?;
@@ -279,7 +265,9 @@ impl UninitializedTimeline<'_> {
// Initialize without loading the layer map. We started with an empty layer map, and already
// updated it for the layers that we created during the import.
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
self.initialize_with_lock(ctx, &mut timelines, false, true)
let tl = self.initialize_with_lock(ctx, &mut timelines, false)?;
tl.activate(broker_client, ctx)?;
Ok(tl)
}
fn raw_timeline(&self) -> anyhow::Result<&Arc<Timeline>> {
@@ -464,6 +452,34 @@ struct RemoteStartupData {
remote_metadata: TimelineMetadata,
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum WaitToBecomeActiveError {
WillNotBecomeActive {
tenant_id: TenantId,
state: TenantState,
},
TenantDropped {
tenant_id: TenantId,
},
}
impl std::fmt::Display for WaitToBecomeActiveError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WaitToBecomeActiveError::WillNotBecomeActive { tenant_id, state } => {
write!(
f,
"Tenant {} will not become active. Current state: {:?}",
tenant_id, state
)
}
WaitToBecomeActiveError::TenantDropped { tenant_id } => {
write!(f, "Tenant {tenant_id} will not become active (dropped)")
}
}
}
}
impl Tenant {
/// Yet another helper for timeline initialization.
/// Contains the common part of `load_local_timeline` and `load_remote_timeline`.
@@ -519,7 +535,7 @@ impl Tenant {
// Do not start walreceiver here. We do need loaded layer map for reconcile_with_remote
// But we shouldnt start walreceiver before we have all the data locally, because working walreceiver
// will ingest data which may require looking at the layers which are not yet available locally
match timeline.initialize_with_lock(ctx, &mut timelines_accessor, true, false) {
match timeline.initialize_with_lock(ctx, &mut timelines_accessor, true) {
Ok(new_timeline) => new_timeline,
Err(e) => {
error!("Failed to initialize timeline {tenant_id}/{timeline_id}: {e:?}");
@@ -599,6 +615,7 @@ impl Tenant {
pub(crate) fn spawn_attach(
conf: &'static PageServerConf,
tenant_id: TenantId,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: GenericRemoteStorage,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
@@ -628,7 +645,12 @@ impl Tenant {
"attach tenant",
false,
async move {
match tenant_clone.attach(ctx).await {
let doit = async {
tenant_clone.attach(&ctx).await?;
tenant_clone.activate(broker_client, &ctx)?;
anyhow::Ok(())
};
match doit.await {
Ok(_) => {}
Err(e) => {
tenant_clone.set_broken(e.to_string());
@@ -636,7 +658,12 @@ impl Tenant {
}
}
Ok(())
},
}
.instrument({
let span = tracing::info_span!(parent: None, "attach", tenant_id=%tenant_id);
span.follows_from(Span::current());
span
}),
);
Ok(tenant)
}
@@ -644,8 +671,9 @@ impl Tenant {
///
/// Background task that downloads all data for a tenant and brings it to Active state.
///
#[instrument(skip_all, fields(tenant_id=%self.tenant_id))]
async fn attach(self: &Arc<Tenant>, ctx: RequestContext) -> anyhow::Result<()> {
async fn attach(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
let marker_file = self.conf.tenant_attaching_mark_file_path(&self.tenant_id);
if !tokio::fs::try_exists(&marker_file)
.await
@@ -735,20 +763,14 @@ impl Tenant {
.expect("just put it in above");
// TODO again handle early failure
self.load_remote_timeline(
timeline_id,
index_part,
remote_metadata,
remote_client,
&ctx,
)
.await
.with_context(|| {
format!(
"failed to load remote timeline {} for tenant {}",
timeline_id, self.tenant_id
)
})?;
self.load_remote_timeline(timeline_id, index_part, remote_metadata, remote_client, ctx)
.await
.with_context(|| {
format!(
"failed to load remote timeline {} for tenant {}",
timeline_id, self.tenant_id
)
})?;
}
std::fs::remove_file(&marker_file)
@@ -758,10 +780,6 @@ impl Tenant {
utils::failpoint_sleep_millis_async!("attach-before-activate");
// Start background operations and open the tenant for business.
// The loops will shut themselves down when they notice that the tenant is inactive.
self.activate(&ctx)?;
info!("Done");
Ok(())
@@ -867,6 +885,7 @@ impl Tenant {
pub fn spawn_load(
conf: &'static PageServerConf,
tenant_id: TenantId,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
ctx: &RequestContext,
) -> Arc<Tenant> {
@@ -901,7 +920,12 @@ impl Tenant {
"initial tenant load",
false,
async move {
match tenant_clone.load(&ctx).await {
let doit = async {
tenant_clone.load(&ctx).await?;
tenant_clone.activate(broker_client, &ctx)?;
anyhow::Ok(())
};
match doit.await {
Ok(()) => {}
Err(err) => {
tenant_clone.set_broken(err.to_string());
@@ -910,7 +934,12 @@ impl Tenant {
}
info!("initial load for tenant {tenant_id} finished!");
Ok(())
},
}
.instrument({
let span = tracing::info_span!(parent: None, "load", tenant_id=%tenant_id);
span.follows_from(Span::current());
span
}),
);
info!("spawned load into background");
@@ -922,8 +951,9 @@ impl Tenant {
/// Background task to load in-memory data structures for this tenant, from
/// files on disk. Used at pageserver startup.
///
#[instrument(skip(self, ctx), fields(tenant_id=%self.tenant_id))]
async fn load(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
info!("loading tenant task");
utils::failpoint_sleep_millis_async!("before-loading-tenant");
@@ -1039,10 +1069,6 @@ impl Tenant {
.with_context(|| format!("load local timeline {timeline_id}"))?;
}
// Start background operations and open the tenant for business.
// The loops will shut themselves down when they notice that the tenant is inactive.
self.activate(ctx)?;
info!("Done");
Ok(())
@@ -1206,6 +1232,27 @@ impl Tenant {
)
}
/// Helper for unit tests to create an emtpy timeline.
///
/// The timeline is has state value `Active` but its background loops are not running.
// This makes the various functions which anyhow::ensure! for Active state work in tests.
// Our current tests don't need the background loops.
#[cfg(test)]
pub fn create_test_timeline(
&self,
new_timeline_id: TimelineId,
initdb_lsn: Lsn,
pg_version: u32,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let uninit_tl = self.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)?;
let mut timelines = self.timelines.lock().unwrap();
let tl = uninit_tl.initialize_with_lock(ctx, &mut timelines, true)?;
// The non-test code would call tl.activate() here.
tl.set_state(TimelineState::Active);
Ok(tl)
}
/// Create a new timeline.
///
/// Returns the new timeline ID and reference to its Timeline object.
@@ -1219,6 +1266,7 @@ impl Tenant {
ancestor_timeline_id: Option<TimelineId>,
mut ancestor_start_lsn: Option<Lsn>,
pg_version: u32,
broker_client: storage_broker::BrokerClientChannel,
ctx: &RequestContext,
) -> anyhow::Result<Option<Arc<Timeline>>> {
anyhow::ensure!(
@@ -1285,6 +1333,8 @@ impl Tenant {
}
};
loaded_timeline.activate(broker_client, ctx)?;
if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
// Wait for the upload of the 'index_part.json` file to finish, so that when we return
// Ok, the timeline is durable in remote storage.
@@ -1588,7 +1638,11 @@ impl Tenant {
}
/// Changes tenant status to active, unless shutdown was already requested.
fn activate(&self, ctx: &RequestContext) -> anyhow::Result<()> {
fn activate(
self: &Arc<Self>,
broker_client: BrokerClientChannel,
ctx: &RequestContext,
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
let mut result = Ok(());
@@ -1621,14 +1675,14 @@ impl Tenant {
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
tasks::start_background_loops(self.tenant_id);
tasks::start_background_loops(self);
let mut activated_timelines = 0;
let mut timelines_broken_during_activation = 0;
for timeline in not_broken_timelines {
match timeline
.activate(ctx)
.activate(broker_client.clone(), ctx)
.context("timeline activation for activating tenant")
{
Ok(()) => {
@@ -1736,25 +1790,30 @@ impl Tenant {
self.state.subscribe()
}
pub async fn wait_to_become_active(&self) -> anyhow::Result<()> {
pub(crate) async fn wait_to_become_active(&self) -> Result<(), WaitToBecomeActiveError> {
let mut receiver = self.state.subscribe();
loop {
let current_state = receiver.borrow_and_update().clone();
match current_state {
TenantState::Loading | TenantState::Attaching => {
// in these states, there's a chance that we can reach ::Active
receiver.changed().await?;
receiver.changed().await.map_err(
|_e: tokio::sync::watch::error::RecvError| {
WaitToBecomeActiveError::TenantDropped {
tenant_id: self.tenant_id,
}
},
)?;
}
TenantState::Active { .. } => {
return Ok(());
}
TenantState::Broken { .. } | TenantState::Stopping => {
// There's no chance the tenant can transition back into ::Active
anyhow::bail!(
"Tenant {} will not become active. Current state: {:?}",
self.tenant_id,
&current_state,
);
return Err(WaitToBecomeActiveError::WillNotBecomeActive {
tenant_id: self.tenant_id,
state: current_state,
});
}
}
}
@@ -2278,13 +2337,45 @@ impl Tenant {
Ok(gc_timelines)
}
/// Branch an existing timeline
/// A substitute for `branch_timeline` for use in unit tests.
/// The returned timeline will have state value `Active` to make various `anyhow::ensure!()`
/// calls pass, but, we do not actually call `.activate()` under the hood. So, none of the
/// timeline background tasks are launched, except the flush loop.
#[cfg(test)]
async fn branch_timeline_test(
&self,
src_timeline: &Arc<Timeline>,
dst_id: TimelineId,
start_lsn: Option<Lsn>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let tl = self
.branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx)
.await?;
tl.set_state(TimelineState::Active);
Ok(tl)
}
/// Branch an existing timeline.
///
/// The caller is responsible for activating the returned timeline.
async fn branch_timeline(
&self,
src_timeline: &Arc<Timeline>,
dst_id: TimelineId,
start_lsn: Option<Lsn>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
self.branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx)
.await
}
async fn branch_timeline_impl(
&self,
src_timeline: &Arc<Timeline>,
dst_id: TimelineId,
start_lsn: Option<Lsn>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let src_id = src_timeline.timeline_id;
@@ -2378,7 +2469,7 @@ impl Tenant {
false,
Some(Arc::clone(src_timeline)),
)?
.initialize_with_lock(ctx, &mut timelines, true, true)?
.initialize_with_lock(ctx, &mut timelines, true)?
};
// Root timeline gets its layers during creation and uploads them along with the metadata.
@@ -2399,6 +2490,8 @@ impl Tenant {
/// - run initdb to init temporary instance and get bootstrap data
/// - after initialization complete, remove the temp dir.
///
/// The caller is responsible for activating the returned timeline.
async fn bootstrap_timeline(
&self,
timeline_id: TimelineId,
@@ -2493,7 +2586,7 @@ impl Tenant {
// map above, when we imported the datadir.
let timeline = {
let mut timelines = self.timelines.lock().unwrap();
raw_timeline.initialize_with_lock(ctx, &mut timelines, false, true)?
raw_timeline.initialize_with_lock(ctx, &mut timelines, false)?
};
info!(
@@ -3134,8 +3227,14 @@ pub mod harness {
let timeline_metadata = load_metadata(self.conf, timeline_id, self.tenant_id)?;
timelines_to_load.insert(timeline_id, timeline_metadata);
}
// FIXME starts background jobs
tenant.load(ctx).await?;
tenant
.load(ctx)
.instrument(info_span!("try_load", tenant_id=%self.tenant_id))
.await?;
tenant.state.send_replace(TenantState::Active);
for timeline in tenant.timelines.lock().unwrap().values() {
timeline.set_state(TimelineState::Active);
}
Ok(tenant)
}
@@ -3193,8 +3292,7 @@ mod tests {
#[tokio::test]
async fn test_basic() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline = tline.initialize(&ctx)?;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let writer = tline.writer();
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
@@ -3227,9 +3325,7 @@ mod tests {
let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")?
.load()
.await;
let timeline =
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let _ = timeline.initialize(&ctx)?;
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) {
Ok(_) => panic!("duplicate timeline creation should fail"),
@@ -3260,8 +3356,7 @@ mod tests {
use std::str::from_utf8;
let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline = tline.initialize(&ctx)?;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let writer = tline.writer();
#[allow(non_snake_case)]
@@ -3283,7 +3378,7 @@ mod tests {
// Branch the history, modify relation differently on the new timeline
tenant
.branch_timeline(&tline, NEW_TIMELINE_ID, Some(Lsn(0x30)), &ctx)
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x30)), &ctx)
.await?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
@@ -3358,8 +3453,7 @@ mod tests {
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
.load()
.await;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline = tline.initialize(&ctx)?;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
@@ -3372,7 +3466,7 @@ mod tests {
// try to branch at lsn 25, should fail because we already garbage collected the data
match tenant
.branch_timeline(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
.await
{
Ok(_) => panic!("branching should have failed"),
@@ -3396,12 +3490,11 @@ mod tests {
.load()
.await;
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)?
.initialize(&ctx)?;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)?;
// try to branch at lsn 0x25, should fail because initdb lsn is 0x50
match tenant
.branch_timeline(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
.await
{
Ok(_) => panic!("branching should have failed"),
@@ -3447,13 +3540,11 @@ mod tests {
TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")?
.load()
.await;
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?
.initialize(&ctx)?;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
tenant
.branch_timeline(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
.await?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
@@ -3497,12 +3588,11 @@ mod tests {
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?
.load()
.await;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline = tline.initialize(&ctx)?;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
tenant
.branch_timeline(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
.await?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
@@ -3521,12 +3611,11 @@ mod tests {
TenantHarness::create("test_parent_keeps_data_forever_after_branching")?
.load()
.await;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline = tline.initialize(&ctx)?;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
tenant
.branch_timeline(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
.await?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
@@ -3555,8 +3644,7 @@ mod tests {
{
let (tenant, ctx) = harness.load().await;
let tline =
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x8000), DEFAULT_PG_VERSION, &ctx)?;
let tline = tline.initialize(&ctx)?;
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x8000), DEFAULT_PG_VERSION, &ctx)?;
make_some_layers(tline.as_ref(), Lsn(0x8000)).await?;
}
@@ -3576,14 +3664,14 @@ mod tests {
{
let (tenant, ctx) = harness.load().await;
let tline =
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline = tline.initialize(&ctx)?;
tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
tenant
.branch_timeline(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
let child_tline = tenant
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
.await?;
child_tline.set_state(TimelineState::Active);
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
@@ -3613,9 +3701,8 @@ mod tests {
let harness = TenantHarness::create(TEST_NAME)?;
let (tenant, ctx) = harness.load().await;
tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?
.initialize(&ctx)?;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
drop(tline);
drop(tenant);
let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME);
@@ -3652,8 +3739,7 @@ mod tests {
#[tokio::test]
async fn test_images() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_images")?.load().await;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline = tline.initialize(&ctx)?;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let writer = tline.writer();
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
@@ -3718,8 +3804,7 @@ mod tests {
#[tokio::test]
async fn test_bulk_insert() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_bulk_insert")?.load().await;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline = tline.initialize(&ctx)?;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let mut lsn = Lsn(0x10);
@@ -3761,8 +3846,7 @@ mod tests {
#[tokio::test]
async fn test_random_updates() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_random_updates")?.load().await;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline = tline.initialize(&ctx)?;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
const NUM_KEYS: usize = 1000;
@@ -3835,9 +3919,8 @@ mod tests {
let (tenant, ctx) = TenantHarness::create("test_traverse_branches")?
.load()
.await;
let mut tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?
.initialize(&ctx)?;
let mut tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
const NUM_KEYS: usize = 1000;
@@ -3870,7 +3953,7 @@ mod tests {
for _ in 0..50 {
let new_tline_id = TimelineId::generate();
tenant
.branch_timeline(&tline, new_tline_id, Some(lsn), &ctx)
.branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx)
.await?;
tline = tenant
.get_timeline(new_tline_id, true)
@@ -3919,9 +4002,8 @@ mod tests {
let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")?
.load()
.await;
let mut tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?
.initialize(&ctx)?;
let mut tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
const NUM_KEYS: usize = 100;
const NUM_TLINES: usize = 50;
@@ -3936,7 +4018,7 @@ mod tests {
for idx in 0..NUM_TLINES {
let new_tline_id = TimelineId::generate();
tenant
.branch_timeline(&tline, new_tline_id, Some(lsn), &ctx)
.branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx)
.await?;
tline = tenant
.get_timeline(new_tline_id, true)

View File

@@ -32,46 +32,23 @@ enum TenantsMap {
Initializing,
/// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
/// New tenants can be added using [`tenant_map_insert`].
Open(HashMap<TenantId, OpenTenantsMapEntry>),
Open(HashMap<TenantId, Arc<Tenant>>),
/// The pageserver has entered shutdown mode via [`shutdown_all_tenants`].
/// Existing tenants are still accessible, but no new tenants can be created.
ShuttingDown(HashMap<TenantId, Arc<Tenant>>),
}
enum OpenTenantsMapEntry {
InterestOnly(tokio::sync::broadcast::Sender<Arc<Tenant>>),
Available(Arc<Tenant>),
}
impl TenantsMap {
/// Return a ref to a tenant if it exists in the map.
/// If there's just interest in a tenant, but the tenant does not exist,
/// this function returns `None`.
fn get(&self, tenant_id: &TenantId) -> Option<&Arc<Tenant>> {
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) => match m.get(tenant_id) {
Some(tme) => match tme {
OpenTenantsMapEntry::InterestOnly(_) => None,
OpenTenantsMapEntry::Available(t) => Some(t),
},
None => None,
},
TenantsMap::ShuttingDown(m) => match m.get(tenant_id) {
Some(t) => Some(t),
None => None,
},
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.get(tenant_id),
}
}
fn remove(&mut self, tenant_id: &TenantId) -> Option<Arc<Tenant>> {
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) => match m.remove(tenant_id) {
None => None,
Some(OpenTenantsMapEntry::InterestOnly(_)) => None,
Some(OpenTenantsMapEntry::Available(tenant)) => Some(tenant),
},
TenantsMap::ShuttingDown(m) => m.remove(tenant_id),
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.remove(tenant_id),
}
}
}
@@ -81,9 +58,10 @@ static TENANTS: Lazy<RwLock<TenantsMap>> = Lazy::new(|| RwLock::new(TenantsMap::
/// Initialize repositories with locally available timelines.
/// Timelines that are only partially available locally (remote storage has more data than this pageserver)
/// are scheduled for download and added to the tenant once download is completed.
#[instrument(skip(conf, remote_storage))]
#[instrument(skip_all)]
pub async fn init_tenant_mgr(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
) -> anyhow::Result<()> {
// Scan local filesystem for attached tenants
@@ -139,12 +117,12 @@ pub async fn init_tenant_mgr(
match schedule_local_tenant_processing(
conf,
&tenant_dir_path,
broker_client.clone(),
remote_storage.clone(),
&ctx,
) {
Ok(tenant) => {
tenants
.insert(tenant.tenant_id(), OpenTenantsMapEntry::Available(tenant));
tenants.insert(tenant.tenant_id(), tenant);
}
Err(e) => {
error!("Failed to collect tenant files from dir {tenants_dir:?} for entry {dir_entry:?}, reason: {e:#}");
@@ -174,6 +152,7 @@ pub async fn init_tenant_mgr(
pub fn schedule_local_tenant_processing(
conf: &'static PageServerConf,
tenant_path: &Path,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
@@ -210,7 +189,7 @@ pub fn schedule_local_tenant_processing(
let tenant = if conf.tenant_attaching_mark_file_path(&tenant_id).exists() {
info!("tenant {tenant_id} has attaching mark file, resuming its attach operation");
if let Some(remote_storage) = remote_storage {
match Tenant::spawn_attach(conf, tenant_id, remote_storage, ctx) {
match Tenant::spawn_attach(conf, tenant_id, broker_client, remote_storage, ctx) {
Ok(tenant) => tenant,
Err(e) => {
error!("Failed to spawn_attach tenant {tenant_id}, reason: {e:#}");
@@ -228,7 +207,7 @@ pub fn schedule_local_tenant_processing(
} else {
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
// Start loading the tenant into memory. It will initially be in Loading state.
Tenant::spawn_load(conf, tenant_id, remote_storage, ctx)
Tenant::spawn_load(conf, tenant_id, broker_client, remote_storage, ctx)
};
Ok(tenant)
}
@@ -254,18 +233,8 @@ pub async fn shutdown_all_tenants() {
return;
}
TenantsMap::Open(tenants) => {
let tenants: HashMap<TenantId, Arc<Tenant>> = std::mem::take(tenants)
.into_iter()
.filter_map(|(tenant_id, tme)| {
match tme {
// don't give the waiters any more hope
OpenTenantsMapEntry::InterestOnly(_) => None,
OpenTenantsMapEntry::Available(tenant) => Some((tenant_id, tenant)),
}
})
.collect();
let tenants_clone = tenants.clone();
*m = TenantsMap::ShuttingDown(tenants);
*m = TenantsMap::ShuttingDown(std::mem::take(tenants));
tenants_clone
}
TenantsMap::ShuttingDown(_) => {
@@ -309,6 +278,7 @@ pub async fn create_tenant(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
tenant_id: TenantId,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
ctx: &RequestContext,
) -> Result<Arc<Tenant>, TenantMapInsertError> {
@@ -321,7 +291,7 @@ pub async fn create_tenant(
// See https://github.com/neondatabase/neon/issues/4233
let created_tenant =
schedule_local_tenant_processing(conf, &tenant_directory, remote_storage, ctx)?;
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
@@ -334,11 +304,19 @@ pub async fn create_tenant(
}).await
}
#[derive(Debug, thiserror::Error)]
pub enum SetNewTenantConfigError {
#[error(transparent)]
GetTenant(#[from] GetTenantError),
#[error(transparent)]
Persist(anyhow::Error),
}
pub async fn set_new_tenant_config(
conf: &'static PageServerConf,
new_tenant_conf: TenantConfOpt,
tenant_id: TenantId,
) -> Result<(), TenantStateError> {
) -> Result<(), SetNewTenantConfigError> {
info!("configuring tenant {tenant_id}");
let tenant = get_tenant(tenant_id, true).await?;
@@ -348,97 +326,41 @@ pub async fn set_new_tenant_config(
&tenant_config_path,
new_tenant_conf,
false,
)?;
)
.map_err(SetNewTenantConfigError::Persist)?;
tenant.set_new_tenant_config(new_tenant_conf);
Ok(())
}
#[derive(Debug, thiserror::Error)]
pub enum GetTenantError {
#[error("Tenant {0} not found")]
NotFound(TenantId),
#[error("Tenant {0} is not active")]
NotActive(TenantId),
}
/// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query.
/// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
pub async fn get_tenant(
tenant_id: TenantId,
active_only: bool,
) -> Result<Arc<Tenant>, TenantStateError> {
) -> Result<Arc<Tenant>, GetTenantError> {
let m = TENANTS.read().await;
let tenant = m
.get(&tenant_id)
.ok_or(TenantStateError::NotFound(tenant_id))?;
.ok_or(GetTenantError::NotFound(tenant_id))?;
if active_only && !tenant.is_active() {
Err(TenantStateError::NotActive(tenant_id))
Err(GetTenantError::NotActive(tenant_id))
} else {
Ok(Arc::clone(tenant))
}
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum WaitForActiveTenantError {
#[error("Tenant map is still initializing")]
Initializing,
#[error("Tenant map is shutting down")]
ShuttingDown,
#[error(transparent)]
Tenant(anyhow::Error),
}
/// Cancellation-Safety: This function is cancellation-safe.
pub(crate) async fn wait_for_active_tenant(
tenant_id: TenantId,
) -> Result<Arc<Tenant>, WaitForActiveTenantError> {
let tenant = loop {
let r_guard = TENANTS.read().await;
let m = match &*r_guard {
TenantsMap::Initializing => return Err(WaitForActiveTenantError::Initializing),
TenantsMap::ShuttingDown(_) => return Err(WaitForActiveTenantError::ShuttingDown),
TenantsMap::Open(m) => m,
};
if let Some(tme) = m.get(&tenant_id) {
match tme {
OpenTenantsMapEntry::InterestOnly(tx) => {
let mut rx = tx.subscribe();
drop(r_guard);
break rx
.recv()
.await
.map_err(|_| WaitForActiveTenantError::ShuttingDown)?;
}
OpenTenantsMapEntry::Available(tenant) => break Arc::clone(tenant),
}
}
// TODO: would be nice to have upgradeable rwlock guard here
drop(r_guard);
let mut w_guard = TENANTS.write().await;
let m = match &mut *w_guard {
TenantsMap::Initializing => return Err(WaitForActiveTenantError::Initializing),
TenantsMap::ShuttingDown(_) => return Err(WaitForActiveTenantError::ShuttingDown),
TenantsMap::Open(m) => m,
};
match m.entry(tenant_id) {
hash_map::Entry::Occupied(_e) => {
// the world changed while we weren't holding the lock
continue;
}
hash_map::Entry::Vacant(e) => {
let (tx, mut rx) = tokio::sync::broadcast::channel(1);
e.insert(OpenTenantsMapEntry::InterestOnly(tx));
drop(w_guard);
break rx
.recv()
.await
.map_err(|_| WaitForActiveTenantError::ShuttingDown)?;
}
}
};
tenant
.wait_to_become_active()
.await
.map_err(WaitForActiveTenantError::Tenant)?;
Ok(tenant)
}
#[derive(Debug, thiserror::Error)]
pub enum DeleteTimelineError {
#[error("Tenant {0}")]
Tenant(#[from] TenantStateError),
Tenant(#[from] GetTenantError),
#[error("Timeline {0}")]
Timeline(#[from] crate::tenant::DeleteTimelineError),
@@ -503,6 +425,7 @@ pub async fn detach_tenant(
pub async fn load_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
@@ -514,7 +437,7 @@ pub async fn load_tenant(
.with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
}
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, remote_storage, ctx)
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, ctx)
.with_context(|| {
format!("Failed to schedule tenant processing in path {tenant_path:?}")
})?;
@@ -554,20 +477,13 @@ pub enum TenantMapListError {
///
pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
let tenants = TENANTS.read().await;
match &*tenants {
TenantsMap::Initializing => Err(TenantMapListError::Initializing),
TenantsMap::Open(m) => Ok(m
.iter()
.filter_map(|(id, tme)| match tme {
OpenTenantsMapEntry::InterestOnly(_) => None,
OpenTenantsMapEntry::Available(t) => Some((*id, t.current_state())),
})
.collect()),
TenantsMap::ShuttingDown(m) => Ok(m
.iter()
.map(|(id, tenant)| (*id, tenant.current_state()))
.collect()),
}
let m = match &*tenants {
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
};
Ok(m.iter()
.map(|(id, tenant)| (*id, tenant.current_state()))
.collect())
}
/// Execute Attach mgmt API command.
@@ -578,6 +494,7 @@ pub async fn attach_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
tenant_conf: TenantConfOpt,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: GenericRemoteStorage,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
@@ -593,7 +510,7 @@ pub async fn attach_tenant(
.context("check for attach marker file existence")?;
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, Some(remote_storage), ctx)?;
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
@@ -640,29 +557,17 @@ where
TenantsMap::Open(m) => m,
};
match m.entry(tenant_id) {
hash_map::Entry::Occupied(mut e) => match e.get() {
OpenTenantsMapEntry::Available(tenant) => Err(
TenantMapInsertError::TenantAlreadyExists(tenant_id, tenant.current_state()),
),
OpenTenantsMapEntry::InterestOnly(v) => {
let tenant = match insert_fn() {
Ok(v) => v,
Err(e) => return Err(TenantMapInsertError::Closure(e)),
};
// ignore if the interest has gone away
let _ = v.send(Arc::clone(&tenant));
e.insert(OpenTenantsMapEntry::Available(Arc::clone(&tenant)));
hash_map::Entry::Occupied(e) => Err(TenantMapInsertError::TenantAlreadyExists(
tenant_id,
e.get().current_state(),
)),
hash_map::Entry::Vacant(v) => match insert_fn() {
Ok(tenant) => {
v.insert(tenant.clone());
Ok(tenant)
}
Err(e) => Err(TenantMapInsertError::Closure(e)),
},
hash_map::Entry::Vacant(v) => {
let tenant = match insert_fn() {
Ok(v) => v,
Err(e) => return Err(TenantMapInsertError::Closure(e)),
};
v.insert(OpenTenantsMapEntry::Available(Arc::clone(&tenant)));
Ok(tenant)
}
}
}

View File

@@ -1264,9 +1264,7 @@ mod tests {
let harness = TenantHarness::create(test_name)?;
let (tenant, ctx) = runtime.block_on(harness.load());
// create an empty timeline directory
let timeline =
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let _ = timeline.initialize(&ctx).unwrap();
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
std::fs::create_dir_all(remote_fs_dir)?;

View File

@@ -542,7 +542,7 @@ impl From<LayerFileName> for LayerDescriptor {
///
/// This is used by DeltaLayer and ImageLayer. Normally, this holds a reference to the
/// global config, and paths to layer files are constructed using the tenant/timeline
/// path from the config. But in the 'pageserver_binutils' binary, we need to construct a Layer
/// path from the config. But in the 'pagectl' binary, we need to construct a Layer
/// struct for a file on disk, without having a page server running, so that we have no
/// config. In that case, we use the Path variant to hold the full path to the file on
/// disk.

View File

@@ -110,7 +110,7 @@ const WILL_INIT: u64 = 1;
/// reading/deserializing records themselves.
///
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
struct BlobRef(u64);
pub struct BlobRef(pub u64);
impl BlobRef {
pub fn will_init(&self) -> bool {
@@ -619,7 +619,7 @@ impl DeltaLayer {
/// Create a DeltaLayer struct representing an existing file on disk.
///
/// This variant is only used for debugging purposes, by the 'pageserver_binutils' binary.
/// This variant is only used for debugging purposes, by the 'pagectl' binary.
pub fn new_for_path(path: &Path, file: File) -> Result<Self> {
let mut summary_buf = Vec::new();
summary_buf.resize(PAGE_SZ, 0);

View File

@@ -422,7 +422,7 @@ impl ImageLayer {
/// Create an ImageLayer struct representing an existing file on disk.
///
/// This variant is only used for debugging purposes, by the 'pageserver_binutils' binary.
/// This variant is only used for debugging purposes, by the 'pagectl' binary.
pub fn new_for_path(path: &Path, file: File) -> Result<ImageLayer> {
let mut summary_buf = Vec::new();
summary_buf.resize(PAGE_SZ, 0);

View File

@@ -1,18 +1,20 @@
//! This module contains functions to serve per-tenant background processes,
//! such as compaction and GC
use std::ops::ControlFlow;
use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics::TENANT_TASK_EVENTS;
use crate::task_mgr;
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::mgr;
use crate::tenant::{Tenant, TenantState};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::id::TenantId;
pub fn start_background_loops(tenant_id: TenantId) {
pub fn start_background_loops(tenant: &Arc<Tenant>) {
let tenant_id = tenant.tenant_id;
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::Compaction,
@@ -20,11 +22,14 @@ pub fn start_background_loops(tenant_id: TenantId) {
None,
&format!("compactor for tenant {tenant_id}"),
false,
async move {
compaction_loop(tenant_id)
.instrument(info_span!("compaction_loop", tenant_id = %tenant_id))
.await;
Ok(())
{
let tenant = Arc::clone(tenant);
async move {
compaction_loop(tenant)
.instrument(info_span!("compaction_loop", tenant_id = %tenant_id))
.await;
Ok(())
}
},
);
task_mgr::spawn(
@@ -34,11 +39,14 @@ pub fn start_background_loops(tenant_id: TenantId) {
None,
&format!("garbage collector for tenant {tenant_id}"),
false,
async move {
gc_loop(tenant_id)
.instrument(info_span!("gc_loop", tenant_id = %tenant_id))
.await;
Ok(())
{
let tenant = Arc::clone(tenant);
async move {
gc_loop(tenant)
.instrument(info_span!("gc_loop", tenant_id = %tenant_id))
.await;
Ok(())
}
},
);
}
@@ -46,7 +54,7 @@ pub fn start_background_loops(tenant_id: TenantId) {
///
/// Compaction task's main loop
///
async fn compaction_loop(tenant_id: TenantId) {
async fn compaction_loop(tenant: Arc<Tenant>) {
let wait_duration = Duration::from_secs(2);
info!("starting");
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
@@ -57,13 +65,16 @@ async fn compaction_loop(tenant_id: TenantId) {
loop {
trace!("waking up");
let tenant = match mgr::wait_for_active_tenant(tenant_id).await {
Ok(tenant) => tenant, // let's have another iteration
Err(e) => {
info!("exiting: {e:#}");
break;
}
};
tokio::select! {
_ = cancel.cancelled() => {
info!("received cancellation request");
return;
},
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
ControlFlow::Break(()) => return,
ControlFlow::Continue(()) => (),
},
}
let period = tenant.get_compaction_period();
@@ -113,7 +124,7 @@ async fn compaction_loop(tenant_id: TenantId) {
///
/// GC task's main loop
///
async fn gc_loop(tenant_id: TenantId) {
async fn gc_loop(tenant: Arc<Tenant>) {
let wait_duration = Duration::from_secs(2);
info!("starting");
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
@@ -127,13 +138,16 @@ async fn gc_loop(tenant_id: TenantId) {
loop {
trace!("waking up");
let tenant = match mgr::wait_for_active_tenant(tenant_id).await {
Ok(tenant) => tenant, // let's have another iteration
Err(e) => {
info!("exiting: {e:#}");
break;
}
};
tokio::select! {
_ = cancel.cancelled() => {
info!("received cancellation request");
return;
},
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
ControlFlow::Break(()) => return,
ControlFlow::Continue(()) => (),
},
}
let period = tenant.get_gc_period();
@@ -181,6 +195,36 @@ async fn gc_loop(tenant_id: TenantId) {
trace!("GC loop stopped.");
}
async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
// if the tenant has a proper status already, no need to wait for anything
if tenant.current_state() == TenantState::Active {
ControlFlow::Continue(())
} else {
let mut tenant_state_updates = tenant.subscribe_for_state_updates();
loop {
match tenant_state_updates.changed().await {
Ok(()) => {
let new_state = &*tenant_state_updates.borrow();
match new_state {
TenantState::Active => {
debug!("Tenant state changed to active, continuing the task loop");
return ControlFlow::Continue(());
}
state => {
debug!("Not running the task loop, tenant is not active: {state:?}");
continue;
}
}
}
Err(_sender_dropped_error) => {
info!("Tenant dropped the state updates sender, quitting waiting for tenant and the task loop");
return ControlFlow::Break(());
}
}
}
}
}
#[derive(thiserror::Error, Debug)]
#[error("cancelled")]
pub(crate) struct Cancelled;

View File

@@ -22,8 +22,7 @@ use tracing::*;
use utils::id::TenantTimelineId;
use std::cmp::{max, min, Ordering};
use std::collections::BinaryHeap;
use std::collections::HashMap;
use std::collections::{BinaryHeap, HashMap};
use std::fs;
use std::ops::{Deref, Range};
use std::path::{Path, PathBuf};
@@ -32,7 +31,6 @@ use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use crate::broker_client::{get_broker_client, is_broker_client_initialized};
use crate::context::{DownloadBehavior, RequestContext};
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
use crate::tenant::storage_layer::{
@@ -48,7 +46,7 @@ use crate::tenant::{
};
use crate::config::PageServerConf;
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
use crate::metrics::{TimelineMetrics, UNEXPECTED_ONDEMAND_DOWNLOADS};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
@@ -123,6 +121,17 @@ pub struct Timeline {
pub(super) layers: RwLock<LayerMap<dyn PersistentLayer>>,
/// Set of key ranges which should be covered by image layers to
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
/// It is used by compaction task when it checks if new image layer should be created.
/// Newly created image layer doesn't help to remove the delta layer, until the
/// newly created image layer falls off the PITR horizon. So on next GC cycle,
/// gc_timeline may still want the new image layer to be created. To avoid redundant
/// image layers creation we should check if image layer exists but beyond PITR horizon.
/// This is why we need remember GC cutoff LSN.
///
wanted_image_layers: Mutex<Option<(Lsn, KeySpace)>>,
last_freeze_at: AtomicLsn,
// Atomic would be more appropriate here.
last_freeze_ts: RwLock<Instant>,
@@ -897,15 +906,12 @@ impl Timeline {
Ok(())
}
pub fn activate(self: &Arc<Self>, ctx: &RequestContext) -> anyhow::Result<()> {
if is_broker_client_initialized() {
self.launch_wal_receiver(ctx, get_broker_client().clone())?;
} else if cfg!(test) {
info!("not launching WAL receiver because broker client hasn't been initialized");
} else {
anyhow::bail!("broker client not initialized");
}
pub fn activate(
self: &Arc<Self>,
broker_client: BrokerClientChannel,
ctx: &RequestContext,
) -> anyhow::Result<()> {
self.launch_wal_receiver(ctx, broker_client)?;
self.set_state(TimelineState::Active);
self.launch_eviction_task();
Ok(())
@@ -1354,6 +1360,7 @@ impl Timeline {
tenant_id,
pg_version,
layers: RwLock::new(LayerMap::default()),
wanted_image_layers: Mutex::new(None),
walredo_mgr,
walreceiver,
@@ -2904,6 +2911,30 @@ impl Timeline {
let layers = self.layers.read().unwrap();
let mut max_deltas = 0;
{
let wanted_image_layers = self.wanted_image_layers.lock().unwrap();
if let Some((cutoff_lsn, wanted)) = &*wanted_image_layers {
let img_range =
partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end;
if wanted.overlaps(&img_range) {
//
// gc_timeline only pays attention to image layers that are older than the GC cutoff,
// but create_image_layers creates image layers at last-record-lsn.
// So it's possible that gc_timeline wants a new image layer to be created for a key range,
// but the range is already covered by image layers at more recent LSNs. Before we
// create a new image layer, check if the range is already covered at more recent LSNs.
if !layers
.image_layer_exists(&img_range, &(Lsn::min(lsn, *cutoff_lsn)..lsn + 1))?
{
debug!(
"Force generation of layer {}-{} wanted by GC, cutoff={}, lsn={})",
img_range.start, img_range.end, cutoff_lsn, lsn
);
return Ok(true);
}
}
}
}
for part_range in &partition.ranges {
let image_coverage = layers.image_coverage(part_range, lsn)?;
@@ -3023,6 +3054,12 @@ impl Timeline {
image_layers.push(image_layer);
}
}
// All layers that the GC wanted us to create have now been created.
//
// It's possible that another GC cycle happened while we were compacting, and added
// something new to wanted_image_layers, and we now clear that before processing it.
// That's OK, because the next GC iteration will put it back in.
*self.wanted_image_layers.lock().unwrap() = None;
// Sync the new layer to disk before adding it to the layer map, to make sure
// we don't garbage collect something based on the new layer, before it has
@@ -3720,6 +3757,7 @@ impl Timeline {
}
let mut layers_to_remove = Vec::new();
let mut wanted_image_layers = KeySpaceRandomAccum::default();
// Scan all layers in the timeline (remote or on-disk).
//
@@ -3803,6 +3841,15 @@ impl Timeline {
"keeping {} because it is the latest layer",
l.filename().file_name()
);
// Collect delta key ranges that need image layers to allow garbage
// collecting the layers.
// It is not so obvious whether we need to propagate information only about
// delta layers. Image layers can form "stairs" preventing old image from been deleted.
// But image layers are in any case less sparse than delta layers. Also we need some
// protection from replacing recent image layers with new one after each GC iteration.
if l.is_incremental() && !LayerMap::is_l0(&*l) {
wanted_image_layers.add_range(l.get_key_range());
}
result.layers_not_updated += 1;
continue 'outer;
}
@@ -3815,6 +3862,10 @@ impl Timeline {
);
layers_to_remove.push(Arc::clone(&l));
}
self.wanted_image_layers
.lock()
.unwrap()
.replace((new_gc_cutoff, wanted_image_layers.to_keyspace()));
let mut updates = layers.batch_update();
if !layers_to_remove.is_empty() {

View File

@@ -1309,9 +1309,8 @@ mod tests {
async fn dummy_state(harness: &TenantHarness<'_>) -> ConnectionManagerState {
let (tenant, ctx) = harness.load().await;
let timeline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &ctx)
.create_test_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &ctx)
.expect("Failed to create an empty timeline for dummy wal connection manager");
let timeline = timeline.initialize(&ctx).unwrap();
ConnectionManagerState {
id: TenantTimelineId {

View File

@@ -11,10 +11,12 @@ OBJS = \
pagestore_smgr.o \
relsize_cache.o \
walproposer.o \
walproposer_utils.o
walproposer_utils.o \
control_plane_connector.o
PG_CPPFLAGS = -I$(libpq_srcdir)
SHLIB_LINK_INTERNAL = $(libpq)
SHLIB_LINK = -lcurl
EXTENSION = neon
DATA = neon--1.0.sql

View File

@@ -0,0 +1,830 @@
/*-------------------------------------------------------------------------
*
* control_plane_connector.c
* Captures updates to roles/databases using ProcessUtility_hook and
* sends them to the control ProcessUtility_hook. The changes are sent
* via HTTP to the URL specified by the GUC neon.console_url when the
* transaction commits. Forwarding may be disabled temporarily by
* setting neon.forward_ddl to false.
*
* Currently, the transaction may abort AFTER
* changes have already been forwarded, and that case is not handled.
* Subtransactions are handled using a stack of hash tables, which
* accumulate changes. On subtransaction commit, the top of the stack
* is merged with the table below it.
*
* IDENTIFICATION
* contrib/neon/control_plane_connector.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "tcop/pquery.h"
#include "tcop/utility.h"
#include "access/xact.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
#include "commands/defrem.h"
#include "miscadmin.h"
#include "utils/acl.h"
#include "fmgr.h"
#include "utils/guc.h"
#include "port.h"
#include <curl/curl.h>
#include "utils/jsonb.h"
static ProcessUtility_hook_type PreviousProcessUtilityHook = NULL;
/* GUCs */
static char *ConsoleURL = NULL;
static bool ForwardDDL = true;
/* Curl structures for sending the HTTP requests */
static CURL * CurlHandle;
static struct curl_slist *ContentHeader = NULL;
/*
* CURL docs say that this buffer must exist until we call curl_easy_cleanup
* (which we never do), so we make this a static
*/
static char CurlErrorBuf[CURL_ERROR_SIZE];
typedef enum
{
Op_Set, /* An upsert: Either a creation or an alter */
Op_Delete,
} OpType;
typedef struct
{
char name[NAMEDATALEN];
Oid owner;
char old_name[NAMEDATALEN];
OpType type;
} DbEntry;
typedef struct
{
char name[NAMEDATALEN];
char old_name[NAMEDATALEN];
const char *password;
OpType type;
} RoleEntry;
/*
* We keep one of these for each subtransaction in a stack. When a subtransaction
* commits, we merge the top of the stack into the table below it. It is allocated in the
* subtransaction's context.
*/
typedef struct DdlHashTable
{
struct DdlHashTable *prev_table;
HTAB *db_table;
HTAB *role_table;
} DdlHashTable;
static DdlHashTable RootTable;
static DdlHashTable * CurrentDdlTable = &RootTable;
static void
PushKeyValue(JsonbParseState **state, char *key, char *value)
{
JsonbValue k,
v;
k.type = jbvString;
k.val.string.len = strlen(key);
k.val.string.val = key;
v.type = jbvString;
v.val.string.len = strlen(value);
v.val.string.val = value;
pushJsonbValue(state, WJB_KEY, &k);
pushJsonbValue(state, WJB_VALUE, &v);
}
static char *
ConstructDeltaMessage()
{
JsonbParseState *state = NULL;
pushJsonbValue(&state, WJB_BEGIN_OBJECT, NULL);
if (RootTable.db_table)
{
JsonbValue dbs;
dbs.type = jbvString;
dbs.val.string.val = "dbs";
dbs.val.string.len = strlen(dbs.val.string.val);
pushJsonbValue(&state, WJB_KEY, &dbs);
pushJsonbValue(&state, WJB_BEGIN_ARRAY, NULL);
HASH_SEQ_STATUS status;
DbEntry *entry;
hash_seq_init(&status, RootTable.db_table);
while ((entry = hash_seq_search(&status)) != NULL)
{
pushJsonbValue(&state, WJB_BEGIN_OBJECT, NULL);
PushKeyValue(&state, "op", entry->type == Op_Set ? "set" : "del");
PushKeyValue(&state, "name", entry->name);
if (entry->owner != InvalidOid)
{
PushKeyValue(&state, "owner", GetUserNameFromId(entry->owner, false));
}
if (entry->old_name[0] != '\0')
{
PushKeyValue(&state, "old_name", entry->old_name);
}
pushJsonbValue(&state, WJB_END_OBJECT, NULL);
}
pushJsonbValue(&state, WJB_END_ARRAY, NULL);
}
if (RootTable.role_table)
{
JsonbValue roles;
roles.type = jbvString;
roles.val.string.val = "roles";
roles.val.string.len = strlen(roles.val.string.val);
pushJsonbValue(&state, WJB_KEY, &roles);
pushJsonbValue(&state, WJB_BEGIN_ARRAY, NULL);
HASH_SEQ_STATUS status;
RoleEntry *entry;
hash_seq_init(&status, RootTable.role_table);
while ((entry = hash_seq_search(&status)) != NULL)
{
pushJsonbValue(&state, WJB_BEGIN_OBJECT, NULL);
PushKeyValue(&state, "op", entry->type == Op_Set ? "set" : "del");
PushKeyValue(&state, "name", entry->name);
if (entry->password)
{
PushKeyValue(&state, "password", (char *) entry->password);
}
if (entry->old_name[0] != '\0')
{
PushKeyValue(&state, "old_name", entry->old_name);
}
pushJsonbValue(&state, WJB_END_OBJECT, NULL);
}
pushJsonbValue(&state, WJB_END_ARRAY, NULL);
}
JsonbValue *result = pushJsonbValue(&state, WJB_END_OBJECT, NULL);
Jsonb *jsonb = JsonbValueToJsonb(result);
return JsonbToCString(NULL, &jsonb->root, 0 /* estimated_len */ );
}
#define ERROR_SIZE 1024
typedef struct
{
char str[ERROR_SIZE];
size_t size;
} ErrorString;
static size_t
ErrorWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
{
/* Docs say size is always 1 */
ErrorString *str = userdata;
size_t to_write = nmemb;
/* +1 for null terminator */
if (str->size + nmemb + 1 >= ERROR_SIZE)
to_write = ERROR_SIZE - str->size - 1;
/* Ignore everyrthing past the first ERROR_SIZE bytes */
if (to_write == 0)
return nmemb;
memcpy(str->str + str->size, ptr, to_write);
str->size += to_write;
str->str[str->size] = '\0';
return nmemb;
}
static void
SendDeltasToControlPlane()
{
if (!RootTable.db_table && !RootTable.role_table)
return;
if (!ConsoleURL)
{
elog(LOG, "ConsoleURL not set, skipping forwarding");
return;
}
if (!ForwardDDL)
return;
char *message = ConstructDeltaMessage();
ErrorString str = {};
curl_easy_setopt(CurlHandle, CURLOPT_CUSTOMREQUEST, "PATCH");
curl_easy_setopt(CurlHandle, CURLOPT_HTTPHEADER, ContentHeader);
curl_easy_setopt(CurlHandle, CURLOPT_POSTFIELDS, message);
curl_easy_setopt(CurlHandle, CURLOPT_URL, ConsoleURL);
curl_easy_setopt(CurlHandle, CURLOPT_ERRORBUFFER, CurlErrorBuf);
curl_easy_setopt(CurlHandle, CURLOPT_TIMEOUT, 3L /* seconds */ );
curl_easy_setopt(CurlHandle, CURLOPT_WRITEDATA, &str);
curl_easy_setopt(CurlHandle, CURLOPT_WRITEFUNCTION, ErrorWriteCallback);
const int num_retries = 5;
int curl_status;
for (int i = 0; i < num_retries; i++)
{
if ((curl_status = curl_easy_perform(CurlHandle)) == 0)
break;
elog(LOG, "Curl request failed on attempt %d: %s", i, CurlErrorBuf);
pg_usleep(1000 * 1000);
}
if (curl_status != 0)
{
elog(ERROR, "Failed to perform curl request: %s", CurlErrorBuf);
}
else
{
long response_code;
if (curl_easy_getinfo(CurlHandle, CURLINFO_RESPONSE_CODE, &response_code) != CURLE_UNKNOWN_OPTION)
{
bool error_exists = str.size != 0;
if (response_code != 200)
{
if (error_exists)
{
elog(ERROR,
"Received HTTP code %ld from control plane: %s",
response_code,
str.str);
}
else
{
elog(ERROR,
"Received HTTP code %ld from control plane",
response_code);
}
}
}
}
}
static void
InitDbTableIfNeeded()
{
if (!CurrentDdlTable->db_table)
{
HASHCTL db_ctl = {};
db_ctl.keysize = NAMEDATALEN;
db_ctl.entrysize = sizeof(DbEntry);
db_ctl.hcxt = CurTransactionContext;
CurrentDdlTable->db_table = hash_create(
"Dbs Created",
4,
&db_ctl,
HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
}
}
static void
InitRoleTableIfNeeded()
{
if (!CurrentDdlTable->role_table)
{
HASHCTL role_ctl = {};
role_ctl.keysize = NAMEDATALEN;
role_ctl.entrysize = sizeof(RoleEntry);
role_ctl.hcxt = CurTransactionContext;
CurrentDdlTable->role_table = hash_create(
"Roles Created",
4,
&role_ctl,
HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
}
}
static void
PushTable()
{
DdlHashTable *new_table = MemoryContextAlloc(CurTransactionContext, sizeof(DdlHashTable));
new_table->prev_table = CurrentDdlTable;
new_table->role_table = NULL;
new_table->db_table = NULL;
CurrentDdlTable = new_table;
}
static void
MergeTable()
{
DdlHashTable *old_table = CurrentDdlTable;
CurrentDdlTable = old_table->prev_table;
if (old_table->db_table)
{
InitDbTableIfNeeded();
DbEntry *entry;
HASH_SEQ_STATUS status;
hash_seq_init(&status, old_table->db_table);
while ((entry = hash_seq_search(&status)) != NULL)
{
DbEntry *to_write = hash_search(
CurrentDdlTable->db_table,
entry->name,
HASH_ENTER,
NULL);
to_write->type = entry->type;
if (entry->owner != InvalidOid)
to_write->owner = entry->owner;
strlcpy(to_write->old_name, entry->old_name, NAMEDATALEN);
if (entry->old_name[0] != '\0')
{
bool found_old = false;
DbEntry *old = hash_search(
CurrentDdlTable->db_table,
entry->old_name,
HASH_FIND,
&found_old);
if (found_old)
{
if (old->old_name[0] != '\0')
strlcpy(to_write->old_name, old->old_name, NAMEDATALEN);
else
strlcpy(to_write->old_name, entry->old_name, NAMEDATALEN);
hash_search(
CurrentDdlTable->db_table,
entry->old_name,
HASH_REMOVE,
NULL);
}
}
}
hash_destroy(old_table->db_table);
}
if (old_table->role_table)
{
InitRoleTableIfNeeded();
RoleEntry *entry;
HASH_SEQ_STATUS status;
hash_seq_init(&status, old_table->role_table);
while ((entry = hash_seq_search(&status)) != NULL)
{
RoleEntry *to_write = hash_search(
CurrentDdlTable->role_table,
entry->name,
HASH_ENTER,
NULL);
to_write->type = entry->type;
if (entry->password)
to_write->password = entry->password;
strlcpy(to_write->old_name, entry->old_name, NAMEDATALEN);
if (entry->old_name[0] != '\0')
{
bool found_old = false;
RoleEntry *old = hash_search(
CurrentDdlTable->role_table,
entry->old_name,
HASH_FIND,
&found_old);
if (found_old)
{
if (old->old_name[0] != '\0')
strlcpy(to_write->old_name, old->old_name, NAMEDATALEN);
else
strlcpy(to_write->old_name, entry->old_name, NAMEDATALEN);
hash_search(CurrentDdlTable->role_table,
entry->old_name,
HASH_REMOVE,
NULL);
}
}
}
hash_destroy(old_table->role_table);
}
}
static void
PopTable()
{
/*
* Current table gets freed because it is allocated in aborted
* subtransaction's memory context.
*/
CurrentDdlTable = CurrentDdlTable->prev_table;
}
static void
NeonSubXactCallback(
SubXactEvent event,
SubTransactionId mySubid,
SubTransactionId parentSubid,
void *arg)
{
switch (event)
{
case SUBXACT_EVENT_START_SUB:
return PushTable();
case SUBXACT_EVENT_COMMIT_SUB:
return MergeTable();
case SUBXACT_EVENT_ABORT_SUB:
return PopTable();
default:
return;
}
}
static void
NeonXactCallback(XactEvent event, void *arg)
{
if (event == XACT_EVENT_PRE_COMMIT || event == XACT_EVENT_PARALLEL_PRE_COMMIT)
{
SendDeltasToControlPlane();
}
RootTable.role_table = NULL;
RootTable.db_table = NULL;
Assert(CurrentDdlTable == &RootTable);
}
static void
HandleCreateDb(CreatedbStmt *stmt)
{
InitDbTableIfNeeded();
DefElem *downer = NULL;
ListCell *option;
foreach(option, stmt->options)
{
DefElem *defel = lfirst(option);
if (strcmp(defel->defname, "owner") == 0)
downer = defel;
}
bool found = false;
DbEntry *entry = hash_search(
CurrentDdlTable->db_table,
stmt->dbname,
HASH_ENTER,
&found);
if (!found)
memset(entry->old_name, 0, sizeof(entry->old_name));
entry->type = Op_Set;
if (downer && downer->arg)
entry->owner = get_role_oid(defGetString(downer), false);
else
entry->owner = GetUserId();
}
static void
HandleAlterOwner(AlterOwnerStmt *stmt)
{
if (stmt->objectType != OBJECT_DATABASE)
return;
InitDbTableIfNeeded();
const char *name = strVal(stmt->object);
bool found = false;
DbEntry *entry = hash_search(
CurrentDdlTable->db_table,
name,
HASH_ENTER,
&found);
if (!found)
memset(entry->old_name, 0, sizeof(entry->old_name));
entry->owner = get_role_oid(get_rolespec_name(stmt->newowner), false);
entry->type = Op_Set;
}
static void
HandleDbRename(RenameStmt *stmt)
{
Assert(stmt->renameType == OBJECT_DATABASE);
InitDbTableIfNeeded();
bool found = false;
DbEntry *entry = hash_search(
CurrentDdlTable->db_table,
stmt->subname,
HASH_FIND,
&found);
DbEntry *entry_for_new_name = hash_search(
CurrentDdlTable->db_table,
stmt->newname,
HASH_ENTER,
NULL);
entry_for_new_name->type = Op_Set;
if (found)
{
if (entry->old_name[0] != '\0')
strlcpy(entry_for_new_name->old_name, entry->old_name, NAMEDATALEN);
else
strlcpy(entry_for_new_name->old_name, entry->name, NAMEDATALEN);
entry_for_new_name->owner = entry->owner;
hash_search(
CurrentDdlTable->db_table,
stmt->subname,
HASH_REMOVE,
NULL);
}
else
{
strlcpy(entry_for_new_name->old_name, stmt->subname, NAMEDATALEN);
entry_for_new_name->owner = InvalidOid;
}
}
static void
HandleDropDb(DropdbStmt *stmt)
{
InitDbTableIfNeeded();
bool found = false;
DbEntry *entry = hash_search(
CurrentDdlTable->db_table,
stmt->dbname,
HASH_ENTER,
&found);
entry->type = Op_Delete;
entry->owner = InvalidOid;
if (!found)
memset(entry->old_name, 0, sizeof(entry->old_name));
}
static void
HandleCreateRole(CreateRoleStmt *stmt)
{
InitRoleTableIfNeeded();
bool found = false;
RoleEntry *entry = hash_search(
CurrentDdlTable->role_table,
stmt->role,
HASH_ENTER,
&found);
DefElem *dpass = NULL;
ListCell *option;
foreach(option, stmt->options)
{
DefElem *defel = lfirst(option);
if (strcmp(defel->defname, "password") == 0)
dpass = defel;
}
if (!found)
memset(entry->old_name, 0, sizeof(entry->old_name));
if (dpass && dpass->arg)
entry->password = MemoryContextStrdup(CurTransactionContext, strVal(dpass->arg));
else
entry->password = NULL;
entry->type = Op_Set;
}
static void
HandleAlterRole(AlterRoleStmt *stmt)
{
InitRoleTableIfNeeded();
DefElem *dpass = NULL;
ListCell *option;
foreach(option, stmt->options)
{
DefElem *defel = lfirst(option);
if (strcmp(defel->defname, "password") == 0)
dpass = defel;
}
/* We only care about updates to the password */
if (!dpass)
return;
bool found = false;
RoleEntry *entry = hash_search(
CurrentDdlTable->role_table,
stmt->role->rolename,
HASH_ENTER,
&found);
if (!found)
memset(entry->old_name, 0, sizeof(entry->old_name));
if (dpass->arg)
entry->password = MemoryContextStrdup(CurTransactionContext, strVal(dpass->arg));
else
entry->password = NULL;
entry->type = Op_Set;
}
static void
HandleRoleRename(RenameStmt *stmt)
{
InitRoleTableIfNeeded();
Assert(stmt->renameType == OBJECT_ROLE);
bool found = false;
RoleEntry *entry = hash_search(
CurrentDdlTable->role_table,
stmt->subname,
HASH_FIND,
&found);
RoleEntry *entry_for_new_name = hash_search(
CurrentDdlTable->role_table,
stmt->newname,
HASH_ENTER,
NULL);
entry_for_new_name->type = Op_Set;
if (found)
{
if (entry->old_name[0] != '\0')
strlcpy(entry_for_new_name->old_name, entry->old_name, NAMEDATALEN);
else
strlcpy(entry_for_new_name->old_name, entry->name, NAMEDATALEN);
entry_for_new_name->password = entry->password;
hash_search(
CurrentDdlTable->role_table,
entry->name,
HASH_REMOVE,
NULL);
}
else
{
strlcpy(entry_for_new_name->old_name, stmt->subname, NAMEDATALEN);
entry_for_new_name->password = NULL;
}
}
static void
HandleDropRole(DropRoleStmt *stmt)
{
InitRoleTableIfNeeded();
ListCell *item;
foreach(item, stmt->roles)
{
RoleSpec *spec = lfirst(item);
bool found = false;
RoleEntry *entry = hash_search(
CurrentDdlTable->role_table,
spec->rolename,
HASH_ENTER,
&found);
entry->type = Op_Delete;
entry->password = NULL;
if (!found)
memset(entry->old_name, 0, sizeof(entry));
}
}
static void
HandleRename(RenameStmt *stmt)
{
if (stmt->renameType == OBJECT_DATABASE)
return HandleDbRename(stmt);
else if (stmt->renameType == OBJECT_ROLE)
return HandleRoleRename(stmt);
}
static void
NeonProcessUtility(
PlannedStmt *pstmt,
const char *queryString,
bool readOnlyTree,
ProcessUtilityContext context,
ParamListInfo params,
QueryEnvironment *queryEnv,
DestReceiver *dest,
QueryCompletion *qc)
{
Node *parseTree = pstmt->utilityStmt;
switch (nodeTag(parseTree))
{
case T_CreatedbStmt:
HandleCreateDb(castNode(CreatedbStmt, parseTree));
break;
case T_AlterOwnerStmt:
HandleAlterOwner(castNode(AlterOwnerStmt, parseTree));
break;
case T_RenameStmt:
HandleRename(castNode(RenameStmt, parseTree));
break;
case T_DropdbStmt:
HandleDropDb(castNode(DropdbStmt, parseTree));
break;
case T_CreateRoleStmt:
HandleCreateRole(castNode(CreateRoleStmt, parseTree));
break;
case T_AlterRoleStmt:
HandleAlterRole(castNode(AlterRoleStmt, parseTree));
break;
case T_DropRoleStmt:
HandleDropRole(castNode(DropRoleStmt, parseTree));
break;
default:
break;
}
if (PreviousProcessUtilityHook)
{
PreviousProcessUtilityHook(
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
}
else
{
standard_ProcessUtility(
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
}
}
extern void
InitControlPlaneConnector()
{
PreviousProcessUtilityHook = ProcessUtility_hook;
ProcessUtility_hook = NeonProcessUtility;
RegisterXactCallback(NeonXactCallback, NULL);
RegisterSubXactCallback(NeonSubXactCallback, NULL);
DefineCustomStringVariable(
"neon.console_url",
"URL of the Neon Console, which will be forwarded changes to dbs and roles",
NULL,
&ConsoleURL,
NULL,
PGC_POSTMASTER,
0,
NULL,
NULL,
NULL);
DefineCustomBoolVariable(
"neon.forward_ddl",
"Controls whether to forward DDL to the control plane",
NULL,
&ForwardDDL,
true,
PGC_SUSET,
0,
NULL,
NULL,
NULL);
const char *jwt_token = getenv("NEON_CONTROL_PLANE_TOKEN");
if (!jwt_token)
{
elog(LOG, "Missing NEON_CONTROL_PLANE_TOKEN environment variable, forwarding will not be authenticated");
}
if (curl_global_init(CURL_GLOBAL_DEFAULT))
{
elog(ERROR, "Failed to initialize curl");
}
if ((CurlHandle = curl_easy_init()) == NULL)
{
elog(ERROR, "Failed to initialize curl handle");
}
if ((ContentHeader = curl_slist_append(ContentHeader, "Content-Type: application/json")) == NULL)
{
elog(ERROR, "Failed to initialize content header");
}
if (jwt_token)
{
char auth_header[8192];
snprintf(auth_header, sizeof(auth_header), "Authorization: Bearer %s", jwt_token);
if ((ContentHeader = curl_slist_append(ContentHeader, auth_header)) == NULL)
{
elog(ERROR, "Failed to initialize authorization header");
}
}
}

View File

@@ -0,0 +1,6 @@
#ifndef CONTROL_PLANE_CONNECTOR_H
#define CONTROL_PLANE_CONNECTOR_H
void InitControlPlaneConnector();
#endif

View File

@@ -25,6 +25,7 @@
#include "neon.h"
#include "walproposer.h"
#include "pagestore_client.h"
#include "control_plane_connector.h"
PG_MODULE_MAGIC;
void _PG_init(void);
@@ -34,7 +35,11 @@ _PG_init(void)
{
pg_init_libpagestore();
pg_init_walproposer();
InitControlPlaneConnector();
// Important: This must happen after other parts of the extension
// are loaded, otherwise any settings to GUCs that were set before
// the extension was loaded will be removed.
EmitWarningsOnPlaceholders("neon");
}

313
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry and should not be changed by hand.
# This file is automatically @generated by Poetry 1.4.2 and should not be changed by hand.
[[package]]
name = "aiohttp"
@@ -79,30 +79,30 @@ sa = ["sqlalchemy[postgresql-psycopg2binary] (>=1.3,<1.5)"]
[[package]]
name = "allure-pytest"
version = "2.13.1"
version = "2.13.2"
description = "Allure pytest integration"
category = "main"
optional = false
python-versions = "*"
files = [
{file = "allure-pytest-2.13.1.tar.gz", hash = "sha256:68d69456eeb65af4061ec06a80bc941163b0616e8216554d36b070a6bf070e08"},
{file = "allure_pytest-2.13.1-py3-none-any.whl", hash = "sha256:a8de2fc3b3effe2d8f98801646920de3f055b779710f4c806dbee7c613c24633"},
{file = "allure-pytest-2.13.2.tar.gz", hash = "sha256:22243159e8ec81ce2b5254b4013802198821b1b42f118f69d4a289396607c7b3"},
{file = "allure_pytest-2.13.2-py3-none-any.whl", hash = "sha256:17de9dbee7f61c8e66a5b5e818b00e419dbcea44cb55c24319401ba813220690"},
]
[package.dependencies]
allure-python-commons = "2.13.1"
allure-python-commons = "2.13.2"
pytest = ">=4.5.0"
[[package]]
name = "allure-python-commons"
version = "2.13.1"
version = "2.13.2"
description = "Common module for integrate allure with python-based frameworks"
category = "main"
optional = false
python-versions = ">=3.6"
files = [
{file = "allure-python-commons-2.13.1.tar.gz", hash = "sha256:3fc13e1da8ebb23f9ab5c9c72ad04595023cdd5078dbb8604939997faebed5cb"},
{file = "allure_python_commons-2.13.1-py3-none-any.whl", hash = "sha256:d08e04867bddf44fef55def3d67f4bc25af58a1bf9fcffcf4ec3331f7f2ef0d0"},
{file = "allure-python-commons-2.13.2.tar.gz", hash = "sha256:8a03681330231b1deadd86b97ff68841c6591320114ae638570f1ed60d7a2033"},
{file = "allure_python_commons-2.13.2-py3-none-any.whl", hash = "sha256:2bb3646ec3fbf5b36d178a5e735002bc130ae9f9ba80f080af97d368ba375051"},
]
[package.dependencies]
@@ -172,17 +172,6 @@ dev = ["Cython (>=0.29.24,<0.30.0)", "Sphinx (>=4.1.2,<4.2.0)", "flake8 (>=5.0.4
docs = ["Sphinx (>=4.1.2,<4.2.0)", "sphinx-rtd-theme (>=0.5.2,<0.6.0)", "sphinxcontrib-asyncio (>=0.3.0,<0.4.0)"]
test = ["flake8 (>=5.0.4,<5.1.0)", "uvloop (>=0.15.3)"]
[[package]]
name = "atomicwrites"
version = "1.4.1"
description = "Atomic file writes."
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
files = [
{file = "atomicwrites-1.4.1.tar.gz", hash = "sha256:81b2c9071a49367a7f770170e5eec8cb66567cfbbc8c73d20ce5ca4a8d71cf11"},
]
[[package]]
name = "attrs"
version = "21.4.0"
@@ -239,49 +228,49 @@ wrapt = "*"
[[package]]
name = "backoff"
version = "1.11.1"
version = "2.2.1"
description = "Function decoration for backoff and retry"
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
python-versions = ">=3.7,<4.0"
files = [
{file = "backoff-1.11.1-py2.py3-none-any.whl", hash = "sha256:61928f8fa48d52e4faa81875eecf308eccfb1016b018bb6bd21e05b5d90a96c5"},
{file = "backoff-1.11.1.tar.gz", hash = "sha256:ccb962a2378418c667b3c979b504fdeb7d9e0d29c0579e3b13b86467177728cb"},
{file = "backoff-2.2.1-py3-none-any.whl", hash = "sha256:63579f9a0628e06278f7e47b7d7d5b6ce20dc65c5e96a6f3ca99a6adca0396e8"},
{file = "backoff-2.2.1.tar.gz", hash = "sha256:03f829f5bb1923180821643f8753b0502c3b682293992485b0eef2807afa5cba"},
]
[[package]]
name = "black"
version = "23.1.0"
version = "23.3.0"
description = "The uncompromising code formatter."
category = "dev"
optional = false
python-versions = ">=3.7"
files = [
{file = "black-23.1.0-cp310-cp310-macosx_10_16_arm64.whl", hash = "sha256:b6a92a41ee34b883b359998f0c8e6eb8e99803aa8bf3123bf2b2e6fec505a221"},
{file = "black-23.1.0-cp310-cp310-macosx_10_16_universal2.whl", hash = "sha256:57c18c5165c1dbe291d5306e53fb3988122890e57bd9b3dcb75f967f13411a26"},
{file = "black-23.1.0-cp310-cp310-macosx_10_16_x86_64.whl", hash = "sha256:9880d7d419bb7e709b37e28deb5e68a49227713b623c72b2b931028ea65f619b"},
{file = "black-23.1.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e6663f91b6feca5d06f2ccd49a10f254f9298cc1f7f49c46e498a0771b507104"},
{file = "black-23.1.0-cp310-cp310-win_amd64.whl", hash = "sha256:9afd3f493666a0cd8f8df9a0200c6359ac53940cbde049dcb1a7eb6ee2dd7074"},
{file = "black-23.1.0-cp311-cp311-macosx_10_16_arm64.whl", hash = "sha256:bfffba28dc52a58f04492181392ee380e95262af14ee01d4bc7bb1b1c6ca8d27"},
{file = "black-23.1.0-cp311-cp311-macosx_10_16_universal2.whl", hash = "sha256:c1c476bc7b7d021321e7d93dc2cbd78ce103b84d5a4cf97ed535fbc0d6660648"},
{file = "black-23.1.0-cp311-cp311-macosx_10_16_x86_64.whl", hash = "sha256:382998821f58e5c8238d3166c492139573325287820963d2f7de4d518bd76958"},
{file = "black-23.1.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2bf649fda611c8550ca9d7592b69f0637218c2369b7744694c5e4902873b2f3a"},
{file = "black-23.1.0-cp311-cp311-win_amd64.whl", hash = "sha256:121ca7f10b4a01fd99951234abdbd97728e1240be89fde18480ffac16503d481"},
{file = "black-23.1.0-cp37-cp37m-macosx_10_16_x86_64.whl", hash = "sha256:a8471939da5e824b891b25751955be52ee7f8a30a916d570a5ba8e0f2eb2ecad"},
{file = "black-23.1.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8178318cb74f98bc571eef19068f6ab5613b3e59d4f47771582f04e175570ed8"},
{file = "black-23.1.0-cp37-cp37m-win_amd64.whl", hash = "sha256:a436e7881d33acaf2536c46a454bb964a50eff59b21b51c6ccf5a40601fbef24"},
{file = "black-23.1.0-cp38-cp38-macosx_10_16_arm64.whl", hash = "sha256:a59db0a2094d2259c554676403fa2fac3473ccf1354c1c63eccf7ae65aac8ab6"},
{file = "black-23.1.0-cp38-cp38-macosx_10_16_universal2.whl", hash = "sha256:0052dba51dec07ed029ed61b18183942043e00008ec65d5028814afaab9a22fd"},
{file = "black-23.1.0-cp38-cp38-macosx_10_16_x86_64.whl", hash = "sha256:49f7b39e30f326a34b5c9a4213213a6b221d7ae9d58ec70df1c4a307cf2a1580"},
{file = "black-23.1.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:162e37d49e93bd6eb6f1afc3e17a3d23a823042530c37c3c42eeeaf026f38468"},
{file = "black-23.1.0-cp38-cp38-win_amd64.whl", hash = "sha256:8b70eb40a78dfac24842458476135f9b99ab952dd3f2dab738c1881a9b38b753"},
{file = "black-23.1.0-cp39-cp39-macosx_10_16_arm64.whl", hash = "sha256:a29650759a6a0944e7cca036674655c2f0f63806ddecc45ed40b7b8aa314b651"},
{file = "black-23.1.0-cp39-cp39-macosx_10_16_universal2.whl", hash = "sha256:bb460c8561c8c1bec7824ecbc3ce085eb50005883a6203dcfb0122e95797ee06"},
{file = "black-23.1.0-cp39-cp39-macosx_10_16_x86_64.whl", hash = "sha256:c91dfc2c2a4e50df0026f88d2215e166616e0c80e86004d0003ece0488db2739"},
{file = "black-23.1.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2a951cc83ab535d248c89f300eccbd625e80ab880fbcfb5ac8afb5f01a258ac9"},
{file = "black-23.1.0-cp39-cp39-win_amd64.whl", hash = "sha256:0680d4380db3719ebcfb2613f34e86c8e6d15ffeabcf8ec59355c5e7b85bb555"},
{file = "black-23.1.0-py3-none-any.whl", hash = "sha256:7a0f701d314cfa0896b9001df70a530eb2472babb76086344e688829efd97d32"},
{file = "black-23.1.0.tar.gz", hash = "sha256:b0bd97bea8903f5a2ba7219257a44e3f1f9d00073d6cc1add68f0beec69692ac"},
{file = "black-23.3.0-cp310-cp310-macosx_10_16_arm64.whl", hash = "sha256:0945e13506be58bf7db93ee5853243eb368ace1c08a24c65ce108986eac65915"},
{file = "black-23.3.0-cp310-cp310-macosx_10_16_universal2.whl", hash = "sha256:67de8d0c209eb5b330cce2469503de11bca4085880d62f1628bd9972cc3366b9"},
{file = "black-23.3.0-cp310-cp310-macosx_10_16_x86_64.whl", hash = "sha256:7c3eb7cea23904399866c55826b31c1f55bbcd3890ce22ff70466b907b6775c2"},
{file = "black-23.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:32daa9783106c28815d05b724238e30718f34155653d4d6e125dc7daec8e260c"},
{file = "black-23.3.0-cp310-cp310-win_amd64.whl", hash = "sha256:35d1381d7a22cc5b2be2f72c7dfdae4072a3336060635718cc7e1ede24221d6c"},
{file = "black-23.3.0-cp311-cp311-macosx_10_16_arm64.whl", hash = "sha256:a8a968125d0a6a404842fa1bf0b349a568634f856aa08ffaff40ae0dfa52e7c6"},
{file = "black-23.3.0-cp311-cp311-macosx_10_16_universal2.whl", hash = "sha256:c7ab5790333c448903c4b721b59c0d80b11fe5e9803d8703e84dcb8da56fec1b"},
{file = "black-23.3.0-cp311-cp311-macosx_10_16_x86_64.whl", hash = "sha256:a6f6886c9869d4daae2d1715ce34a19bbc4b95006d20ed785ca00fa03cba312d"},
{file = "black-23.3.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6f3c333ea1dd6771b2d3777482429864f8e258899f6ff05826c3a4fcc5ce3f70"},
{file = "black-23.3.0-cp311-cp311-win_amd64.whl", hash = "sha256:11c410f71b876f961d1de77b9699ad19f939094c3a677323f43d7a29855fe326"},
{file = "black-23.3.0-cp37-cp37m-macosx_10_16_x86_64.whl", hash = "sha256:1d06691f1eb8de91cd1b322f21e3bfc9efe0c7ca1f0e1eb1db44ea367dff656b"},
{file = "black-23.3.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:50cb33cac881766a5cd9913e10ff75b1e8eb71babf4c7104f2e9c52da1fb7de2"},
{file = "black-23.3.0-cp37-cp37m-win_amd64.whl", hash = "sha256:e114420bf26b90d4b9daa597351337762b63039752bdf72bf361364c1aa05925"},
{file = "black-23.3.0-cp38-cp38-macosx_10_16_arm64.whl", hash = "sha256:48f9d345675bb7fbc3dd85821b12487e1b9a75242028adad0333ce36ed2a6d27"},
{file = "black-23.3.0-cp38-cp38-macosx_10_16_universal2.whl", hash = "sha256:714290490c18fb0126baa0fca0a54ee795f7502b44177e1ce7624ba1c00f2331"},
{file = "black-23.3.0-cp38-cp38-macosx_10_16_x86_64.whl", hash = "sha256:064101748afa12ad2291c2b91c960be28b817c0c7eaa35bec09cc63aa56493c5"},
{file = "black-23.3.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:562bd3a70495facf56814293149e51aa1be9931567474993c7942ff7d3533961"},
{file = "black-23.3.0-cp38-cp38-win_amd64.whl", hash = "sha256:e198cf27888ad6f4ff331ca1c48ffc038848ea9f031a3b40ba36aced7e22f2c8"},
{file = "black-23.3.0-cp39-cp39-macosx_10_16_arm64.whl", hash = "sha256:3238f2aacf827d18d26db07524e44741233ae09a584273aa059066d644ca7b30"},
{file = "black-23.3.0-cp39-cp39-macosx_10_16_universal2.whl", hash = "sha256:f0bd2f4a58d6666500542b26354978218a9babcdc972722f4bf90779524515f3"},
{file = "black-23.3.0-cp39-cp39-macosx_10_16_x86_64.whl", hash = "sha256:92c543f6854c28a3c7f39f4d9b7694f9a6eb9d3c5e2ece488c327b6e7ea9b266"},
{file = "black-23.3.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3a150542a204124ed00683f0db1f5cf1c2aaaa9cc3495b7a3b5976fb136090ab"},
{file = "black-23.3.0-cp39-cp39-win_amd64.whl", hash = "sha256:6b39abdfb402002b8a7d030ccc85cf5afff64ee90fa4c5aebc531e3ad0175ddb"},
{file = "black-23.3.0-py3-none-any.whl", hash = "sha256:ec751418022185b0c1bb7d7736e6933d40bbb14c14a0abcf9123d1b159f98dd4"},
{file = "black-23.3.0.tar.gz", hash = "sha256:1c7b8d606e728a41ea1ccbd7264677e494e87cf630e399262ced92d4a8dac940"},
]
[package.dependencies]
@@ -951,6 +940,21 @@ six = ">=1.9.0"
gmpy = ["gmpy"]
gmpy2 = ["gmpy2"]
[[package]]
name = "exceptiongroup"
version = "1.1.1"
description = "Backport of PEP 654 (exception groups)"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
{file = "exceptiongroup-1.1.1-py3-none-any.whl", hash = "sha256:232c37c63e4f682982c8b6459f33a8981039e5fb8756b2074364e5055c498c9e"},
{file = "exceptiongroup-1.1.1.tar.gz", hash = "sha256:d484c3090ba2889ae2928419117447a14daf3c1231d5e30d0aae34f354f01785"},
]
[package.extras]
test = ["pytest (>=6)"]
[[package]]
name = "execnet"
version = "1.9.0"
@@ -1410,38 +1414,38 @@ files = [
[[package]]
name = "mypy"
version = "1.1.1"
version = "1.3.0"
description = "Optional static typing for Python"
category = "dev"
optional = false
python-versions = ">=3.7"
files = [
{file = "mypy-1.1.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:39c7119335be05630611ee798cc982623b9e8f0cff04a0b48dfc26100e0b97af"},
{file = "mypy-1.1.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:61bf08362e93b6b12fad3eab68c4ea903a077b87c90ac06c11e3d7a09b56b9c1"},
{file = "mypy-1.1.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dbb19c9f662e41e474e0cff502b7064a7edc6764f5262b6cd91d698163196799"},
{file = "mypy-1.1.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:315ac73cc1cce4771c27d426b7ea558fb4e2836f89cb0296cbe056894e3a1f78"},
{file = "mypy-1.1.1-cp310-cp310-win_amd64.whl", hash = "sha256:5cb14ff9919b7df3538590fc4d4c49a0f84392237cbf5f7a816b4161c061829e"},
{file = "mypy-1.1.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:26cdd6a22b9b40b2fd71881a8a4f34b4d7914c679f154f43385ca878a8297389"},
{file = "mypy-1.1.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:5b5f81b40d94c785f288948c16e1f2da37203c6006546c5d947aab6f90aefef2"},
{file = "mypy-1.1.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:21b437be1c02712a605591e1ed1d858aba681757a1e55fe678a15c2244cd68a5"},
{file = "mypy-1.1.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:d809f88734f44a0d44959d795b1e6f64b2bbe0ea4d9cc4776aa588bb4229fc1c"},
{file = "mypy-1.1.1-cp311-cp311-win_amd64.whl", hash = "sha256:a380c041db500e1410bb5b16b3c1c35e61e773a5c3517926b81dfdab7582be54"},
{file = "mypy-1.1.1-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:b7c7b708fe9a871a96626d61912e3f4ddd365bf7f39128362bc50cbd74a634d5"},
{file = "mypy-1.1.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c1c10fa12df1232c936830839e2e935d090fc9ee315744ac33b8a32216b93707"},
{file = "mypy-1.1.1-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:0a28a76785bf57655a8ea5eb0540a15b0e781c807b5aa798bd463779988fa1d5"},
{file = "mypy-1.1.1-cp37-cp37m-win_amd64.whl", hash = "sha256:ef6a01e563ec6a4940784c574d33f6ac1943864634517984471642908b30b6f7"},
{file = "mypy-1.1.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:d64c28e03ce40d5303450f547e07418c64c241669ab20610f273c9e6290b4b0b"},
{file = "mypy-1.1.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:64cc3afb3e9e71a79d06e3ed24bb508a6d66f782aff7e56f628bf35ba2e0ba51"},
{file = "mypy-1.1.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ce61663faf7a8e5ec6f456857bfbcec2901fbdb3ad958b778403f63b9e606a1b"},
{file = "mypy-1.1.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:2b0c373d071593deefbcdd87ec8db91ea13bd8f1328d44947e88beae21e8d5e9"},
{file = "mypy-1.1.1-cp38-cp38-win_amd64.whl", hash = "sha256:2888ce4fe5aae5a673386fa232473014056967f3904f5abfcf6367b5af1f612a"},
{file = "mypy-1.1.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:19ba15f9627a5723e522d007fe708007bae52b93faab00f95d72f03e1afa9598"},
{file = "mypy-1.1.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:59bbd71e5c58eed2e992ce6523180e03c221dcd92b52f0e792f291d67b15a71c"},
{file = "mypy-1.1.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9401e33814cec6aec8c03a9548e9385e0e228fc1b8b0a37b9ea21038e64cdd8a"},
{file = "mypy-1.1.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:4b398d8b1f4fba0e3c6463e02f8ad3346f71956b92287af22c9b12c3ec965a9f"},
{file = "mypy-1.1.1-cp39-cp39-win_amd64.whl", hash = "sha256:69b35d1dcb5707382810765ed34da9db47e7f95b3528334a3c999b0c90fe523f"},
{file = "mypy-1.1.1-py3-none-any.whl", hash = "sha256:4e4e8b362cdf99ba00c2b218036002bdcdf1e0de085cdb296a49df03fb31dfc4"},
{file = "mypy-1.1.1.tar.gz", hash = "sha256:ae9ceae0f5b9059f33dbc62dea087e942c0ccab4b7a003719cb70f9b8abfa32f"},
{file = "mypy-1.3.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:c1eb485cea53f4f5284e5baf92902cd0088b24984f4209e25981cc359d64448d"},
{file = "mypy-1.3.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:4c99c3ecf223cf2952638da9cd82793d8f3c0c5fa8b6ae2b2d9ed1e1ff51ba85"},
{file = "mypy-1.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:550a8b3a19bb6589679a7c3c31f64312e7ff482a816c96e0cecec9ad3a7564dd"},
{file = "mypy-1.3.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:cbc07246253b9e3d7d74c9ff948cd0fd7a71afcc2b77c7f0a59c26e9395cb152"},
{file = "mypy-1.3.0-cp310-cp310-win_amd64.whl", hash = "sha256:a22435632710a4fcf8acf86cbd0d69f68ac389a3892cb23fbad176d1cddaf228"},
{file = "mypy-1.3.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:6e33bb8b2613614a33dff70565f4c803f889ebd2f859466e42b46e1df76018dd"},
{file = "mypy-1.3.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:7d23370d2a6b7a71dc65d1266f9a34e4cde9e8e21511322415db4b26f46f6b8c"},
{file = "mypy-1.3.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:658fe7b674769a0770d4b26cb4d6f005e88a442fe82446f020be8e5f5efb2fae"},
{file = "mypy-1.3.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:6e42d29e324cdda61daaec2336c42512e59c7c375340bd202efa1fe0f7b8f8ca"},
{file = "mypy-1.3.0-cp311-cp311-win_amd64.whl", hash = "sha256:d0b6c62206e04061e27009481cb0ec966f7d6172b5b936f3ead3d74f29fe3dcf"},
{file = "mypy-1.3.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:76ec771e2342f1b558c36d49900dfe81d140361dd0d2df6cd71b3db1be155409"},
{file = "mypy-1.3.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ebc95f8386314272bbc817026f8ce8f4f0d2ef7ae44f947c4664efac9adec929"},
{file = "mypy-1.3.0-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:faff86aa10c1aa4a10e1a301de160f3d8fc8703b88c7e98de46b531ff1276a9a"},
{file = "mypy-1.3.0-cp37-cp37m-win_amd64.whl", hash = "sha256:8c5979d0deb27e0f4479bee18ea0f83732a893e81b78e62e2dda3e7e518c92ee"},
{file = "mypy-1.3.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:c5d2cc54175bab47011b09688b418db71403aefad07cbcd62d44010543fc143f"},
{file = "mypy-1.3.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:87df44954c31d86df96c8bd6e80dfcd773473e877ac6176a8e29898bfb3501cb"},
{file = "mypy-1.3.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:473117e310febe632ddf10e745a355714e771ffe534f06db40702775056614c4"},
{file = "mypy-1.3.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:74bc9b6e0e79808bf8678d7678b2ae3736ea72d56eede3820bd3849823e7f305"},
{file = "mypy-1.3.0-cp38-cp38-win_amd64.whl", hash = "sha256:44797d031a41516fcf5cbfa652265bb994e53e51994c1bd649ffcd0c3a7eccbf"},
{file = "mypy-1.3.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:ddae0f39ca146972ff6bb4399f3b2943884a774b8771ea0a8f50e971f5ea5ba8"},
{file = "mypy-1.3.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:1c4c42c60a8103ead4c1c060ac3cdd3ff01e18fddce6f1016e08939647a0e703"},
{file = "mypy-1.3.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e86c2c6852f62f8f2b24cb7a613ebe8e0c7dc1402c61d36a609174f63e0ff017"},
{file = "mypy-1.3.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:f9dca1e257d4cc129517779226753dbefb4f2266c4eaad610fc15c6a7e14283e"},
{file = "mypy-1.3.0-cp39-cp39-win_amd64.whl", hash = "sha256:95d8d31a7713510685b05fbb18d6ac287a56c8f6554d88c19e73f724a445448a"},
{file = "mypy-1.3.0-py3-none-any.whl", hash = "sha256:a8763e72d5d9574d45ce5881962bc8e9046bf7b375b0abf031f3e6811732a897"},
{file = "mypy-1.3.0.tar.gz", hash = "sha256:e1f4d16e296f5135624b34e8fb741eb0eadedca90862405b1f1fde2040b9bd11"},
]
[package.dependencies]
@@ -1721,18 +1725,6 @@ files = [
{file = "psycopg2_binary-2.9.3-cp39-cp39-win_amd64.whl", hash = "sha256:accfe7e982411da3178ec690baaceaad3c278652998b2c45828aaac66cd8285f"},
]
[[package]]
name = "py"
version = "1.11.0"
description = "library with cross-python path, ini-parsing, io, code, log facilities"
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
files = [
{file = "py-1.11.0-py2.py3-none-any.whl", hash = "sha256:607c53218732647dff4acdfcd50cb62615cedf612e72d1724fb1a0cc6405b378"},
{file = "py-1.11.0.tar.gz", hash = "sha256:51c75c4126074b472f746a24399ad32f6053d1b34b68d2fa41e558e6f4a98719"},
]
[[package]]
name = "pyasn1"
version = "0.4.8"
@@ -1841,57 +1833,56 @@ files = [
[[package]]
name = "pytest"
version = "6.2.5"
version = "7.3.1"
description = "pytest: simple powerful testing with Python"
category = "main"
optional = false
python-versions = ">=3.6"
python-versions = ">=3.7"
files = [
{file = "pytest-6.2.5-py3-none-any.whl", hash = "sha256:7310f8d27bc79ced999e760ca304d69f6ba6c6649c0b60fb0e04a4a77cacc134"},
{file = "pytest-6.2.5.tar.gz", hash = "sha256:131b36680866a76e6781d13f101efb86cf674ebb9762eb70d3082b6f29889e89"},
{file = "pytest-7.3.1-py3-none-any.whl", hash = "sha256:3799fa815351fea3a5e96ac7e503a96fa51cc9942c3753cda7651b93c1cfa362"},
{file = "pytest-7.3.1.tar.gz", hash = "sha256:434afafd78b1d78ed0addf160ad2b77a30d35d4bdf8af234fe621919d9ed15e3"},
]
[package.dependencies]
atomicwrites = {version = ">=1.0", markers = "sys_platform == \"win32\""}
attrs = ">=19.2.0"
colorama = {version = "*", markers = "sys_platform == \"win32\""}
exceptiongroup = {version = ">=1.0.0rc8", markers = "python_version < \"3.11\""}
iniconfig = "*"
packaging = "*"
pluggy = ">=0.12,<2.0"
py = ">=1.8.2"
toml = "*"
tomli = {version = ">=1.0.0", markers = "python_version < \"3.11\""}
[package.extras]
testing = ["argcomplete", "hypothesis (>=3.56)", "mock", "nose", "requests", "xmlschema"]
testing = ["argcomplete", "attrs (>=19.2.0)", "hypothesis (>=3.56)", "mock", "nose", "pygments (>=2.7.2)", "requests", "xmlschema"]
[[package]]
name = "pytest-asyncio"
version = "0.19.0"
version = "0.21.0"
description = "Pytest support for asyncio"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
{file = "pytest-asyncio-0.19.0.tar.gz", hash = "sha256:ac4ebf3b6207259750bc32f4c1d8fcd7e79739edbc67ad0c58dd150b1d072fed"},
{file = "pytest_asyncio-0.19.0-py3-none-any.whl", hash = "sha256:7a97e37cfe1ed296e2e84941384bdd37c376453912d397ed39293e0916f521fa"},
{file = "pytest-asyncio-0.21.0.tar.gz", hash = "sha256:2b38a496aef56f56b0e87557ec313e11e1ab9276fc3863f6a7be0f1d0e415e1b"},
{file = "pytest_asyncio-0.21.0-py3-none-any.whl", hash = "sha256:f2b3366b7cd501a4056858bd39349d5af19742aed2d81660b7998b6341c7eb9c"},
]
[package.dependencies]
pytest = ">=6.1.0"
pytest = ">=7.0.0"
[package.extras]
docs = ["sphinx (>=5.3)", "sphinx-rtd-theme (>=1.0)"]
testing = ["coverage (>=6.2)", "flaky (>=3.5.0)", "hypothesis (>=5.7.1)", "mypy (>=0.931)", "pytest-trio (>=0.7.0)"]
[[package]]
name = "pytest-httpserver"
version = "1.0.6"
version = "1.0.8"
description = "pytest-httpserver is a httpserver for pytest"
category = "main"
optional = false
python-versions = ">=3.7,<4.0"
python-versions = ">=3.8,<4.0"
files = [
{file = "pytest_httpserver-1.0.6-py3-none-any.whl", hash = "sha256:ac2379acc91fe8bdbe2911c93af8dd130e33b5899fb9934d15669480739c6d32"},
{file = "pytest_httpserver-1.0.6.tar.gz", hash = "sha256:9040d07bf59ac45d8de3db1d4468fd2d1d607975e4da4c872ecc0402cdbf7b3e"},
{file = "pytest_httpserver-1.0.8-py3-none-any.whl", hash = "sha256:24cd3d9f6a0b927c7bfc400d0b3fda7442721b8267ce29942bf307b190f0bb09"},
{file = "pytest_httpserver-1.0.8.tar.gz", hash = "sha256:e052f69bc8a9073db02484681e8e47004dd1fb3763b0ae833bd899e5895c559a"},
]
[package.dependencies]
@@ -1914,14 +1905,14 @@ pytest = ">=3.2.5"
[[package]]
name = "pytest-order"
version = "1.0.1"
version = "1.1.0"
description = "pytest plugin to run your tests in a specific order"
category = "main"
optional = false
python-versions = ">=3.6"
files = [
{file = "pytest-order-1.0.1.tar.gz", hash = "sha256:5dd6b929fbd7eaa6d0ee07586f65c623babb0afe72b4843c5f15055d6b3b1b1f"},
{file = "pytest_order-1.0.1-py3-none-any.whl", hash = "sha256:bbe6e63a8e23741ab3e810d458d1ea7317e797b70f9550512d77d6e9e8fd1bbb"},
{file = "pytest-order-1.1.0.tar.gz", hash = "sha256:139d25b30826b78eebb42722f747eab14c44b88059d7a71d4f79d14a057269a5"},
{file = "pytest_order-1.1.0-py3-none-any.whl", hash = "sha256:3b3730969c97900fa5cd31ecff80847680ed56b2490954565c14949ba60d9371"},
]
[package.dependencies]
@@ -1963,14 +1954,14 @@ pytest = ">=5.0.0"
[[package]]
name = "pytest-xdist"
version = "3.0.2"
description = "pytest xdist plugin for distributed testing and loop-on-failing modes"
version = "3.3.1"
description = "pytest xdist plugin for distributed testing, most importantly across multiple CPUs"
category = "main"
optional = false
python-versions = ">=3.6"
python-versions = ">=3.7"
files = [
{file = "pytest-xdist-3.0.2.tar.gz", hash = "sha256:688da9b814370e891ba5de650c9327d1a9d861721a524eb917e620eec3e90291"},
{file = "pytest_xdist-3.0.2-py3-none-any.whl", hash = "sha256:9feb9a18e1790696ea23e1434fa73b325ed4998b0e9fcb221f16fd1945e6df1b"},
{file = "pytest-xdist-3.3.1.tar.gz", hash = "sha256:d5ee0520eb1b7bcca50a60a518ab7a7707992812c578198f8b44fdfac78e8c93"},
{file = "pytest_xdist-3.3.1-py3-none-any.whl", hash = "sha256:ff9daa7793569e6a68544850fd3927cd257cc03a7ef76c95e86915355e82b5f2"},
]
[package.dependencies]
@@ -2092,21 +2083,21 @@ files = [
[[package]]
name = "requests"
version = "2.28.1"
version = "2.31.0"
description = "Python HTTP for Humans."
category = "main"
optional = false
python-versions = ">=3.7, <4"
python-versions = ">=3.7"
files = [
{file = "requests-2.28.1-py3-none-any.whl", hash = "sha256:8fefa2a1a1365bf5520aac41836fbee479da67864514bdb821f31ce07ce65349"},
{file = "requests-2.28.1.tar.gz", hash = "sha256:7c5599b102feddaa661c826c56ab4fee28bfd17f5abca1ebbe3e7f19d7c97983"},
{file = "requests-2.31.0-py3-none-any.whl", hash = "sha256:58cd2187c01e70e6e26505bca751777aa9f2ee0b7f4300988b709f44e013003f"},
{file = "requests-2.31.0.tar.gz", hash = "sha256:942c5a758f98d790eaed1a29cb6eefc7ffb0d1cf7af05c3d2791656dbd6ad1e1"},
]
[package.dependencies]
certifi = ">=2017.4.17"
charset-normalizer = ">=2,<3"
charset-normalizer = ">=2,<4"
idna = ">=2.5,<4"
urllib3 = ">=1.21.1,<1.27"
urllib3 = ">=1.21.1,<3"
[package.extras]
socks = ["PySocks (>=1.5.6,!=1.5.7)"]
@@ -2148,29 +2139,29 @@ pyasn1 = ">=0.1.3"
[[package]]
name = "ruff"
version = "0.0.255"
version = "0.0.269"
description = "An extremely fast Python linter, written in Rust."
category = "dev"
optional = false
python-versions = ">=3.7"
files = [
{file = "ruff-0.0.255-py3-none-macosx_10_7_x86_64.whl", hash = "sha256:b2d71fb6a7e50501a2473864acffc85dee6b750c25db198f7e71fe1dbbff1aad"},
{file = "ruff-0.0.255-py3-none-macosx_10_9_x86_64.macosx_11_0_arm64.macosx_10_9_universal2.whl", hash = "sha256:6c97d746861a6010f941179e84bba9feb8a871815667471d9ed6beb98d45c252"},
{file = "ruff-0.0.255-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9a7fa60085079b91a298b963361be9b1b1c724582af6c84be954cbabdbd9309a"},
{file = "ruff-0.0.255-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:c089f7141496334ab5a127b54ce55e41f0d6714e68a4453a1e09d2204cdea8c3"},
{file = "ruff-0.0.255-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0423908caa7d437a416b853214565b9c33bbd1106c4f88147982216dddcbbd96"},
{file = "ruff-0.0.255-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:981493e92547cacbb8e0874904ec049fe744507ee890dc8736caf89a8864f9a7"},
{file = "ruff-0.0.255-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:59d5193d2aedb35db180824462b374dbcfc306b2e76076245088afa6e5837df2"},
{file = "ruff-0.0.255-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:dd5e00733c9d160c8a34a22e62b390da9d1e9f326676402421cb8c1236beefc3"},
{file = "ruff-0.0.255-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:694418cf41838bd19c6229e4e1b2d04505b1e6b86fe3ab81165484fc96d36f01"},
{file = "ruff-0.0.255-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:5d0408985c9777369daebb5d3340a99e9f7294bdd7120642239261508185cf89"},
{file = "ruff-0.0.255-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:abd6376ef9d12f370d95a8c7c98682fbb9bfedfba59f40e84a816fef8ddcb8de"},
{file = "ruff-0.0.255-py3-none-musllinux_1_2_i686.whl", hash = "sha256:f9b1a5df0bc09193cbef58a6f78e4a9a0b058a4f9733c0442866d078006d1bb9"},
{file = "ruff-0.0.255-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:6a25c5f4ff087445b2e1bbcb9963f2ae7c868d65e4a8d5f84c36c12f71571179"},
{file = "ruff-0.0.255-py3-none-win32.whl", hash = "sha256:1ff87a8310354f9f1a099625e54a27fdd6756d9cd2a40b45922f2e943daf982d"},
{file = "ruff-0.0.255-py3-none-win_amd64.whl", hash = "sha256:f3d8416be618f023f93ec4fd6ee3048585ef85dba9563b2a7e38fc7e5131d5b1"},
{file = "ruff-0.0.255-py3-none-win_arm64.whl", hash = "sha256:8ba124819624145d7b6b53add40c367c44318893215ffc1bfe3d72e0225a1c9c"},
{file = "ruff-0.0.255.tar.gz", hash = "sha256:f9eb1d3b2eecbeedae419fa494c4e2a5e4484baf93a1ce0f81eddb005e1919c5"},
{file = "ruff-0.0.269-py3-none-macosx_10_7_x86_64.whl", hash = "sha256:3569bcdee679045c09c0161fabc057599759c49219a08d9a4aad2cc3982ccba3"},
{file = "ruff-0.0.269-py3-none-macosx_10_9_x86_64.macosx_11_0_arm64.macosx_10_9_universal2.whl", hash = "sha256:56347da63757a56cbce7d4b3d6044ca4f1941cd1bbff3714f7554360c3361f83"},
{file = "ruff-0.0.269-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6da8ee25ef2f0cc6cc8e6e20942c1d44d25a36dce35070d7184655bc14f63f63"},
{file = "ruff-0.0.269-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:bd81b8e681b9eaa6cf15484f3985bd8bd97c3d114e95bff3e8ea283bf8865062"},
{file = "ruff-0.0.269-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1f19f59ca3c28742955241fb452f3346241ddbd34e72ac5cb3d84fadebcf6bc8"},
{file = "ruff-0.0.269-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:f062059b8289a4fab7f6064601b811d447c2f9d3d432a17f689efe4d68988450"},
{file = "ruff-0.0.269-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:3f5dc7aac52c58e82510217e3c7efd80765c134c097c2815d59e40face0d1fe6"},
{file = "ruff-0.0.269-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:e131b4dbe798c391090c6407641d6ab12c0fa1bb952379dde45e5000e208dabb"},
{file = "ruff-0.0.269-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a374434e588e06550df0f8dcb74777290f285678de991fda4e1063c367ab2eb2"},
{file = "ruff-0.0.269-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:cec2f4b84a14b87f1b121488649eb5b4eaa06467a2387373f750da74bdcb5679"},
{file = "ruff-0.0.269-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:374b161753a247904aec7a32d45e165302b76b6e83d22d099bf3ff7c232c888f"},
{file = "ruff-0.0.269-py3-none-musllinux_1_2_i686.whl", hash = "sha256:9ca0a1ddb1d835b5f742db9711c6cf59f213a1ad0088cb1e924a005fd399e7d8"},
{file = "ruff-0.0.269-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:5a20658f0b97d207c7841c13d528f36d666bf445b00b01139f28a8ccb80093bb"},
{file = "ruff-0.0.269-py3-none-win32.whl", hash = "sha256:03ff42bc91ceca58e0f0f072cb3f9286a9208f609812753474e799a997cdad1a"},
{file = "ruff-0.0.269-py3-none-win_amd64.whl", hash = "sha256:f3b59ccff57b21ef0967ea8021fd187ec14c528ec65507d8bcbe035912050776"},
{file = "ruff-0.0.269-py3-none-win_arm64.whl", hash = "sha256:bbeb857b1e508a4487bdb02ca1e6d41dd8d5ac5335a5246e25de8a3dff38c1ff"},
{file = "ruff-0.0.269.tar.gz", hash = "sha256:11ddcfbab32cf5c420ea9dd5531170ace5a3e59c16d9251c7bd2581f7b16f602"},
]
[[package]]
@@ -2271,7 +2262,7 @@ files = [
name = "tomli"
version = "2.0.1"
description = "A lil' TOML parser"
category = "dev"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -2281,42 +2272,54 @@ files = [
[[package]]
name = "types-psutil"
version = "5.9.5.4"
version = "5.9.5.12"
description = "Typing stubs for psutil"
category = "main"
optional = false
python-versions = "*"
files = [
{file = "types-psutil-5.9.5.4.tar.gz", hash = "sha256:aa09102b80c65a3b4573216614372398dab78972d650488eaff1ff05482cc18f"},
{file = "types_psutil-5.9.5.4-py3-none-any.whl", hash = "sha256:28e59764630187e462d43788efa16d59d5e77b510115f9e25901b2d4007fca62"},
{file = "types-psutil-5.9.5.12.tar.gz", hash = "sha256:61a91679d3fe737250013b624dca09375e7cc3ad77dcc734553746c429c02aca"},
{file = "types_psutil-5.9.5.12-py3-none-any.whl", hash = "sha256:e9a147b8561235c6afcce5aa1adb973fad9ab2c50cf89820697687f53510358f"},
]
[[package]]
name = "types-psycopg2"
version = "2.9.18"
version = "2.9.21.10"
description = "Typing stubs for psycopg2"
category = "main"
optional = false
python-versions = "*"
files = [
{file = "types-psycopg2-2.9.18.tar.gz", hash = "sha256:9b0e9e1f097b15cd9fa8aad2596a9e3082fd72f8d9cfe52b190cfa709105b6c0"},
{file = "types_psycopg2-2.9.18-py3-none-any.whl", hash = "sha256:14c779dcab18c31453fa1cad3cf4b1601d33540a344adead3c47a6b8091cd2fa"},
{file = "types-psycopg2-2.9.21.10.tar.gz", hash = "sha256:c2600892312ae1c34e12f145749795d93dc4eac3ef7dbf8a9c1bfd45385e80d7"},
{file = "types_psycopg2-2.9.21.10-py3-none-any.whl", hash = "sha256:918224a0731a3650832e46633e720703b5beef7693a064e777d9748654fcf5e5"},
]
[[package]]
name = "types-pytest-lazy-fixture"
version = "0.6.3.3"
description = "Typing stubs for pytest-lazy-fixture"
category = "main"
optional = false
python-versions = "*"
files = [
{file = "types-pytest-lazy-fixture-0.6.3.3.tar.gz", hash = "sha256:2ef79d66bcde0e50acdac8dc55074b9ae0d4cfaeabdd638f5522f4cac7c8a2c7"},
{file = "types_pytest_lazy_fixture-0.6.3.3-py3-none-any.whl", hash = "sha256:a56a55649147ff960ff79d4b2c781a4f769351abc1876873f3116d0bd0c96353"},
]
[[package]]
name = "types-requests"
version = "2.28.5"
version = "2.31.0.0"
description = "Typing stubs for requests"
category = "main"
optional = false
python-versions = "*"
files = [
{file = "types-requests-2.28.5.tar.gz", hash = "sha256:ac618bfefcb3742eaf97c961e13e9e5a226e545eda4a3dbe293b898d40933ad1"},
{file = "types_requests-2.28.5-py3-none-any.whl", hash = "sha256:98ab647ae88b5e2c41d6d20cfcb5117da1bea561110000b6fdeeea07b3e89877"},
{file = "types-requests-2.31.0.0.tar.gz", hash = "sha256:c1c29d20ab8d84dff468d7febfe8e0cb0b4664543221b386605e14672b44ea25"},
{file = "types_requests-2.31.0.0-py3-none-any.whl", hash = "sha256:7c5cea7940f8e92ec560bbc468f65bf684aa3dcf0554a6f8c4710f5f708dc598"},
]
[package.dependencies]
types-urllib3 = "<1.27"
types-urllib3 = "*"
[[package]]
name = "types-s3transfer"
@@ -2332,14 +2335,14 @@ files = [
[[package]]
name = "types-toml"
version = "0.10.8"
version = "0.10.8.6"
description = "Typing stubs for toml"
category = "main"
optional = false
python-versions = "*"
files = [
{file = "types-toml-0.10.8.tar.gz", hash = "sha256:b7e7ea572308b1030dc86c3ba825c5210814c2825612ec679eb7814f8dd9295a"},
{file = "types_toml-0.10.8-py3-none-any.whl", hash = "sha256:8300fd093e5829eb9c1fba69cee38130347d4b74ddf32d0a7df650ae55c2b599"},
{file = "types-toml-0.10.8.6.tar.gz", hash = "sha256:6d3ac79e36c9ee593c5d4fb33a50cca0e3adceb6ef5cff8b8e5aef67b4c4aaf2"},
{file = "types_toml-0.10.8.6-py3-none-any.whl", hash = "sha256:de7b2bb1831d6f7a4b554671ffe5875e729753496961b3e9b202745e4955dafa"},
]
[[package]]
@@ -2356,14 +2359,14 @@ files = [
[[package]]
name = "typing-extensions"
version = "4.3.0"
version = "4.6.1"
description = "Backported and Experimental Type Hints for Python 3.7+"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
{file = "typing_extensions-4.3.0-py3-none-any.whl", hash = "sha256:25642c956049920a5aa49edcdd6ab1e06d7e5d467fc00e0506c44ac86fbfca02"},
{file = "typing_extensions-4.3.0.tar.gz", hash = "sha256:e6d2677a32f47fc7eb2795db1dd15c1f34eff616bcaf2cfb5e997f854fa1c4a6"},
{file = "typing_extensions-4.6.1-py3-none-any.whl", hash = "sha256:6bac751f4789b135c43228e72de18637e9a6c29d12777023a703fd1a6858469f"},
{file = "typing_extensions-4.6.1.tar.gz", hash = "sha256:558bc0c4145f01e6405f4a5fdbd82050bd221b119f4bf72a961a1cfd471349d6"},
]
[[package]]
@@ -2611,4 +2614,4 @@ testing = ["func-timeout", "jaraco.itertools", "pytest (>=6)", "pytest-black (>=
[metadata]
lock-version = "2.0"
python-versions = "^3.9"
content-hash = "b689ffd6eae32b966f1744b5ac3343fe0dd26b31ee1f50e13daf5045ee0623e1"
content-hash = "c6c217033f50430c31b0979b74db222e6bab2301abd8b9f0cce5a9d5bccc578f"

View File

@@ -1,6 +1,6 @@
# Proxy
Proxy binary accepts `--auth-backend` CLI option, which determines auth scheme and cluster routing method. Following backends are currently implemented:
Proxy binary accepts `--auth-backend` CLI option, which determines auth scheme and cluster routing method. Following routing backends are currently implemented:
* console
new SCRAM-based console API; uses SNI info to select the destination project (endpoint soon)
@@ -9,6 +9,90 @@ Proxy binary accepts `--auth-backend` CLI option, which determines auth scheme a
* link
sends login link for all usernames
Also proxy can expose following services to the external world:
* postgres protocol over TCP -- usual postgres endpoint compatible with usual
postgres drivers
* postgres protocol over WebSockets -- same protocol tunneled over websockets
for environments where TCP connection is not available. We have our own
implementation of a client that uses node-postgres and tunnels traffic through
websockets: https://github.com/neondatabase/serverless
* SQL over HTTP -- service that accepts POST requests with SQL text over HTTP
and responds with JSON-serialised results.
## SQL over HTTP
Contrary to the usual postgres proto over TCP and WebSockets using plain
one-shot HTTP request achieves smaller amortized latencies in edge setups due to
fewer round trips and an enhanced open connection reuse by the v8 engine. Also
such endpoint could be used directly without any driver.
To play with it locally one may start proxy over a local postgres installation
(see end of this page on how to generate certs with openssl):
```
./target/debug/proxy -c server.crt -k server.key --auth-backend=postgres --auth-endpoint=postgres://stas@127.0.0.1:5432/stas --wss 0.0.0.0:4444
```
If both postgres and proxy are running you may send a SQL query:
```json
curl -k -X POST 'https://proxy.localtest.me:4444/sql' \
-H 'Neon-Connection-String: postgres://stas:pass@proxy.localtest.me:4444/postgres' \
-H 'Content-Type: application/json' \
--data '{
"query":"SELECT $1::int[] as arr, $2::jsonb as obj, 42 as num",
"params":[ "{{1,2},{\"3\",4}}", {"key":"val", "ikey":4242}]
}' | jq
{
"command": "SELECT",
"fields": [
{ "dataTypeID": 1007, "name": "arr" },
{ "dataTypeID": 3802, "name": "obj" },
{ "dataTypeID": 23, "name": "num" }
],
"rowCount": 1,
"rows": [
{
"arr": [[1,2],[3,4]],
"num": 42,
"obj": {
"ikey": 4242,
"key": "val"
}
}
]
}
```
With the current approach we made the following design decisions:
1. SQL injection protection: We employed the extended query protocol, modifying
the rust-postgres driver to send queries in one roundtrip using a text
protocol rather than binary, bypassing potential issues like those identified
in sfackler/rust-postgres#1030.
2. Postgres type compatibility: As not all postgres types have binary
representations (e.g., acl's in pg_class), we adjusted rust-postgres to
respond with text protocol, simplifying serialization and fixing queries with
text-only types in response.
3. Data type conversion: Considering JSON supports fewer data types than
Postgres, we perform conversions where possible, passing all other types as
strings. Key conversions include:
- postgres int2, int4, float4, float8 -> json number (NaN and Inf remain
text)
- postgres bool, null, text -> json bool, null, string
- postgres array -> json array
- postgres json and jsonb -> json object
4. Alignment with node-postgres: To facilitate integration with js libraries,
we've matched the response structure of node-postgres, returning command tags
and column oids. Command tag capturing was added to the rust-postgres
functionality as part of this change.
## Using SNI-based routing on localhost
Now proxy determines project name from the subdomain, request to the `round-rice-566201.somedomain.tld` will be routed to the project named `round-rice-566201`. Unfortunately, `/etc/hosts` does not support domain wildcards, so I usually use `*.localtest.me` which resolves to `127.0.0.1`. Now we can create self-signed certificate and play with proxy:

View File

@@ -139,6 +139,16 @@ async fn auth_quirks(
}
impl BackendType<'_, ClientCredentials<'_>> {
/// Get compute endpoint name from the credentials.
pub fn get_endpoint(&self) -> Option<String> {
use BackendType::*;
match self {
Console(_, creds) => creds.project.clone(),
Postgres(_, creds) => creds.project.clone(),
Link(_) => Some("link".to_owned()),
}
}
/// Authenticate the client via the requested backend, possibly using credentials.
#[tracing::instrument(fields(allow_cleartext = allow_cleartext), skip_all)]
pub async fn authenticate(

View File

@@ -100,9 +100,10 @@ impl CertResolver {
is_default: bool,
) -> anyhow::Result<()> {
let priv_key = {
let key_bytes = std::fs::read(key_path).context("TLS key file")?;
let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..])
let key_bytes = std::fs::read(key_path)
.context(format!("Failed to read TLS keys at '{key_path}'"))?;
let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..])
.context(format!("Failed to parse TLS keys at '{key_path}'"))?;
ensure!(keys.len() == 1, "keys.len() = {} (should be 1)", keys.len());
keys.pop().map(rustls::PrivateKey).unwrap()

View File

@@ -3,6 +3,7 @@
//! directly relying on deps like `reqwest` (think loose coupling).
pub mod server;
pub mod sql_over_http;
pub mod websocket;
pub use reqwest::{Request, Response, StatusCode};

View File

@@ -0,0 +1,603 @@
use futures::pin_mut;
use futures::StreamExt;
use hyper::body::HttpBody;
use hyper::{Body, HeaderMap, Request};
use pq_proto::StartupMessageParams;
use serde_json::json;
use serde_json::Map;
use serde_json::Value;
use tokio_postgres::types::Kind;
use tokio_postgres::types::Type;
use tokio_postgres::Row;
use url::Url;
use crate::{auth, config::ProxyConfig, console};
#[derive(serde::Deserialize)]
struct QueryData {
query: String,
params: Vec<serde_json::Value>,
}
const APP_NAME: &str = "sql_over_http";
const MAX_RESPONSE_SIZE: usize = 1024 * 1024; // 1 MB
const MAX_REQUEST_SIZE: u64 = 1024 * 1024; // 1 MB
//
// Convert json non-string types to strings, so that they can be passed to Postgres
// as parameters.
//
fn json_to_pg_text(json: Vec<Value>) -> Result<Vec<String>, serde_json::Error> {
json.iter()
.map(|value| {
match value {
Value::Null => serde_json::to_string(value),
Value::Bool(_) => serde_json::to_string(value),
Value::Number(_) => serde_json::to_string(value),
Value::Object(_) => serde_json::to_string(value),
// no need to escape
Value::String(s) => Ok(s.to_string()),
// special care for arrays
Value::Array(_) => json_array_to_pg_array(value),
}
})
.collect()
}
//
// Serialize a JSON array to a Postgres array. Contrary to the strings in the params
// in the array we need to escape the strings. Postgres is okay with arrays of form
// '{1,"2",3}'::int[], so we don't check that array holds values of the same type, leaving
// it for Postgres to check.
//
// Example of the same escaping in node-postgres: packages/pg/lib/utils.js
//
fn json_array_to_pg_array(value: &Value) -> Result<String, serde_json::Error> {
match value {
// same
Value::Null => serde_json::to_string(value),
Value::Bool(_) => serde_json::to_string(value),
Value::Number(_) => serde_json::to_string(value),
Value::Object(_) => serde_json::to_string(value),
// now needs to be escaped, as it is part of the array
Value::String(_) => serde_json::to_string(value),
// recurse into array
Value::Array(arr) => {
let vals = arr
.iter()
.map(json_array_to_pg_array)
.collect::<Result<Vec<_>, _>>()?
.join(",");
Ok(format!("{{{}}}", vals))
}
}
}
fn get_conn_info(
headers: &HeaderMap,
sni_hostname: Option<String>,
) -> Result<(String, String, String, String), anyhow::Error> {
let connection_string = headers
.get("Neon-Connection-String")
.ok_or(anyhow::anyhow!("missing connection string"))?
.to_str()?;
let connection_url = Url::parse(connection_string)?;
let protocol = connection_url.scheme();
if protocol != "postgres" && protocol != "postgresql" {
return Err(anyhow::anyhow!(
"connection string must start with postgres: or postgresql:"
));
}
let mut url_path = connection_url
.path_segments()
.ok_or(anyhow::anyhow!("missing database name"))?;
let dbname = url_path
.next()
.ok_or(anyhow::anyhow!("invalid database name"))?;
let username = connection_url.username();
if username.is_empty() {
return Err(anyhow::anyhow!("missing username"));
}
let password = connection_url
.password()
.ok_or(anyhow::anyhow!("no password"))?;
// TLS certificate selector now based on SNI hostname, so if we are running here
// we are sure that SNI hostname is set to one of the configured domain names.
let sni_hostname = sni_hostname.ok_or(anyhow::anyhow!("no SNI hostname set"))?;
let hostname = connection_url
.host_str()
.ok_or(anyhow::anyhow!("no host"))?;
let host_header = headers
.get("host")
.and_then(|h| h.to_str().ok())
.and_then(|h| h.split(':').next());
if hostname != sni_hostname {
return Err(anyhow::anyhow!("mismatched SNI hostname and hostname"));
} else if let Some(h) = host_header {
if h != hostname {
return Err(anyhow::anyhow!("mismatched host header and hostname"));
}
}
Ok((
username.to_owned(),
dbname.to_owned(),
hostname.to_owned(),
password.to_owned(),
))
}
// TODO: return different http error codes
pub async fn handle(
config: &'static ProxyConfig,
request: Request<Body>,
sni_hostname: Option<String>,
) -> anyhow::Result<Value> {
//
// Determine the destination and connection params
//
let headers = request.headers();
let (username, dbname, hostname, password) = get_conn_info(headers, sni_hostname)?;
let credential_params = StartupMessageParams::new([
("user", &username),
("database", &dbname),
("application_name", APP_NAME),
]);
//
// Wake up the destination if needed. Code here is a bit involved because
// we reuse the code from the usual proxy and we need to prepare few structures
// that this code expects.
//
let tls = config.tls_config.as_ref();
let common_names = tls.and_then(|tls| tls.common_names.clone());
let creds = config
.auth_backend
.as_ref()
.map(|_| auth::ClientCredentials::parse(&credential_params, Some(&hostname), common_names))
.transpose()?;
let extra = console::ConsoleReqExtra {
session_id: uuid::Uuid::new_v4(),
application_name: Some(APP_NAME),
};
let node = creds.wake_compute(&extra).await?.expect("msg");
let conf = node.value.config;
let port = *conf.get_ports().first().expect("no port");
let host = match conf.get_hosts().first().expect("no host") {
tokio_postgres::config::Host::Tcp(host) => host,
tokio_postgres::config::Host::Unix(_) => {
return Err(anyhow::anyhow!("unix socket is not supported"));
}
};
let request_content_length = match request.body().size_hint().upper() {
Some(v) => v,
None => MAX_REQUEST_SIZE + 1,
};
if request_content_length > MAX_REQUEST_SIZE {
return Err(anyhow::anyhow!(
"request is too large (max {MAX_REQUEST_SIZE} bytes)"
));
}
//
// Read the query and query params from the request body
//
let body = hyper::body::to_bytes(request.into_body()).await?;
let QueryData { query, params } = serde_json::from_slice(&body)?;
let query_params = json_to_pg_text(params)?;
//
// Connenct to the destination
//
let (client, connection) = tokio_postgres::Config::new()
.host(host)
.port(port)
.user(&username)
.password(&password)
.dbname(&dbname)
.max_backend_message_size(MAX_RESPONSE_SIZE)
.connect(tokio_postgres::NoTls)
.await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
//
// Now execute the query and return the result
//
let row_stream = client.query_raw_txt(query, query_params).await?;
// Manually drain the stream into a vector to leave row_stream hanging
// around to get a command tag. Also check that the response is not too
// big.
pin_mut!(row_stream);
let mut rows: Vec<tokio_postgres::Row> = Vec::new();
let mut curret_size = 0;
while let Some(row) = row_stream.next().await {
let row = row?;
curret_size += row.body_len();
rows.push(row);
if curret_size > MAX_RESPONSE_SIZE {
return Err(anyhow::anyhow!("response too large"));
}
}
// grab the command tag and number of rows affected
let command_tag = row_stream.command_tag().unwrap_or_default();
let mut command_tag_split = command_tag.split(' ');
let command_tag_name = command_tag_split.next().unwrap_or_default();
let command_tag_count = if command_tag_name == "INSERT" {
// INSERT returns OID first and then number of rows
command_tag_split.nth(1)
} else {
// other commands return number of rows (if any)
command_tag_split.next()
}
.and_then(|s| s.parse::<i64>().ok());
let fields = if !rows.is_empty() {
rows[0]
.columns()
.iter()
.map(|c| {
json!({
"name": Value::String(c.name().to_owned()),
"dataTypeID": Value::Number(c.type_().oid().into()),
})
})
.collect::<Vec<_>>()
} else {
Vec::new()
};
// convert rows to JSON
let rows = rows
.iter()
.map(pg_text_row_to_json)
.collect::<Result<Vec<_>, _>>()?;
// resulting JSON format is based on the format of node-postgres result
Ok(json!({
"command": command_tag_name,
"rowCount": command_tag_count,
"rows": rows,
"fields": fields,
}))
}
//
// Convert postgres row with text-encoded values to JSON object
//
pub fn pg_text_row_to_json(row: &Row) -> Result<Value, anyhow::Error> {
let res = row
.columns()
.iter()
.enumerate()
.map(|(i, column)| {
let name = column.name();
let pg_value = row.as_text(i)?;
let json_value = pg_text_to_json(pg_value, column.type_())?;
Ok((name.to_string(), json_value))
})
.collect::<Result<Map<String, Value>, anyhow::Error>>()?;
Ok(Value::Object(res))
}
//
// Convert postgres text-encoded value to JSON value
//
pub fn pg_text_to_json(pg_value: Option<&str>, pg_type: &Type) -> Result<Value, anyhow::Error> {
if let Some(val) = pg_value {
if val == "NULL" {
return Ok(Value::Null);
}
if let Kind::Array(elem_type) = pg_type.kind() {
return pg_array_parse(val, elem_type);
}
match *pg_type {
Type::BOOL => Ok(Value::Bool(val == "t")),
Type::INT2 | Type::INT4 => {
let val = val.parse::<i32>()?;
Ok(Value::Number(serde_json::Number::from(val)))
}
Type::FLOAT4 | Type::FLOAT8 => {
let fval = val.parse::<f64>()?;
let num = serde_json::Number::from_f64(fval);
if let Some(num) = num {
Ok(Value::Number(num))
} else {
// Pass Nan, Inf, -Inf as strings
// JS JSON.stringify() does converts them to null, but we
// want to preserve them, so we pass them as strings
Ok(Value::String(val.to_string()))
}
}
Type::JSON | Type::JSONB => Ok(serde_json::from_str(val)?),
_ => Ok(Value::String(val.to_string())),
}
} else {
Ok(Value::Null)
}
}
//
// Parse postgres array into JSON array.
//
// This is a bit involved because we need to handle nested arrays and quoted
// values. Unlike postgres we don't check that all nested arrays have the same
// dimensions, we just return them as is.
//
fn pg_array_parse(pg_array: &str, elem_type: &Type) -> Result<Value, anyhow::Error> {
_pg_array_parse(pg_array, elem_type, false).map(|(v, _)| v)
}
fn _pg_array_parse(
pg_array: &str,
elem_type: &Type,
nested: bool,
) -> Result<(Value, usize), anyhow::Error> {
let mut pg_array_chr = pg_array.char_indices();
let mut level = 0;
let mut quote = false;
let mut entries: Vec<Value> = Vec::new();
let mut entry = String::new();
// skip bounds decoration
if let Some('[') = pg_array.chars().next() {
for (_, c) in pg_array_chr.by_ref() {
if c == '=' {
break;
}
}
}
while let Some((mut i, mut c)) = pg_array_chr.next() {
let mut escaped = false;
if c == '\\' {
escaped = true;
(i, c) = pg_array_chr.next().unwrap();
}
match c {
'{' if !quote => {
level += 1;
if level > 1 {
let (res, off) = _pg_array_parse(&pg_array[i..], elem_type, true)?;
entries.push(res);
for _ in 0..off - 1 {
pg_array_chr.next();
}
}
}
'}' => {
level -= 1;
if level == 0 {
if !entry.is_empty() {
entries.push(pg_text_to_json(Some(&entry), elem_type)?);
}
if nested {
return Ok((Value::Array(entries), i));
}
}
}
'"' if !escaped => {
if quote {
// push even if empty
entries.push(pg_text_to_json(Some(&entry), elem_type)?);
entry = String::new();
}
quote = !quote;
}
',' if !quote => {
if !entry.is_empty() {
entries.push(pg_text_to_json(Some(&entry), elem_type)?);
entry = String::new();
}
}
_ => {
entry.push(c);
}
}
}
if level != 0 {
return Err(anyhow::anyhow!("unbalanced array"));
}
Ok((Value::Array(entries), 0))
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_atomic_types_to_pg_params() {
let json = vec![Value::Bool(true), Value::Bool(false)];
let pg_params = json_to_pg_text(json).unwrap();
assert_eq!(pg_params, vec!["true", "false"]);
let json = vec![Value::Number(serde_json::Number::from(42))];
let pg_params = json_to_pg_text(json).unwrap();
assert_eq!(pg_params, vec!["42"]);
let json = vec![Value::String("foo\"".to_string())];
let pg_params = json_to_pg_text(json).unwrap();
assert_eq!(pg_params, vec!["foo\""]);
let json = vec![Value::Null];
let pg_params = json_to_pg_text(json).unwrap();
assert_eq!(pg_params, vec!["null"]);
}
#[test]
fn test_json_array_to_pg_array() {
// atoms and escaping
let json = "[true, false, null, 42, \"foo\", \"bar\\\"-\\\\\"]";
let json: Value = serde_json::from_str(json).unwrap();
let pg_params = json_to_pg_text(vec![json]).unwrap();
assert_eq!(
pg_params,
vec!["{true,false,null,42,\"foo\",\"bar\\\"-\\\\\"}"]
);
// nested arrays
let json = "[[true, false], [null, 42], [\"foo\", \"bar\\\"-\\\\\"]]";
let json: Value = serde_json::from_str(json).unwrap();
let pg_params = json_to_pg_text(vec![json]).unwrap();
assert_eq!(
pg_params,
vec!["{{true,false},{null,42},{\"foo\",\"bar\\\"-\\\\\"}}"]
);
}
#[test]
fn test_atomic_types_parse() {
assert_eq!(
pg_text_to_json(Some("foo"), &Type::TEXT).unwrap(),
json!("foo")
);
assert_eq!(pg_text_to_json(None, &Type::TEXT).unwrap(), json!(null));
assert_eq!(pg_text_to_json(Some("42"), &Type::INT4).unwrap(), json!(42));
assert_eq!(pg_text_to_json(Some("42"), &Type::INT2).unwrap(), json!(42));
assert_eq!(
pg_text_to_json(Some("42"), &Type::INT8).unwrap(),
json!("42")
);
assert_eq!(
pg_text_to_json(Some("42.42"), &Type::FLOAT8).unwrap(),
json!(42.42)
);
assert_eq!(
pg_text_to_json(Some("42.42"), &Type::FLOAT4).unwrap(),
json!(42.42)
);
assert_eq!(
pg_text_to_json(Some("NaN"), &Type::FLOAT4).unwrap(),
json!("NaN")
);
assert_eq!(
pg_text_to_json(Some("Infinity"), &Type::FLOAT4).unwrap(),
json!("Infinity")
);
assert_eq!(
pg_text_to_json(Some("-Infinity"), &Type::FLOAT4).unwrap(),
json!("-Infinity")
);
let json: Value =
serde_json::from_str("{\"s\":\"str\",\"n\":42,\"f\":4.2,\"a\":[null,3,\"a\"]}")
.unwrap();
assert_eq!(
pg_text_to_json(
Some(r#"{"s":"str","n":42,"f":4.2,"a":[null,3,"a"]}"#),
&Type::JSONB
)
.unwrap(),
json
);
}
#[test]
fn test_pg_array_parse_text() {
fn pt(pg_arr: &str) -> Value {
pg_array_parse(pg_arr, &Type::TEXT).unwrap()
}
assert_eq!(
pt(r#"{"aa\"\\\,a",cha,"bbbb"}"#),
json!(["aa\"\\,a", "cha", "bbbb"])
);
assert_eq!(
pt(r#"{{"foo","bar"},{"bee","bop"}}"#),
json!([["foo", "bar"], ["bee", "bop"]])
);
assert_eq!(
pt(r#"{{{{"foo",NULL,"bop",bup}}}}"#),
json!([[[["foo", null, "bop", "bup"]]]])
);
assert_eq!(
pt(r#"{{"1",2,3},{4,NULL,6},{NULL,NULL,NULL}}"#),
json!([["1", "2", "3"], ["4", null, "6"], [null, null, null]])
);
}
#[test]
fn test_pg_array_parse_bool() {
fn pb(pg_arr: &str) -> Value {
pg_array_parse(pg_arr, &Type::BOOL).unwrap()
}
assert_eq!(pb(r#"{t,f,t}"#), json!([true, false, true]));
assert_eq!(pb(r#"{{t,f,t}}"#), json!([[true, false, true]]));
assert_eq!(
pb(r#"{{t,f},{f,t}}"#),
json!([[true, false], [false, true]])
);
assert_eq!(
pb(r#"{{t,NULL},{NULL,f}}"#),
json!([[true, null], [null, false]])
);
}
#[test]
fn test_pg_array_parse_numbers() {
fn pn(pg_arr: &str, ty: &Type) -> Value {
pg_array_parse(pg_arr, ty).unwrap()
}
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT4), json!([1, 2, 3]));
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT2), json!([1, 2, 3]));
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT8), json!(["1", "2", "3"]));
assert_eq!(pn(r#"{1,2,3}"#, &Type::FLOAT4), json!([1.0, 2.0, 3.0]));
assert_eq!(pn(r#"{1,2,3}"#, &Type::FLOAT8), json!([1.0, 2.0, 3.0]));
assert_eq!(
pn(r#"{1.1,2.2,3.3}"#, &Type::FLOAT4),
json!([1.1, 2.2, 3.3])
);
assert_eq!(
pn(r#"{1.1,2.2,3.3}"#, &Type::FLOAT8),
json!([1.1, 2.2, 3.3])
);
assert_eq!(
pn(r#"{NaN,Infinity,-Infinity}"#, &Type::FLOAT4),
json!(["NaN", "Infinity", "-Infinity"])
);
assert_eq!(
pn(r#"{NaN,Infinity,-Infinity}"#, &Type::FLOAT8),
json!(["NaN", "Infinity", "-Infinity"])
);
}
#[test]
fn test_pg_array_with_decoration() {
fn p(pg_arr: &str) -> Value {
pg_array_parse(pg_arr, &Type::INT2).unwrap()
}
assert_eq!(
p(r#"[1:1][-2:-1][3:5]={{{1,2,3},{4,5,6}}}"#),
json!([[[1, 2, 3], [4, 5, 6]]])
);
}
}

View File

@@ -4,12 +4,17 @@ use crate::{
use bytes::{Buf, Bytes};
use futures::{Sink, Stream, StreamExt};
use hyper::{
server::{accept, conn::AddrIncoming},
server::{
accept,
conn::{AddrIncoming, AddrStream},
},
upgrade::Upgraded,
Body, Request, Response, StatusCode,
Body, Method, Request, Response, StatusCode,
};
use hyper_tungstenite::{tungstenite::Message, HyperWebsocket, WebSocketStream};
use pin_project_lite::pin_project;
use serde_json::{json, Value};
use std::{
convert::Infallible,
future::ready,
@@ -21,6 +26,7 @@ use tls_listener::TlsListener;
use tokio::{
io::{self, AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf},
net::TcpListener,
select,
};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, info_span, warn, Instrument};
@@ -30,6 +36,8 @@ use utils::http::{error::ApiError, json::json_response};
// Tracking issue: https://github.com/rust-lang/rust/issues/98407.
use sync_wrapper::SyncWrapper;
use super::sql_over_http;
pin_project! {
/// This is a wrapper around a [`WebSocketStream`] that
/// implements [`AsyncRead`] and [`AsyncWrite`].
@@ -159,6 +167,7 @@ async fn ws_handler(
config: &'static ProxyConfig,
cancel_map: Arc<CancelMap>,
session_id: uuid::Uuid,
sni_hostname: Option<String>,
) -> Result<Response<Body>, ApiError> {
let host = request
.headers()
@@ -181,8 +190,44 @@ async fn ws_handler(
// Return the response so the spawned future can continue.
Ok(response)
// TODO: that deserves a refactor as now this function also handles http json client besides websockets.
// Right now I don't want to blow up sql-over-http patch with file renames and do that as a follow up instead.
} else if request.uri().path() == "/sql" && request.method() == Method::POST {
let result = select! {
_ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
Err(anyhow::anyhow!("Query timed out"))
}
response = sql_over_http::handle(config, request, sni_hostname) => {
response
}
};
let status_code = match result {
Ok(_) => StatusCode::OK,
Err(_) => StatusCode::BAD_REQUEST,
};
let json = match result {
Ok(r) => r,
Err(e) => {
let message = format!("{:?}", e);
let code = match e.downcast_ref::<tokio_postgres::Error>() {
Some(e) => match e.code() {
Some(e) => serde_json::to_value(e.code()).unwrap(),
None => Value::Null,
},
None => Value::Null,
};
json!({ "message": message, "code": code })
}
};
json_response(status_code, json).map(|mut r| {
r.headers_mut().insert(
"Access-Control-Allow-Origin",
hyper::http::HeaderValue::from_static("*"),
);
r
})
} else {
json_response(StatusCode::OK, "Connect with a websocket client")
json_response(StatusCode::BAD_REQUEST, "query is not supported")
}
}
@@ -216,20 +261,27 @@ pub async fn task_main(
}
});
let make_svc = hyper::service::make_service_fn(|_stream| async move {
Ok::<_, Infallible>(hyper::service::service_fn(
move |req: Request<Body>| async move {
let cancel_map = Arc::new(CancelMap::default());
let session_id = uuid::Uuid::new_v4();
ws_handler(req, config, cancel_map, session_id)
.instrument(info_span!(
"ws-client",
session = format_args!("{session_id}")
))
.await
},
))
});
let make_svc =
hyper::service::make_service_fn(|stream: &tokio_rustls::server::TlsStream<AddrStream>| {
let sni_name = stream.get_ref().1.sni_hostname().map(|s| s.to_string());
async move {
Ok::<_, Infallible>(hyper::service::service_fn(move |req: Request<Body>| {
let sni_name = sni_name.clone();
async move {
let cancel_map = Arc::new(CancelMap::default());
let session_id = uuid::Uuid::new_v4();
ws_handler(req, config, cancel_map, session_id, sni_name)
.instrument(info_span!(
"ws-client",
session = format_args!("{session_id}")
))
.await
}
}))
}
});
hyper::Server::builder(accept::from_stream(tls_listener))
.serve(make_svc)

View File

@@ -455,6 +455,9 @@ impl<'a, S> Client<'a, S> {
impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
/// Let the client authenticate and connect to the designated compute node.
// Instrumentation logs endpoint name everywhere. Doesn't work for link
// auth; strictly speaking we don't know endpoint name in its case.
#[tracing::instrument(name = "", fields(ep = self.creds.get_endpoint().unwrap_or("".to_owned())), skip_all)]
async fn connect_to_db(
self,
session: cancellation::Session<'_>,

View File

@@ -6,40 +6,41 @@ authors = []
[tool.poetry.dependencies]
python = "^3.9"
pytest = "^6.2.5"
pytest = "^7.3.1"
psycopg2-binary = "^2.9.1"
typing-extensions = "^4.1.0"
typing-extensions = "^4.6.1"
PyJWT = {version = "^2.1.0", extras = ["crypto"]}
requests = "^2.26.0"
pytest-xdist = "^3.0.2"
requests = "^2.31.0"
pytest-xdist = "^3.3.1"
asyncpg = "^0.27.0"
aiopg = "^1.3.1"
Jinja2 = "^3.0.2"
types-requests = "^2.28.5"
types-psycopg2 = "^2.9.18"
types-requests = "^2.31.0.0"
types-psycopg2 = "^2.9.21.10"
boto3 = "^1.26.16"
boto3-stubs = {extras = ["s3"], version = "^1.26.16"}
moto = {extras = ["server"], version = "^4.1.2"}
backoff = "^1.11.1"
backoff = "^2.2.1"
pytest-lazy-fixture = "^0.6.3"
prometheus-client = "^0.14.1"
pytest-timeout = "^2.1.0"
Werkzeug = "^2.2.3"
pytest-order = "^1.0.1"
allure-pytest = "^2.13.1"
pytest-asyncio = "^0.19.0"
pytest-order = "^1.1.0"
allure-pytest = "^2.13.2"
pytest-asyncio = "^0.21.0"
toml = "^0.10.2"
psutil = "^5.9.4"
types-psutil = "^5.9.5.4"
types-toml = "^0.10.8"
pytest-httpserver = "^1.0.6"
types-psutil = "^5.9.5.12"
types-toml = "^0.10.8.6"
pytest-httpserver = "^1.0.8"
aiohttp = "3.7.4"
pytest-rerunfailures = "^11.1.2"
types-pytest-lazy-fixture = "^0.6.3.3"
[tool.poetry.group.dev.dependencies]
black = "^23.1.0"
mypy = "==1.1.1"
ruff = "^0.0.255"
black = "^23.3.0"
mypy = "==1.3.0"
ruff = "^0.0.269"
[build-system]
requires = ["poetry-core>=1.0.0"]

View File

@@ -156,7 +156,9 @@ class LLVM:
profdata: Path,
objects: List[str],
sources: List[str],
demangler: Optional[Path] = None) -> None:
demangler: Optional[Path] = None,
output_file: Optional[Path] = None,
) -> None:
cwd = self.cargo.cwd
objects = list(intersperse('-object', objects))
@@ -180,14 +182,18 @@ class LLVM:
*objects,
*sources,
]
subprocess.check_call(cmd, cwd=cwd)
if output_file is not None:
with output_file.open('w') as outfile:
subprocess.check_call(cmd, cwd=cwd, stdout=outfile)
else:
subprocess.check_call(cmd, cwd=cwd)
def cov_report(self, **kwargs) -> None:
self._cov(subcommand='report', **kwargs)
def cov_export(self, *, kind: str, **kwargs) -> None:
def cov_export(self, *, kind: str, output_file: Optional[Path], **kwargs) -> None:
extras = (f'-format={kind}', )
self._cov(subcommand='export', *extras, **kwargs)
self._cov(subcommand='export', *extras, output_file=output_file, **kwargs)
def cov_show(self, *, kind: str, output_dir: Optional[Path] = None, **kwargs) -> None:
extras = [f'-format={kind}']
@@ -282,10 +288,19 @@ class TextReport(Report):
def generate(self) -> None:
self.llvm.cov_show(kind='text', **self._common_kwargs())
@dataclass
class JsonReport(Report):
output_file: Path
class LcovReport(Report):
def generate(self) -> None:
self.llvm.cov_export(kind='lcov', **self._common_kwargs())
self.llvm.cov_export(kind='text', output_file=self.output_file, **self._common_kwargs())
@dataclass
class LcovReport(Report):
output_file: Path
def generate(self) -> None:
self.llvm.cov_export(kind='lcov', output_file=self.output_file, **self._common_kwargs())
@dataclass
@@ -474,8 +489,10 @@ class State:
lambda: HtmlReport(**params, output_dir=self.report_dir),
'text':
lambda: TextReport(**params),
'json':
lambda: JsonReport(**params, output_file=self.report_dir / 'coverage.json'),
'lcov':
lambda: LcovReport(**params),
lambda: LcovReport(**params, output_file=self.report_dir / 'lcov.info'),
'summary':
lambda: SummaryReport(**params),
'github':
@@ -543,7 +560,7 @@ self-contained example:
help='cargo build profile')
p_report.add_argument('--format',
default='html',
choices=('html', 'text', 'summary', 'lcov', 'github'),
choices=('html', 'text', 'summary', 'json', 'lcov', 'github'),
help='report format')
p_report.add_argument('--input-objects',
metavar='FILE',

View File

@@ -162,7 +162,7 @@ class PgProtocol:
Returns psycopg2's connection object.
This method passes all extra params to connstr.
"""
conn = psycopg2.connect(**self.conn_options(**kwargs))
conn: PgConnection = psycopg2.connect(**self.conn_options(**kwargs))
# WARNING: this setting affects *all* tests!
conn.autocommit = autocommit
@@ -535,8 +535,8 @@ def export_timeline(
def main(args: argparse.Namespace):
# any psql version will do here. use current DEFAULT_PG_VERSION = 14
psql_path = str(Path(args.pg_distrib_dir) / "v14" / "bin" / "psql")
# any psql version will do here. use current DEFAULT_PG_VERSION = 15
psql_path = str(Path(args.pg_distrib_dir) / "v15" / "bin" / "psql")
old_pageserver_host = args.old_pageserver_host
new_pageserver_host = args.new_pageserver_host

View File

@@ -35,7 +35,7 @@ def get_connection_cursor():
connstr = os.getenv("DATABASE_URL")
if not connstr:
err("DATABASE_URL environment variable is not set")
with psycopg2.connect(connstr) as conn:
with psycopg2.connect(connstr, connect_timeout=30) as conn:
with conn.cursor() as cur:
yield cur

View File

@@ -40,6 +40,9 @@ pub type BrokerClientChannel = BrokerServiceClient<Channel>;
// Create connection object configured to run TLS if schema starts with https://
// and plain text otherwise. Connection is lazy, only endpoint sanity is
// validated here.
//
// NB: this function is not async, but still must be run on a tokio runtime thread
// because that's a requirement of tonic_endpoint.connect_lazy()'s Channel::new call.
pub fn connect<U>(endpoint: U, keepalive_interval: Duration) -> anyhow::Result<BrokerClientChannel>
where
U: std::convert::TryInto<Uri>,

View File

@@ -312,6 +312,6 @@ def neon_with_baseline(request: FixtureRequest) -> PgCompare:
implementation-specific logic is widely useful across multiple tests, it might
make sense to add methods to the PgCompare class.
"""
fixture = request.getfixturevalue(request.param) # type: ignore
fixture = request.getfixturevalue(request.param)
assert isinstance(fixture, PgCompare), f"test error: fixture {fixture} is not PgCompare"
return fixture

View File

@@ -26,7 +26,7 @@ from typing import Any, Dict, Iterator, List, Optional, Tuple, Type, Union, cast
from urllib.parse import urlparse
import asyncpg
import backoff # type: ignore
import backoff
import boto3
import jwt
import psycopg2
@@ -354,7 +354,7 @@ class PgProtocol:
Returns psycopg2's connection object.
This method passes all extra params to connstr.
"""
conn = psycopg2.connect(**self.conn_options(**kwargs))
conn: PgConnection = psycopg2.connect(**self.conn_options(**kwargs))
# WARNING: this setting affects *all* tests!
conn.autocommit = autocommit
@@ -1603,8 +1603,6 @@ class NeonPageserver(PgProtocol):
# https://github.com/neondatabase/neon/issues/2442
".*could not remove ephemeral file.*No such file or directory.*",
# FIXME: These need investigation
".*gc_loop.*Failed to get a tenant .* Tenant .* not found.*",
".*compaction_loop.*Failed to get a tenant .* Tenant .* not found.*",
".*manual_gc.*is_shutdown_requested\\(\\) called in an unexpected task or thread.*",
".*tenant_list: timeline is not found in remote index while it is present in the tenants registry.*",
".*Removing intermediate uninit mark file.*",
@@ -1621,6 +1619,8 @@ class NeonPageserver(PgProtocol):
".*task iteration took longer than the configured period.*",
# this is until #3501
".*Compaction failed, retrying in [^:]+: Cannot run compaction iteration on inactive tenant",
# these can happen anytime we do compactions from background task and shutdown pageserver
r".*ERROR.*ancestor timeline \S+ is being stopped",
]
def start(
@@ -2042,15 +2042,19 @@ class NeonProxy(PgProtocol):
proxy_port: int,
http_port: int,
mgmt_port: int,
external_http_port: int,
auth_backend: NeonProxy.AuthBackend,
metric_collection_endpoint: Optional[str] = None,
metric_collection_interval: Optional[str] = None,
):
host = "127.0.0.1"
super().__init__(dsn=auth_backend.default_conn_url, host=host, port=proxy_port)
domain = "proxy.localtest.me" # resolves to 127.0.0.1
super().__init__(dsn=auth_backend.default_conn_url, host=domain, port=proxy_port)
self.domain = domain
self.host = host
self.http_port = http_port
self.external_http_port = external_http_port
self.neon_binpath = neon_binpath
self.test_output_dir = test_output_dir
self.proxy_port = proxy_port
@@ -2062,11 +2066,42 @@ class NeonProxy(PgProtocol):
def start(self) -> NeonProxy:
assert self._popen is None
# generate key of it doesn't exist
crt_path = self.test_output_dir / "proxy.crt"
key_path = self.test_output_dir / "proxy.key"
if not key_path.exists():
r = subprocess.run(
[
"openssl",
"req",
"-new",
"-x509",
"-days",
"365",
"-nodes",
"-text",
"-out",
str(crt_path),
"-keyout",
str(key_path),
"-subj",
"/CN=*.localtest.me",
"-addext",
"subjectAltName = DNS:*.localtest.me",
]
)
assert r.returncode == 0
args = [
str(self.neon_binpath / "proxy"),
*["--http", f"{self.host}:{self.http_port}"],
*["--proxy", f"{self.host}:{self.proxy_port}"],
*["--mgmt", f"{self.host}:{self.mgmt_port}"],
*["--wss", f"{self.host}:{self.external_http_port}"],
*["-c", str(crt_path)],
*["-k", str(key_path)],
*self.auth_backend.extra_args(),
]
@@ -2190,6 +2225,7 @@ def link_proxy(
http_port = port_distributor.get_port()
proxy_port = port_distributor.get_port()
mgmt_port = port_distributor.get_port()
external_http_port = port_distributor.get_port()
with NeonProxy(
neon_binpath=neon_binpath,
@@ -2197,6 +2233,7 @@ def link_proxy(
proxy_port=proxy_port,
http_port=http_port,
mgmt_port=mgmt_port,
external_http_port=external_http_port,
auth_backend=NeonProxy.Link(),
) as proxy:
proxy.start()
@@ -2224,6 +2261,7 @@ def static_proxy(
proxy_port = port_distributor.get_port()
mgmt_port = port_distributor.get_port()
http_port = port_distributor.get_port()
external_http_port = port_distributor.get_port()
with NeonProxy(
neon_binpath=neon_binpath,
@@ -2231,6 +2269,7 @@ def static_proxy(
proxy_port=proxy_port,
http_port=http_port,
mgmt_port=mgmt_port,
external_http_port=external_http_port,
auth_backend=NeonProxy.Postgres(auth_endpoint),
) as proxy:
proxy.start()

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import json
import time
from collections import defaultdict
from dataclasses import dataclass
@@ -109,6 +110,10 @@ class PageserverHttpClient(requests.Session):
if auth_token is not None:
self.headers["Authorization"] = f"Bearer {auth_token}"
@property
def base_url(self) -> str:
return f"http://localhost:{self.port}"
def verbose_error(self, res: requests.Response):
try:
res.raise_for_status()
@@ -168,8 +173,22 @@ class PageserverHttpClient(requests.Session):
assert isinstance(new_tenant_id, str)
return TenantId(new_tenant_id)
def tenant_attach(self, tenant_id: TenantId):
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/attach")
def tenant_attach(
self, tenant_id: TenantId, config: None | Dict[str, Any] = None, config_null: bool = False
):
if config_null:
assert config is None
body = "null"
else:
# null-config is prohibited by the API
if config is None:
config = {}
body = json.dumps({"config": config})
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/attach",
data=body,
headers={"Content-Type": "application/json"},
)
self.verbose_error(res)
def tenant_detach(self, tenant_id: TenantId, detach_ignored=False):

View File

@@ -27,6 +27,10 @@ class PgVersion(str, enum.Enum):
def __repr__(self) -> str:
return f"'{self.value}'"
# Make this explicit for Python 3.11 compatibility, which changes the behavior of enums
def __str__(self) -> str:
return self.value
# In GitHub workflows we use Postgres version with v-prefix (e.g. v14 instead of just 14),
# sometime we need to do so in tests.
@property
@@ -78,11 +82,11 @@ def pytest_addoption(parser: Parser):
@pytest.fixture(scope="session")
def pg_version(request: FixtureRequest) -> Iterator[PgVersion]:
if v := request.config.getoption("--pg-version"):
version, source = v, "from --pg-version commad-line argument"
version, source = v, "from --pg-version command-line argument"
elif v := os.environ.get("DEFAULT_PG_VERSION"):
version, source = PgVersion(v), "from DEFAULT_PG_VERSION environment variable"
else:
version, source = DEFAULT_VERSION, "default verson"
version, source = DEFAULT_VERSION, "default version"
log.info(f"pg_version is {version} ({source})")
yield version

View File

@@ -2,7 +2,7 @@ from contextlib import closing
import pytest
from fixtures.compare_fixtures import PgCompare
from pytest_lazyfixture import lazy_fixture # type: ignore
from pytest_lazyfixture import lazy_fixture
@pytest.mark.parametrize(

View File

@@ -0,0 +1,76 @@
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
@pytest.mark.timeout(10000)
def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
"""
Test that GC is able to collect all old layers even if them are forming
"stairs" and there are not three delta layers since last image layer.
Information about image layers needed to collect old layers should
be propagated by GC to compaction task which should take in in account
when make a decision which new image layers needs to be created.
"""
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
tenant_id, _ = env.neon_cli.create_tenant(
conf={
# disable default GC and compaction
"gc_period": "1000 m",
"compaction_period": "0 s",
"gc_horizon": f"{1024 ** 2}",
"checkpoint_distance": f"{1024 ** 2}",
"compaction_target_size": f"{1024 ** 2}",
# set PITR interval to be small, so we can do GC
"pitr_interval": "10 s",
# "compaction_threshold": "3",
# "image_creation_threshold": "2",
}
)
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
timeline_id = endpoint.safe_psql("show neon.timeline_id")[0][0]
n_steps = 10
n_update_iters = 100
step_size = 10000
with endpoint.cursor() as cur:
cur.execute("SET statement_timeout='1000s'")
cur.execute(
"CREATE TABLE t(step bigint, count bigint default 0, payload text default repeat(' ', 100)) with (fillfactor=50)"
)
cur.execute("CREATE INDEX ON t(step)")
# In each step, we insert 'step_size' new rows, and update the newly inserted rows
# 'n_update_iters' times. This creates a lot of churn and generates lots of WAL at the end of the table,
# without modifying the earlier parts of the table.
for step in range(n_steps):
cur.execute(f"INSERT INTO t (step) SELECT {step} FROM generate_series(1, {step_size})")
for i in range(n_update_iters):
cur.execute(f"UPDATE t set count=count+1 where step = {step}")
cur.execute("vacuum t")
# cur.execute("select pg_table_size('t')")
# logical_size = cur.fetchone()[0]
logical_size = client.timeline_detail(tenant_id, timeline_id)["current_logical_size"]
log.info(f"Logical storage size {logical_size}")
client.timeline_checkpoint(tenant_id, timeline_id)
# Do compaction and GC
client.timeline_gc(tenant_id, timeline_id, 0)
client.timeline_compact(tenant_id, timeline_id)
# One more iteration to check that no excessive image layers are generated
client.timeline_gc(tenant_id, timeline_id, 0)
client.timeline_compact(tenant_id, timeline_id)
physical_size = client.timeline_detail(tenant_id, timeline_id)["current_physical_size"]
log.info(f"Physical storage size {physical_size}")
MB = 1024 * 1024
zenbenchmark.record("logical_size", logical_size // MB, "Mb", MetricReport.LOWER_IS_BETTER)
zenbenchmark.record("physical_size", physical_size // MB, "Mb", MetricReport.LOWER_IS_BETTER)
zenbenchmark.record(
"physical/logical ratio", physical_size / logical_size, "", MetricReport.LOWER_IS_BETTER
)

View File

@@ -2,7 +2,7 @@ from contextlib import closing
import pytest
from fixtures.compare_fixtures import PgCompare
from pytest_lazyfixture import lazy_fixture # type: ignore
from pytest_lazyfixture import lazy_fixture
@pytest.mark.parametrize(

View File

@@ -2,7 +2,7 @@ from contextlib import closing
import pytest
from fixtures.compare_fixtures import PgCompare
from pytest_lazyfixture import lazy_fixture # type: ignore
from pytest_lazyfixture import lazy_fixture
@pytest.mark.parametrize(

View File

@@ -6,7 +6,7 @@ import pytest
from fixtures.benchmark_fixture import MetricReport
from fixtures.compare_fixtures import PgCompare
from fixtures.log_helper import log
from pytest_lazyfixture import lazy_fixture # type: ignore
from pytest_lazyfixture import lazy_fixture
@pytest.mark.parametrize(

View File

@@ -0,0 +1,200 @@
from dataclasses import dataclass
from typing import Generator, Optional
import pytest
from fixtures.neon_fixtures import (
LocalFsStorage,
NeonEnv,
NeonEnvBuilder,
RemoteStorageKind,
)
from fixtures.pageserver.http import PageserverApiException, TenantConfig
from fixtures.types import TenantId
from fixtures.utils import wait_until
@pytest.fixture
def positive_env(neon_env_builder: NeonEnvBuilder) -> NeonEnv:
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.LOCAL_FS,
test_name="test_attach_tenant_config",
)
env = neon_env_builder.init_start()
assert isinstance(env.remote_storage, LocalFsStorage)
return env
@dataclass
class NegativeTests:
neon_env: NeonEnv
tenant_id: TenantId
config_pre_detach: TenantConfig
@pytest.fixture
def negative_env(neon_env_builder: NeonEnvBuilder) -> Generator[NegativeTests, None, None]:
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.LOCAL_FS,
test_name="test_attach_tenant_config",
)
env = neon_env_builder.init_start()
assert isinstance(env.remote_storage, LocalFsStorage)
ps_http = env.pageserver.http_client()
(tenant_id, _) = env.neon_cli.create_tenant()
assert ps_http.tenant_config(tenant_id).tenant_specific_overrides == {}
config_pre_detach = ps_http.tenant_config(tenant_id)
assert tenant_id in [TenantId(t["id"]) for t in ps_http.tenant_list()]
ps_http.tenant_detach(tenant_id)
assert tenant_id not in [TenantId(t["id"]) for t in ps_http.tenant_list()]
yield NegativeTests(env, tenant_id, config_pre_detach)
assert tenant_id not in [
TenantId(t["id"]) for t in ps_http.tenant_list()
], "tenant should not be attached after negative test"
env.pageserver.allowed_errors.append(".*Error processing HTTP request: Bad request")
def log_contains_bad_request():
env.pageserver.log_contains(".*Error processing HTTP request: Bad request")
wait_until(50, 0.1, log_contains_bad_request)
def test_null_body(negative_env: NegativeTests):
"""
If we send `null` in the body, the request should be rejected with status 400.
"""
env = negative_env.neon_env
tenant_id = negative_env.tenant_id
ps_http = env.pageserver.http_client()
res = ps_http.post(
f"{ps_http.base_url}/v1/tenant/{tenant_id}/attach",
data=b"null",
headers={"Content-Type": "application/json"},
)
assert res.status_code == 400
def test_null_config(negative_env: NegativeTests):
"""
If the `config` field is `null`, the request should be rejected with status 400.
"""
env = negative_env.neon_env
tenant_id = negative_env.tenant_id
ps_http = env.pageserver.http_client()
res = ps_http.post(
f"{ps_http.base_url}/v1/tenant/{tenant_id}/attach",
data=b'{"config": null}',
headers={"Content-Type": "application/json"},
)
assert res.status_code == 400
def test_config_with_unknown_keys_is_bad_request(negative_env: NegativeTests):
"""
If we send a config with unknown keys, the request should be rejected with status 400.
"""
env = negative_env.neon_env
tenant_id = negative_env.tenant_id
ps_http = env.pageserver.http_client()
config_with_unknown_keys = {
"compaction_period": "1h",
"this_key_does_not_exist": "some value",
}
with pytest.raises(PageserverApiException) as e:
ps_http.tenant_attach(tenant_id, config=config_with_unknown_keys)
assert e.type == PageserverApiException
assert e.value.status_code == 400
@pytest.mark.parametrize("content_type", [None, "application/json"])
def test_empty_body(positive_env: NeonEnv, content_type: Optional[str]):
"""
For backwards-compatiblity: if we send an empty body,
the request should be accepted and the config should be the default config.
"""
env = positive_env
ps_http = env.pageserver.http_client()
(tenant_id, _) = env.neon_cli.create_tenant()
assert ps_http.tenant_config(tenant_id).tenant_specific_overrides == {}
config_pre_detach = ps_http.tenant_config(tenant_id)
assert tenant_id in [TenantId(t["id"]) for t in ps_http.tenant_list()]
ps_http.tenant_detach(tenant_id)
assert tenant_id not in [TenantId(t["id"]) for t in ps_http.tenant_list()]
ps_http.post(
f"{ps_http.base_url}/v1/tenant/{tenant_id}/attach",
data=b"",
headers=None if content_type else {"Content-Type": "application/json"},
).raise_for_status()
assert ps_http.tenant_config(tenant_id).tenant_specific_overrides == {}
assert ps_http.tenant_config(tenant_id).effective_config == config_pre_detach.effective_config
def test_fully_custom_config(positive_env: NeonEnv):
"""
If we send a valid config in the body, the request should be accepted and the config should be applied.
"""
env = positive_env
fully_custom_config = {
"compaction_period": "1h",
"compaction_threshold": 13,
"compaction_target_size": 1048576,
"checkpoint_distance": 10000,
"checkpoint_timeout": "13m",
"eviction_policy": {
"kind": "LayerAccessThreshold",
"period": "20s",
"threshold": "23h",
},
"evictions_low_residence_duration_metric_threshold": "2days",
"gc_horizon": 23 * (1024 * 1024),
"gc_period": "2h 13m",
"image_creation_threshold": 7,
"pitr_interval": "1m",
"lagging_wal_timeout": "23m",
"max_lsn_wal_lag": 230000,
"min_resident_size_override": 23,
"trace_read_requests": True,
"walreceiver_connect_timeout": "13m",
}
ps_http = env.pageserver.http_client()
initial_tenant_config = ps_http.tenant_config(env.initial_tenant)
assert initial_tenant_config.tenant_specific_overrides == {}
assert set(initial_tenant_config.effective_config.keys()) == set(
fully_custom_config.keys()
), "ensure we cover all config options"
(tenant_id, _) = env.neon_cli.create_tenant()
ps_http.set_tenant_config(tenant_id, fully_custom_config)
our_tenant_config = ps_http.tenant_config(tenant_id)
assert our_tenant_config.tenant_specific_overrides == fully_custom_config
assert set(our_tenant_config.effective_config.keys()) == set(
fully_custom_config.keys()
), "ensure we cover all config options"
assert {
k: initial_tenant_config.effective_config[k] != our_tenant_config.effective_config[k]
for k in fully_custom_config.keys()
} == {
k: True for k in fully_custom_config.keys()
}, "ensure our custom config has different values than the default config for all config options, so we know we overrode everything"
ps_http.tenant_detach(tenant_id)
ps_http.tenant_attach(tenant_id, config=fully_custom_config)
assert ps_http.tenant_config(tenant_id).tenant_specific_overrides == fully_custom_config
assert set(ps_http.tenant_config(tenant_id).effective_config.keys()) == set(
fully_custom_config.keys()
), "ensure we cover all config options"

View File

@@ -0,0 +1,210 @@
from types import TracebackType
from typing import Any, Dict, List, Optional, Tuple, Type
import psycopg2
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import VanillaPostgres
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
def handle_db(dbs, roles, operation):
if operation["op"] == "set":
if "old_name" in operation and operation["old_name"] in dbs:
dbs[operation["name"]] = dbs[operation["old_name"]]
dbs.pop(operation["old_name"])
if "owner" in operation:
dbs[operation["name"]] = operation["owner"]
elif operation["op"] == "del":
dbs.pop(operation["name"])
else:
raise ValueError("Invalid op")
def handle_role(dbs, roles, operation):
if operation["op"] == "set":
if "old_name" in operation and operation["old_name"] in roles:
roles[operation["name"]] = roles[operation["old_name"]]
roles.pop(operation["old_name"])
for db, owner in dbs.items():
if owner == operation["old_name"]:
dbs[db] = operation["name"]
if "password" in operation:
roles[operation["name"]] = operation["password"]
elif operation["op"] == "del":
if "old_name" in operation:
roles.pop(operation["old_name"])
roles.pop(operation["name"])
else:
raise ValueError("Invalid op")
fail = False
def ddl_forward_handler(request: Request, dbs: Dict[str, str], roles: Dict[str, str]) -> Response:
log.info(f"Received request with data {request.get_data(as_text=True)}")
if fail:
log.info("FAILING")
return Response(status=500, response="Failed just cuz")
if request.json is None:
log.info("Received invalid JSON")
return Response(status=400)
json = request.json
# Handle roles first
if "roles" in json:
for operation in json["roles"]:
handle_role(dbs, roles, operation)
if "dbs" in json:
for operation in json["dbs"]:
handle_db(dbs, roles, operation)
return Response(status=200)
class DdlForwardingContext:
def __init__(self, httpserver: HTTPServer, vanilla_pg: VanillaPostgres, host: str, port: int):
self.server = httpserver
self.pg = vanilla_pg
self.host = host
self.port = port
self.dbs: Dict[str, str] = {}
self.roles: Dict[str, str] = {}
endpoint = "/management/api/v2/roles_and_databases"
ddl_url = f"http://{host}:{port}{endpoint}"
self.pg.configure(
[
f"neon.console_url={ddl_url}",
"shared_preload_libraries = 'neon'",
]
)
log.info(f"Listening on {ddl_url}")
self.server.expect_request(endpoint, method="PATCH").respond_with_handler(
lambda request: ddl_forward_handler(request, self.dbs, self.roles)
)
def __enter__(self):
self.pg.start()
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
tb: Optional[TracebackType],
):
self.pg.stop()
def send(self, query: str) -> List[Tuple[Any, ...]]:
return self.pg.safe_psql(query)
def wait(self, timeout=3):
self.server.wait(timeout=timeout)
def send_and_wait(self, query: str, timeout=3) -> List[Tuple[Any, ...]]:
res = self.send(query)
self.wait(timeout=timeout)
return res
@pytest.fixture(scope="function")
def ddl(
httpserver: HTTPServer, vanilla_pg: VanillaPostgres, httpserver_listen_address: tuple[str, int]
):
(host, port) = httpserver_listen_address
with DdlForwardingContext(httpserver, vanilla_pg, host, port) as ddl:
yield ddl
def test_ddl_forwarding(ddl: DdlForwardingContext):
curr_user = ddl.send("SELECT current_user")[0][0]
log.info(f"Current user is {curr_user}")
ddl.send_and_wait("CREATE DATABASE bork")
assert ddl.dbs == {"bork": curr_user}
ddl.send_and_wait("CREATE ROLE volk WITH PASSWORD 'nu_zayats'")
ddl.send_and_wait("ALTER DATABASE bork RENAME TO nu_pogodi")
assert ddl.dbs == {"nu_pogodi": curr_user}
ddl.send_and_wait("ALTER DATABASE nu_pogodi OWNER TO volk")
assert ddl.dbs == {"nu_pogodi": "volk"}
ddl.send_and_wait("DROP DATABASE nu_pogodi")
assert ddl.dbs == {}
ddl.send_and_wait("DROP ROLE volk")
assert ddl.roles == {}
ddl.send_and_wait("CREATE ROLE tarzan WITH PASSWORD 'of_the_apes'")
assert ddl.roles == {"tarzan": "of_the_apes"}
ddl.send_and_wait("DROP ROLE tarzan")
assert ddl.roles == {}
ddl.send_and_wait("CREATE ROLE tarzan WITH PASSWORD 'of_the_apes'")
assert ddl.roles == {"tarzan": "of_the_apes"}
ddl.send_and_wait("ALTER ROLE tarzan WITH PASSWORD 'jungle_man'")
assert ddl.roles == {"tarzan": "jungle_man"}
ddl.send_and_wait("ALTER ROLE tarzan RENAME TO mowgli")
assert ddl.roles == {"mowgli": "jungle_man"}
ddl.send_and_wait("DROP ROLE mowgli")
assert ddl.roles == {}
conn = ddl.pg.connect()
cur = conn.cursor()
cur.execute("BEGIN")
cur.execute("CREATE ROLE bork WITH PASSWORD 'cork'")
cur.execute("COMMIT")
ddl.wait()
assert ddl.roles == {"bork": "cork"}
cur.execute("BEGIN")
cur.execute("CREATE ROLE stork WITH PASSWORD 'pork'")
cur.execute("ABORT")
ddl.wait()
assert ("stork", "pork") not in ddl.roles.items()
cur.execute("BEGIN")
cur.execute("ALTER ROLE bork WITH PASSWORD 'pork'")
cur.execute("ALTER ROLE bork RENAME TO stork")
cur.execute("COMMIT")
ddl.wait()
assert ddl.roles == {"stork": "pork"}
cur.execute("BEGIN")
cur.execute("CREATE ROLE dork WITH PASSWORD 'york'")
cur.execute("SAVEPOINT point")
cur.execute("ALTER ROLE dork WITH PASSWORD 'zork'")
cur.execute("ALTER ROLE dork RENAME TO fork")
cur.execute("ROLLBACK TO SAVEPOINT point")
cur.execute("ALTER ROLE dork WITH PASSWORD 'fork'")
cur.execute("ALTER ROLE dork RENAME TO zork")
cur.execute("RELEASE SAVEPOINT point")
cur.execute("COMMIT")
ddl.wait()
assert ddl.roles == {"stork": "pork", "zork": "fork"}
cur.execute("DROP ROLE stork")
cur.execute("DROP ROLE zork")
ddl.wait()
assert ddl.roles == {}
cur.execute("CREATE ROLE bork WITH PASSWORD 'dork'")
cur.execute("CREATE ROLE stork WITH PASSWORD 'cork'")
cur.execute("BEGIN")
cur.execute("DROP ROLE bork")
cur.execute("ALTER ROLE stork RENAME TO bork")
cur.execute("COMMIT")
ddl.wait()
assert ddl.roles == {"bork": "cork"}
cur.execute("DROP ROLE bork")
ddl.wait()
assert ddl.roles == {}
cur.execute("CREATE ROLE bork WITH PASSWORD 'dork'")
cur.execute("CREATE DATABASE stork WITH OWNER=bork")
cur.execute("ALTER ROLE bork RENAME TO cork")
ddl.wait()
assert ddl.dbs == {"stork": "cork"}
with pytest.raises(psycopg2.InternalError):
global fail
fail = True
cur.execute("CREATE DATABASE failure WITH OWNER=cork")
ddl.wait()
conn.close()

View File

@@ -204,6 +204,7 @@ def proxy_with_metric_collector(
http_port = port_distributor.get_port()
proxy_port = port_distributor.get_port()
mgmt_port = port_distributor.get_port()
external_http_port = port_distributor.get_port()
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
@@ -215,6 +216,7 @@ def proxy_with_metric_collector(
proxy_port=proxy_port,
http_port=http_port,
mgmt_port=mgmt_port,
external_http_port=external_http_port,
metric_collection_endpoint=metric_collection_endpoint,
metric_collection_interval=metric_collection_interval,
auth_backend=NeonProxy.Link(),
@@ -226,7 +228,6 @@ def proxy_with_metric_collector(
@pytest.mark.asyncio
async def test_proxy_metric_collection(
httpserver: HTTPServer,
httpserver_listen_address,
proxy_with_metric_collector: NeonProxy,
vanilla_pg: VanillaPostgres,
):

View File

@@ -1,22 +1,32 @@
import json
import subprocess
from typing import Any, List
import psycopg2
import pytest
import requests
from fixtures.neon_fixtures import PSQL, NeonProxy, VanillaPostgres
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
def test_proxy_select_1(static_proxy: NeonProxy, option_name: str):
def test_proxy_select_1(static_proxy: NeonProxy):
"""
A simplest smoke test: check proxy against a local postgres instance.
"""
out = static_proxy.safe_psql("select 1", options=f"{option_name}=generic-project-name")
# no SNI, deprecated `options=project` syntax (before we had several endpoint in project)
out = static_proxy.safe_psql("select 1", sslsni=0, options="project=generic-project-name")
assert out[0][0] == 1
# no SNI, new `options=endpoint` syntax
out = static_proxy.safe_psql("select 1", sslsni=0, options="endpoint=generic-project-name")
assert out[0][0] == 1
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
def test_password_hack(static_proxy: NeonProxy, option_name: str):
# with SNI
out = static_proxy.safe_psql("select 42", host="generic-project-name.localtest.me")
assert out[0][0] == 42
def test_password_hack(static_proxy: NeonProxy):
"""
Check the PasswordHack auth flow: an alternative to SCRAM auth for
clients which can't provide the project/endpoint name via SNI or `options`.
@@ -24,14 +34,16 @@ def test_password_hack(static_proxy: NeonProxy, option_name: str):
user = "borat"
password = "password"
static_proxy.safe_psql(
f"create role {user} with login password '{password}'",
options=f"{option_name}=irrelevant",
)
static_proxy.safe_psql(f"create role {user} with login password '{password}'")
# Note the format of `magic`!
magic = f"{option_name}=irrelevant;{password}"
static_proxy.safe_psql("select 1", sslsni=0, user=user, password=magic)
magic = f"project=irrelevant;{password}"
out = static_proxy.safe_psql("select 1", sslsni=0, user=user, password=magic)
assert out[0][0] == 1
magic = f"endpoint=irrelevant;{password}"
out = static_proxy.safe_psql("select 1", sslsni=0, user=user, password=magic)
assert out[0][0] == 1
# Must also check that invalid magic won't be accepted.
with pytest.raises(psycopg2.OperationalError):
@@ -69,52 +81,55 @@ def test_proxy_options(static_proxy: NeonProxy, option_name: str):
"""
options = f"{option_name}=irrelevant -cproxytest.option=value"
out = static_proxy.safe_psql("show proxytest.option", options=options)
out = static_proxy.safe_psql("show proxytest.option", options=options, sslsni=0)
assert out[0][0] == "value"
options = f"-c proxytest.foo=\\ str {option_name}=irrelevant"
out = static_proxy.safe_psql("show proxytest.foo", options=options, sslsni=0)
assert out[0][0] == " str"
options = "-cproxytest.option=value"
out = static_proxy.safe_psql("show proxytest.option", options=options)
assert out[0][0] == "value"
options = "-c proxytest.foo=\\ str"
out = static_proxy.safe_psql("show proxytest.foo", options=options)
assert out[0][0] == " str"
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
def test_auth_errors(static_proxy: NeonProxy, option_name: str):
def test_auth_errors(static_proxy: NeonProxy):
"""
Check that we throw very specific errors in some unsuccessful auth scenarios.
"""
# User does not exist
with pytest.raises(psycopg2.Error) as exprinfo:
static_proxy.connect(user="pinocchio", options=f"{option_name}=irrelevant")
static_proxy.connect(user="pinocchio")
text = str(exprinfo.value).strip()
assert text.endswith("password authentication failed for user 'pinocchio'")
assert text.find("password authentication failed for user 'pinocchio'") != -1
static_proxy.safe_psql(
"create role pinocchio with login password 'magic'",
options=f"{option_name}=irrelevant",
)
# User exists, but password is missing
with pytest.raises(psycopg2.Error) as exprinfo:
static_proxy.connect(user="pinocchio", password=None, options=f"{option_name}=irrelevant")
static_proxy.connect(user="pinocchio", password=None)
text = str(exprinfo.value).strip()
assert text.endswith("password authentication failed for user 'pinocchio'")
assert text.find("password authentication failed for user 'pinocchio'") != -1
# User exists, but password is wrong
with pytest.raises(psycopg2.Error) as exprinfo:
static_proxy.connect(user="pinocchio", password="bad", options=f"{option_name}=irrelevant")
static_proxy.connect(user="pinocchio", password="bad")
text = str(exprinfo.value).strip()
assert text.endswith("password authentication failed for user 'pinocchio'")
assert text.find("password authentication failed for user 'pinocchio'") != -1
# Finally, check that the user can connect
with static_proxy.connect(
user="pinocchio", password="magic", options=f"{option_name}=irrelevant"
):
with static_proxy.connect(user="pinocchio", password="magic"):
pass
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
def test_forward_params_to_client(static_proxy: NeonProxy, option_name: str):
def test_forward_params_to_client(static_proxy: NeonProxy):
"""
Check that we forward all necessary PostgreSQL server params to client.
"""
@@ -140,7 +155,7 @@ def test_forward_params_to_client(static_proxy: NeonProxy, option_name: str):
where name = any(%s)
"""
with static_proxy.connect(options=f"{option_name}=irrelevant") as conn:
with static_proxy.connect() as conn:
with conn.cursor() as cur:
cur.execute(query, (reported_params_subset,))
for name, value in cur.fetchall():
@@ -148,18 +163,65 @@ def test_forward_params_to_client(static_proxy: NeonProxy, option_name: str):
assert conn.get_parameter_status(name) == value
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
@pytest.mark.timeout(5)
def test_close_on_connections_exit(static_proxy: NeonProxy, option_name: str):
def test_close_on_connections_exit(static_proxy: NeonProxy):
# Open two connections, send SIGTERM, then ensure that proxy doesn't exit
# until after connections close.
with static_proxy.connect(options=f"{option_name}=irrelevant"), static_proxy.connect(
options=f"{option_name}=irrelevant"
):
with static_proxy.connect(), static_proxy.connect():
static_proxy.terminate()
with pytest.raises(subprocess.TimeoutExpired):
static_proxy.wait_for_exit(timeout=2)
# Ensure we don't accept any more connections
with pytest.raises(psycopg2.OperationalError):
static_proxy.connect(options=f"{option_name}=irrelevant")
static_proxy.connect()
static_proxy.wait_for_exit()
def test_sql_over_http(static_proxy: NeonProxy):
static_proxy.safe_psql("create role http with login password 'http' superuser")
def q(sql: str, params: List[Any] = []) -> Any:
connstr = f"postgresql://http:http@{static_proxy.domain}:{static_proxy.proxy_port}/postgres"
response = requests.post(
f"https://{static_proxy.domain}:{static_proxy.external_http_port}/sql",
data=json.dumps({"query": sql, "params": params}),
headers={"Content-Type": "application/sql", "Neon-Connection-String": connstr},
verify=str(static_proxy.test_output_dir / "proxy.crt"),
)
assert response.status_code == 200
return response.json()
rows = q("select 42 as answer")["rows"]
assert rows == [{"answer": 42}]
rows = q("select $1 as answer", [42])["rows"]
assert rows == [{"answer": "42"}]
rows = q("select $1 * 1 as answer", [42])["rows"]
assert rows == [{"answer": 42}]
rows = q("select $1::int[] as answer", [[1, 2, 3]])["rows"]
assert rows == [{"answer": [1, 2, 3]}]
rows = q("select $1::json->'a' as answer", [{"a": {"b": 42}}])["rows"]
assert rows == [{"answer": {"b": 42}}]
rows = q("select * from pg_class limit 1")["rows"]
assert len(rows) == 1
res = q("create table t(id serial primary key, val int)")
assert res["command"] == "CREATE"
assert res["rowCount"] is None
res = q("insert into t(val) values (10), (20), (30) returning id")
assert res["command"] == "INSERT"
assert res["rowCount"] == 3
assert res["rows"] == [{"id": 1}, {"id": 2}, {"id": 3}]
res = q("select * from t")
assert res["command"] == "SELECT"
assert res["rowCount"] == 3
res = q("drop table t")
assert res["command"] == "DROP"
assert res["rowCount"] is None

View File

@@ -4,7 +4,7 @@ from pathlib import Path
from types import TracebackType
from typing import Optional, Type
import backoff # type: ignore
import backoff
from fixtures.log_helper import log
from fixtures.neon_fixtures import PgProtocol, PortDistributor, VanillaPostgres

View File

@@ -62,6 +62,7 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
log.info(f"show {env.initial_tenant}")
pscur.execute(f"show {env.initial_tenant}")
res = pscur.fetchone()
assert res is not None
assert all(
i in res.items()
for i in {
@@ -101,6 +102,7 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
pscur.execute(f"show {tenant}")
res = pscur.fetchone()
log.info(f"res: {res}")
assert res is not None
assert all(
i in res.items()
for i in {
@@ -151,6 +153,7 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
"eviction_policy": json.dumps(
{"kind": "LayerAccessThreshold", "period": "80s", "threshold": "42h"}
),
"max_lsn_wal_lag": "13000000",
}
env.neon_cli.config_tenant(
tenant_id=tenant,
@@ -162,6 +165,7 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
pscur.execute(f"show {tenant}")
res = pscur.fetchone()
log.info(f"after config res: {res}")
assert res is not None
assert all(
i in res.items()
for i in {
@@ -206,6 +210,7 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
assert updated_effective_config["gc_horizon"] == 67108864
assert updated_effective_config["image_creation_threshold"] == 2
assert updated_effective_config["pitr_interval"] == "7days"
assert updated_effective_config["max_lsn_wal_lag"] == 13000000
# restart the pageserver and ensure that the config is still correct
env.pageserver.stop()
@@ -216,6 +221,7 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
pscur.execute(f"show {tenant}")
res = pscur.fetchone()
log.info(f"after restart res: {res}")
assert res is not None
assert all(
i in res.items()
for i in {
@@ -265,6 +271,7 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
"period": "20s",
"threshold": "23h",
}
assert final_effective_config["max_lsn_wal_lag"] == 10 * 1024 * 1024
# restart the pageserver and ensure that the config is still correct
env.pageserver.stop()
@@ -275,6 +282,7 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
pscur.execute(f"show {tenant}")
res = pscur.fetchone()
log.info(f"after restart res: {res}")
assert res is not None
assert all(
i in res.items()
for i in {

View File

@@ -27,7 +27,6 @@ futures-core = { version = "0.3" }
futures-executor = { version = "0.3" }
futures-sink = { version = "0.3" }
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
hashbrown = { version = "0.12", features = ["raw"] }
itertools = { version = "0.10" }
libc = { version = "0.2", features = ["extra_traits"] }
log = { version = "0.4", default-features = false, features = ["std"] }
@@ -39,7 +38,7 @@ num-traits = { version = "0.2", features = ["i128"] }
prost = { version = "0.11" }
rand = { version = "0.8", features = ["small_rng"] }
regex = { version = "1" }
regex-syntax = { version = "0.6" }
regex-syntax = { version = "0.7" }
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "multipart", "rustls-tls"] }
ring = { version = "0.16", features = ["std"] }
rustls = { version = "0.20", features = ["dangerous_configuration"] }
@@ -62,7 +61,6 @@ url = { version = "2", features = ["serde"] }
anyhow = { version = "1", features = ["backtrace"] }
bytes = { version = "1", features = ["serde"] }
either = { version = "1" }
hashbrown = { version = "0.12", features = ["raw"] }
itertools = { version = "0.10" }
libc = { version = "0.2", features = ["extra_traits"] }
log = { version = "0.4", default-features = false, features = ["std"] }
@@ -70,7 +68,7 @@ memchr = { version = "2" }
nom = { version = "7" }
prost = { version = "0.11" }
regex = { version = "1" }
regex-syntax = { version = "0.6" }
regex-syntax = { version = "0.7" }
serde = { version = "1", features = ["alloc", "derive"] }
syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-traits", "full", "visit", "visit-mut"] }
syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "full", "visit-mut"] }