Compare commits

..

57 Commits

Author SHA1 Message Date
Dmitry Rodionov
c883d851e2 tests: disable gc as intended 2023-04-07 20:04:06 +03:00
Stas Kelvich
0bf70e113f Add extra cnames to staging proxy 2023-04-07 19:18:19 +03:00
Vadim Kharitonov
31f2cdeb1e Update Dockerfile.compute-node
Co-authored-by: MMeent <matthias@neon.tech>
2023-04-07 15:26:22 +02:00
Vadim Kharitonov
979fa8b1ba Compile timescaledb 2023-04-07 15:26:22 +02:00
Konstantin Knizhnik
bfee412701 Trigger tests for index scan implementation (#3968)
## Describe your changes

## Issue ticket number and link

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist
2023-04-07 14:26:21 +03:00
Dmitry Rodionov
bfeb428d1b tests: make neon_fixtures a bit thinner by splitting out some pageserver related helpers (#3977)
neon_fixture is quite big and messy, lets clean it up a bit.
2023-04-07 13:47:28 +03:00
Stas Kelvich
b1c2a6384a Set non-wildcard common names in link auth proxy
Old coding here ignored non-wildcard common names and passed None instead. With my recent changes
I started throwing an error in that case. Old logic doesn't seem to be a great choice, so instead
of passing None I actually set non-wildcard common names too. That way it is possible to avoid handling
cases with None in downstream code.
2023-04-07 01:24:27 +03:00
Anastasia Lubennikova
6d01d835a8 [proxy] Report error if proxy_io_bytes_per_client metric has decreased 2023-04-06 23:14:07 +03:00
Alexey Kondratov
e42982fb1e [compute_ctl] Empty computes and /configure API (#3963)
This commit adds an option to start compute without spec and then pass
it a valid spec via `POST /configure` API endpoint. This is a main
prerequisite for maintaining the pool of compute nodes in the
control-plane.

For example:

1. Start compute with
   ```shell
   cargo run --bin compute_ctl -- -i no-compute \
    -p http://localhost:9095 \
    -D compute_pgdata \
    -C "postgresql://cloud_admin@127.0.0.1:5434/postgres" \
    -b ./pg_install/v15/bin/postgres
   ```

2. Configure it with
   ```shell
   curl -d "{\"spec\": $(cat ./compute-spec.json)}" http://localhost:3080/configure
   ```

Internally, it's implemented using a `Condvar` + `Mutex`. Compute spec
is moved under Mutex, as it's now could be updated in the http handler.
Also `RwLock` was replaced with `Mutex` because the latter works well
with `Condvar`.

First part of the neondatabase/cloud#4433
2023-04-06 21:21:58 +02:00
Dmitry Rodionov
b45c92e533 tests: exclude compatibility tests by default (#3975)
This allows to skip compatibility tests based on `CHECK_ONDISK_DATA_COMPATIBILITY` environment variable. When the variable is missing (default) compatibility tests wont be run.
2023-04-06 21:21:39 +03:00
Arthur Petukhovsky
ba4a96fdb1 Eagerly update wal_backup_lsn after each segment offload (#3976)
Otherwise it can lag a lot, preventing WAL segments cleanup. Also max
wal_backup_lsn on update, pulling it down is pointless.

Should help with https://github.com/neondatabase/neon/issues/3957, but
will not fix it completely.
2023-04-06 20:57:06 +03:00
Alexander Bayandin
4d64edf8a5 Nightly Benchmarks: Add free tier sized compute (#3969)
- Add support for VMs and CU
- Add free tier limited benchmark (0.25 CU)
- Ensure we use 1 CU by default for pgbench workload
2023-04-06 19:18:24 +03:00
Kirill Bulatov
102746bc8f Apply clippy rule exclusion locally instead of a global approach (#3974) 2023-04-06 18:57:48 +03:00
Alexander Bayandin
887cee64e2 test_runner: add links to grafana for remote tests (#3961)
Add Grafana links to allure reports to make it easier to debug perf
test failures
2023-04-06 13:52:41 +01:00
Vadim Kharitonov
2ce973c72f Allow installation of pg_stat_statements 2023-04-06 13:26:40 +02:00
Gleb Novikov
9db70f6232 Added disk_size and instance_type to payload (#3918)
## Describe your changes

In https://github.com/neondatabase/cloud/issues/4354 we are making
scheduling of projects based on available disk space and overcommit, so
we need to know disk size and just in case instance type of the
pageserver

## Issue ticket number and link

https://github.com/neondatabase/cloud/issues/4354

## Checklist before requesting a review
- [x] I have performed a self-review of my code.
- [ ] ~If it is a core feature, I have added thorough tests.~
- [ ] ~Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?~
- [ ] ~If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.~
2023-04-06 14:02:56 +04:00
Joonas Koivunen
b17c24fa38 fix: settle down to configured percent (#3947)
in real env testing we noted that the disk-usage based eviction sails 1
percentage point above the configured value, which might be a source of
confusion, so it might be better to get rid of that confusion now.

confusion: "I configured 85% but pageserver sails at 86%".

Co-authored-by: Christian Schwarz <christian@neon.tech>
2023-04-06 12:47:21 +03:00
Alexander Bayandin
9310949b44 GitHub Autocomment: Retry on server errors (#3958)
Retry posting/updating a comment in case of 5XX errors from GitHub API
2023-04-05 22:08:06 +03:00
Stas Kelvich
d8df5237fa Aligne extra certificate name with default cert-manager names 2023-04-05 21:29:21 +03:00
Stas Kelvich
c3ca48c62b Support extra domain names for proxy.
Make it possible to specify directory where proxy will look up for
extra certificates. Proxy will iterate through subdirs of that directory
and load `key.pem` and `cert.pem` files from each subdir. Certs directory
structure may look like that:

  certs
  |--example.com
  |  |--key.pem
  |  |--cert.pem
  |--foo.bar
     |--key.pem
     |--cert.pem

Actual domain names are taken from certs and key, subdir names are
ignored.
2023-04-05 20:06:48 +03:00
Alexander Bayandin
957acb51b5 GitHub Autocomment: Fix the link to the latest commit (#3952) 2023-04-04 19:06:10 +03:00
Alexander Bayandin
1d23b5d1de Comment PR with test results (#3907)
This PR adds posting a comment with test results. Each workflow run
updates the comment with new results.
The layout and the information that we post can be changed to our needs,
right now, it contains failed tests and test which changes status after
rerun (i.e. flaky tests)
2023-04-04 12:22:47 +01:00
Alexander Bayandin
105b8bb9d3 test_runner: automatically rerun flaky tests (#3880)
This PR adds a plugin that automatically reruns (up to 3 times) flaky
tests. Internally, it uses data from `TEST_RESULT_CONNSTR` database and
`pytest-rerunfailures` plugin.

As the first approximation we consider the test flaky if it has failed on 
the main branch in the last 10 days.

Flaky tests are fetched by `scripts/flaky_tests.py` script (it's
possible to use it in a standalone mode to learn which tests are flaky),
stored to a JSON file, and then the file is passed to the pytest plugin.
2023-04-04 12:21:54 +01:00
Kirill Bulatov
846532112c Remove unused S3 list operation (#3936)
In S3, pageserver only lists tenants (prefixes) on S3, no other keys.
Remove the list operation from the API, since S3 impl does not seem to
work normally and not used anyway,
2023-04-03 23:44:38 +03:00
Dmitry Ivanov
f85a61ceac [proxy] Fix regression in logging
For some reason, `tracing::instrument` proc_macro doesn't always print
elements specified via `fields()` or even show that it's impossible
(e.g. there's no Display impl).

Work around this using the `?foo` notation.

Before:
2023-04-03T14:48:06.017504Z  INFO handle_client🤝 received SslRequest

After:
2023-04-03T14:51:24.424176Z  INFO handle_client{session_id=7bd07be8-3462-404e-8ccc-0a5332bf3ace}🤝 received SslRequest
2023-04-03 18:49:30 +03:00
Christian Schwarz
45bf76eb05 enable layer eviction by default in prod (#3933)
Leave disk_usage_based_eviction above the current max usage in prod
(82%ish), so that deploying this commit won't trigger
disk_usage_based_eviction.

As indicated in the TODO, we'll decrease the value to 80% later.

Also update the staging YAMLs to use the anchor syntax for
`evictions_low_residence_duration_metric_threshold` like we do in the
prod YAMLs as of this patch.
2023-04-03 14:57:36 +02:00
Joonas Koivunen
a415670bc3 feat: log evictions (#3930)
this will help log analysis with the counterpart of already logging all
remote download needs and downloads. ended up with a easily regexable
output in the final round.
2023-04-03 14:15:41 +03:00
Joonas Koivunen
cf5cfe6d71 fix: metric used for alerting threshold on staging (#3932)
This should remove the too eager alerts from staging.
2023-04-03 13:26:45 +03:00
Arseny Sher
d733bc54b8 Rename ReplicationFeedback and its fields.
This is the the feedback originating from pageserver, so change previous
confusing names to
s/ReplicationFeedback/PageserverFeedback
s/ps_writelsn/last_receive_lsn
s/ps_flushlsn/disk_consistent_lsn
s/ps_apply_lsn/remote_consistent_lsn

I haven't changed on the wire format to keep compatibility. However,
understanding of new field names is added to compute, so once all computes
receive this patch we can change the wire names as well. Safekeepers/pageservers
are deployed roughly at the same time and it is ok to live without feedbacks
during the short period, so this is not a problem there.
2023-04-03 01:52:41 +04:00
Arthur Petukhovsky
814abd9f84 Switch to safekeeper in the same AZ (#3883)
Add a condition to switch walreceiver connection to safekeeper that is
located in the same availability zone. Switch happens when commit_lsn of
a candidate is not less than commit_lsn from the active connection. This
condition is expected not to trigger instantly, because commit_lsn of a
current connection is usually greater than commit_lsn of updates from
the broker. That means that if WAL is written continuously, switch can
take a lot of time, but it should happen eventually.

Now protoc 3.15+ is required for building neon.

Fixes https://github.com/neondatabase/neon/issues/3200
2023-04-02 11:32:27 +03:00
Alexander Bayandin
75ffe34b17 check-macos-build: fix cache key (#3926)
We don't have `${{ matrix.build_type }}` there, so it gets resolved to
an empty substring and looks like this

[`v1-macOS--pg-f8a650e49b06d39ad131b860117504044b01f312-dcccd010ff851b9f72bb451f28243fa3a341f07028034bbb46ea802413b36d80`](https://github.com/neondatabase/neon/actions/runs/4575422427/jobs/8078231907#step:26:2)
2023-03-31 21:45:59 +03:00
Christian Schwarz
d2aa31f0ce fix pageserver_evictions_with_low_residence_duration metric (#3925)
It was doing the comparison in the wrong way.
2023-03-31 19:25:53 +03:00
Dmitry Rodionov
22f9ea5fe2 Remind people to clean up merge commit message in PR template (#3920) 2023-03-31 16:11:34 +03:00
Joonas Koivunen
d0711d0896 build: fix git perms for deploy job (#3921)
copy pasted from `build-neon` job. it is interesting that this is only
needed by `build-neon` and `deploy`.

Fixes:
https://github.com/neondatabase/neon/actions/runs/4568077915/jobs/8070960178
which seems to have been going for a while.
2023-03-31 16:05:15 +03:00
Arseny Sher
271f6a6e99 Always sync-safekeepers in neon_local on compute start.
Instead of checking neon.safekeepers GUC value in existing pg node data dir,
just always run sync-safekeepers when safekeepers are configured. Without this
change, creation of new compute didn't run it. That's ok for new
timeline/branch (it doesn't return anything useful anyway, and LSN is known by
pageserver), but restart of compute for existing timeline bore the risk of
getting basebackup not on the latest LSN, i.e. basically broken -- it might not
have prev_lsn, and even if it had, walproposer would complain anyway.

fixes https://github.com/neondatabase/neon/issues/2963
2023-03-31 16:15:06 +04:00
Christian Schwarz
a64dd3ecb5 disk-usage-based layer eviction (#3809)
This patch adds a pageserver-global background loop that evicts layers
in response to a shortage of available bytes in the $repo/tenants
directory's filesystem.

The loop runs periodically at a configurable `period`.

Each loop iteration uses `statvfs` to determine filesystem-level space
usage. It compares the returned usage data against two different types
of thresholds. The iteration tries to evict layers until app-internal
accounting says we should be below the thresholds. We cross-check this
internal accounting with the real world by making another `statvfs` at
the end of the iteration. We're good if that second statvfs shows that
we're _actually_ below the configured thresholds. If we're still above
one or more thresholds, we emit a warning log message, leaving it to the
operator to investigate further.

There are two thresholds:
- `max_usage_pct` is the relative available space, expressed in percent
of the total filesystem space. If the actual usage is higher, the
threshold is exceeded.
- `min_avail_bytes` is the absolute available space in bytes. If the
actual usage is lower, the threshold is exceeded.

The iteration evicts layers in LRU fashion with a reservation of up to
`tenant_min_resident_size` bytes of the most recent layers per tenant.
The layers not part of the per-tenant reservation are evicted
least-recently-used first until we're below all thresholds. The
`tenant_min_resident_size` can be overridden per tenant as
`min_resident_size_override` (bytes).

In addition to the loop, there is also an HTTP endpoint to perform one
loop iteration synchronous to the request. The endpoint takes an
absolute number of bytes that the iteration needs to evict before
pressure is relieved. The tests use this endpoint, which is a great
simplification over setting up loopback-mounts in the tests, which would
be required to test the statvfs part of the implementation. We will rely
on manual testing in staging to test the statvfs parts.

The HTTP endpoint is also handy in emergencies where an operator wants
the pageserver to evict a given amount of space _now. Hence, it's
arguments documented in openapi_spec.yml. The response type isn't
documented though because we don't consider it stable. The endpoint
should _not_ be used by Console but it could be used by on-call.

Co-authored-by: Joonas Koivunen <joonas@neon.tech>
Co-authored-by: Dmitry Rodionov <dmitry@neon.tech>
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2023-03-31 14:47:57 +03:00
Konstantin Knizhnik
bf46237fc2 Fix prefetch for parallel bitmap scan (#3875)
## Describe your changes

Fix prefetch for parallel bitmap scan

## Issue ticket number and link

## Checklist before requesting a review
- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.
2023-03-30 22:07:19 +03:00
Lassi Pölönen
41d364a8f1 Add more detailed logging to compute_ctl's shutdown (#3915)
Currently we don't see from the logs, if shutting down tracing takes
long time or not. We do see that shutting down computes gets delayed for
some reason and hits thhe grace period limit. Moving the shutdown
message to slightly later, when we don't have anything else than just
exit left.
## Issue ticket number and link

## Checklist before requesting a review
- [x] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.
2023-03-30 22:02:39 +03:00
Christian Schwarz
fa54a57ca2 random_init_delay: remove the minimum of 10 seconds (#3914)
Before this patch, the range from which the random delay is picked is at
minimum 10 seconds.
With this patch, they delay is bounded to whatever the given `period`
is, and zero, if period id Duration::ZERO.

Motivation for this: the disk usage eviction tests that we'll add in
https://github.com/neondatabase/neon/pull/3905 need to wait for the disk
usage eviction background loop to do its job.
They set a period of 1s.
It seems wasteful to wait 10 seconds in the tests.

Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-03-30 18:38:45 +02:00
Lassi Pölönen
1c1bb904ed Rename zenith_* labels to neon_* (#3911)
## Describe your changes
Get rid of the legacy labeling. Aslo `neon_region_slug` with the same
value as `neon_region` doesn't make much sense, so just drop it. This
allows us to drop the relabeling from zenith to neon in the log
collector.
2023-03-30 16:24:47 +03:00
Gleb Novikov
b26c837ed6 Fixed pageserver openapi spec properties reference (#3904)
## Describe your changes

In [this linter
run](https://github.com/neondatabase/cloud/actions/runs/4553032319/jobs/8029101300?pr=4391)
accidentally found out that spec is invalid. Reference other schemas in
properties should be done the way I changed.

Could not find documentation specifically for schemas embedding in
`components.schemas`, but it seems like the approach is inherited from
json schema:
https://json-schema.org/understanding-json-schema/structuring.html#ref

## Issue ticket number and link
-

## Checklist before requesting a review
- [x] I have performed a self-review of my code.
- [ ] ~If it is a core feature, I have added thorough tests.~
- [ ] ~Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?~
- [ ] ~If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.~
2023-03-29 19:18:44 +04:00
Kirill Bulatov
ac9c7e8c4a Replace pin! from tokio to the std one (#3903)
With fresh rustc brought by
https://github.com/neondatabase/neon/pull/3902, we can use
`std::pin::pin!` macro instead of the tokio one.
One place did not need the macro at all, other places were adjusted.
2023-03-29 14:14:56 +03:00
Vadim Kharitonov
f1b174dc6a Update rust version to 1.68.2 2023-03-29 12:50:04 +04:00
Kirill Bulatov
9d714a8413 Split $CARGO_FLAGS and $CARGO_FEATURES to make e2e tests work 2023-03-29 00:08:30 +03:00
Kirill Bulatov
6c84cbbb58 Run new Rust IT test in CI 2023-03-29 00:08:30 +03:00
Kirill Bulatov
1300dc9239 Replace Python IT test with the Rust one 2023-03-29 00:08:30 +03:00
Kirill Bulatov
018c8b0e2b Use proper tokens and delimeters when listing S3 2023-03-29 00:08:30 +03:00
Arseny Sher
b52389f228 Cleanly exit on any shutdown signal in storage_broker.
neon_local sends SIGQUIT, which otherwise dumps core by default. Also, remove
obsolete install_shutdown_handlers; in all binaries it was overridden by
ShutdownSignals::handle later.

ref https://github.com/neondatabase/neon/issues/3847
2023-03-28 22:29:42 +04:00
Heikki Linnakangas
5a123b56e5 Remove obsolete hack to rename neon-specific GUCs.
I checked the console database, we don't have any of these left in
production.
2023-03-28 17:57:22 +03:00
Arthur Petukhovsky
7456e5b71c Add script to collect state from safekeepers (#3835)
Add an ansible script to collect
https://github.com/neondatabase/neon/pull/3710 state JSON from all
safekeeper nodes and upload them to a postgres table.
2023-03-28 17:04:02 +03:00
Konstantin Knizhnik
9798737ec6 Update pgxn/neon/file_cache.c
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2023-03-28 14:43:34 +04:00
Konstantin Knizhnik
35ecb139dc Use stavfs instead inof statfs to fix MacOS build 2023-03-28 14:43:34 +04:00
Arseny Sher
278d0f117d Rename neon_local sk logs s/safekeeper 1.log/safekeeper-1.log.
I don't like spaces in file names.
2023-03-28 14:28:56 +04:00
Arseny Sher
c30b9e6eb1 Show full path to pg_ctl invokation when it fails. 2023-03-28 12:06:06 +04:00
Konstantin Knizhnik
82a4777046 Add local free space monitor (#3832)
## Describe your changes

Monitor free spae in local file system and shrink local file cache size
if it is under watermark.
Neon is using local storage for temp files (temp table + intermediate
results), unlogged relations
and local file cache.

Ideally all space not used for temporary files should be used for local
file cache.
Temporary files and even unlogged relation are intended to have small
life time (because
them can be lost at  any moment in case of compute restart).

So the policy is to overcommit local cache size and shrink it if there
is not enough free space.
As far as temporary files are expected to be needed for a short time,
there i no need
to permanently shrink local file cache size. Instead of it, we just
throw away least recently accessed elements
from local file cache, releasing some space on the local disk.

## Issue ticket number and link

## Checklist before requesting a review
- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

---------

Co-authored-by: sharnoff <sharnoff@neon.tech>
2023-03-28 08:27:50 +03:00
Dmitry Rodionov
6efea43449 Use precondition failed code in delete_timeline when tenant is missing (#3884)
This allows client to differentiate between missing tenant and missing
timeline cases
2023-03-27 21:01:46 +03:00
Joonas Koivunen
f14895b48e eviction: avoid post-restart download by synthetic_size (#3871)
As of #3867, we do artificial layer accesses to layers that will be
needed after the next restart, but not until then because of caches.

With this patch, we also do that for the accesses that the synthetic
size calculation worker does if consumption metrics are enabled.

The actual size calculation is not of importance, but we need to
calculate all of the sizes, so we only call tenant::size::gather_inputs.

Co-authored-by: Christian Schwarz <christian@neon.tech>
2023-03-27 19:20:23 +02:00
125 changed files with 3291 additions and 1498 deletions

View File

@@ -15,10 +15,32 @@ outputs:
report-url:
description: 'Allure report URL'
value: ${{ steps.generate-report.outputs.report-url }}
report-json-url:
description: 'Allure report JSON URL'
value: ${{ steps.generate-report.outputs.report-json-url }}
runs:
using: "composite"
steps:
# We're using some of env variables quite offen, so let's set them once.
#
# It would be nice to have them set in common runs.env[0] section, but it doesn't work[1]
#
# - [0] https://docs.github.com/en/actions/creating-actions/metadata-syntax-for-github-actions#runsenv
# - [1] https://github.com/neondatabase/neon/pull/3907#discussion_r1154703456
#
- name: Set common environment variables
shell: bash -euxo pipefail {0}
run: |
echo "BUILD_TYPE=${BUILD_TYPE}" >> $GITHUB_ENV
echo "BUCKET=${BUCKET}" >> $GITHUB_ENV
echo "TEST_OUTPUT=${TEST_OUTPUT}" >> $GITHUB_ENV
env:
BUILD_TYPE: ${{ inputs.build_type }}
BUCKET: neon-github-public-dev
TEST_OUTPUT: /tmp/test_output
- name: Validate input parameters
shell: bash -euxo pipefail {0}
run: |
@@ -76,16 +98,14 @@ runs:
rm -f ${ALLURE_ZIP}
fi
env:
ALLURE_VERSION: 2.19.0
ALLURE_ZIP_MD5: ced21401a1a8b9dfb68cee9e4c210464
ALLURE_VERSION: 2.21.0
ALLURE_ZIP_MD5: c8db4dd8e2a7882583d569ed2c82879c
- name: Upload Allure results
if: ${{ inputs.action == 'store' }}
env:
REPORT_PREFIX: reports/${{ steps.calculate-vars.outputs.KEY }}/${{ inputs.build_type }}
RAW_PREFIX: reports-raw/${{ steps.calculate-vars.outputs.KEY }}/${{ inputs.build_type }}
TEST_OUTPUT: /tmp/test_output
BUCKET: neon-github-public-dev
TEST_SELECTION: ${{ steps.calculate-vars.outputs.TEST_SELECTION }}
shell: bash -euxo pipefail {0}
run: |
@@ -104,7 +124,7 @@ runs:
EOF
cat <<EOF > $TEST_OUTPUT/allure/results/environment.properties
TEST_SELECTION=${{ inputs.test_selection }}
BUILD_TYPE=${{ inputs.build_type }}
BUILD_TYPE=${BUILD_TYPE}
EOF
ARCHIVE="${GITHUB_RUN_ID}-${TEST_SELECTION}-${GITHUB_RUN_ATTEMPT}-$(date +%s).tar.zst"
@@ -113,13 +133,12 @@ runs:
tar -C ${TEST_OUTPUT}/allure/results -cf ${ARCHIVE} --zstd .
aws s3 mv --only-show-errors ${ARCHIVE} "s3://${BUCKET}/${RAW_PREFIX}/${ARCHIVE}"
# Potentially we could have several running build for the same key (for example for the main branch), so we use improvised lock for this
# Potentially we could have several running build for the same key (for example for the main branch), so we use improvised lock for this
- name: Acquire Allure lock
if: ${{ inputs.action == 'generate' }}
shell: bash -euxo pipefail {0}
env:
LOCK_FILE: reports/${{ steps.calculate-vars.outputs.KEY }}/lock.txt
BUCKET: neon-github-public-dev
TEST_SELECTION: ${{ steps.calculate-vars.outputs.TEST_SELECTION }}
run: |
LOCK_TIMEOUT=300 # seconds
@@ -149,8 +168,6 @@ runs:
env:
REPORT_PREFIX: reports/${{ steps.calculate-vars.outputs.KEY }}/${{ inputs.build_type }}
RAW_PREFIX: reports-raw/${{ steps.calculate-vars.outputs.KEY }}/${{ inputs.build_type }}
TEST_OUTPUT: /tmp/test_output
BUCKET: neon-github-public-dev
shell: bash -euxo pipefail {0}
run: |
# Get previously uploaded data for this run
@@ -186,24 +203,24 @@ runs:
REPORT_URL=https://${BUCKET}.s3.amazonaws.com/${REPORT_PREFIX}/${GITHUB_RUN_ID}/index.html
# Generate redirect
cat <<EOF > ./index.html
cat <<EOF > ${TEST_OUTPUT}/allure/index.html
<!DOCTYPE html>
<meta charset="utf-8">
<title>Redirecting to ${REPORT_URL}</title>
<meta http-equiv="refresh" content="0; URL=${REPORT_URL}">
EOF
aws s3 cp --only-show-errors ./index.html "s3://${BUCKET}/${REPORT_PREFIX}/latest/index.html"
aws s3 cp --only-show-errors ${TEST_OUTPUT}/allure/index.html "s3://${BUCKET}/${REPORT_PREFIX}/latest/index.html"
echo "[Allure Report](${REPORT_URL})" >> ${GITHUB_STEP_SUMMARY}
echo "report-url=${REPORT_URL}" >> $GITHUB_OUTPUT
echo "report-json-url=${REPORT_URL%/index.html}/data/suites.json" >> $GITHUB_OUTPUT
- name: Release Allure lock
if: ${{ inputs.action == 'generate' && always() }}
shell: bash -euxo pipefail {0}
env:
LOCK_FILE: reports/${{ steps.calculate-vars.outputs.KEY }}/lock.txt
BUCKET: neon-github-public-dev
TEST_SELECTION: ${{ steps.calculate-vars.outputs.TEST_SELECTION }}
run: |
aws s3 cp --only-show-errors "s3://${BUCKET}/${LOCK_FILE}" ./lock.txt || exit 0
@@ -212,11 +229,16 @@ runs:
aws s3 rm "s3://${BUCKET}/${LOCK_FILE}"
fi
- name: Cleanup
if: always()
shell: bash -euxo pipefail {0}
run: |
rm -rf ${TEST_OUTPUT}/allure
- uses: actions/github-script@v6
if: ${{ inputs.action == 'generate' && always() }}
env:
REPORT_URL: ${{ steps.generate-report.outputs.report-url }}
BUILD_TYPE: ${{ inputs.build_type }}
SHA: ${{ github.event.pull_request.head.sha || github.sha }}
with:
script: |

View File

@@ -14,6 +14,12 @@ inputs:
api_host:
desctiption: 'Neon API host'
default: console.stage.neon.tech
provisioner:
desctiption: 'k8s-pod or k8s-neonvm'
default: 'k8s-pod'
compute_units:
desctiption: '[Min, Max] compute units; Min and Max are used for k8s-neonvm with autoscaling, for k8s-pod values Min and Max should be equal'
default: '[1, 1]'
outputs:
dsn:
@@ -31,6 +37,10 @@ runs:
# A shell without `set -x` to not to expose password/dsn in logs
shell: bash -euo pipefail {0}
run: |
if [ "${PROVISIONER}" == "k8s-pod" ] && [ "${MIN_CU}" != "${MAX_CU}" ]; then
echo >&2 "For k8s-pod provisioner MIN_CU should be equal to MAX_CU"
fi
project=$(curl \
"https://${API_HOST}/api/v2/projects" \
--fail \
@@ -42,6 +52,9 @@ runs:
\"name\": \"Created by actions/neon-project-create; GITHUB_RUN_ID=${GITHUB_RUN_ID}\",
\"pg_version\": ${POSTGRES_VERSION},
\"region_id\": \"${REGION_ID}\",
\"provisioner\": \"${PROVISIONER}\",
\"autoscaling_limit_min_cu\": ${MIN_CU},
\"autoscaling_limit_max_cu\": ${MAX_CU},
\"settings\": { }
}
}")
@@ -62,3 +75,6 @@ runs:
API_KEY: ${{ inputs.api_key }}
REGION_ID: ${{ inputs.region_id }}
POSTGRES_VERSION: ${{ inputs.postgres_version }}
PROVISIONER: ${{ inputs.provisioner }}
MIN_CU: ${{ fromJSON(inputs.compute_units)[0] }}
MAX_CU: ${{ fromJSON(inputs.compute_units)[1] }}

View File

@@ -44,6 +44,10 @@ inputs:
description: 'Secret access key'
required: false
default: ''
rerun_flaky:
description: 'Whether to rerun flaky tests'
required: false
default: 'false'
runs:
using: "composite"
@@ -101,6 +105,7 @@ runs:
COMPATIBILITY_SNAPSHOT_DIR: /tmp/compatibility_snapshot_pg14
ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE: contains(github.event.pull_request.labels.*.name, 'backward compatibility breakage')
ALLOW_FORWARD_COMPATIBILITY_BREAKAGE: contains(github.event.pull_request.labels.*.name, 'forward compatibility breakage')
RERUN_FLAKY: ${{ inputs.rerun_flaky }}
shell: bash -euxo pipefail {0}
run: |
# PLATFORM will be embedded in the perf test report
@@ -143,6 +148,13 @@ runs:
EXTRA_PARAMS="--out-dir $PERF_REPORT_DIR $EXTRA_PARAMS"
fi
if [ "${RERUN_FLAKY}" == "true" ]; then
mkdir -p $TEST_OUTPUT
poetry run ./scripts/flaky_tests.py "${TEST_RESULT_CONNSTR}" --days 10 --output "$TEST_OUTPUT/flaky.json"
EXTRA_PARAMS="--flaky-tests-json $TEST_OUTPUT/flaky.json $EXTRA_PARAMS"
fi
if [[ "${{ inputs.build_type }}" == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
elif [[ "${{ inputs.build_type }}" == "release" ]]; then

View File

@@ -8,6 +8,16 @@ storage:
pg_distrib_dir: /usr/local
metric_collection_endpoint: http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events
metric_collection_interval: 10min
disk_usage_based_eviction:
max_usage_pct: 85 # TODO: decrease to 80 after all pageservers are below 80
min_avail_bytes: 0
period: "10s"
tenant_config:
eviction_policy:
kind: "LayerAccessThreshold"
period: "10m"
threshold: &default_eviction_threshold "24h"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"

View File

@@ -8,6 +8,16 @@ storage:
pg_distrib_dir: /usr/local
metric_collection_endpoint: http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events
metric_collection_interval: 10min
disk_usage_based_eviction:
max_usage_pct: 85 # TODO: decrease to 80 after all pageservers are below 80
min_avail_bytes: 0
period: "10s"
tenant_config:
eviction_policy:
kind: "LayerAccessThreshold"
period: "10m"
threshold: &default_eviction_threshold "24h"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"

View File

@@ -8,6 +8,16 @@ storage:
pg_distrib_dir: /usr/local
metric_collection_endpoint: http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events
metric_collection_interval: 10min
disk_usage_based_eviction:
max_usage_pct: 85 # TODO: decrease to 80 after all pageservers are below 80
min_avail_bytes: 0
period: "10s"
tenant_config:
eviction_policy:
kind: "LayerAccessThreshold"
period: "10m"
threshold: &default_eviction_threshold "24h"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"

View File

@@ -8,6 +8,16 @@ storage:
pg_distrib_dir: /usr/local
metric_collection_endpoint: http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events
metric_collection_interval: 10min
disk_usage_based_eviction:
max_usage_pct: 85 # TODO: decrease to 80 after all pageservers are below 80
min_avail_bytes: 0
period: "10s"
tenant_config:
eviction_policy:
kind: "LayerAccessThreshold"
period: "10m"
threshold: &default_eviction_threshold "24h"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"

View File

@@ -3,6 +3,8 @@
# fetch params from meta-data service
INSTANCE_ID=$(curl -s http://169.254.169.254/latest/meta-data/instance-id)
AZ_ID=$(curl -s http://169.254.169.254/latest/meta-data/placement/availability-zone)
INSTANCE_TYPE=$(curl -s http://169.254.169.254/latest/meta-data/instance-type)
DISK_SIZE=$(df -B1 /storage | tail -1 | awk '{print $2}')
# store fqdn hostname in var
HOST=$(hostname -f)
@@ -18,7 +20,9 @@ cat <<EOF | tee /tmp/payload
"http_host": "${HOST}",
"http_port": 9898,
"active": false,
"availability_zone_id": "${AZ_ID}"
"availability_zone_id": "${AZ_ID}",
"disk_size": ${DISK_SIZE},
"instance_type": "${INSTANCE_TYPE}"
}
EOF

View File

@@ -10,17 +10,14 @@ storage:
metric_collection_interval: 10min
disk_usage_based_eviction:
max_usage_pct: 80
# TODO: learn typical resident-size growth rate [GiB/minute] and configure
# min_avail_bytes such that we have X minutes of headroom.
min_avail_bytes: 0
# We assume that the worst-case growth rate is small enough that we can
# catch above-threshold conditions by checking every 10s.
period: "10s"
tenant_config:
eviction_policy:
kind: "LayerAccessThreshold"
period: "20m"
threshold: "20m"
threshold: &default_eviction_threshold "20m"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"

View File

@@ -10,17 +10,14 @@ storage:
metric_collection_interval: 10min
disk_usage_based_eviction:
max_usage_pct: 80
# TODO: learn typical resident-size growth rate [GiB/minute] and configure
# min_avail_bytes such that we have X minutes of headroom.
min_avail_bytes: 0
# We assume that the worst-case growth rate is small enough that we can
# catch above-threshold conditions by checking every 10s.
period: "10s"
tenant_config:
eviction_policy:
kind: "LayerAccessThreshold"
period: "20m"
threshold: "20m"
threshold: &default_eviction_threshold "20m"
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
remote_storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"

View File

@@ -30,10 +30,9 @@ settings:
# -- Additional labels for neon-proxy pods
podLabels:
zenith_service: proxy-scram
zenith_env: dev
zenith_region: eu-west-1
zenith_region_slug: eu-west-1
neon_service: proxy-scram
neon_env: dev
neon_region: eu-west-1
exposedService:
annotations:

View File

@@ -15,10 +15,9 @@ settings:
# -- Additional labels for neon-proxy-link pods
podLabels:
zenith_service: proxy
zenith_env: dev
zenith_region: us-east-2
zenith_region_slug: us-east-2
neon_service: proxy
neon_env: dev
neon_region: us-east-2
service:
type: LoadBalancer

View File

@@ -15,10 +15,9 @@ settings:
# -- Additional labels for neon-proxy pods
podLabels:
zenith_service: proxy-scram-legacy
zenith_env: dev
zenith_region: us-east-2
zenith_region_slug: us-east-2
neon_service: proxy-scram-legacy
neon_env: dev
neon_region: us-east-2
exposedService:
annotations:

View File

@@ -23,6 +23,7 @@ settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.build/management/api/v2"
domain: "*.us-east-2.aws.neon.build"
extraDomains: ["*.us-east-2.postgres.zenith.tech", "*.us-east-2.retooldb-staging.com"]
sentryEnvironment: "staging"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events"
@@ -30,10 +31,9 @@ settings:
# -- Additional labels for neon-proxy pods
podLabels:
zenith_service: proxy-scram
zenith_env: dev
zenith_region: us-east-2
zenith_region_slug: us-east-2
neon_service: proxy-scram
neon_env: dev
neon_region: us-east-2
exposedService:
annotations:

View File

@@ -31,10 +31,9 @@ settings:
# -- Additional labels for neon-proxy pods
podLabels:
zenith_service: proxy-scram
zenith_env: prod
zenith_region: ap-southeast-1
zenith_region_slug: ap-southeast-1
neon_service: proxy-scram
neon_env: prod
neon_region: ap-southeast-1
exposedService:
annotations:

View File

@@ -31,10 +31,9 @@ settings:
# -- Additional labels for neon-proxy pods
podLabels:
zenith_service: proxy-scram
zenith_env: prod
zenith_region: eu-central-1
zenith_region_slug: eu-central-1
neon_service: proxy-scram
neon_env: prod
neon_region: eu-central-1
exposedService:
annotations:

View File

@@ -13,10 +13,9 @@ settings:
# -- Additional labels for zenith-proxy pods
podLabels:
zenith_service: proxy
zenith_env: production
zenith_region: us-east-2
zenith_region_slug: us-east-2
neon_service: proxy
neon_env: production
neon_region: us-east-2
service:
type: LoadBalancer

View File

@@ -31,10 +31,9 @@ settings:
# -- Additional labels for neon-proxy pods
podLabels:
zenith_service: proxy-scram
zenith_env: prod
zenith_region: us-east-2
zenith_region_slug: us-east-2
neon_service: proxy-scram
neon_env: prod
neon_region: us-east-2
exposedService:
annotations:

View File

@@ -31,10 +31,9 @@ settings:
# -- Additional labels for neon-proxy pods
podLabels:
zenith_service: proxy-scram
zenith_env: prod
zenith_region: us-west-2
zenith_region_slug: us-west-2
neon_service: proxy-scram
neon_env: prod
neon_region: us-west-2
exposedService:
annotations:

View File

@@ -31,10 +31,9 @@ settings:
# -- Additional labels for neon-proxy pods
podLabels:
zenith_service: proxy-scram
zenith_env: prod
zenith_region: us-west-2
zenith_region_slug: us-west-2
neon_service: proxy-scram
neon_env: prod
neon_region: us-west-2
exposedService:
annotations:

View File

@@ -3,8 +3,12 @@
## Issue ticket number and link
## Checklist before requesting a review
- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section.
## Checklist before merging
- [ ] Do not forget to reformat commit message to not include the above checklist

View File

@@ -111,6 +111,7 @@ jobs:
strategy:
fail-fast: false
matrix:
# neon-captest-freetier: Run pgbench with freetier-limited compute
# neon-captest-new: Run pgbench in a freshly created project
# neon-captest-reuse: Same, but reusing existing project
# neon-captest-prefetch: Same, with prefetching enabled (new project)
@@ -120,6 +121,9 @@ jobs:
db_size: [ 10gb ]
runner: [ us-east-2 ]
include:
- platform: neon-captest-freetier
db_size: 3gb
runner: us-east-2
- platform: neon-captest-prefetch
db_size: 50gb
runner: us-east-2
@@ -160,13 +164,14 @@ jobs:
echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH
- name: Create Neon Project
if: contains(fromJson('["neon-captest-new", "neon-captest-prefetch"]'), matrix.platform)
if: contains(fromJson('["neon-captest-new", "neon-captest-prefetch", "neon-captest-freetier"]'), matrix.platform)
id: create-neon-project
uses: ./.github/actions/neon-project-create
with:
region_id: ${{ github.event.inputs.region_id || 'aws-us-east-2' }}
postgres_version: ${{ env.DEFAULT_PG_VERSION }}
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
compute_units: ${{ (matrix.platform == 'neon-captest-freetier' && '[0.25, 0.25]') || '[1, 1]' }}
- name: Set up Connection String
id: set-up-connstr
@@ -175,7 +180,7 @@ jobs:
neon-captest-reuse)
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_CONNSTR }}
;;
neon-captest-new | neon-captest-prefetch)
neon-captest-new | neon-captest-prefetch | neon-captest-freetier)
CONNSTR=${{ steps.create-neon-project.outputs.dsn }}
;;
rds-aurora)
@@ -185,7 +190,7 @@ jobs:
CONNSTR=${{ secrets.BENCHMARK_RDS_POSTGRES_CONNSTR }}
;;
*)
echo 2>&1 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'neon-captest-new', 'neon-captest-prefetch', 'rds-aurora', or 'rds-postgres'"
echo 2>&1 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'neon-captest-new', 'neon-captest-prefetch', neon-captest-freetier, 'rds-aurora', or 'rds-postgres'"
exit 1
;;
esac

View File

@@ -184,10 +184,10 @@ jobs:
CARGO_FEATURES="--features testing"
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix="scripts/coverage --profraw-prefix=$GITHUB_JOB --dir=/tmp/coverage run"
CARGO_FLAGS="--locked $CARGO_FEATURES"
CARGO_FLAGS="--locked"
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=""
CARGO_FLAGS="--locked --release $CARGO_FEATURES"
CARGO_FLAGS="--locked --release"
fi
echo "cov_prefix=${cov_prefix}" >> $GITHUB_ENV
echo "CARGO_FEATURES=${CARGO_FEATURES}" >> $GITHUB_ENV
@@ -240,11 +240,18 @@ jobs:
- name: Run cargo build
run: |
${cov_prefix} mold -run cargo build $CARGO_FLAGS --bins --tests
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests
- name: Run cargo test
run: |
${cov_prefix} cargo test $CARGO_FLAGS
${cov_prefix} cargo test $CARGO_FLAGS $CARGO_FEATURES
# Run separate tests for real S3
export ENABLE_REAL_S3_REMOTE_STORAGE=nonempty
export REMOTE_STORAGE_S3_BUCKET=neon-github-public-dev
export REMOTE_STORAGE_S3_REGION=eu-central-1
# Avoid `$CARGO_FEATURES` since there's no `testing` feature in the e2e tests now
${cov_prefix} cargo test $CARGO_FLAGS --package remote_storage --test pagination_tests -- s3_pagination_should_work --exact
- name: Install rust binaries
run: |
@@ -268,7 +275,7 @@ jobs:
mkdir -p /tmp/neon/test_bin/
test_exe_paths=$(
${cov_prefix} cargo test $CARGO_FLAGS --message-format=json --no-run |
${cov_prefix} cargo test $CARGO_FLAGS $CARGO_FEATURES --message-format=json --no-run |
jq -r '.executable | select(. != null)'
)
for bin in $test_exe_paths; do
@@ -328,6 +335,10 @@ jobs:
real_s3_region: us-west-2
real_s3_access_key_id: "${{ secrets.AWS_ACCESS_KEY_ID_CI_TESTS_S3 }}"
real_s3_secret_access_key: "${{ secrets.AWS_SECRET_ACCESS_KEY_CI_TESTS_S3 }}"
rerun_flaky: true
env:
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR }}
CHECK_ONDISK_DATA_COMPATIBILITY: nonempty
- name: Merge and upload coverage data
if: matrix.build_type == 'debug'
@@ -364,42 +375,90 @@ jobs:
# XXX: no coverage data handling here, since benchmarks are run on release builds,
# while coverage is currently collected for the debug ones
merge-allure-report:
create-test-report:
runs-on: [ self-hosted, gen3, small ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
needs: [ regress-tests, benchmarks ]
if: ${{ !cancelled() }}
strategy:
fail-fast: false
matrix:
build_type: [ debug, release ]
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: false
- name: Create Allure report
id: create-allure-report
steps:
- uses: actions/checkout@v3
- name: Create Allure report (debug)
if: ${{ !cancelled() }}
id: create-allure-report-debug
uses: ./.github/actions/allure-report
with:
action: generate
build_type: ${{ matrix.build_type }}
build_type: debug
- name: Create Allure report (release)
if: ${{ !cancelled() }}
id: create-allure-report-release
uses: ./.github/actions/allure-report
with:
action: generate
build_type: release
- uses: actions/github-script@v6
if: >
!cancelled() &&
github.event_name == 'pull_request' && (
steps.create-allure-report-debug.outputs.report-url ||
steps.create-allure-report-release.outputs.report-url
)
with:
# Retry script for 5XX server errors: https://github.com/actions/github-script#retries
retries: 5
script: |
const reports = [{
buildType: "debug",
reportUrl: "${{ steps.create-allure-report-debug.outputs.report-url }}",
jsonUrl: "${{ steps.create-allure-report-debug.outputs.report-json-url }}",
}, {
buildType: "release",
reportUrl: "${{ steps.create-allure-report-release.outputs.report-url }}",
jsonUrl: "${{ steps.create-allure-report-release.outputs.report-json-url }}",
}]
const script = require("./scripts/pr-comment-test-report.js")
await script({
github,
context,
fetch,
reports,
})
- name: Store Allure test stat in the DB
if: ${{ steps.create-allure-report.outputs.report-url }}
if: >
!cancelled() && (
steps.create-allure-report-debug.outputs.report-url ||
steps.create-allure-report-release.outputs.report-url
)
env:
BUILD_TYPE: ${{ matrix.build_type }}
SHA: ${{ github.event.pull_request.head.sha || github.sha }}
REPORT_URL: ${{ steps.create-allure-report.outputs.report-url }}
REPORT_JSON_URL_DEBUG: ${{ steps.create-allure-report-debug.outputs.report-json-url }}
REPORT_JSON_URL_RELEASE: ${{ steps.create-allure-report-release.outputs.report-json-url }}
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR }}
run: |
curl --fail --output suites.json ${REPORT_URL%/index.html}/data/suites.json
./scripts/pysync
DATABASE_URL="$TEST_RESULT_CONNSTR" poetry run python3 scripts/ingest_regress_test_result.py --revision ${SHA} --reference ${GITHUB_REF} --build-type ${BUILD_TYPE} --ingest suites.json
for report_url in $REPORT_JSON_URL_DEBUG $REPORT_JSON_URL_RELEASE; do
if [ -z "$report_url" ]; then
continue
fi
if [[ "$report_url" == "$REPORT_JSON_URL_DEBUG" ]]; then
BUILD_TYPE=debug
else
BUILD_TYPE=release
fi
curl --fail --output suites.json "${report_url}"
DATABASE_URL="$TEST_RESULT_CONNSTR" poetry run python3 scripts/ingest_regress_test_result.py --revision ${SHA} --reference ${GITHUB_REF} --build-type ${BUILD_TYPE} --ingest suites.json
done
coverage-report:
runs-on: [ self-hosted, gen3, small ]
@@ -891,6 +950,16 @@ jobs:
needs: [ push-docker-hub, tag, regress-tests ]
if: ( github.ref_name == 'main' || github.ref_name == 'release' ) && github.event_name != 'workflow_dispatch'
steps:
- name: Fix git ownership
run: |
# Workaround for `fatal: detected dubious ownership in repository at ...`
#
# Use both ${{ github.workspace }} and ${GITHUB_WORKSPACE} because they're different on host and in containers
# Ref https://github.com/actions/checkout/issues/785
#
git config --global --add safe.directory ${{ github.workspace }}
git config --global --add safe.directory ${GITHUB_WORKSPACE}
- name: Checkout
uses: actions/checkout@v3
with:

View File

@@ -53,14 +53,14 @@ jobs:
uses: actions/cache@v3
with:
path: pg_install/v14
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
key: v1-${{ runner.os }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache postgres v15 build
id: cache_pg_15
uses: actions/cache@v3
with:
path: pg_install/v15
key: v1-${{ runner.os }}-${{ matrix.build_type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
key: v1-${{ runner.os }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Set extra env for macOS
run: |

22
Cargo.lock generated
View File

@@ -3086,6 +3086,7 @@ dependencies = [
"serde",
"serde_json",
"tempfile",
"test-context",
"tokio",
"tokio-util",
"toml_edit",
@@ -3889,6 +3890,27 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "test-context"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "055831a02a4f5aa28fede67f2902014273eb8c21b958ac5ebbd59b71ef30dbc3"
dependencies = [
"async-trait",
"futures",
"test-context-macros",
]
[[package]]
name = "test-context-macros"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8901a55b0a7a06ebc4a674dcca925170da8e613fa3b163a1df804ed10afb154d"
dependencies = [
"quote",
"syn",
]
[[package]]
name = "textwrap"
version = "0.16.0"

View File

@@ -97,6 +97,7 @@ strum_macros = "0.24"
svg_fmt = "0.4.1"
sync_wrapper = "0.1.2"
tar = "0.4"
test-context = "0.1"
thiserror = "1.0"
tls-listener = { version = "0.6", features = ["rustls", "hyper-h1"] }
tokio = { version = "1.17", features = ["macros"] }

View File

@@ -38,6 +38,7 @@ RUN cd postgres && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/insert_username.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/intagg.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/moddatetime.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_stat_statements.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgrowlocks.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgstattuple.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/refint.control && \
@@ -300,6 +301,27 @@ RUN wget https://github.com/okbob/plpgsql_check/archive/refs/tags/v2.3.2.tar.gz
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/plpgsql_check.control
#########################################################################################
#
# Layer "timescaledb-pg-build"
# compile timescaledb extension
#
#########################################################################################
FROM build-deps AS timescaledb-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ENV PATH "/usr/local/pgsql/bin:$PATH"
RUN apt-get update && \
apt-get install -y cmake && \
wget https://github.com/timescale/timescaledb/archive/refs/tags/2.10.1.tar.gz -O timescaledb.tar.gz && \
mkdir timescaledb-src && cd timescaledb-src && tar xvzf ../timescaledb.tar.gz --strip-components=1 -C . && \
./bootstrap -DSEND_TELEMETRY_DEFAULT:BOOL=OFF -DUSE_TELEMETRY:BOOL=OFF -DAPACHE_ONLY:BOOL=ON && \
cd build && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make install -j $(getconf _NPROCESSORS_ONLN) && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/timescaledb.control
#########################################################################################
#
# Layer "rust extensions"
@@ -404,6 +426,7 @@ COPY --from=pgtap-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=prefix-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=hll-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=plpgsql-check-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=timescaledb-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \

View File

@@ -40,6 +40,8 @@ pacman -S base-devel readline zlib libseccomp openssl clang \
postgresql-libs cmake postgresql protobuf
```
Building Neon requires 3.15+ version of `protoc` (protobuf-compiler). If your distribution provides an older version, you can install a newer version from [here](https://github.com/protocolbuffers/protobuf/releases).
2. [Install Rust](https://www.rust-lang.org/tools/install)
```
# recommended approach from https://www.rust-lang.org/tools/install

View File

@@ -34,13 +34,14 @@ use std::fs::File;
use std::panic;
use std::path::Path;
use std::process::exit;
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Condvar, Mutex};
use std::{thread, time::Duration};
use anyhow::{Context, Result};
use chrono::Utc;
use clap::Arg;
use tracing::{error, info};
use url::Url;
use compute_tools::compute::{ComputeMetrics, ComputeNode, ComputeState, ComputeStatus};
use compute_tools::http::api::launch_http_server;
@@ -49,7 +50,6 @@ use compute_tools::monitor::launch_monitor;
use compute_tools::params::*;
use compute_tools::pg_helpers::*;
use compute_tools::spec::*;
use url::Url;
fn main() -> Result<()> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
@@ -62,7 +62,7 @@ fn main() -> Result<()> {
let connstr = matches
.get_one::<String>("connstr")
.expect("Postgres connection string is required");
let spec = matches.get_one::<String>("spec");
let spec_json = matches.get_one::<String>("spec");
let spec_path = matches.get_one::<String>("spec-path");
let compute_id = matches.get_one::<String>("compute-id");
@@ -71,40 +71,107 @@ fn main() -> Result<()> {
// Try to use just 'postgres' if no path is provided
let pgbin = matches.get_one::<String>("pgbin").unwrap();
let spec: ComputeSpec = match spec {
let mut spec = Default::default();
let mut spec_set = false;
let mut live_config_allowed = false;
match spec_json {
// First, try to get cluster spec from the cli argument
Some(json) => serde_json::from_str(json)?,
Some(json) => {
spec = serde_json::from_str(json)?;
spec_set = true;
}
None => {
// Second, try to read it from the file if path is provided
if let Some(sp) = spec_path {
let path = Path::new(sp);
let file = File::open(path)?;
serde_json::from_reader(file)?
spec = serde_json::from_reader(file)?;
spec_set = true;
} else if let Some(id) = compute_id {
if let Some(cp_base) = control_plane_uri {
let cp_uri = format!("{cp_base}/management/api/v1/{id}/spec");
let jwt: String = match std::env::var("NEON_CONSOLE_JWT") {
Ok(v) => v,
Err(_) => "".to_string(),
};
reqwest::blocking::Client::new()
.get(cp_uri)
.header("Authorization", jwt)
.send()?
.json()?
live_config_allowed = true;
if let Ok(s) = get_spec_from_control_plane(cp_base, id) {
spec = s;
spec_set = true;
}
} else {
panic!(
"must specify --control-plane-uri \"{:#?}\" and --compute-id \"{:#?}\"",
control_plane_uri, compute_id
);
panic!("must specify both --control-plane-uri and --compute-id or none");
}
} else {
panic!("compute spec should be provided via --spec or --spec-path argument");
panic!(
"compute spec should be provided by one of the following ways: \
--spec OR --spec-path OR --control-plane-uri and --compute-id"
);
}
}
};
let mut new_state = ComputeState::new();
if spec_set {
new_state.spec = spec;
}
let compute_node = ComputeNode {
start_time: Utc::now(),
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
pgdata: pgdata.to_string(),
pgbin: pgbin.to_string(),
live_config_allowed,
metrics: ComputeMetrics::default(),
state: Mutex::new(new_state),
state_changed: Condvar::new(),
};
let compute = Arc::new(compute_node);
// Launch http service first, so we were able to serve control-plane
// requests, while configuration is still in progress.
let _http_handle = launch_http_server(&compute).expect("cannot launch http endpoint thread");
if !spec_set {
// No spec provided, hang waiting for it.
info!("no compute spec provided, waiting");
let mut state = compute.state.lock().unwrap();
while state.status != ComputeStatus::ConfigurationPending {
state = compute.state_changed.wait(state).unwrap();
if state.status == ComputeStatus::ConfigurationPending {
info!("got spec, continue configuration");
// Spec is already set by the http server handler.
break;
}
}
}
// We got all we need, fill in the state.
let mut state = compute.state.lock().unwrap();
let pageserver_connstr = state
.spec
.cluster
.settings
.find("neon.pageserver_connstring")
.expect("pageserver connstr should be provided");
let storage_auth_token = state.spec.storage_auth_token.clone();
let tenant = state
.spec
.cluster
.settings
.find("neon.tenant_id")
.expect("tenant id should be provided");
let timeline = state
.spec
.cluster
.settings
.find("neon.timeline_id")
.expect("tenant id should be provided");
let startup_tracing_context = state.spec.startup_tracing_context.clone();
state.pageserver_connstr = pageserver_connstr;
state.storage_auth_token = storage_auth_token;
state.tenant = tenant;
state.timeline = timeline;
state.status = ComputeStatus::Init;
compute.state_changed.notify_all();
drop(state);
// Extract OpenTelemetry context for the startup actions from the spec, and
// attach it to the current tracing context.
//
@@ -120,7 +187,7 @@ fn main() -> Result<()> {
// postgres is configured and up-and-running, we exit this span. Any other
// actions that are performed on incoming HTTP requests, for example, are
// performed in separate spans.
let startup_context_guard = if let Some(ref carrier) = spec.startup_tracing_context {
let startup_context_guard = if let Some(ref carrier) = startup_tracing_context {
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry::sdk::propagation::TraceContextPropagator;
Some(TraceContextPropagator::new().extract(carrier).attach())
@@ -128,41 +195,7 @@ fn main() -> Result<()> {
None
};
let pageserver_connstr = spec
.cluster
.settings
.find("neon.pageserver_connstring")
.expect("pageserver connstr should be provided");
let storage_auth_token = spec.storage_auth_token.clone();
let tenant = spec
.cluster
.settings
.find("neon.tenant_id")
.expect("tenant id should be provided");
let timeline = spec
.cluster
.settings
.find("neon.timeline_id")
.expect("tenant id should be provided");
let compute_state = ComputeNode {
start_time: Utc::now(),
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
pgdata: pgdata.to_string(),
pgbin: pgbin.to_string(),
spec,
tenant,
timeline,
pageserver_connstr,
storage_auth_token,
metrics: ComputeMetrics::default(),
state: RwLock::new(ComputeState::new()),
};
let compute = Arc::new(compute_state);
// Launch service threads first, so we were able to serve availability
// requests, while configuration is still in progress.
let _http_handle = launch_http_server(&compute).expect("cannot launch http endpoint thread");
// Launch remaining service threads
let _monitor_handle = launch_monitor(&compute).expect("cannot launch compute monitor thread");
// Start Postgres
@@ -172,7 +205,7 @@ fn main() -> Result<()> {
Ok(pg) => Some(pg),
Err(err) => {
error!("could not start the compute node: {:?}", err);
let mut state = compute.state.write().unwrap();
let mut state = compute.state.lock().unwrap();
state.error = Some(format!("{:?}", err));
state.status = ComputeStatus::Failed;
drop(state);
@@ -203,13 +236,14 @@ fn main() -> Result<()> {
if delay_exit {
info!("giving control plane 30s to collect the error before shutdown");
thread::sleep(Duration::from_secs(30));
info!("shutting down");
}
info!("shutting down tracing");
// Shutdown trace pipeline gracefully, so that it has a chance to send any
// pending traces before we exit.
tracing_utils::shutdown_tracing();
info!("shutting down");
exit(exit_code.unwrap_or(1))
}
@@ -261,7 +295,7 @@ fn cli() -> clap::Command {
Arg::new("control-plane-uri")
.short('p')
.long("control-plane-uri")
.value_name("CONTROL_PLANE"),
.value_name("CONTROL_PLANE_API_BASE_URI"),
)
}

View File

@@ -20,12 +20,12 @@ use std::path::Path;
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::RwLock;
use std::sync::{Condvar, Mutex};
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use postgres::{Client, NoTls};
use serde::{Serialize, Serializer};
use serde::Serialize;
use tokio_postgres;
use tracing::{info, instrument, warn};
@@ -41,41 +41,52 @@ pub struct ComputeNode {
pub connstr: url::Url,
pub pgdata: String,
pub pgbin: String,
pub metrics: ComputeMetrics,
/// We should only allow live re- / configuration of the compute node if
/// it uses 'pull model', i.e. it can go to control-plane and fetch
/// the latest configuration. Otherwise, there could be a case:
/// - we start compute with some spec provided as argument
/// - we push new spec and it does reconfiguration
/// - but then something happens and compute pod / VM is destroyed,
/// so k8s controller starts it again with the **old** spec
/// and the same for empty computes:
/// - we started compute without any spec
/// - we push spec and it does configuration
/// - but then it is restarted without any spec again
pub live_config_allowed: bool,
/// Volatile part of the `ComputeNode`, which should be used under `Mutex`.
/// To allow HTTP API server to serving status requests, while configuration
/// is in progress, lock should be held only for short periods of time to do
/// read/write, not the whole configuration process.
pub state: Mutex<ComputeState>,
/// `Condvar` to allow notifying waiters about state changes.
pub state_changed: Condvar,
}
#[derive(Clone, Debug)]
pub struct ComputeState {
pub status: ComputeStatus,
/// Timestamp of the last Postgres activity
pub last_active: DateTime<Utc>,
pub error: Option<String>,
pub spec: ComputeSpec,
pub tenant: String,
pub timeline: String,
pub pageserver_connstr: String,
pub storage_auth_token: Option<String>,
pub metrics: ComputeMetrics,
/// Volatile part of the `ComputeNode` so should be used under `RwLock`
/// to allow HTTP API server to serve status requests, while configuration
/// is in progress.
pub state: RwLock<ComputeState>,
}
fn rfc3339_serialize<S>(x: &DateTime<Utc>, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
x.to_rfc3339().serialize(s)
}
#[derive(Serialize)]
#[serde(rename_all = "snake_case")]
pub struct ComputeState {
pub status: ComputeStatus,
/// Timestamp of the last Postgres activity
#[serde(serialize_with = "rfc3339_serialize")]
pub last_active: DateTime<Utc>,
pub error: Option<String>,
}
impl ComputeState {
pub fn new() -> Self {
Self {
status: ComputeStatus::Init,
status: ComputeStatus::Empty,
last_active: Utc::now(),
error: None,
spec: ComputeSpec::default(),
tenant: String::new(),
timeline: String::new(),
pageserver_connstr: String::new(),
storage_auth_token: None,
}
}
}
@@ -86,11 +97,22 @@ impl Default for ComputeState {
}
}
#[derive(Serialize, Clone, Copy, PartialEq, Eq)]
#[derive(Serialize, Clone, Copy, PartialEq, Eq, Debug)]
#[serde(rename_all = "snake_case")]
pub enum ComputeStatus {
// Spec wasn't provided at start, waiting for it to be
// provided by control-plane.
Empty,
// Compute configuration was requested.
ConfigurationPending,
// Compute node has spec and initial startup and
// configuration is in progress.
Init,
// Compute is configured and running.
Running,
// Either startup or configuration failed,
// compute will exit soon or is waiting for
// control-plane to terminate it.
Failed,
}
@@ -104,11 +126,13 @@ pub struct ComputeMetrics {
impl ComputeNode {
pub fn set_status(&self, status: ComputeStatus) {
self.state.write().unwrap().status = status;
let mut state = self.state.lock().unwrap();
state.status = status;
self.state_changed.notify_all();
}
pub fn get_status(&self) -> ComputeStatus {
self.state.read().unwrap().status
self.state.lock().unwrap().status
}
// Remove `pgdata` directory and create it again with right permissions.
@@ -124,15 +148,15 @@ impl ComputeNode {
// Get basebackup from the libpq connection to pageserver using `connstr` and
// unarchive it to `pgdata` directory overriding all its previous content.
#[instrument(skip(self))]
fn get_basebackup(&self, lsn: &str) -> Result<()> {
#[instrument(skip(self, compute_state))]
fn get_basebackup(&self, compute_state: &ComputeState, lsn: &str) -> Result<()> {
let start_time = Utc::now();
let mut config = postgres::Config::from_str(&self.pageserver_connstr)?;
let mut config = postgres::Config::from_str(&compute_state.pageserver_connstr)?;
// Use the storage auth token from the config file, if given.
// Note: this overrides any password set in the connection string.
if let Some(storage_auth_token) = &self.storage_auth_token {
if let Some(storage_auth_token) = &compute_state.storage_auth_token {
info!("Got storage auth token from spec file");
config.password(storage_auth_token);
} else {
@@ -141,8 +165,14 @@ impl ComputeNode {
let mut client = config.connect(NoTls)?;
let basebackup_cmd = match lsn {
"0/0" => format!("basebackup {} {}", &self.tenant, &self.timeline), // First start of the compute
_ => format!("basebackup {} {} {}", &self.tenant, &self.timeline, lsn),
"0/0" => format!(
"basebackup {} {}",
&compute_state.tenant, &compute_state.timeline
), // First start of the compute
_ => format!(
"basebackup {} {} {}",
&compute_state.tenant, &compute_state.timeline, lsn
),
};
let copyreader = client.copy_out(basebackup_cmd.as_str())?;
@@ -169,14 +199,14 @@ impl ComputeNode {
// Run `postgres` in a special mode with `--sync-safekeepers` argument
// and return the reported LSN back to the caller.
#[instrument(skip(self))]
fn sync_safekeepers(&self) -> Result<String> {
#[instrument(skip(self, storage_auth_token))]
fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<String> {
let start_time = Utc::now();
let sync_handle = Command::new(&self.pgbin)
.args(["--sync-safekeepers"])
.env("PGDATA", &self.pgdata) // we cannot use -D in this mode
.envs(if let Some(storage_auth_token) = &self.storage_auth_token {
.envs(if let Some(storage_auth_token) = &storage_auth_token {
vec![("NEON_AUTH_TOKEN", storage_auth_token)]
} else {
vec![]
@@ -217,9 +247,9 @@ impl ComputeNode {
/// Do all the preparations like PGDATA directory creation, configuration,
/// safekeepers sync, basebackup, etc.
#[instrument(skip(self))]
pub fn prepare_pgdata(&self) -> Result<()> {
let spec = &self.spec;
#[instrument(skip(self, compute_state))]
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
let spec = &compute_state.spec;
let pgdata_path = Path::new(&self.pgdata);
// Remove/create an empty pgdata directory and put configuration there.
@@ -228,18 +258,18 @@ impl ComputeNode {
info!("starting safekeepers syncing");
let lsn = self
.sync_safekeepers()
.sync_safekeepers(compute_state.storage_auth_token.clone())
.with_context(|| "failed to sync safekeepers")?;
info!("safekeepers synced at LSN {}", lsn);
info!(
"getting basebackup@{} from pageserver {}",
lsn, &self.pageserver_connstr
lsn, &compute_state.pageserver_connstr
);
self.get_basebackup(&lsn).with_context(|| {
self.get_basebackup(compute_state, &lsn).with_context(|| {
format!(
"failed to get basebackup@{} from pageserver {}",
lsn, &self.pageserver_connstr
lsn, &compute_state.pageserver_connstr
)
})?;
@@ -252,13 +282,16 @@ impl ComputeNode {
/// Start Postgres as a child process and manage DBs/roles.
/// After that this will hang waiting on the postmaster process to exit.
#[instrument(skip(self))]
pub fn start_postgres(&self) -> Result<std::process::Child> {
pub fn start_postgres(
&self,
storage_auth_token: Option<String>,
) -> Result<std::process::Child> {
let pgdata_path = Path::new(&self.pgdata);
// Run postgres as a child process.
let mut pg = Command::new(&self.pgbin)
.args(["-D", &self.pgdata])
.envs(if let Some(storage_auth_token) = &self.storage_auth_token {
.envs(if let Some(storage_auth_token) = &storage_auth_token {
vec![("NEON_AUTH_TOKEN", storage_auth_token)]
} else {
vec![]
@@ -271,8 +304,9 @@ impl ComputeNode {
Ok(pg)
}
#[instrument(skip(self))]
pub fn apply_config(&self) -> Result<()> {
/// Do initial configuration of the already started Postgres.
#[instrument(skip(self, compute_state))]
pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> {
// If connection fails,
// it may be the old node with `zenith_admin` superuser.
//
@@ -303,19 +337,19 @@ impl ComputeNode {
};
// Proceed with post-startup configuration. Note, that order of operations is important.
handle_roles(&self.spec, &mut client)?;
handle_databases(&self.spec, &mut client)?;
handle_role_deletions(self, &mut client)?;
handle_grants(self, &mut client)?;
handle_roles(&compute_state.spec, &mut client)?;
handle_databases(&compute_state.spec, &mut client)?;
handle_role_deletions(&compute_state.spec, self.connstr.as_str(), &mut client)?;
handle_grants(&compute_state.spec, self.connstr.as_str(), &mut client)?;
create_writability_check_data(&mut client)?;
handle_extensions(&self.spec, &mut client)?;
handle_extensions(&compute_state.spec, &mut client)?;
// 'Close' connection
drop(client);
info!(
"finished configuration of compute for project {}",
self.spec.cluster.cluster_id
compute_state.spec.cluster.cluster_id
);
Ok(())
@@ -323,21 +357,22 @@ impl ComputeNode {
#[instrument(skip(self))]
pub fn start_compute(&self) -> Result<std::process::Child> {
let compute_state = self.state.lock().unwrap().clone();
info!(
"starting compute for project {}, operation {}, tenant {}, timeline {}",
self.spec.cluster.cluster_id,
self.spec.operation_uuid.as_ref().unwrap(),
self.tenant,
self.timeline,
compute_state.spec.cluster.cluster_id,
compute_state.spec.operation_uuid.as_ref().unwrap(),
compute_state.tenant,
compute_state.timeline,
);
self.prepare_pgdata()?;
self.prepare_pgdata(&compute_state)?;
let start_time = Utc::now();
let pg = self.start_postgres()?;
let pg = self.start_postgres(compute_state.storage_auth_token.clone())?;
self.apply_config()?;
self.apply_config(&compute_state)?;
let startup_end_time = Utc::now();
self.metrics.config_ms.store(

View File

@@ -3,12 +3,16 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::thread;
use crate::compute::ComputeNode;
use crate::compute::{ComputeNode, ComputeStatus};
use crate::http::requests::ConfigurationRequest;
use crate::http::responses::{ComputeStatusResponse, GenericAPIError};
use anyhow::Result;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use num_cpus;
use serde_json;
use tokio::task;
use tracing::{error, info};
use tracing_utils::http::OtelName;
@@ -23,8 +27,10 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
// Serialized compute state.
(&Method::GET, "/status") => {
info!("serving /status GET request");
let state = compute.state.read().unwrap();
Response::new(Body::from(serde_json::to_string(&*state).unwrap()))
let state = compute.state.lock().unwrap();
let status_response = ComputeStatusResponse::from(state.clone());
Response::new(Body::from(serde_json::to_string(&status_response).unwrap()))
}
// Startup metrics in JSON format. Keep /metrics reserved for a possible
@@ -37,12 +43,29 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
// Collect Postgres current usage insights
(&Method::GET, "/insights") => {
info!("serving /insights GET request");
let status = compute.get_status();
if status != ComputeStatus::Running {
let msg = format!("compute is not running, current status: {:?}", status);
error!(msg);
return Response::new(Body::from(msg));
}
let insights = compute.collect_insights().await;
Response::new(Body::from(insights))
}
(&Method::POST, "/check_writability") => {
info!("serving /check_writability POST request");
let status = compute.get_status();
if status != ComputeStatus::Running {
let msg = format!(
"invalid compute status for check_writability request: {:?}",
status
);
error!(msg);
return Response::new(Body::from(msg));
}
let res = crate::checker::check_writability(compute).await;
match res {
Ok(_) => Response::new(Body::from("true")),
@@ -61,6 +84,23 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
))
}
// Accept spec in JSON format and request compute configuration. If
// anything goes wrong after we set the compute status to `ConfigurationPending`
// and update compute state with new spec, we basically leave compute
// in the potentially wrong state. That said, it's control-plane's
// responsibility to watch compute state after reconfiguration request
// and to clean restart in case of errors.
(&Method::POST, "/configure") => {
info!("serving /configure POST request");
match handle_configure_request(req, compute).await {
Ok(msg) => Response::new(Body::from(msg)),
Err((msg, code)) => {
error!("error handling /configure request: {msg}");
render_json_error(&msg, code)
}
}
}
// Return the `404 Not Found` for any other routes.
_ => {
let mut not_found = Response::new(Body::from("404 Not Found"));
@@ -70,6 +110,88 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}
async fn handle_configure_request(
req: Request<Body>,
compute: &Arc<ComputeNode>,
) -> Result<String, (String, StatusCode)> {
if !compute.live_config_allowed {
return Err((
"live configuration is not allowed for this compute node".to_string(),
StatusCode::PRECONDITION_FAILED,
));
}
let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap();
if let Ok(request) = serde_json::from_str::<ConfigurationRequest>(&spec_raw) {
let spec = request.spec;
// XXX: wrap state update under lock in code blocks. Otherwise,
// we will try to `Send` `mut state` into the spawned thread
// bellow, which will cause error:
// ```
// error: future cannot be sent between threads safely
// ```
{
let mut state = compute.state.lock().unwrap();
if state.status != ComputeStatus::Empty {
let msg = format!(
"invalid compute status for configuration request: {:?}",
state.status.clone()
);
return Err((msg, StatusCode::PRECONDITION_FAILED));
}
state.spec = spec;
state.status = ComputeStatus::ConfigurationPending;
compute.state_changed.notify_all();
drop(state);
info!("set new spec and notified waiters");
}
// Spawn a blocking thread to wait for compute to become Running.
// This is needed to do not block the main pool of workers and
// be able to serve other requests while some particular request
// is waiting for compute to finish configuration.
let c = compute.clone();
task::spawn_blocking(move || {
let mut state = c.state.lock().unwrap();
while state.status != ComputeStatus::Running {
state = c.state_changed.wait(state).unwrap();
info!(
"waiting for compute to become Running, current status: {:?}",
state.status
);
if state.status == ComputeStatus::Failed {
let err = state.error.clone().unwrap_or("unknown error".to_string());
let msg = format!("compute configuration failed: {:?}", err);
return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
}
}
Ok(())
})
.await
.unwrap()?;
// Return current compute state if everything went well.
let state = compute.state.lock().unwrap().clone();
let status_response = ComputeStatusResponse::from(state);
Ok(serde_json::to_string(&status_response).unwrap())
} else {
Err(("invalid spec".to_string(), StatusCode::BAD_REQUEST))
}
}
fn render_json_error(e: &str, status: StatusCode) -> Response<Body> {
let error = GenericAPIError {
error: e.to_string(),
};
Response::builder()
.status(status)
.body(Body::from(serde_json::to_string(&error).unwrap()))
.unwrap()
}
// Main Hyper HTTP server function that runs it and blocks waiting on it forever.
#[tokio::main]
async fn serve(state: Arc<ComputeNode>) {

View File

@@ -1 +1,3 @@
pub mod api;
pub mod requests;
pub mod responses;

View File

@@ -11,7 +11,7 @@ paths:
get:
tags:
- Info
summary: Get compute node internal status
summary: Get compute node internal status.
description: ""
operationId: getComputeStatus
responses:
@@ -26,7 +26,7 @@ paths:
get:
tags:
- Info
summary: Get compute node startup metrics in JSON format
summary: Get compute node startup metrics in JSON format.
description: ""
operationId: getComputeMetricsJSON
responses:
@@ -41,9 +41,9 @@ paths:
get:
tags:
- Info
summary: Get current compute insights in JSON format
summary: Get current compute insights in JSON format.
description: |
Note, that this doesn't include any historical data
Note, that this doesn't include any historical data.
operationId: getComputeInsights
responses:
200:
@@ -56,12 +56,12 @@ paths:
/info:
get:
tags:
- "info"
summary: Get info about the compute Pod/VM
- Info
summary: Get info about the compute pod / VM.
description: ""
operationId: getInfo
responses:
"200":
200:
description: Info
content:
application/json:
@@ -72,7 +72,7 @@ paths:
post:
tags:
- Check
summary: Check that we can write new data on this compute
summary: Check that we can write new data on this compute.
description: ""
operationId: checkComputeWritability
responses:
@@ -82,9 +82,64 @@ paths:
text/plain:
schema:
type: string
description: Error text or 'true' if check passed
description: Error text or 'true' if check passed.
example: "true"
/configure:
post:
tags:
- Configure
summary: Perform compute node configuration.
description: |
This is a blocking API endpoint, i.e. it blocks waiting until
compute is finished configuration and is in `Running` state.
Optional non-blocking mode could be added later.
operationId: configureCompute
requestBody:
description: Configuration request.
required: true
content:
application/json:
schema:
type: object
required:
- spec
properties:
spec:
# XXX: I don't want to explain current spec in the OpenAPI format,
# as it could be changed really soon. Consider doing it later.
type: object
responses:
200:
description: Compute configuration finished.
content:
application/json:
schema:
$ref: "#/components/schemas/ComputeState"
400:
description: Provided spec is invalid.
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
412:
description: |
It's not possible to do live-configuration of the compute.
It's either in the wrong state, or compute doesn't use pull
mode of configuration.
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
500:
description: |
Compute configuration request was processed, but error
occurred. Compute will likely shutdown soon.
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
components:
securitySchemes:
JWT:
@@ -95,7 +150,7 @@ components:
schemas:
ComputeMetrics:
type: object
description: Compute startup metrics
description: Compute startup metrics.
required:
- sync_safekeepers_ms
- basebackup_ms
@@ -113,7 +168,7 @@ components:
Info:
type: object
description: Information about VM/Pod
description: Information about VM/Pod.
required:
- num_cpus
properties:
@@ -130,17 +185,26 @@ components:
$ref: '#/components/schemas/ComputeStatus'
last_active:
type: string
description: The last detected compute activity timestamp in UTC and RFC3339 format
description: The last detected compute activity timestamp in UTC and RFC3339 format.
example: "2022-10-12T07:20:50.52Z"
error:
type: string
description: Text of the error during compute startup, if any
description: Text of the error during compute startup, if any.
example: ""
tenant:
type: string
description: Identifier of the current tenant served by compute node, if any.
example: c9269c359e9a199fad1ea0981246a78f
timeline:
type: string
description: Identifier of the current timeline served by compute node, if any.
example: ece7de74d4b8cbe5433a68ce4d1b97b4
ComputeInsights:
type: object
properties:
pg_stat_statements:
description: Contains raw output from pg_stat_statements in JSON format
description: Contains raw output from pg_stat_statements in JSON format.
type: array
items:
type: object
@@ -151,6 +215,19 @@ components:
- init
- failed
- running
example: running
#
# Errors
#
GenericError:
type: object
required:
- error
properties:
error:
type: string
security:
- JWT: []

View File

@@ -0,0 +1,11 @@
use serde::Deserialize;
use crate::spec::ComputeSpec;
/// We now pass only `spec` in the configuration request, but later we can
/// extend it and something like `restart: bool` or something else. So put
/// `spec` into a struct initially to be more flexible in the future.
#[derive(Deserialize, Debug)]
pub struct ConfigurationRequest {
pub spec: ComputeSpec,
}

View File

@@ -0,0 +1,40 @@
use serde::{Serialize, Serializer};
use chrono::{DateTime, Utc};
use crate::compute::{ComputeState, ComputeStatus};
#[derive(Serialize, Debug)]
pub struct GenericAPIError {
pub error: String,
}
#[derive(Serialize, Debug)]
#[serde(rename_all = "snake_case")]
pub struct ComputeStatusResponse {
pub tenant: String,
pub timeline: String,
pub status: ComputeStatus,
#[serde(serialize_with = "rfc3339_serialize")]
pub last_active: DateTime<Utc>,
pub error: Option<String>,
}
impl From<ComputeState> for ComputeStatusResponse {
fn from(state: ComputeState) -> Self {
ComputeStatusResponse {
tenant: state.tenant,
timeline: state.timeline,
status: state.status,
last_active: state.last_active,
error: state.error,
}
}
}
fn rfc3339_serialize<S>(x: &DateTime<Utc>, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
x.to_rfc3339().serialize(s)
}

View File

@@ -46,7 +46,7 @@ fn watch_compute_activity(compute: &ComputeNode) {
AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors?
&[],
);
let mut last_active = compute.state.read().unwrap().last_active;
let mut last_active = compute.state.lock().unwrap().last_active;
if let Ok(backs) = backends {
let mut idle_backs: Vec<DateTime<Utc>> = vec![];
@@ -87,7 +87,7 @@ fn watch_compute_activity(compute: &ComputeNode) {
}
// Update the last activity in the shared state if we got a more recent one.
let mut state = compute.state.write().unwrap();
let mut state = compute.state.lock().unwrap();
if last_active > state.last_active {
state.last_active = last_active;
debug!("set the last compute activity time to: {}", last_active);

View File

@@ -17,7 +17,7 @@ const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // mil
/// Rust representation of Postgres role info with only those fields
/// that matter for us.
#[derive(Clone, Deserialize)]
#[derive(Clone, Deserialize, Debug)]
pub struct Role {
pub name: PgIdent,
pub encrypted_password: Option<String>,
@@ -26,7 +26,7 @@ pub struct Role {
/// Rust representation of Postgres database info with only those fields
/// that matter for us.
#[derive(Clone, Deserialize)]
#[derive(Clone, Deserialize, Debug)]
pub struct Database {
pub name: PgIdent,
pub owner: PgIdent,
@@ -36,7 +36,7 @@ pub struct Database {
/// Common type representing both SQL statement params with or without value,
/// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config
/// options like `wal_level = logical`.
#[derive(Clone, Deserialize)]
#[derive(Clone, Deserialize, Debug)]
pub struct GenericOption {
pub name: String,
pub value: Option<String>,
@@ -74,18 +74,9 @@ impl GenericOption {
/// Represent `GenericOption` as configuration option.
pub fn to_pg_setting(&self) -> String {
if let Some(val) = &self.value {
// TODO: check in the console DB that we don't have these settings
// set for any non-deleted project and drop this override.
let name = match self.name.as_str() {
"safekeepers" => "neon.safekeepers",
"wal_acceptor_reconnect" => "neon.safekeeper_reconnect_timeout",
"wal_acceptor_connection_timeout" => "neon.safekeeper_connection_timeout",
it => it,
};
match self.vartype.as_ref() {
"string" => format!("{} = '{}'", name, escape_conf_value(val)),
_ => format!("{} = {}", name, val),
"string" => format!("{} = '{}'", self.name, escape_conf_value(val)),
_ => format!("{} = {}", self.name, val),
}
} else {
self.name.to_owned()

View File

@@ -8,14 +8,13 @@ use postgres::{Client, NoTls};
use serde::Deserialize;
use tracing::{info, info_span, instrument, span_enabled, warn, Level};
use crate::compute::ComputeNode;
use crate::config;
use crate::params::PG_HBA_ALL_MD5;
use crate::pg_helpers::*;
/// Cluster spec or configuration represented as an optional number of
/// delta operations + final cluster state description.
#[derive(Clone, Deserialize)]
#[derive(Clone, Deserialize, Debug, Default)]
pub struct ComputeSpec {
pub format_version: f32,
pub timestamp: String,
@@ -31,7 +30,7 @@ pub struct ComputeSpec {
/// Cluster state seen from the perspective of the external tools
/// like Rails web console.
#[derive(Clone, Deserialize)]
#[derive(Clone, Deserialize, Debug, Default)]
pub struct Cluster {
pub cluster_id: String,
pub name: String,
@@ -47,13 +46,36 @@ pub struct Cluster {
/// - DROP ROLE
/// - ALTER ROLE name RENAME TO new_name
/// - ALTER DATABASE name RENAME TO new_name
#[derive(Clone, Deserialize)]
#[derive(Clone, Deserialize, Debug)]
pub struct DeltaOp {
pub action: String,
pub name: PgIdent,
pub new_name: Option<PgIdent>,
}
/// Request spec from the control-plane by compute_id. If `NEON_CONSOLE_JWT`
/// env variable is set, it will be used for authorization.
pub fn get_spec_from_control_plane(base_uri: &str, compute_id: &str) -> Result<ComputeSpec> {
let cp_uri = format!("{base_uri}/management/api/v2/computes/{compute_id}/spec");
let jwt: String = match std::env::var("NEON_CONSOLE_JWT") {
Ok(v) => v,
Err(_) => "".to_string(),
};
info!("getting spec from control plane: {}", cp_uri);
// TODO: check the response. We should distinguish cases when it's
// - network error, then retry
// - no spec for compute yet, then wait
// - compute id is unknown or any other error, then bail out
let spec = reqwest::blocking::Client::new()
.get(cp_uri)
.header("Authorization", jwt)
.send()?
.json()?;
Ok(spec)
}
/// It takes cluster specification and does the following:
/// - Serialize cluster config and put it into `postgresql.conf` completely rewriting the file.
/// - Update `pg_hba.conf` to allow external connections.
@@ -226,8 +248,8 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
/// Reassign all dependent objects and delete requested roles.
#[instrument(skip_all)]
pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result<()> {
if let Some(ops) = &node.spec.delta_operations {
pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> {
if let Some(ops) = &spec.delta_operations {
// First, reassign all dependent objects to db owners.
info!("reassigning dependent objects of to-be-deleted roles");
@@ -244,7 +266,7 @@ pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result<
// Check that role is still present in Postgres, as this could be a
// restart with the same spec after role deletion.
if op.action == "delete_role" && existing_roles.iter().any(|r| r.name == op.name) {
reassign_owned_objects(node, &op.name)?;
reassign_owned_objects(spec, connstr, &op.name)?;
}
}
@@ -268,10 +290,10 @@ pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result<
}
// Reassign all owned objects in all databases to the owner of the database.
fn reassign_owned_objects(node: &ComputeNode, role_name: &PgIdent) -> Result<()> {
for db in &node.spec.cluster.databases {
fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> {
for db in &spec.cluster.databases {
if db.owner != *role_name {
let mut conf = Config::from_str(node.connstr.as_str())?;
let mut conf = Config::from_str(connstr)?;
conf.dbname(&db.name);
let mut client = conf.connect(NoTls)?;
@@ -416,9 +438,7 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
/// Grant CREATE ON DATABASE to the database owner and do some other alters and grants
/// to allow users creating trusted extensions and re-creating `public` schema, for example.
#[instrument(skip_all)]
pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> {
let spec = &node.spec;
pub fn handle_grants(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> {
info!("cluster spec grants:");
// We now have a separate `web_access` role to connect to the database
@@ -450,8 +470,8 @@ pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> {
// Do some per-database access adjustments. We'd better do this at db creation time,
// but CREATE DATABASE isn't transactional. So we cannot create db + do some grants
// atomically.
for db in &node.spec.cluster.databases {
let mut conf = Config::from_str(node.connstr.as_str())?;
for db in &spec.cluster.databases {
let mut conf = Config::from_str(connstr)?;
conf.dbname(&db.name);
let mut db_client = conf.connect(NoTls)?;

View File

@@ -90,7 +90,6 @@ impl ComputeControlPlane {
timeline_id,
lsn,
tenant_id,
uses_wal_proposer: false,
pg_version,
});
@@ -115,7 +114,6 @@ pub struct PostgresNode {
pub timeline_id: TimelineId,
pub lsn: Option<Lsn>, // if it's a read-only node. None for primary
pub tenant_id: TenantId,
uses_wal_proposer: bool,
pg_version: u32,
}
@@ -149,7 +147,6 @@ impl PostgresNode {
let port: u16 = conf.parse_field("port", &context)?;
let timeline_id: TimelineId = conf.parse_field("neon.timeline_id", &context)?;
let tenant_id: TenantId = conf.parse_field("neon.tenant_id", &context)?;
let uses_wal_proposer = conf.get("neon.safekeepers").is_some();
// Read postgres version from PG_VERSION file to determine which postgres version binary to use.
// If it doesn't exist, assume broken data directory and use default pg version.
@@ -172,7 +169,6 @@ impl PostgresNode {
timeline_id,
lsn: recovery_target_lsn,
tenant_id,
uses_wal_proposer,
pg_version,
})
}
@@ -364,7 +360,7 @@ impl PostgresNode {
fn load_basebackup(&self, auth_token: &Option<String>) -> Result<()> {
let backup_lsn = if let Some(lsn) = self.lsn {
Some(lsn)
} else if self.uses_wal_proposer {
} else if !self.env.safekeepers.is_empty() {
// LSN 0 means that it is bootstrap and we need to download just
// latest data from the pageserver. That is a bit clumsy but whole bootstrap
// procedure evolves quite actively right now, so let's think about it again
@@ -403,7 +399,7 @@ impl PostgresNode {
fn pg_ctl(&self, args: &[&str], auth_token: &Option<String>) -> Result<()> {
let pg_ctl_path = self.env.pg_bin_dir(self.pg_version)?.join("pg_ctl");
let mut cmd = Command::new(pg_ctl_path);
let mut cmd = Command::new(&pg_ctl_path);
cmd.args(
[
&[
@@ -432,7 +428,9 @@ impl PostgresNode {
cmd.env("NEON_AUTH_TOKEN", token);
}
let pg_ctl = cmd.output().context("pg_ctl failed")?;
let pg_ctl = cmd
.output()
.context(format!("{} failed", pg_ctl_path.display()))?;
if !pg_ctl.status.success() {
anyhow::bail!(
"pg_ctl failed, exit code: {}, stdout: {}, stderr: {}",

View File

@@ -156,7 +156,7 @@ impl SafekeeperNode {
}
background_process::start_process(
&format!("safekeeper {id}"),
&format!("safekeeper-{id}"),
&datadir,
&self.env.safekeeper_bin(),
&args,

View File

@@ -293,6 +293,9 @@ impl FeStartupPacket {
// We shouldn't advance `buf` as probably full message is not there yet,
// so can't directly use Bytes::get_u32 etc.
let len = (&buf[0..4]).read_u32::<BigEndian>().unwrap() as usize;
// The proposed replacement is `!(4..=MAX_STARTUP_PACKET_LENGTH).contains(&len)`
// which is less readable
#[allow(clippy::manual_range_contains)]
if len < 4 || len > MAX_STARTUP_PACKET_LENGTH {
return Err(ProtocolError::Protocol(format!(
"invalid startup packet message length {}",
@@ -936,35 +939,40 @@ impl<'a> BeMessage<'a> {
}
}
// Neon extension of postgres replication protocol
// See NEON_STATUS_UPDATE_TAG_BYTE
/// Feedback pageserver sends to safekeeper and safekeeper resends to compute.
/// Serialized in custom flexible key/value format. In replication protocol, it
/// is marked with NEON_STATUS_UPDATE_TAG_BYTE to differentiate from postgres
/// Standby status update / Hot standby feedback messages.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct ReplicationFeedback {
// Last known size of the timeline. Used to enforce timeline size limit.
pub struct PageserverFeedback {
/// Last known size of the timeline. Used to enforce timeline size limit.
pub current_timeline_size: u64,
// Parts of StandbyStatusUpdate we resend to compute via safekeeper
pub ps_writelsn: u64,
pub ps_applylsn: u64,
pub ps_flushlsn: u64,
pub ps_replytime: SystemTime,
/// LSN last received and ingested by the pageserver.
pub last_received_lsn: u64,
/// LSN up to which data is persisted by the pageserver to its local disc.
pub disk_consistent_lsn: u64,
/// LSN up to which data is persisted by the pageserver on s3; safekeepers
/// consider WAL before it can be removed.
pub remote_consistent_lsn: u64,
pub replytime: SystemTime,
}
// NOTE: Do not forget to increment this number when adding new fields to ReplicationFeedback.
// NOTE: Do not forget to increment this number when adding new fields to PageserverFeedback.
// Do not remove previously available fields because this might be backwards incompatible.
pub const REPLICATION_FEEDBACK_FIELDS_NUMBER: u8 = 5;
pub const PAGESERVER_FEEDBACK_FIELDS_NUMBER: u8 = 5;
impl ReplicationFeedback {
pub fn empty() -> ReplicationFeedback {
ReplicationFeedback {
impl PageserverFeedback {
pub fn empty() -> PageserverFeedback {
PageserverFeedback {
current_timeline_size: 0,
ps_writelsn: 0,
ps_applylsn: 0,
ps_flushlsn: 0,
ps_replytime: SystemTime::now(),
last_received_lsn: 0,
remote_consistent_lsn: 0,
disk_consistent_lsn: 0,
replytime: SystemTime::now(),
}
}
// Serialize ReplicationFeedback using custom format
// Serialize PageserverFeedback using custom format
// to support protocol extensibility.
//
// Following layout is used:
@@ -974,24 +982,26 @@ impl ReplicationFeedback {
// null-terminated string - key,
// uint32 - value length in bytes
// value itself
//
// TODO: change serialized fields names once all computes migrate to rename.
pub fn serialize(&self, buf: &mut BytesMut) {
buf.put_u8(REPLICATION_FEEDBACK_FIELDS_NUMBER); // # of keys
buf.put_u8(PAGESERVER_FEEDBACK_FIELDS_NUMBER); // # of keys
buf.put_slice(b"current_timeline_size\0");
buf.put_i32(8);
buf.put_u64(self.current_timeline_size);
buf.put_slice(b"ps_writelsn\0");
buf.put_i32(8);
buf.put_u64(self.ps_writelsn);
buf.put_u64(self.last_received_lsn);
buf.put_slice(b"ps_flushlsn\0");
buf.put_i32(8);
buf.put_u64(self.ps_flushlsn);
buf.put_u64(self.disk_consistent_lsn);
buf.put_slice(b"ps_applylsn\0");
buf.put_i32(8);
buf.put_u64(self.ps_applylsn);
buf.put_u64(self.remote_consistent_lsn);
let timestamp = self
.ps_replytime
.replytime
.duration_since(*PG_EPOCH)
.expect("failed to serialize pg_replytime earlier than PG_EPOCH")
.as_micros() as i64;
@@ -1001,9 +1011,10 @@ impl ReplicationFeedback {
buf.put_i64(timestamp);
}
// Deserialize ReplicationFeedback message
pub fn parse(mut buf: Bytes) -> ReplicationFeedback {
let mut rf = ReplicationFeedback::empty();
// Deserialize PageserverFeedback message
// TODO: change serialized fields names once all computes migrate to rename.
pub fn parse(mut buf: Bytes) -> PageserverFeedback {
let mut rf = PageserverFeedback::empty();
let nfields = buf.get_u8();
for _ in 0..nfields {
let key = read_cstr(&mut buf).unwrap();
@@ -1016,39 +1027,39 @@ impl ReplicationFeedback {
b"ps_writelsn" => {
let len = buf.get_i32();
assert_eq!(len, 8);
rf.ps_writelsn = buf.get_u64();
rf.last_received_lsn = buf.get_u64();
}
b"ps_flushlsn" => {
let len = buf.get_i32();
assert_eq!(len, 8);
rf.ps_flushlsn = buf.get_u64();
rf.disk_consistent_lsn = buf.get_u64();
}
b"ps_applylsn" => {
let len = buf.get_i32();
assert_eq!(len, 8);
rf.ps_applylsn = buf.get_u64();
rf.remote_consistent_lsn = buf.get_u64();
}
b"ps_replytime" => {
let len = buf.get_i32();
assert_eq!(len, 8);
let raw_time = buf.get_i64();
if raw_time > 0 {
rf.ps_replytime = *PG_EPOCH + Duration::from_micros(raw_time as u64);
rf.replytime = *PG_EPOCH + Duration::from_micros(raw_time as u64);
} else {
rf.ps_replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64);
rf.replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64);
}
}
_ => {
let len = buf.get_i32();
warn!(
"ReplicationFeedback parse. unknown key {} of len {len}. Skip it.",
"PageserverFeedback parse. unknown key {} of len {len}. Skip it.",
String::from_utf8_lossy(key.as_ref())
);
buf.advance(len as usize);
}
}
}
trace!("ReplicationFeedback parsed is {:?}", rf);
trace!("PageserverFeedback parsed is {:?}", rf);
rf
}
}
@@ -1059,33 +1070,33 @@ mod tests {
#[test]
fn test_replication_feedback_serialization() {
let mut rf = ReplicationFeedback::empty();
let mut rf = PageserverFeedback::empty();
// Fill rf with some values
rf.current_timeline_size = 12345678;
// Set rounded time to be able to compare it with deserialized value,
// because it is rounded up to microseconds during serialization.
rf.ps_replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
let mut data = BytesMut::new();
rf.serialize(&mut data);
let rf_parsed = ReplicationFeedback::parse(data.freeze());
let rf_parsed = PageserverFeedback::parse(data.freeze());
assert_eq!(rf, rf_parsed);
}
#[test]
fn test_replication_feedback_unknown_key() {
let mut rf = ReplicationFeedback::empty();
let mut rf = PageserverFeedback::empty();
// Fill rf with some values
rf.current_timeline_size = 12345678;
// Set rounded time to be able to compare it with deserialized value,
// because it is rounded up to microseconds during serialization.
rf.ps_replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
let mut data = BytesMut::new();
rf.serialize(&mut data);
// Add an extra field to the buffer and adjust number of keys
if let Some(first) = data.first_mut() {
*first = REPLICATION_FEEDBACK_FIELDS_NUMBER + 1;
*first = PAGESERVER_FEEDBACK_FIELDS_NUMBER + 1;
}
data.put_slice(b"new_field_one\0");
@@ -1093,7 +1104,7 @@ mod tests {
data.put_u64(42);
// Parse serialized data and check that new field is not parsed
let rf_parsed = ReplicationFeedback::parse(data.freeze());
let rf_parsed = PageserverFeedback::parse(data.freeze());
assert_eq!(rf, rf_parsed);
}

View File

@@ -26,3 +26,4 @@ workspace_hack.workspace = true
[dev-dependencies]
tempfile.workspace = true
test-context.workspace = true

View File

@@ -39,6 +39,9 @@ pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10;
/// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests
/// https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/
pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
/// No limits on the client side, which currenltly means 1000 for AWS S3.
/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax
pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;
const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
@@ -64,6 +67,10 @@ impl RemotePath {
pub fn object_name(&self) -> Option<&str> {
self.0.file_name().and_then(|os_str| os_str.to_str())
}
pub fn join(&self, segment: &Path) -> Self {
Self(self.0.join(segment))
}
}
/// Storage (potentially remote) API to manage its state.
@@ -71,9 +78,6 @@ impl RemotePath {
/// providing basic CRUD operations for storage files.
#[async_trait::async_trait]
pub trait RemoteStorage: Send + Sync + 'static {
/// Lists all items the storage has right now.
async fn list(&self) -> anyhow::Result<Vec<RemotePath>>;
/// Lists all top level subdirectories for a given prefix
/// Note: here we assume that if the prefix is passed it was obtained via remote_object_id
/// which already takes into account any kind of global prefix (prefix_in_bucket for S3 or storage_root for LocalFS)
@@ -266,6 +270,7 @@ pub struct S3Config {
/// AWS S3 has various limits on its API calls, we need not to exceed those.
/// See [`DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT`] for more details.
pub concurrency_limit: NonZeroUsize,
pub max_keys_per_list_response: Option<i32>,
}
impl Debug for S3Config {
@@ -275,6 +280,10 @@ impl Debug for S3Config {
.field("bucket_region", &self.bucket_region)
.field("prefix_in_bucket", &self.prefix_in_bucket)
.field("concurrency_limit", &self.concurrency_limit)
.field(
"max_keys_per_list_response",
&self.max_keys_per_list_response,
)
.finish()
}
}
@@ -303,6 +312,11 @@ impl RemoteStorageConfig {
)
.context("Failed to parse 'concurrency_limit' as a positive integer")?;
let max_keys_per_list_response =
parse_optional_integer::<i32, _>("max_keys_per_list_response", toml)
.context("Failed to parse 'max_keys_per_list_response' as a positive integer")?
.or(DEFAULT_MAX_KEYS_PER_LIST_RESPONSE);
let storage = match (local_path, bucket_name, bucket_region) {
// no 'local_path' nor 'bucket_name' options are provided, consider this remote storage disabled
(None, None, None) => return Ok(None),
@@ -324,6 +338,7 @@ impl RemoteStorageConfig {
.map(|endpoint| parse_toml_string("endpoint", endpoint))
.transpose()?,
concurrency_limit,
max_keys_per_list_response,
}),
(Some(local_path), None, None) => RemoteStorageKind::LocalFs(PathBuf::from(
parse_toml_string("local_path", local_path)?,

View File

@@ -73,10 +73,8 @@ impl LocalFs {
Ok(None)
}
}
}
#[async_trait::async_trait]
impl RemoteStorage for LocalFs {
#[cfg(test)]
async fn list(&self) -> anyhow::Result<Vec<RemotePath>> {
Ok(get_all_files(&self.storage_root, true)
.await?
@@ -91,7 +89,10 @@ impl RemoteStorage for LocalFs {
})
.collect())
}
}
#[async_trait::async_trait]
impl RemoteStorage for LocalFs {
async fn list_prefixes(
&self,
prefix: Option<&RemotePath>,

View File

@@ -102,6 +102,7 @@ pub struct S3Bucket {
client: Client,
bucket_name: String,
prefix_in_bucket: Option<String>,
max_keys_per_list_response: Option<i32>,
// Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
// Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
// The helps to ensure we don't exceed the thresholds.
@@ -164,6 +165,7 @@ impl S3Bucket {
Ok(Self {
client,
bucket_name: aws_config.bucket_name.clone(),
max_keys_per_list_response: aws_config.max_keys_per_list_response,
prefix_in_bucket,
concurrency_limiter: Arc::new(Semaphore::new(aws_config.concurrency_limit.get())),
})
@@ -273,48 +275,6 @@ impl<S: AsyncRead> AsyncRead for RatelimitedAsyncRead<S> {
#[async_trait::async_trait]
impl RemoteStorage for S3Bucket {
async fn list(&self) -> anyhow::Result<Vec<RemotePath>> {
let mut document_keys = Vec::new();
let mut continuation_token = None;
loop {
let _guard = self
.concurrency_limiter
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 list")?;
metrics::inc_list_objects();
let fetch_response = self
.client
.list_objects_v2()
.bucket(self.bucket_name.clone())
.set_prefix(self.prefix_in_bucket.clone())
.set_continuation_token(continuation_token)
.send()
.await
.map_err(|e| {
metrics::inc_list_objects_fail();
e
})?;
document_keys.extend(
fetch_response
.contents
.unwrap_or_default()
.into_iter()
.filter_map(|o| Some(self.s3_object_to_relative_path(o.key()?))),
);
match fetch_response.continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
Ok(document_keys)
}
/// See the doc for `RemoteStorage::list_prefixes`
/// Note: it wont include empty "directories"
async fn list_prefixes(
@@ -354,6 +314,7 @@ impl RemoteStorage for S3Bucket {
.set_prefix(list_prefix.clone())
.set_continuation_token(continuation_token)
.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string())
.set_max_keys(self.max_keys_per_list_response)
.send()
.await
.map_err(|e| {
@@ -371,7 +332,7 @@ impl RemoteStorage for S3Bucket {
.filter_map(|o| Some(self.s3_object_to_relative_path(o.prefix()?))),
);
match fetch_response.continuation_token {
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}

View File

@@ -20,7 +20,6 @@ pub struct UnreliableWrapper {
/// Used to identify retries of different unique operation.
#[derive(Debug, Hash, Eq, PartialEq)]
enum RemoteOp {
List,
ListPrefixes(Option<RemotePath>),
Upload(RemotePath),
Download(RemotePath),
@@ -75,12 +74,6 @@ impl UnreliableWrapper {
#[async_trait::async_trait]
impl RemoteStorage for UnreliableWrapper {
/// Lists all items the storage has right now.
async fn list(&self) -> anyhow::Result<Vec<RemotePath>> {
self.attempt(RemoteOp::List)?;
self.inner.list().await
}
async fn list_prefixes(
&self,
prefix: Option<&RemotePath>,

View File

@@ -0,0 +1,275 @@
use std::collections::HashSet;
use std::env;
use std::num::{NonZeroU32, NonZeroUsize};
use std::ops::ControlFlow;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::UNIX_EPOCH;
use anyhow::Context;
use remote_storage::{
GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config,
};
use test_context::{test_context, AsyncTestContext};
use tokio::task::JoinSet;
use tracing::{debug, error, info};
const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
/// Tests that S3 client can list all prefixes, even if the response come paginated and requires multiple S3 queries.
/// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified.
/// See the client creation in [`create_s3_client`] for details on the required env vars.
/// If real S3 tests are disabled, the test passes, skipping any real test run: currently, there's no way to mark the test ignored in runtime with the
/// deafult test framework, see https://github.com/rust-lang/rust/issues/68007 for details.
///
/// First, the test creates a set of S3 objects with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_s3_data`]
/// where
/// * `random_prefix_part` is set for the entire S3 client during the S3 client creation in [`create_s3_client`], to avoid multiple test runs interference
/// * `base_prefix_str` is a common prefix to use in the client requests: we would want to ensure that the client is able to list nested prefixes inside the bucket
///
/// Then, verifies that the client does return correct prefixes when queried:
/// * with no prefix, it lists everything after its `${random_prefix_part}/` — that should be `${base_prefix_str}` value only
/// * with `${base_prefix_str}/` prefix, it lists every `sub_prefix_${i}`
///
/// With the real S3 enabled and `#[cfg(test)]` Rust configuration used, the S3 client test adds a `max-keys` param to limit the response keys.
/// This way, we are able to test the pagination implicitly, by ensuring all results are returned from the remote storage and avoid uploading too many blobs to S3,
/// since current default AWS S3 pagination limit is 1000.
/// (see https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax)
///
/// Lastly, the test attempts to clean up and remove all uploaded S3 files.
/// If any errors appear during the clean up, they get logged, but the test is not failed or stopped until clean up is finished.
#[test_context(MaybeEnabledS3)]
#[tokio::test]
async fn s3_pagination_should_work(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> {
let ctx = match ctx {
MaybeEnabledS3::Enabled(ctx) => ctx,
MaybeEnabledS3::Disabled => return Ok(()),
MaybeEnabledS3::UploadsFailed(e, _) => anyhow::bail!("S3 init failed: {e:?}"),
};
let test_client = Arc::clone(&ctx.client_with_excessive_pagination);
let expected_remote_prefixes = ctx.remote_prefixes.clone();
let base_prefix =
RemotePath::new(Path::new(ctx.base_prefix_str)).context("common_prefix construction")?;
let root_remote_prefixes = test_client
.list_prefixes(None)
.await
.context("client list root prefixes failure")?
.into_iter()
.collect::<HashSet<_>>();
assert_eq!(
root_remote_prefixes, HashSet::from([base_prefix.clone()]),
"remote storage root prefixes list mismatches with the uploads. Returned prefixes: {root_remote_prefixes:?}"
);
let nested_remote_prefixes = test_client
.list_prefixes(Some(&base_prefix))
.await
.context("client list nested prefixes failure")?
.into_iter()
.collect::<HashSet<_>>();
let remote_only_prefixes = nested_remote_prefixes
.difference(&expected_remote_prefixes)
.collect::<HashSet<_>>();
let missing_uploaded_prefixes = expected_remote_prefixes
.difference(&nested_remote_prefixes)
.collect::<HashSet<_>>();
assert_eq!(
remote_only_prefixes.len() + missing_uploaded_prefixes.len(), 0,
"remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}",
);
Ok(())
}
enum MaybeEnabledS3 {
Enabled(S3WithTestBlobs),
Disabled,
UploadsFailed(anyhow::Error, S3WithTestBlobs),
}
struct S3WithTestBlobs {
client_with_excessive_pagination: Arc<GenericRemoteStorage>,
base_prefix_str: &'static str,
remote_prefixes: HashSet<RemotePath>,
remote_blobs: HashSet<RemotePath>,
}
#[async_trait::async_trait]
impl AsyncTestContext for MaybeEnabledS3 {
async fn setup() -> Self {
utils::logging::init(utils::logging::LogFormat::Test).expect("logging init failed");
if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
info!(
"`{}` env variable is not set, skipping the test",
ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
);
return Self::Disabled;
}
let max_keys_in_list_response = 10;
let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
let client_with_excessive_pagination = create_s3_client(max_keys_in_list_response)
.context("S3 client creation")
.expect("S3 client creation failed");
let base_prefix_str = "test/";
match upload_s3_data(
&client_with_excessive_pagination,
base_prefix_str,
upload_tasks_count,
)
.await
{
ControlFlow::Continue(uploads) => {
info!("Remote objects created successfully");
Self::Enabled(S3WithTestBlobs {
client_with_excessive_pagination,
base_prefix_str,
remote_prefixes: uploads.prefixes,
remote_blobs: uploads.blobs,
})
}
ControlFlow::Break(uploads) => Self::UploadsFailed(
anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
S3WithTestBlobs {
client_with_excessive_pagination,
base_prefix_str,
remote_prefixes: uploads.prefixes,
remote_blobs: uploads.blobs,
},
),
}
}
async fn teardown(self) {
match self {
Self::Disabled => {}
Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
cleanup(&ctx.client_with_excessive_pagination, ctx.remote_blobs).await;
}
}
}
}
fn create_s3_client(max_keys_per_list_response: i32) -> anyhow::Result<Arc<GenericRemoteStorage>> {
let remote_storage_s3_bucket = env::var("REMOTE_STORAGE_S3_BUCKET")
.context("`REMOTE_STORAGE_S3_BUCKET` env var is not set, but real S3 tests are enabled")?;
let remote_storage_s3_region = env::var("REMOTE_STORAGE_S3_REGION")
.context("`REMOTE_STORAGE_S3_REGION` env var is not set, but real S3 tests are enabled")?;
let random_prefix_part = std::time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.context("random s3 test prefix part calculation")?
.as_millis();
let remote_storage_config = RemoteStorageConfig {
max_concurrent_syncs: NonZeroUsize::new(100).unwrap(),
max_sync_errors: NonZeroU32::new(5).unwrap(),
storage: RemoteStorageKind::AwsS3(S3Config {
bucket_name: remote_storage_s3_bucket,
bucket_region: remote_storage_s3_region,
prefix_in_bucket: Some(format!("pagination_should_work_test_{random_prefix_part}/")),
endpoint: None,
concurrency_limit: NonZeroUsize::new(100).unwrap(),
max_keys_per_list_response: Some(max_keys_per_list_response),
}),
};
Ok(Arc::new(
GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?,
))
}
struct Uploads {
prefixes: HashSet<RemotePath>,
blobs: HashSet<RemotePath>,
}
async fn upload_s3_data(
client: &Arc<GenericRemoteStorage>,
base_prefix_str: &'static str,
upload_tasks_count: usize,
) -> ControlFlow<Uploads, Uploads> {
info!("Creating {upload_tasks_count} S3 files");
let mut upload_tasks = JoinSet::new();
for i in 1..upload_tasks_count + 1 {
let task_client = Arc::clone(client);
upload_tasks.spawn(async move {
let prefix = PathBuf::from(format!("{base_prefix_str}/sub_prefix_{i}/"));
let blob_prefix = RemotePath::new(&prefix)
.with_context(|| format!("{prefix:?} to RemotePath conversion"))?;
let blob_path = blob_prefix.join(Path::new(&format!("blob_{i}")));
debug!("Creating remote item {i} at path {blob_path:?}");
let data = format!("remote blob data {i}").into_bytes();
let data_len = data.len();
task_client
.upload(
Box::new(std::io::Cursor::new(data)),
data_len,
&blob_path,
None,
)
.await?;
Ok::<_, anyhow::Error>((blob_prefix, blob_path))
});
}
let mut upload_tasks_failed = false;
let mut uploaded_prefixes = HashSet::with_capacity(upload_tasks_count);
let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count);
while let Some(task_run_result) = upload_tasks.join_next().await {
match task_run_result
.context("task join failed")
.and_then(|task_result| task_result.context("upload task failed"))
{
Ok((upload_prefix, upload_path)) => {
uploaded_prefixes.insert(upload_prefix);
uploaded_blobs.insert(upload_path);
}
Err(e) => {
error!("Upload task failed: {e:?}");
upload_tasks_failed = true;
}
}
}
let uploads = Uploads {
prefixes: uploaded_prefixes,
blobs: uploaded_blobs,
};
if upload_tasks_failed {
ControlFlow::Break(uploads)
} else {
ControlFlow::Continue(uploads)
}
}
async fn cleanup(client: &Arc<GenericRemoteStorage>, objects_to_delete: HashSet<RemotePath>) {
info!(
"Removing {} objects from the remote storage during cleanup",
objects_to_delete.len()
);
let mut delete_tasks = JoinSet::new();
for object_to_delete in objects_to_delete {
let task_client = Arc::clone(client);
delete_tasks.spawn(async move {
debug!("Deleting remote item at path {object_to_delete:?}");
task_client
.delete(&object_to_delete)
.await
.with_context(|| format!("{object_to_delete:?} removal"))
});
}
while let Some(task_run_result) = delete_tasks.join_next().await {
match task_run_result {
Ok(task_result) => match task_result {
Ok(()) => {}
Err(e) => error!("Delete task failed: {e:?}"),
},
Err(join_err) => error!("Delete task did not finish correctly: {join_err}"),
}
}
}

View File

@@ -20,6 +20,9 @@ pub enum ApiError {
#[error("Conflict: {0}")]
Conflict(String),
#[error("Precondition failed: {0}")]
PreconditionFailed(&'static str),
#[error(transparent)]
InternalServerError(anyhow::Error),
}
@@ -44,6 +47,10 @@ impl ApiError {
ApiError::Conflict(_) => {
HttpErrorBody::response_from_msg_and_status(self.to_string(), StatusCode::CONFLICT)
}
ApiError::PreconditionFailed(_) => HttpErrorBody::response_from_msg_and_status(
self.to_string(),
StatusCode::PRECONDITION_FAILED,
),
ApiError::InternalServerError(err) => HttpErrorBody::response_from_msg_and_status(
err.to_string(),
StatusCode::INTERNAL_SERVER_ERROR,

View File

@@ -11,6 +11,14 @@ use serde::{Deserialize, Serialize};
pub struct Percent(#[serde(deserialize_with = "deserialize_pct_0_to_100")] u8);
impl Percent {
pub const fn new(pct: u8) -> Option<Self> {
if pct <= 100 {
Some(Percent(pct))
} else {
None
}
}
pub fn get(&self) -> u8 {
self.0
}

View File

@@ -1,25 +1,7 @@
use signal_hook::flag;
use signal_hook::iterator::Signals;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
pub use signal_hook::consts::{signal::*, TERM_SIGNALS};
pub fn install_shutdown_handlers() -> anyhow::Result<ShutdownSignals> {
let term_now = Arc::new(AtomicBool::new(false));
for sig in TERM_SIGNALS {
// When terminated by a second term signal, exit with exit code 1.
// This will do nothing the first time (because term_now is false).
flag::register_conditional_shutdown(*sig, 1, Arc::clone(&term_now))?;
// But this will "arm" the above for the second time, by setting it to true.
// The order of registering these is important, if you put this one first, it will
// first arm and then terminate all in the first round.
flag::register(*sig, Arc::clone(&term_now))?;
}
Ok(ShutdownSignals)
}
pub enum Signal {
Quit,
Interrupt,
@@ -39,10 +21,7 @@ impl Signal {
pub struct ShutdownSignals;
impl ShutdownSignals {
pub fn handle(
self,
mut handler: impl FnMut(Signal) -> anyhow::Result<()>,
) -> anyhow::Result<()> {
pub fn handle(mut handler: impl FnMut(Signal) -> anyhow::Result<()>) -> anyhow::Result<()> {
for raw_signal in Signals::new(TERM_SIGNALS)?.into_iter() {
let signal = match raw_signal {
SIGINT => Signal::Interrupt,

View File

@@ -25,11 +25,9 @@ use pageserver::{
virtual_file,
};
use postgres_backend::AuthType;
use utils::signals::ShutdownSignals;
use utils::{
auth::JwtAuth,
logging, project_git_version,
sentry_init::init_sentry,
signals::{self, Signal},
auth::JwtAuth, logging, project_git_version, sentry_init::init_sentry, signals::Signal,
tcp_listener,
};
@@ -264,9 +262,6 @@ fn start_pageserver(
info!("Starting pageserver pg protocol handler on {pg_addr}");
let pageserver_listener = tcp_listener::bind(pg_addr)?;
// Install signal handlers
let signals = signals::install_shutdown_handlers()?;
// Launch broker client
WALRECEIVER_RUNTIME.block_on(pageserver::broker_client::init_broker_client(conf))?;
@@ -430,7 +425,7 @@ fn start_pageserver(
}
// All started up! Now just sit and wait for shutdown signal.
signals.handle(|signal| match signal {
ShutdownSignals::handle(|signal| match signal {
Signal::Quit => {
info!(
"Got {}. Terminating in immediate shutdown mode",

View File

@@ -170,6 +170,10 @@ pub struct PageServerConf {
/// Number of concurrent [`Tenant::gather_size_inputs`] allowed.
pub concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore,
/// Limit of concurrent [`Tenant::gather_size_inputs`] issued by module `eviction_task`.
/// The number of permits is the same as `concurrent_tenant_size_logical_size_queries`.
/// See the comment in `eviction_task` for details.
pub eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore,
// How often to collect metrics and send them to the metrics endpoint.
pub metric_collection_interval: Duration,
@@ -246,7 +250,7 @@ struct PageServerConfigBuilder {
log_format: BuilderValue<LogFormat>,
concurrent_tenant_size_logical_size_queries: BuilderValue<ConfigurableSemaphore>,
concurrent_tenant_size_logical_size_queries: BuilderValue<NonZeroUsize>,
metric_collection_interval: BuilderValue<Duration>,
cached_metric_collection_interval: BuilderValue<Duration>,
@@ -295,7 +299,9 @@ impl Default for PageServerConfigBuilder {
.expect("cannot parse default keepalive interval")),
log_format: Set(LogFormat::from_str(DEFAULT_LOG_FORMAT).unwrap()),
concurrent_tenant_size_logical_size_queries: Set(ConfigurableSemaphore::default()),
concurrent_tenant_size_logical_size_queries: Set(
ConfigurableSemaphore::DEFAULT_INITIAL,
),
metric_collection_interval: Set(humantime::parse_duration(
DEFAULT_METRIC_COLLECTION_INTERVAL,
)
@@ -400,7 +406,7 @@ impl PageServerConfigBuilder {
self.log_format = BuilderValue::Set(log_format)
}
pub fn concurrent_tenant_size_logical_size_queries(&mut self, u: ConfigurableSemaphore) {
pub fn concurrent_tenant_size_logical_size_queries(&mut self, u: NonZeroUsize) {
self.concurrent_tenant_size_logical_size_queries = BuilderValue::Set(u);
}
@@ -449,6 +455,11 @@ impl PageServerConfigBuilder {
}
pub fn build(self) -> anyhow::Result<PageServerConf> {
let concurrent_tenant_size_logical_size_queries = self
.concurrent_tenant_size_logical_size_queries
.ok_or(anyhow!(
"missing concurrent_tenant_size_logical_size_queries"
))?;
Ok(PageServerConf {
listen_pg_addr: self
.listen_pg_addr
@@ -496,11 +507,12 @@ impl PageServerConfigBuilder {
.broker_keepalive_interval
.ok_or(anyhow!("No broker keepalive interval provided"))?,
log_format: self.log_format.ok_or(anyhow!("missing log_format"))?,
concurrent_tenant_size_logical_size_queries: self
.concurrent_tenant_size_logical_size_queries
.ok_or(anyhow!(
"missing concurrent_tenant_size_logical_size_queries"
))?,
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::new(
concurrent_tenant_size_logical_size_queries,
),
eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore::new(
concurrent_tenant_size_logical_size_queries,
),
metric_collection_interval: self
.metric_collection_interval
.ok_or(anyhow!("missing metric_collection_interval"))?,
@@ -698,8 +710,7 @@ impl PageServerConf {
"concurrent_tenant_size_logical_size_queries" => builder.concurrent_tenant_size_logical_size_queries({
let input = parse_toml_string(key, item)?;
let permits = input.parse::<usize>().context("expected a number of initial permits, not {s:?}")?;
let permits = NonZeroUsize::new(permits).context("initial semaphore permits out of range: 0, use other configuration to disable a feature")?;
ConfigurableSemaphore::new(permits)
NonZeroUsize::new(permits).context("initial semaphore permits out of range: 0, use other configuration to disable a feature")?
}),
"metric_collection_interval" => builder.metric_collection_interval(parse_toml_duration(key, item)?),
"cached_metric_collection_interval" => builder.cached_metric_collection_interval(parse_toml_duration(key, item)?),
@@ -860,6 +871,8 @@ impl PageServerConf {
broker_keepalive_interval: Duration::from_secs(5000),
log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore::default(
),
metric_collection_interval: Duration::from_secs(60),
cached_metric_collection_interval: Duration::from_secs(60 * 60),
metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
@@ -953,6 +966,11 @@ impl ConfigurableSemaphore {
inner: std::sync::Arc::new(tokio::sync::Semaphore::new(initial_permits.get())),
}
}
/// Returns the configured amount of permits.
pub fn initial_permits(&self) -> NonZeroUsize {
self.initial_permits
}
}
impl Default for ConfigurableSemaphore {
@@ -1057,6 +1075,8 @@ log_format = 'json'
)?,
log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
eviction_task_immitated_concurrent_logical_size_queries:
ConfigurableSemaphore::default(),
metric_collection_interval: humantime::parse_duration(
defaults::DEFAULT_METRIC_COLLECTION_INTERVAL
)?,
@@ -1118,6 +1138,8 @@ log_format = 'json'
broker_keepalive_interval: Duration::from_secs(5),
log_format: LogFormat::Json,
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
eviction_task_immitated_concurrent_logical_size_queries:
ConfigurableSemaphore::default(),
metric_collection_interval: Duration::from_secs(222),
cached_metric_collection_interval: Duration::from_secs(22200),
metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?),
@@ -1250,6 +1272,7 @@ broker_endpoint = '{broker_endpoint}'
prefix_in_bucket: Some(prefix_in_bucket.clone()),
endpoint: Some(endpoint.clone()),
concurrency_limit: s3_concurrency_limit,
max_keys_per_list_response: None,
}),
},
"Remote storage config should correctly parse the S3 config"

View File

@@ -22,7 +22,8 @@
//! If the actual usage is higher, the threshold is exceeded.
//! `min_avail_bytes` is the absolute available space in bytes.
//! If the actual usage is lower, the threshold is exceeded.
//!
//! If either of these thresholds is exceeded, the system is considered to have "disk pressure", and eviction
//! is performed on the next iteration, to release disk space and bring the usage below the thresholds again.
//! The iteration evicts layers in LRU fashion, but, with a weak reservation per tenant.
//! The reservation is to keep the most recently accessed X bytes per tenant resident.
//! If we cannot relieve pressure by evicting layers outside of the reservation, we
@@ -34,7 +35,11 @@
//! The idea is to allow at least one layer to be resident per tenant, to ensure it can make forward progress
//! during page reconstruction.
//! An alternative default for all tenants can be specified in the `tenant_config` section of the config.
//! Lastly, each tenant can have an override in their respectice tenant config (`min_resident_size_override`).
//! Lastly, each tenant can have an override in their respective tenant config (`min_resident_size_override`).
// Implementation notes:
// - The `#[allow(dead_code)]` above various structs are to suppress warnings about only the Debug impl
// reading these fields. We use the Debug impl for semi-structured logging, though.
use std::{
collections::HashMap,
@@ -224,6 +229,7 @@ pub enum IterationOutcome<U> {
Finished(IterationOutcomeFinished<U>),
}
#[allow(dead_code)]
#[derive(Debug, Serialize)]
pub struct IterationOutcomeFinished<U> {
/// The actual usage observed before we started the iteration.
@@ -238,6 +244,7 @@ pub struct IterationOutcomeFinished<U> {
}
#[derive(Debug, Serialize)]
#[allow(dead_code)]
struct AssumedUsage<U> {
/// The expected value for `after`, after phase 2.
projected_after: U,
@@ -245,12 +252,14 @@ struct AssumedUsage<U> {
failed: LayerCount,
}
#[allow(dead_code)]
#[derive(Debug, Serialize)]
struct PlannedUsage<U> {
respecting_tenant_min_resident_size: U,
fallback_to_global_lru: Option<U>,
}
#[allow(dead_code)]
#[derive(Debug, Default, Serialize)]
struct LayerCount {
file_sizes: u64,
@@ -608,6 +617,7 @@ mod filesystem_level_usage {
use super::DiskUsageEvictionTaskConfig;
#[derive(Debug, Clone, Copy)]
#[allow(dead_code)]
pub struct Usage<'a> {
config: &'a DiskUsageEvictionTaskConfig,
@@ -629,7 +639,7 @@ mod filesystem_level_usage {
),
(
"max_usage_pct",
usage_pct > self.config.max_usage_pct.get() as u64,
usage_pct >= self.config.max_usage_pct.get() as u64,
),
];
@@ -676,4 +686,43 @@ mod filesystem_level_usage {
avail_bytes,
})
}
#[test]
fn max_usage_pct_pressure() {
use super::Usage as _;
use std::time::Duration;
use utils::serde_percent::Percent;
let mut usage = Usage {
config: &DiskUsageEvictionTaskConfig {
max_usage_pct: Percent::new(85).unwrap(),
min_avail_bytes: 0,
period: Duration::MAX,
#[cfg(feature = "testing")]
mock_statvfs: None,
},
total_bytes: 100_000,
avail_bytes: 0,
};
assert!(usage.has_pressure(), "expected pressure at 100%");
usage.add_available_bytes(14_000);
assert!(usage.has_pressure(), "expected pressure at 86%");
usage.add_available_bytes(999);
assert!(usage.has_pressure(), "expected pressure at 85.001%");
usage.add_available_bytes(1);
assert!(usage.has_pressure(), "expected pressure at precisely 85%");
usage.add_available_bytes(1);
assert!(!usage.has_pressure(), "no pressure at 84.999%");
usage.add_available_bytes(999);
assert!(!usage.has_pressure(), "no pressure at 84%");
usage.add_available_bytes(16_000);
assert!(!usage.has_pressure());
}
}

View File

@@ -214,6 +214,13 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/NotFoundError"
"412":
description: Tenant is missing
content:
application/json:
schema:
$ref: "#/components/schemas/PreconditionFailedError"
"500":
description: Generic operation error
content:
@@ -891,13 +898,9 @@ components:
type: object
properties:
tenant_specific_overrides:
type: object
schema:
$ref: "#/components/schemas/TenantConfigInfo"
$ref: "#/components/schemas/TenantConfigInfo"
effective_config:
type: object
schema:
$ref: "#/components/schemas/TenantConfigInfo"
$ref: "#/components/schemas/TenantConfigInfo"
TimelineInfo:
type: object
required:
@@ -983,6 +986,13 @@ components:
properties:
msg:
type: string
PreconditionFailedError:
type: object
required:
- msg
properties:
msg:
type: string
security:
- JWT: []

View File

@@ -152,6 +152,11 @@ impl From<crate::tenant::mgr::DeleteTimelineError> for ApiError {
fn from(value: crate::tenant::mgr::DeleteTimelineError) -> Self {
use crate::tenant::mgr::DeleteTimelineError::*;
match value {
// Report Precondition failed so client can distinguish between
// "tenant is missing" case from "timeline is missing"
Tenant(TenantStateError::NotFound(..)) => {
ApiError::PreconditionFailed("Requested tenant is missing")
}
Tenant(t) => ApiError::from(t),
Timeline(t) => ApiError::from(t),
}

View File

@@ -257,7 +257,7 @@ impl EvictionsWithLowResidenceDuration {
}
pub fn observe(&self, observed_value: Duration) {
if self.threshold < observed_value {
if observed_value < self.threshold {
self.counter
.as_ref()
.expect("nobody calls this function after `remove_from_vec`")

View File

@@ -27,6 +27,7 @@ use pq_proto::FeStartupPacket;
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
use std::io;
use std::net::TcpListener;
use std::pin::pin;
use std::str;
use std::str::FromStr;
use std::sync::Arc;
@@ -466,8 +467,7 @@ impl PageServerHandler {
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
pgb.flush().await?;
let copyin_reader = StreamReader::new(copyin_stream(pgb));
tokio::pin!(copyin_reader);
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
timeline
.import_basebackup_from_tar(&mut copyin_reader, base_lsn, &ctx)
.await?;
@@ -512,8 +512,7 @@ impl PageServerHandler {
info!("importing wal");
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
pgb.flush().await?;
let copyin_reader = StreamReader::new(copyin_stream(pgb));
tokio::pin!(copyin_reader);
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?;
info!("wal import complete");

View File

@@ -46,6 +46,7 @@ use std::time::{Duration, Instant};
use self::config::TenantConf;
use self::metadata::TimelineMetadata;
use self::remote_timeline_client::RemoteTimelineClient;
use self::timeline::EvictionTaskTenantState;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::import_datadir;
@@ -142,6 +143,8 @@ pub struct Tenant {
/// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`].
cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
cached_synthetic_tenant_size: Arc<AtomicU64>,
eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
}
/// A timeline with some of its files on disk, being initialized.
@@ -1788,6 +1791,7 @@ impl Tenant {
state,
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
}
}

View File

@@ -6,6 +6,7 @@ use std::sync::Arc;
use anyhow::{bail, Context};
use tokio::sync::oneshot::error::RecvError;
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
use crate::context::RequestContext;
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
@@ -352,6 +353,10 @@ async fn fill_logical_sizes(
// our advantage with `?` error handling.
let mut joinset = tokio::task::JoinSet::new();
let cancel = tokio_util::sync::CancellationToken::new();
// be sure to cancel all spawned tasks if we are dropped
let _dg = cancel.clone().drop_guard();
// For each point that would benefit from having a logical size available,
// spawn a Task to fetch it, unless we have it cached already.
for seg in segments.iter() {
@@ -373,6 +378,7 @@ async fn fill_logical_sizes(
timeline,
lsn,
ctx,
cancel.child_token(),
));
}
e.insert(cached_size);
@@ -477,13 +483,14 @@ async fn calculate_logical_size(
timeline: Arc<crate::tenant::Timeline>,
lsn: utils::lsn::Lsn,
ctx: RequestContext,
cancel: CancellationToken,
) -> Result<TimelineAtLsnSizeResult, RecvError> {
let _permit = tokio::sync::Semaphore::acquire_owned(limit)
.await
.expect("global semaphore should not had been closed");
let size_res = timeline
.spawn_ondemand_logical_size_calculation(lsn, ctx)
.spawn_ondemand_logical_size_calculation(lsn, ctx, cancel)
.instrument(info_span!("spawn_ondemand_logical_size_calculation"))
.await?;
Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))

View File

@@ -25,6 +25,7 @@ use std::collections::HashMap;
use std::fs;
use std::ops::{Deref, Range};
use std::path::{Path, PathBuf};
use std::pin::pin;
use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
@@ -72,6 +73,7 @@ use crate::ZERO_PAGE;
use crate::{is_temporary, task_mgr};
use walreceiver::spawn_connection_manager_task;
pub(super) use self::eviction_task::EvictionTaskTenantState;
use self::eviction_task::EvictionTaskTimelineState;
use super::layer_map::BatchedUpdates;
@@ -677,8 +679,7 @@ impl Timeline {
let mut failed = 0;
let cancelled = task_mgr::shutdown_watcher();
tokio::pin!(cancelled);
let mut cancelled = pin!(task_mgr::shutdown_watcher());
loop {
tokio::select! {
@@ -1126,6 +1127,9 @@ impl Timeline {
self.metrics
.evictions_with_low_residence_duration
.observe(delta);
info!(layer=%local_layer.short_id(), residence_millis=delta.as_millis(), "evicted layer after known residence period");
} else {
info!(layer=%local_layer.short_id(), "evicted layer after unknown residence period");
}
true
@@ -1768,8 +1772,11 @@ impl Timeline {
false,
// NB: don't log errors here, task_mgr will do that.
async move {
// no cancellation here, because nothing really waits for this to complete compared
// to spawn_ondemand_logical_size_calculation.
let cancel = CancellationToken::new();
let calculated_size = match self_clone
.logical_size_calculation_task(lsn, &background_ctx)
.logical_size_calculation_task(lsn, &background_ctx, cancel)
.await
{
Ok(s) => s,
@@ -1824,6 +1831,7 @@ impl Timeline {
self: &Arc<Self>,
lsn: Lsn,
ctx: RequestContext,
cancel: CancellationToken,
) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
let (sender, receiver) = oneshot::channel();
let self_clone = Arc::clone(self);
@@ -1843,7 +1851,9 @@ impl Timeline {
"ondemand logical size calculation",
false,
async move {
let res = self_clone.logical_size_calculation_task(lsn, &ctx).await;
let res = self_clone
.logical_size_calculation_task(lsn, &ctx, cancel)
.await;
let _ = sender.send(res).ok();
Ok(()) // Receiver is responsible for handling errors
},
@@ -1856,18 +1866,18 @@ impl Timeline {
self: &Arc<Self>,
lsn: Lsn,
ctx: &RequestContext,
cancel: CancellationToken,
) -> Result<u64, CalculateLogicalSizeError> {
let mut timeline_state_updates = self.subscribe_for_state_updates();
let self_calculation = Arc::clone(self);
let cancel = CancellationToken::new();
let calculation = async {
let mut calculation = pin!(async {
let cancel = cancel.child_token();
let ctx = ctx.attached_child();
self_calculation
.calculate_logical_size(lsn, cancel, &ctx)
.await
};
});
let timeline_state_cancellation = async {
loop {
match timeline_state_updates.changed().await {
@@ -1896,7 +1906,6 @@ impl Timeline {
"aborted because task_mgr shutdown requested".to_string()
};
tokio::pin!(calculation);
loop {
tokio::select! {
res = &mut calculation => { return res }

View File

@@ -14,6 +14,7 @@
//!
//! See write-up on restart on-demand download spike: <https://gist.github.com/problame/2265bf7b8dc398be834abfead36c76b5>
use std::{
collections::HashMap,
ops::ControlFlow,
sync::Arc,
time::{Duration, SystemTime},
@@ -29,6 +30,7 @@ use crate::{
tenant::{
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
storage_layer::PersistentLayer,
Tenant,
},
};
@@ -36,7 +38,12 @@ use super::Timeline;
#[derive(Default)]
pub struct EvictionTaskTimelineState {
last_refresh_required_in_restart: Option<tokio::time::Instant>,
last_layer_access_imitation: Option<tokio::time::Instant>,
}
#[derive(Default)]
pub struct EvictionTaskTenantState {
last_layer_access_imitation: Option<Instant>,
}
impl Timeline {
@@ -126,6 +133,35 @@ impl Timeline {
) -> ControlFlow<()> {
let now = SystemTime::now();
// If we evict layers but keep cached values derived from those layers, then
// we face a storm of on-demand downloads after pageserver restart.
// The reason is that the restart empties the caches, and so, the values
// need to be re-computed by accessing layers, which we evicted while the
// caches were filled.
//
// Solutions here would be one of the following:
// 1. Have a persistent cache.
// 2. Count every access to a cached value to the access stats of all layers
// that were accessed to compute the value in the first place.
// 3. Invalidate the caches at a period of < p.threshold/2, so that the values
// get re-computed from layers, thereby counting towards layer access stats.
// 4. Make the eviction task imitate the layer accesses that typically hit caches.
//
// We follow approach (4) here because in Neon prod deployment:
// - page cache is quite small => high churn => low hit rate
// => eviction gets correct access stats
// - value-level caches such as logical size & repatition have a high hit rate,
// especially for inactive tenants
// => eviction sees zero accesses for these
// => they cause the on-demand download storm on pageserver restart
//
// We should probably move to persistent caches in the future, or avoid
// having inactive tenants attached to pageserver in the first place.
match self.imitate_layer_accesses(p, cancel, ctx).await {
ControlFlow::Break(()) => return ControlFlow::Break(()),
ControlFlow::Continue(()) => (),
}
#[allow(dead_code)]
#[derive(Debug, Default)]
struct EvictionStats {
@@ -136,27 +172,6 @@ impl Timeline {
skipped_for_shutdown: usize,
}
// what we want is to invalidate any caches which haven't been accessed for `p.threshold`,
// but we cannot actually do it for current limitations except by restarting pageserver. we
// just recompute the values which would be recomputed on startup.
//
// for active tenants this will likely materialized page cache or in-memory layers. for
// inactive tenants it will refresh the last_access timestamps so that we will not evict
// and re-download on restart these layers.
let mut state = self.eviction_task_timeline_state.lock().await;
match state.last_refresh_required_in_restart {
Some(ts) if ts.elapsed() < p.threshold => { /* no need to run */ }
_ => {
self.refresh_layers_required_in_restart(cancel, ctx).await;
state.last_refresh_required_in_restart = Some(tokio::time::Instant::now())
}
}
drop(state);
if cancel.is_cancelled() {
return ControlFlow::Break(());
}
let mut stats = EvictionStats::default();
// Gather layers for eviction.
// NB: all the checks can be invalidated as soon as we release the layer map lock.
@@ -254,8 +269,55 @@ impl Timeline {
ControlFlow::Continue(())
}
async fn imitate_layer_accesses(
&self,
p: &EvictionPolicyLayerAccessThreshold,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> ControlFlow<()> {
let mut state = self.eviction_task_timeline_state.lock().await;
match state.last_layer_access_imitation {
Some(ts) if ts.elapsed() < p.threshold => { /* no need to run */ }
_ => {
self.imitate_timeline_cached_layer_accesses(cancel, ctx)
.await;
state.last_layer_access_imitation = Some(tokio::time::Instant::now())
}
}
drop(state);
if cancel.is_cancelled() {
return ControlFlow::Break(());
}
// This task is timeline-scoped, but the synthetic size calculation is tenant-scoped.
// Make one of the tenant's timelines draw the short straw and run the calculation.
// The others wait until the calculation is done so that they take into account the
// imitated accesses that the winner made.
let Ok(tenant) = crate::tenant::mgr::get_tenant(self.tenant_id, true).await else {
// likely, we're shutting down
return ControlFlow::Break(());
};
let mut state = tenant.eviction_task_tenant_state.lock().await;
match state.last_layer_access_imitation {
Some(ts) if ts.elapsed() < p.threshold => { /* no need to run */ }
_ => {
self.imitate_synthetic_size_calculation_worker(&tenant, ctx, cancel)
.await;
state.last_layer_access_imitation = Some(tokio::time::Instant::now());
}
}
drop(state);
if cancel.is_cancelled() {
return ControlFlow::Break(());
}
ControlFlow::Continue(())
}
/// Recompute the values which would cause on-demand downloads during restart.
async fn refresh_layers_required_in_restart(
async fn imitate_timeline_cached_layer_accesses(
&self,
cancel: &CancellationToken,
ctx: &RequestContext,
@@ -289,4 +351,61 @@ impl Timeline {
}
}
}
// Imitate the synthetic size calculation done by the consumption_metrics module.
async fn imitate_synthetic_size_calculation_worker(
&self,
tenant: &Arc<Tenant>,
ctx: &RequestContext,
cancel: &CancellationToken,
) {
if self.conf.metric_collection_endpoint.is_none() {
// We don't start the consumption metrics task if this is not set in the config.
// So, no need to imitate the accesses in that case.
return;
}
// The consumption metrics are collected on a per-tenant basis, by a single
// global background loop.
// It limits the number of synthetic size calculations using the global
// `concurrent_tenant_size_logical_size_queries` semaphore to not overload
// the pageserver. (size calculation is somewhat expensive in terms of CPU and IOs).
//
// If we used that same semaphore here, then we'd compete for the
// same permits, which may impact timeliness of consumption metrics.
// That is a no-go, as consumption metrics are much more important
// than what we do here.
//
// So, we have a separate semaphore, initialized to the same
// number of permits as the `concurrent_tenant_size_logical_size_queries`.
// In the worst, we would have twice the amount of concurrenct size calculations.
// But in practice, the `p.threshold` >> `consumption metric interval`, and
// we spread out the eviction task using `random_init_delay`.
// So, the chance of the worst case is quite low in practice.
// It runs as a per-tenant task, but the eviction_task.rs is per-timeline.
// So, we must coordinate with other with other eviction tasks of this tenant.
let limit = self
.conf
.eviction_task_immitated_concurrent_logical_size_queries
.inner();
let mut throwaway_cache = HashMap::new();
let gather =
crate::tenant::size::gather_inputs(tenant, limit, None, &mut throwaway_cache, ctx);
tokio::select! {
_ = cancel.cancelled() => {}
gather_result = gather => {
match gather_result {
Ok(_) => {},
Err(e) => {
// We don't care about the result, but, if it failed, we should log it,
// since consumption metric might be hitting the cached value and
// thus not encountering this error.
warn!("failed to imitate synthetic size calculation accesses: {e:#}")
}
}
}
}
}
}

View File

@@ -237,11 +237,7 @@ async fn connection_manager_loop_step(
if let Some(new_candidate) = walreceiver_state.next_connection_candidate() {
info!("Switching to new connection candidate: {new_candidate:?}");
walreceiver_state
.change_connection(
new_candidate.safekeeper_id,
new_candidate.wal_source_connconf,
ctx,
)
.change_connection(new_candidate, ctx)
.await
}
}
@@ -346,6 +342,8 @@ struct WalConnection {
started_at: NaiveDateTime,
/// Current safekeeper pageserver is connected to for WAL streaming.
sk_id: NodeId,
/// Availability zone of the safekeeper.
availability_zone: Option<String>,
/// Status of the connection.
status: WalConnectionStatus,
/// WAL streaming task handle.
@@ -405,12 +403,7 @@ impl WalreceiverState {
}
/// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
async fn change_connection(
&mut self,
new_sk_id: NodeId,
new_wal_source_connconf: PgConnectionConfig,
ctx: &RequestContext,
) {
async fn change_connection(&mut self, new_sk: NewWalConnectionCandidate, ctx: &RequestContext) {
self.drop_old_connection(true).await;
let id = self.id;
@@ -424,7 +417,7 @@ impl WalreceiverState {
async move {
super::walreceiver_connection::handle_walreceiver_connection(
timeline,
new_wal_source_connconf,
new_sk.wal_source_connconf,
events_sender,
cancellation,
connect_timeout,
@@ -433,13 +426,16 @@ impl WalreceiverState {
.await
.context("walreceiver connection handling failure")
}
.instrument(info_span!("walreceiver_connection", id = %id, node_id = %new_sk_id))
.instrument(
info_span!("walreceiver_connection", id = %id, node_id = %new_sk.safekeeper_id),
)
});
let now = Utc::now().naive_utc();
self.wal_connection = Some(WalConnection {
started_at: now,
sk_id: new_sk_id,
sk_id: new_sk.safekeeper_id,
availability_zone: new_sk.availability_zone,
status: WalConnectionStatus {
is_connected: false,
has_processed_wal: false,
@@ -546,6 +542,7 @@ impl WalreceiverState {
/// * if connected safekeeper is not present, pick the candidate
/// * if we haven't received any updates for some time, pick the candidate
/// * if the candidate commit_lsn is much higher than the current one, pick the candidate
/// * if the candidate commit_lsn is same, but candidate is located in the same AZ as the pageserver, pick the candidate
/// * if connected safekeeper stopped sending us new WAL which is available on other safekeeper, pick the candidate
///
/// This way we ensure to keep up with the most up-to-date safekeeper and don't try to jump from one safekeeper to another too frequently.
@@ -559,6 +556,7 @@ impl WalreceiverState {
let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
self.select_connection_candidate(Some(connected_sk_node))?;
let new_availability_zone = new_safekeeper_broker_data.availability_zone.clone();
let now = Utc::now().naive_utc();
if let Ok(latest_interaciton) =
@@ -569,6 +567,7 @@ impl WalreceiverState {
return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
wal_source_connconf: new_wal_source_connconf,
availability_zone: new_availability_zone,
reason: ReconnectReason::NoKeepAlives {
last_keep_alive: Some(
existing_wal_connection.status.latest_connection_update,
@@ -594,6 +593,7 @@ impl WalreceiverState {
return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
wal_source_connconf: new_wal_source_connconf,
availability_zone: new_availability_zone,
reason: ReconnectReason::LaggingWal {
current_commit_lsn,
new_commit_lsn,
@@ -601,6 +601,20 @@ impl WalreceiverState {
},
});
}
// If we have a candidate with the same commit_lsn as the current one, which is in the same AZ as pageserver,
// and the current one is not, switch to the new one.
if self.availability_zone.is_some()
&& existing_wal_connection.availability_zone
!= self.availability_zone
&& self.availability_zone == new_availability_zone
{
return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
availability_zone: new_availability_zone,
wal_source_connconf: new_wal_source_connconf,
reason: ReconnectReason::SwitchAvailabilityZone,
});
}
}
None => debug!(
"Best SK candidate has its commit_lsn behind connected SK's commit_lsn"
@@ -668,6 +682,7 @@ impl WalreceiverState {
return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
wal_source_connconf: new_wal_source_connconf,
availability_zone: new_availability_zone,
reason: ReconnectReason::NoWalTimeout {
current_lsn,
current_commit_lsn,
@@ -686,10 +701,11 @@ impl WalreceiverState {
self.wal_connection.as_mut().unwrap().discovered_new_wal = discovered_new_wal;
}
None => {
let (new_sk_id, _, new_wal_source_connconf) =
let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
self.select_connection_candidate(None)?;
return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
availability_zone: new_safekeeper_broker_data.availability_zone.clone(),
wal_source_connconf: new_wal_source_connconf,
reason: ReconnectReason::NoExistingConnection,
});
@@ -794,6 +810,7 @@ impl WalreceiverState {
struct NewWalConnectionCandidate {
safekeeper_id: NodeId,
wal_source_connconf: PgConnectionConfig,
availability_zone: Option<String>,
// This field is used in `derive(Debug)` only.
#[allow(dead_code)]
reason: ReconnectReason,
@@ -808,6 +825,7 @@ enum ReconnectReason {
new_commit_lsn: Lsn,
threshold: NonZeroU64,
},
SwitchAvailabilityZone,
NoWalTimeout {
current_lsn: Lsn,
current_commit_lsn: Lsn,
@@ -873,6 +891,7 @@ mod tests {
peer_horizon_lsn: 0,
local_start_lsn: 0,
safekeeper_connstr: safekeeper_connstr.to_owned(),
availability_zone: None,
},
latest_update,
}
@@ -933,6 +952,7 @@ mod tests {
state.wal_connection = Some(WalConnection {
started_at: now,
sk_id: connected_sk_id,
availability_zone: None,
status: connection_status,
connection_task: TaskHandle::spawn(move |sender, _| async move {
sender
@@ -1095,6 +1115,7 @@ mod tests {
state.wal_connection = Some(WalConnection {
started_at: now,
sk_id: connected_sk_id,
availability_zone: None,
status: connection_status,
connection_task: TaskHandle::spawn(move |sender, _| async move {
sender
@@ -1160,6 +1181,7 @@ mod tests {
state.wal_connection = Some(WalConnection {
started_at: now,
sk_id: NodeId(1),
availability_zone: None,
status: connection_status,
connection_task: TaskHandle::spawn(move |sender, _| async move {
sender
@@ -1222,6 +1244,7 @@ mod tests {
state.wal_connection = Some(WalConnection {
started_at: now,
sk_id: NodeId(1),
availability_zone: None,
status: connection_status,
connection_task: TaskHandle::spawn(move |_, _| async move { Ok(()) }),
discovered_new_wal: Some(NewCommittedWAL {
@@ -1289,4 +1312,74 @@ mod tests {
availability_zone: None,
}
}
#[tokio::test]
async fn switch_to_same_availability_zone() -> anyhow::Result<()> {
// Pageserver and one of safekeepers will be in the same availability zone
// and pageserver should prefer to connect to it.
let test_az = Some("test_az".to_owned());
let harness = TenantHarness::create("switch_to_same_availability_zone")?;
let mut state = dummy_state(&harness).await;
state.availability_zone = test_az.clone();
let current_lsn = Lsn(100_000).align();
let now = Utc::now().naive_utc();
let connected_sk_id = NodeId(0);
let connection_status = WalConnectionStatus {
is_connected: true,
has_processed_wal: true,
latest_connection_update: now,
latest_wal_update: now,
commit_lsn: Some(current_lsn),
streaming_lsn: Some(current_lsn),
};
state.wal_connection = Some(WalConnection {
started_at: now,
sk_id: connected_sk_id,
availability_zone: None,
status: connection_status,
connection_task: TaskHandle::spawn(move |sender, _| async move {
sender
.send(TaskStateUpdate::Progress(connection_status))
.ok();
Ok(())
}),
discovered_new_wal: None,
});
// We have another safekeeper with the same commit_lsn, and it have the same availability zone as
// the current pageserver.
let mut same_az_sk = dummy_broker_sk_timeline(current_lsn.0, "same_az", now);
same_az_sk.timeline.availability_zone = test_az.clone();
state.wal_stream_candidates = HashMap::from([
(
connected_sk_id,
dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
),
(NodeId(1), same_az_sk),
]);
// We expect that pageserver will switch to the safekeeper in the same availability zone,
// even if it has the same commit_lsn.
let next_candidate = state.next_connection_candidate().expect(
"Expected one candidate selected out of multiple valid data options, but got none",
);
assert_eq!(next_candidate.safekeeper_id, NodeId(1));
assert_eq!(
next_candidate.reason,
ReconnectReason::SwitchAvailabilityZone,
"Should switch to the safekeeper in the same availability zone, if it has the same commit_lsn"
);
assert_eq!(
next_candidate.wal_source_connconf.host(),
&Host::Domain("same_az".to_owned())
);
Ok(())
}
}

View File

@@ -2,6 +2,7 @@
use std::{
error::Error,
pin::pin,
str::FromStr,
sync::Arc,
time::{Duration, SystemTime},
@@ -17,7 +18,7 @@ use postgres_ffi::v14::xlog_utils::normalize_lsn;
use postgres_ffi::WAL_SEGMENT_SIZE;
use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
use tokio::{pin, select, sync::watch, time};
use tokio::{select, sync::watch, time};
use tokio_postgres::{replication::ReplicationStream, Client};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, trace, warn};
@@ -36,7 +37,7 @@ use crate::{
use postgres_backend::is_expected_io_error;
use postgres_connection::PgConnectionConfig;
use postgres_ffi::waldecoder::WalStreamDecoder;
use pq_proto::ReplicationFeedback;
use pq_proto::PageserverFeedback;
use utils::lsn::Lsn;
/// Status of the connection.
@@ -187,8 +188,7 @@ pub async fn handle_walreceiver_connection(
let query = format!("START_REPLICATION PHYSICAL {startpoint}");
let copy_stream = replication_client.copy_both_simple(&query).await?;
let physical_stream = ReplicationStream::new(copy_stream);
pin!(physical_stream);
let mut physical_stream = pin!(ReplicationStream::new(copy_stream));
let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version);
@@ -319,12 +319,12 @@ pub async fn handle_walreceiver_connection(
timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0));
// The last LSN we processed. It is not guaranteed to survive pageserver crash.
let write_lsn = u64::from(last_lsn);
let last_received_lsn = u64::from(last_lsn);
// `disk_consistent_lsn` is the LSN at which page server guarantees local persistence of all received data
let flush_lsn = u64::from(timeline.get_disk_consistent_lsn());
let disk_consistent_lsn = u64::from(timeline.get_disk_consistent_lsn());
// The last LSN that is synced to remote storage and is guaranteed to survive pageserver crash
// Used by safekeepers to remove WAL preceding `remote_consistent_lsn`.
let apply_lsn = u64::from(timeline_remote_consistent_lsn);
let remote_consistent_lsn = u64::from(timeline_remote_consistent_lsn);
let ts = SystemTime::now();
// Update the status about what we just received. This is shown in the mgmt API.
@@ -343,12 +343,12 @@ pub async fn handle_walreceiver_connection(
let (timeline_logical_size, _) = timeline
.get_current_logical_size(&ctx)
.context("Status update creation failed to get current logical size")?;
let status_update = ReplicationFeedback {
let status_update = PageserverFeedback {
current_timeline_size: timeline_logical_size,
ps_writelsn: write_lsn,
ps_flushlsn: flush_lsn,
ps_applylsn: apply_lsn,
ps_replytime: ts,
last_received_lsn,
disk_consistent_lsn,
remote_consistent_lsn,
replytime: ts,
};
debug!("neon_status_update {status_update:?}");

View File

@@ -14,6 +14,7 @@
*/
#include <sys/file.h>
#include <sys/statvfs.h>
#include <unistd.h>
#include <fcntl.h>
@@ -34,6 +35,9 @@
#include "storage/fd.h"
#include "storage/pg_shmem.h"
#include "storage/buf_internals.h"
#include "storage/procsignal.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
/*
* Local file cache is used to temporary store relations pages in local file system.
@@ -59,6 +63,9 @@
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK))
#define MAX_MONITOR_INTERVAL_USEC 1000000 /* 1 second */
#define MAX_DISK_WRITE_RATE 1000 /* MB/sec */
typedef struct FileCacheEntry
{
BufferTag key;
@@ -71,6 +78,7 @@ typedef struct FileCacheEntry
typedef struct FileCacheControl
{
uint32 size; /* size of cache file in chunks */
uint32 used; /* number of used chunks */
dlist_head lru; /* double linked list for LRU replacement algorithm */
} FileCacheControl;
@@ -79,12 +87,14 @@ static int lfc_desc;
static LWLockId lfc_lock;
static int lfc_max_size;
static int lfc_size_limit;
static int lfc_free_space_watermark;
static char* lfc_path;
static FileCacheControl* lfc_ctl;
static shmem_startup_hook_type prev_shmem_startup_hook;
#if PG_VERSION_NUM>=150000
static shmem_request_hook_type prev_shmem_request_hook;
#endif
static int lfc_shrinking_factor; /* power of two by which local cache size will be shrinked when lfc_free_space_watermark is reached */
static void
lfc_shmem_startup(void)
@@ -112,6 +122,7 @@ lfc_shmem_startup(void)
&info,
HASH_ELEM | HASH_BLOBS);
lfc_ctl->size = 0;
lfc_ctl->used = 0;
dlist_init(&lfc_ctl->lru);
/* Remove file cache on restart */
@@ -165,7 +176,7 @@ lfc_change_limit_hook(int newval, void *extra)
}
}
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
while (new_size < lfc_ctl->size && !dlist_is_empty(&lfc_ctl->lru))
while (new_size < lfc_ctl->used && !dlist_is_empty(&lfc_ctl->lru))
{
/* Shrink cache by throwing away least recently accessed chunks and returning their space to file system */
FileCacheEntry* victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru));
@@ -175,12 +186,86 @@ lfc_change_limit_hook(int newval, void *extra)
elog(LOG, "Failed to punch hole in file: %m");
#endif
hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL);
lfc_ctl->size -= 1;
lfc_ctl->used -= 1;
}
elog(LOG, "set local file cache limit to %d", new_size);
LWLockRelease(lfc_lock);
}
/*
* Local file system state monitor check available free space.
* If it is lower than lfc_free_space_watermark then we shrink size of local cache
* but throwing away least recently accessed chunks.
* First time low space watermark is reached cache size is divided by two,
* second time by four,... Finally we remove all chunks from local cache.
*
* Please notice that we are not changing lfc_cache_size: it is used to be adjusted by autoscaler.
* We only throw away cached chunks but do not prevent from filling cache by new chunks.
*
* Interval of poooling cache state is calculated as minimal time needed to consume lfc_free_space_watermark
* disk space with maximal possible disk write speed (1Gb/sec). But not larger than 1 second.
* Calling statvfs each second should not add any noticeable overhead.
*/
void
FileCacheMonitorMain(Datum main_arg)
{
/*
* Choose file system state monitor interval so that space can not be exosted
* during this period but not longer than MAX_MONITOR_INTERVAL (10 sec)
*/
uint64 monitor_interval = Min(MAX_MONITOR_INTERVAL_USEC, lfc_free_space_watermark*MB/MAX_DISK_WRITE_RATE);
/* Establish signal handlers. */
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGTERM, SignalHandlerForShutdownRequest);
BackgroundWorkerUnblockSignals();
/* Periodically dump buffers until terminated. */
while (!ShutdownRequestPending)
{
if (lfc_size_limit != 0)
{
struct statvfs sfs;
if (statvfs(lfc_path, &sfs) < 0)
{
elog(WARNING, "Failed to obtain status of %s: %m", lfc_path);
}
else
{
if (sfs.f_bavail*sfs.f_bsize < lfc_free_space_watermark*MB)
{
if (lfc_shrinking_factor < 31) {
lfc_shrinking_factor += 1;
}
lfc_change_limit_hook(lfc_size_limit >> lfc_shrinking_factor, NULL);
}
else
lfc_shrinking_factor = 0; /* reset to initial value */
}
}
pg_usleep(monitor_interval);
}
}
static void
lfc_register_free_space_monitor(void)
{
BackgroundWorker bgw;
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon");
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "FileCacheMonitorMain");
snprintf(bgw.bgw_name, BGW_MAXLEN, "Local free space monitor");
snprintf(bgw.bgw_type, BGW_MAXLEN, "Local free space monitor");
bgw.bgw_restart_time = 5;
bgw.bgw_notify_pid = 0;
bgw.bgw_main_arg = (Datum) 0;
RegisterBackgroundWorker(&bgw);
}
void
lfc_init(void)
{
@@ -217,6 +302,19 @@ lfc_init(void)
lfc_change_limit_hook,
NULL);
DefineCustomIntVariable("neon.free_space_watermark",
"Minimal free space in local file system after reaching which local file cache will be truncated",
NULL,
&lfc_free_space_watermark,
1024, /* 1GB */
0,
INT_MAX,
PGC_SIGHUP,
GUC_UNIT_MB,
NULL,
NULL,
NULL);
DefineCustomStringVariable("neon.file_cache_path",
"Path to local file cache (can be raw device)",
NULL,
@@ -231,6 +329,9 @@ lfc_init(void)
if (lfc_max_size == 0)
return;
if (lfc_free_space_watermark != 0)
lfc_register_free_space_monitor();
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = lfc_shmem_startup;
#if PG_VERSION_NUM>=150000
@@ -380,7 +481,7 @@ lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
* there are should be very large number of concurrent IO operations and them are limited by max_connections,
* we prefer not to complicate code and use second approach.
*/
if (lfc_ctl->size >= SIZE_MB_TO_CHUNKS(lfc_size_limit) && !dlist_is_empty(&lfc_ctl->lru))
if (lfc_ctl->used >= SIZE_MB_TO_CHUNKS(lfc_size_limit) && !dlist_is_empty(&lfc_ctl->lru))
{
/* Cache overflow: evict least recently used chunk */
FileCacheEntry* victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru));
@@ -390,7 +491,10 @@ lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
elog(LOG, "Swap file cache page");
}
else
{
lfc_ctl->used += 1;
entry->offset = lfc_ctl->size++; /* allocate new chunk at end of file */
}
entry->access_count = 1;
memset(entry->bitmap, 0, sizeof entry->bitmap);
}

View File

@@ -1872,9 +1872,9 @@ RecvAppendResponses(Safekeeper *sk)
return sk->state == SS_ACTIVE;
}
/* Parse a ReplicationFeedback message, or the ReplicationFeedback part of an AppendResponse */
/* Parse a PageserverFeedback message, or the PageserverFeedback part of an AppendResponse */
void
ParseReplicationFeedbackMessage(StringInfo reply_message, ReplicationFeedback * rf)
ParsePageserverFeedbackMessage(StringInfo reply_message, PageserverFeedback * rf)
{
uint8 nkeys;
int i;
@@ -1892,45 +1892,45 @@ ParseReplicationFeedbackMessage(StringInfo reply_message, ReplicationFeedback *
pq_getmsgint(reply_message, sizeof(int32));
/* read value length */
rf->currentClusterSize = pq_getmsgint64(reply_message);
elog(DEBUG2, "ParseReplicationFeedbackMessage: current_timeline_size %lu",
elog(DEBUG2, "ParsePageserverFeedbackMessage: current_timeline_size %lu",
rf->currentClusterSize);
}
else if (strcmp(key, "ps_writelsn") == 0)
else if ((strcmp(key, "ps_writelsn") == 0) || (strcmp(key, "last_received_lsn") == 0))
{
pq_getmsgint(reply_message, sizeof(int32));
/* read value length */
rf->ps_writelsn = pq_getmsgint64(reply_message);
elog(DEBUG2, "ParseReplicationFeedbackMessage: ps_writelsn %X/%X",
LSN_FORMAT_ARGS(rf->ps_writelsn));
rf->last_received_lsn = pq_getmsgint64(reply_message);
elog(DEBUG2, "ParsePageserverFeedbackMessage: last_received_lsn %X/%X",
LSN_FORMAT_ARGS(rf->last_received_lsn));
}
else if (strcmp(key, "ps_flushlsn") == 0)
else if ((strcmp(key, "ps_flushlsn") == 0) || (strcmp(key, "disk_consistent_lsn") == 0))
{
pq_getmsgint(reply_message, sizeof(int32));
/* read value length */
rf->ps_flushlsn = pq_getmsgint64(reply_message);
elog(DEBUG2, "ParseReplicationFeedbackMessage: ps_flushlsn %X/%X",
LSN_FORMAT_ARGS(rf->ps_flushlsn));
rf->disk_consistent_lsn = pq_getmsgint64(reply_message);
elog(DEBUG2, "ParsePageserverFeedbackMessage: disk_consistent_lsn %X/%X",
LSN_FORMAT_ARGS(rf->disk_consistent_lsn));
}
else if (strcmp(key, "ps_applylsn") == 0)
else if ((strcmp(key, "ps_applylsn") == 0) || (strcmp(key, "remote_consistent_lsn") == 0))
{
pq_getmsgint(reply_message, sizeof(int32));
/* read value length */
rf->ps_applylsn = pq_getmsgint64(reply_message);
elog(DEBUG2, "ParseReplicationFeedbackMessage: ps_applylsn %X/%X",
LSN_FORMAT_ARGS(rf->ps_applylsn));
rf->remote_consistent_lsn = pq_getmsgint64(reply_message);
elog(DEBUG2, "ParsePageserverFeedbackMessage: remote_consistent_lsn %X/%X",
LSN_FORMAT_ARGS(rf->remote_consistent_lsn));
}
else if (strcmp(key, "ps_replytime") == 0)
else if ((strcmp(key, "ps_replytime") == 0) || (strcmp(key, "replytime") == 0))
{
pq_getmsgint(reply_message, sizeof(int32));
/* read value length */
rf->ps_replytime = pq_getmsgint64(reply_message);
rf->replytime = pq_getmsgint64(reply_message);
{
char *replyTimeStr;
/* Copy because timestamptz_to_str returns a static buffer */
replyTimeStr = pstrdup(timestamptz_to_str(rf->ps_replytime));
elog(DEBUG2, "ParseReplicationFeedbackMessage: ps_replytime %lu reply_time: %s",
rf->ps_replytime, replyTimeStr);
replyTimeStr = pstrdup(timestamptz_to_str(rf->replytime));
elog(DEBUG2, "ParsePageserverFeedbackMessage: replytime %lu reply_time: %s",
rf->replytime, replyTimeStr);
pfree(replyTimeStr);
}
@@ -1944,7 +1944,7 @@ ParseReplicationFeedbackMessage(StringInfo reply_message, ReplicationFeedback *
* Skip unknown keys to support backward compatibile protocol
* changes
*/
elog(LOG, "ParseReplicationFeedbackMessage: unknown key: %s len %d", key, len);
elog(LOG, "ParsePageserverFeedbackMessage: unknown key: %s len %d", key, len);
pq_getmsgbytes(reply_message, len);
};
}
@@ -2024,7 +2024,7 @@ GetAcknowledgedByQuorumWALPosition(void)
}
/*
* ReplicationFeedbackShmemSize --- report amount of shared memory space needed
* WalproposerShmemSize --- report amount of shared memory space needed
*/
Size
WalproposerShmemSize(void)
@@ -2054,10 +2054,10 @@ WalproposerShmemInit(void)
}
void
replication_feedback_set(ReplicationFeedback * rf)
replication_feedback_set(PageserverFeedback * rf)
{
SpinLockAcquire(&walprop_shared->mutex);
memcpy(&walprop_shared->feedback, rf, sizeof(ReplicationFeedback));
memcpy(&walprop_shared->feedback, rf, sizeof(PageserverFeedback));
SpinLockRelease(&walprop_shared->mutex);
}
@@ -2065,43 +2065,43 @@ void
replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn)
{
SpinLockAcquire(&walprop_shared->mutex);
*writeLsn = walprop_shared->feedback.ps_writelsn;
*flushLsn = walprop_shared->feedback.ps_flushlsn;
*applyLsn = walprop_shared->feedback.ps_applylsn;
*writeLsn = walprop_shared->feedback.last_received_lsn;
*flushLsn = walprop_shared->feedback.disk_consistent_lsn;
*applyLsn = walprop_shared->feedback.remote_consistent_lsn;
SpinLockRelease(&walprop_shared->mutex);
}
/*
* Get ReplicationFeedback fields from the most advanced safekeeper
* Get PageserverFeedback fields from the most advanced safekeeper
*/
static void
GetLatestNeonFeedback(ReplicationFeedback * rf)
GetLatestNeonFeedback(PageserverFeedback * rf)
{
int latest_safekeeper = 0;
XLogRecPtr ps_writelsn = InvalidXLogRecPtr;
XLogRecPtr last_received_lsn = InvalidXLogRecPtr;
for (int i = 0; i < n_safekeepers; i++)
{
if (safekeeper[i].appendResponse.rf.ps_writelsn > ps_writelsn)
if (safekeeper[i].appendResponse.rf.last_received_lsn > last_received_lsn)
{
latest_safekeeper = i;
ps_writelsn = safekeeper[i].appendResponse.rf.ps_writelsn;
last_received_lsn = safekeeper[i].appendResponse.rf.last_received_lsn;
}
}
rf->currentClusterSize = safekeeper[latest_safekeeper].appendResponse.rf.currentClusterSize;
rf->ps_writelsn = safekeeper[latest_safekeeper].appendResponse.rf.ps_writelsn;
rf->ps_flushlsn = safekeeper[latest_safekeeper].appendResponse.rf.ps_flushlsn;
rf->ps_applylsn = safekeeper[latest_safekeeper].appendResponse.rf.ps_applylsn;
rf->ps_replytime = safekeeper[latest_safekeeper].appendResponse.rf.ps_replytime;
rf->last_received_lsn = safekeeper[latest_safekeeper].appendResponse.rf.last_received_lsn;
rf->disk_consistent_lsn = safekeeper[latest_safekeeper].appendResponse.rf.disk_consistent_lsn;
rf->remote_consistent_lsn = safekeeper[latest_safekeeper].appendResponse.rf.remote_consistent_lsn;
rf->replytime = safekeeper[latest_safekeeper].appendResponse.rf.replytime;
elog(DEBUG2, "GetLatestNeonFeedback: currentClusterSize %lu,"
" ps_writelsn %X/%X, ps_flushlsn %X/%X, ps_applylsn %X/%X, ps_replytime %lu",
" last_received_lsn %X/%X, disk_consistent_lsn %X/%X, remote_consistent_lsn %X/%X, replytime %lu",
rf->currentClusterSize,
LSN_FORMAT_ARGS(rf->ps_writelsn),
LSN_FORMAT_ARGS(rf->ps_flushlsn),
LSN_FORMAT_ARGS(rf->ps_applylsn),
rf->ps_replytime);
LSN_FORMAT_ARGS(rf->last_received_lsn),
LSN_FORMAT_ARGS(rf->disk_consistent_lsn),
LSN_FORMAT_ARGS(rf->remote_consistent_lsn),
rf->replytime);
replication_feedback_set(rf);
}
@@ -2115,16 +2115,16 @@ HandleSafekeeperResponse(void)
XLogRecPtr minFlushLsn;
minQuorumLsn = GetAcknowledgedByQuorumWALPosition();
diskConsistentLsn = quorumFeedback.rf.ps_flushlsn;
diskConsistentLsn = quorumFeedback.rf.disk_consistent_lsn;
if (!syncSafekeepers)
{
/* Get ReplicationFeedback fields from the most advanced safekeeper */
/* Get PageserverFeedback fields from the most advanced safekeeper */
GetLatestNeonFeedback(&quorumFeedback.rf);
SetZenithCurrentClusterSize(quorumFeedback.rf.currentClusterSize);
}
if (minQuorumLsn > quorumFeedback.flushLsn || diskConsistentLsn != quorumFeedback.rf.ps_flushlsn)
if (minQuorumLsn > quorumFeedback.flushLsn || diskConsistentLsn != quorumFeedback.rf.disk_consistent_lsn)
{
if (minQuorumLsn > quorumFeedback.flushLsn)
@@ -2142,7 +2142,7 @@ HandleSafekeeperResponse(void)
* apply_lsn - This is what processed and durably saved at*
* pageserver.
*/
quorumFeedback.rf.ps_flushlsn,
quorumFeedback.rf.disk_consistent_lsn,
GetCurrentTimestamp(), false);
}
@@ -2326,7 +2326,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage * anymsg)
msg->hs.xmin.value = pq_getmsgint64_le(&s);
msg->hs.catalog_xmin.value = pq_getmsgint64_le(&s);
if (buf_size > APPENDRESPONSE_FIXEDPART_SIZE)
ParseReplicationFeedbackMessage(&s, &msg->rf);
ParsePageserverFeedbackMessage(&s, &msg->rf);
pq_getmsgend(&s);
return true;
}
@@ -2462,7 +2462,7 @@ backpressure_lag_impl(void)
replication_feedback_get_lsns(&writePtr, &flushPtr, &applyPtr);
#define MB ((XLogRecPtr)1024 * 1024)
elog(DEBUG2, "current flushLsn %X/%X ReplicationFeedback: write %X/%X flush %X/%X apply %X/%X",
elog(DEBUG2, "current flushLsn %X/%X PageserverFeedback: write %X/%X flush %X/%X apply %X/%X",
LSN_FORMAT_ARGS(myFlushLsn),
LSN_FORMAT_ARGS(writePtr),
LSN_FORMAT_ARGS(flushPtr),

View File

@@ -280,21 +280,21 @@ typedef struct HotStandbyFeedback
FullTransactionId catalog_xmin;
} HotStandbyFeedback;
typedef struct ReplicationFeedback
typedef struct PageserverFeedback
{
/* current size of the timeline on pageserver */
uint64 currentClusterSize;
/* standby_status_update fields that safekeeper received from pageserver */
XLogRecPtr ps_writelsn;
XLogRecPtr ps_flushlsn;
XLogRecPtr ps_applylsn;
TimestampTz ps_replytime;
} ReplicationFeedback;
XLogRecPtr last_received_lsn;
XLogRecPtr disk_consistent_lsn;
XLogRecPtr remote_consistent_lsn;
TimestampTz replytime;
} PageserverFeedback;
typedef struct WalproposerShmemState
{
slock_t mutex;
ReplicationFeedback feedback;
PageserverFeedback feedback;
term_t mineLastElectedTerm;
pg_atomic_uint64 backpressureThrottlingTime;
} WalproposerShmemState;
@@ -320,10 +320,10 @@ typedef struct AppendResponse
/* Feedback recieved from pageserver includes standby_status_update fields */
/* and custom neon feedback. */
/* This part of the message is extensible. */
ReplicationFeedback rf;
PageserverFeedback rf;
} AppendResponse;
/* ReplicationFeedback is extensible part of the message that is parsed separately */
/* PageserverFeedback is extensible part of the message that is parsed separately */
/* Other fields are fixed part */
#define APPENDRESPONSE_FIXEDPART_SIZE offsetof(AppendResponse, rf)
@@ -383,13 +383,13 @@ extern void WalProposerSync(int argc, char *argv[]);
extern void WalProposerMain(Datum main_arg);
extern void WalProposerBroadcast(XLogRecPtr startpos, XLogRecPtr endpos);
extern void WalProposerPoll(void);
extern void ParseReplicationFeedbackMessage(StringInfo reply_message,
ReplicationFeedback *rf);
extern void ParsePageserverFeedbackMessage(StringInfo reply_message,
PageserverFeedback *rf);
extern void StartProposerReplication(StartReplicationCmd *cmd);
extern Size WalproposerShmemSize(void);
extern bool WalproposerShmemInit(void);
extern void replication_feedback_set(ReplicationFeedback *rf);
extern void replication_feedback_set(PageserverFeedback *rf);
extern void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn);
/* libpqwalproposer hooks & helper type */

38
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.4.0 and should not be changed by hand.
# This file is automatically @generated by Poetry 1.4.1 and should not be changed by hand.
[[package]]
name = "aiohttp"
@@ -79,37 +79,35 @@ sa = ["sqlalchemy[postgresql-psycopg2binary] (>=1.3,<1.5)"]
[[package]]
name = "allure-pytest"
version = "2.10.0"
version = "2.13.1"
description = "Allure pytest integration"
category = "main"
optional = false
python-versions = "*"
files = [
{file = "allure-pytest-2.10.0.tar.gz", hash = "sha256:3b2ab67629f4cbd8617abd817d2b22292c6eb7efd5584f992d1af8143aea6ee7"},
{file = "allure_pytest-2.10.0-py3-none-any.whl", hash = "sha256:08274096594758447db54c3b2c382526ee04f1fe12119cdaee92d2d93c84b530"},
{file = "allure-pytest-2.13.1.tar.gz", hash = "sha256:68d69456eeb65af4061ec06a80bc941163b0616e8216554d36b070a6bf070e08"},
{file = "allure_pytest-2.13.1-py3-none-any.whl", hash = "sha256:a8de2fc3b3effe2d8f98801646920de3f055b779710f4c806dbee7c613c24633"},
]
[package.dependencies]
allure-python-commons = "2.10.0"
allure-python-commons = "2.13.1"
pytest = ">=4.5.0"
six = ">=1.9.0"
[[package]]
name = "allure-python-commons"
version = "2.10.0"
version = "2.13.1"
description = "Common module for integrate allure with python-based frameworks"
category = "main"
optional = false
python-versions = ">=3.5"
python-versions = ">=3.6"
files = [
{file = "allure-python-commons-2.10.0.tar.gz", hash = "sha256:d4d31344b0f0037a4a11e16b91b28cf0eeb23ffa0e50c27fcfc6aabe72212d3c"},
{file = "allure_python_commons-2.10.0-py3-none-any.whl", hash = "sha256:2a717e8ca8d296bf89cd57f38fc3c21893bd7ea8cd02a6ae5420e6d1a6eda5d0"},
{file = "allure-python-commons-2.13.1.tar.gz", hash = "sha256:3fc13e1da8ebb23f9ab5c9c72ad04595023cdd5078dbb8604939997faebed5cb"},
{file = "allure_python_commons-2.13.1-py3-none-any.whl", hash = "sha256:d08e04867bddf44fef55def3d67f4bc25af58a1bf9fcffcf4ec3331f7f2ef0d0"},
]
[package.dependencies]
attrs = ">=16.0.0"
pluggy = ">=0.4.0"
six = ">=1.9.0"
[[package]]
name = "async-timeout"
@@ -1932,6 +1930,22 @@ pytest = [
{version = ">=6.2.4", markers = "python_version >= \"3.10\""},
]
[[package]]
name = "pytest-rerunfailures"
version = "11.1.2"
description = "pytest plugin to re-run tests to eliminate flaky failures"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
{file = "pytest-rerunfailures-11.1.2.tar.gz", hash = "sha256:55611661e873f1cafa384c82f08d07883954f4b76435f4b8a5b470c1954573de"},
{file = "pytest_rerunfailures-11.1.2-py3-none-any.whl", hash = "sha256:d21fe2e46d9774f8ad95f1aa799544ae95cac3a223477af94aa985adfae92b7e"},
]
[package.dependencies]
packaging = ">=17.1"
pytest = ">=5.3"
[[package]]
name = "pytest-timeout"
version = "2.1.0"
@@ -2597,4 +2611,4 @@ testing = ["func-timeout", "jaraco.itertools", "pytest (>=6)", "pytest-black (>=
[metadata]
lock-version = "2.0"
python-versions = "^3.9"
content-hash = "2515a9320c2960076012fbc036fb33c4f6a23515c8d143785931dc18c6722d91"
content-hash = "b689ffd6eae32b966f1744b5ac3343fe0dd26b31ee1f50e13daf5045ee0623e1"

View File

@@ -140,7 +140,7 @@ async fn auth_quirks(
impl BackendType<'_, ClientCredentials<'_>> {
/// Authenticate the client via the requested backend, possibly using credentials.
#[tracing::instrument(fields(allow_cleartext), skip_all)]
#[tracing::instrument(fields(allow_cleartext = allow_cleartext), skip_all)]
pub async fn authenticate(
&mut self,
extra: &ConsoleReqExtra<'_>,

View File

@@ -53,7 +53,7 @@ pub async fn password_hack(
.await?;
info!(project = &payload.project, "received missing parameter");
creds.project = Some(payload.project.into());
creds.project = Some(payload.project);
let mut node = api.wake_compute(extra, creds).await?;
node.config.password(payload.password);

View File

@@ -2,7 +2,7 @@
use crate::error::UserFacingError;
use pq_proto::StartupMessageParams;
use std::borrow::Cow;
use std::collections::HashSet;
use thiserror::Error;
use tracing::info;
@@ -19,11 +19,10 @@ pub enum ClientCredsParseError {
InconsistentProjectNames { domain: String, option: String },
#[error(
"SNI ('{}') inconsistently formatted with respect to common name ('{}'). \
SNI should be formatted as '<project-name>.{}'.",
.sni, .cn, .cn,
"Common name inferred from SNI ('{}') is not known",
.cn,
)]
InconsistentSni { sni: String, cn: String },
UnknownCommonName { cn: String },
#[error("Project name ('{0}') must contain only alphanumeric characters and hyphen.")]
MalformedProjectName(String),
@@ -37,7 +36,7 @@ impl UserFacingError for ClientCredsParseError {}
pub struct ClientCredentials<'a> {
pub user: &'a str,
// TODO: this is a severe misnomer! We should think of a new name ASAP.
pub project: Option<Cow<'a, str>>,
pub project: Option<String>,
}
impl ClientCredentials<'_> {
@@ -51,7 +50,7 @@ impl<'a> ClientCredentials<'a> {
pub fn parse(
params: &'a StartupMessageParams,
sni: Option<&str>,
common_name: Option<&str>,
common_names: Option<HashSet<String>>,
) -> Result<Self, ClientCredsParseError> {
use ClientCredsParseError::*;
@@ -60,37 +59,43 @@ impl<'a> ClientCredentials<'a> {
let user = get_param("user")?;
// Project name might be passed via PG's command-line options.
let project_option = params.options_raw().and_then(|mut options| {
options
.find_map(|opt| opt.strip_prefix("project="))
.map(Cow::Borrowed)
});
let project_option = params
.options_raw()
.and_then(|mut options| options.find_map(|opt| opt.strip_prefix("project=")))
.map(|name| name.to_string());
// Alternative project name is in fact a subdomain from SNI.
// NOTE: we do not consider SNI if `common_name` is missing.
let project_domain = sni
.zip(common_name)
.map(|(sni, cn)| {
subdomain_from_sni(sni, cn)
.ok_or_else(|| InconsistentSni {
sni: sni.into(),
cn: cn.into(),
let project_from_domain = if let Some(sni_str) = sni {
if let Some(cn) = common_names {
let common_name_from_sni = sni_str.split_once('.').map(|(_, domain)| domain);
let project = common_name_from_sni
.and_then(|domain| {
if cn.contains(domain) {
subdomain_from_sni(sni_str, domain)
} else {
None
}
})
.map(Cow::<'static, str>::Owned)
})
.transpose()?;
.ok_or_else(|| UnknownCommonName {
cn: common_name_from_sni.unwrap_or("").into(),
})?;
let project = match (project_option, project_domain) {
Some(project)
} else {
None
}
} else {
None
};
let project = match (project_option, project_from_domain) {
// Invariant: if we have both project name variants, they should match.
(Some(option), Some(domain)) if option != domain => {
Some(Err(InconsistentProjectNames {
domain: domain.into(),
option: option.into(),
}))
Some(Err(InconsistentProjectNames { domain, option }))
}
// Invariant: project name may not contain certain characters.
(a, b) => a.or(b).map(|name| match project_name_valid(&name) {
false => Err(MalformedProjectName(name.into())),
false => Err(MalformedProjectName(name)),
true => Ok(name),
}),
}
@@ -149,9 +154,9 @@ mod tests {
let options = StartupMessageParams::new([("user", "john_doe")]);
let sni = Some("foo.localhost");
let common_name = Some("localhost");
let common_names = Some(["localhost".into()].into());
let creds = ClientCredentials::parse(&options, sni, common_name)?;
let creds = ClientCredentials::parse(&options, sni, common_names)?;
assert_eq!(creds.user, "john_doe");
assert_eq!(creds.project.as_deref(), Some("foo"));
@@ -177,24 +182,41 @@ mod tests {
let options = StartupMessageParams::new([("user", "john_doe"), ("options", "project=baz")]);
let sni = Some("baz.localhost");
let common_name = Some("localhost");
let common_names = Some(["localhost".into()].into());
let creds = ClientCredentials::parse(&options, sni, common_name)?;
let creds = ClientCredentials::parse(&options, sni, common_names)?;
assert_eq!(creds.user, "john_doe");
assert_eq!(creds.project.as_deref(), Some("baz"));
Ok(())
}
#[test]
fn parse_multi_common_names() -> anyhow::Result<()> {
let options = StartupMessageParams::new([("user", "john_doe")]);
let common_names = Some(["a.com".into(), "b.com".into()].into());
let sni = Some("p1.a.com");
let creds = ClientCredentials::parse(&options, sni, common_names)?;
assert_eq!(creds.project.as_deref(), Some("p1"));
let common_names = Some(["a.com".into(), "b.com".into()].into());
let sni = Some("p1.b.com");
let creds = ClientCredentials::parse(&options, sni, common_names)?;
assert_eq!(creds.project.as_deref(), Some("p1"));
Ok(())
}
#[test]
fn parse_projects_different() {
let options =
StartupMessageParams::new([("user", "john_doe"), ("options", "project=first")]);
let sni = Some("second.localhost");
let common_name = Some("localhost");
let common_names = Some(["localhost".into()].into());
let err = ClientCredentials::parse(&options, sni, common_name).expect_err("should fail");
let err = ClientCredentials::parse(&options, sni, common_names).expect_err("should fail");
match err {
InconsistentProjectNames { domain, option } => {
assert_eq!(option, "first");
@@ -209,13 +231,12 @@ mod tests {
let options = StartupMessageParams::new([("user", "john_doe")]);
let sni = Some("project.localhost");
let common_name = Some("example.com");
let common_names = Some(["example.com".into()].into());
let err = ClientCredentials::parse(&options, sni, common_name).expect_err("should fail");
let err = ClientCredentials::parse(&options, sni, common_names).expect_err("should fail");
match err {
InconsistentSni { sni, cn } => {
assert_eq!(sni, "project.localhost");
assert_eq!(cn, "example.com");
UnknownCommonName { cn } => {
assert_eq!(cn, "localhost");
}
_ => panic!("bad error: {err:?}"),
}

View File

@@ -1,6 +1,12 @@
use crate::auth;
use anyhow::{bail, ensure, Context};
use std::{str::FromStr, sync::Arc, time::Duration};
use anyhow::{bail, ensure, Context, Ok};
use rustls::sign;
use std::{
collections::{HashMap, HashSet},
str::FromStr,
sync::Arc,
time::Duration,
};
pub struct ProxyConfig {
pub tls_config: Option<TlsConfig>,
@@ -16,7 +22,7 @@ pub struct MetricCollectionConfig {
pub struct TlsConfig {
pub config: Arc<rustls::ServerConfig>,
pub common_name: Option<String>,
pub common_names: Option<HashSet<String>>,
}
impl TlsConfig {
@@ -26,28 +32,34 @@ impl TlsConfig {
}
/// Configure TLS for the main endpoint.
pub fn configure_tls(key_path: &str, cert_path: &str) -> anyhow::Result<TlsConfig> {
let key = {
let key_bytes = std::fs::read(key_path).context("TLS key file")?;
let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..])
.context(format!("Failed to read TLS keys at '{key_path}'"))?;
pub fn configure_tls(
key_path: &str,
cert_path: &str,
certs_dir: Option<&String>,
) -> anyhow::Result<TlsConfig> {
let mut cert_resolver = CertResolver::new();
ensure!(keys.len() == 1, "keys.len() = {} (should be 1)", keys.len());
keys.pop().map(rustls::PrivateKey).unwrap()
};
// add default certificate
cert_resolver.add_cert(key_path, cert_path)?;
let cert_chain_bytes = std::fs::read(cert_path)
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
// add extra certificates
if let Some(certs_dir) = certs_dir {
for entry in std::fs::read_dir(certs_dir)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
// file names aligned with default cert-manager names
let key_path = path.join("tls.key");
let cert_path = path.join("tls.crt");
if key_path.exists() && cert_path.exists() {
cert_resolver
.add_cert(&key_path.to_string_lossy(), &cert_path.to_string_lossy())?;
}
}
}
}
let cert_chain = {
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
.context(format!(
"Failed to read TLS certificate chain from bytes from file at '{cert_path}'."
))?
.into_iter()
.map(rustls::Certificate)
.collect()
};
let common_names = cert_resolver.get_common_names();
let config = rustls::ServerConfig::builder()
.with_safe_default_cipher_suites()
@@ -55,27 +67,116 @@ pub fn configure_tls(key_path: &str, cert_path: &str) -> anyhow::Result<TlsConfi
// allow TLS 1.2 to be compatible with older client libraries
.with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])?
.with_no_client_auth()
.with_single_cert(cert_chain, key)?
.with_cert_resolver(Arc::new(cert_resolver))
.into();
// determine common name from tls-cert (-c server.crt param).
// used in asserting project name formatting invariant.
let common_name = {
let pem = x509_parser::pem::parse_x509_pem(&cert_chain_bytes)
.context(format!(
"Failed to parse PEM object from bytes from file at '{cert_path}'."
))?
.1;
let common_name = pem.parse_x509()?.subject().to_string();
common_name.strip_prefix("CN=*.").map(|s| s.to_string())
};
Ok(TlsConfig {
config,
common_name,
common_names: Some(common_names),
})
}
struct CertResolver {
certs: HashMap<String, Arc<rustls::sign::CertifiedKey>>,
}
impl CertResolver {
fn new() -> Self {
Self {
certs: HashMap::new(),
}
}
fn add_cert(&mut self, key_path: &str, cert_path: &str) -> anyhow::Result<()> {
let priv_key = {
let key_bytes = std::fs::read(key_path).context("TLS key file")?;
let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..])
.context(format!("Failed to read TLS keys at '{key_path}'"))?;
ensure!(keys.len() == 1, "keys.len() = {} (should be 1)", keys.len());
keys.pop().map(rustls::PrivateKey).unwrap()
};
let key = sign::any_supported_type(&priv_key).context("invalid private key")?;
let cert_chain_bytes = std::fs::read(cert_path)
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
let cert_chain = {
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
.context(format!(
"Failed to read TLS certificate chain from bytes from file at '{cert_path}'."
))?
.into_iter()
.map(rustls::Certificate)
.collect()
};
let common_name = {
let pem = x509_parser::pem::parse_x509_pem(&cert_chain_bytes)
.context(format!(
"Failed to parse PEM object from bytes from file at '{cert_path}'."
))?
.1;
let common_name = pem.parse_x509()?.subject().to_string();
// We only use non-wildcard certificates in link proxy so it seems okay to treat them the same as
// wildcard ones as we don't use SNI there. That treatment only affects certificate selection, so
// verify-full will still check wildcard match. Old coding here just ignored non-wildcard common names
// and passed None instead, which blows up number of cases downstream code should handle. Proper coding
// here should better avoid Option for common_names, and do wildcard-based certificate selection instead
// of cutting off '*.' parts.
if common_name.starts_with("CN=*.") {
common_name.strip_prefix("CN=*.").map(|s| s.to_string())
} else {
common_name.strip_prefix("CN=").map(|s| s.to_string())
}
}
.context(format!(
"Failed to parse common name from certificate at '{cert_path}'."
))?;
self.certs.insert(
common_name,
Arc::new(rustls::sign::CertifiedKey::new(cert_chain, key)),
);
Ok(())
}
fn get_common_names(&self) -> HashSet<String> {
self.certs.keys().map(|s| s.to_string()).collect()
}
}
impl rustls::server::ResolvesServerCert for CertResolver {
fn resolve(
&self,
_client_hello: rustls::server::ClientHello,
) -> Option<Arc<rustls::sign::CertifiedKey>> {
// loop here and cut off more and more subdomains until we find
// a match to get a proper wildcard support. OTOH, we now do not
// use nested domains, so keep this simple for now.
//
// With the current coding foo.com will match *.foo.com and that
// repeats behavior of the old code.
if let Some(mut sni_name) = _client_hello.server_name() {
loop {
if let Some(cert) = self.certs.get(sni_name) {
return Some(cert.clone());
}
if let Some((_, rest)) = sni_name.split_once('.') {
sni_name = rest;
} else {
return None;
}
}
} else {
None
}
}
}
/// Helper for cmdline cache options parsing.
pub struct CacheOptions {
/// Max number of entries.

View File

@@ -132,7 +132,11 @@ fn build_config(args: &clap::ArgMatches) -> anyhow::Result<&'static ProxyConfig>
args.get_one::<String>("tls-key"),
args.get_one::<String>("tls-cert"),
) {
(Some(key_path), Some(cert_path)) => Some(config::configure_tls(key_path, cert_path)?),
(Some(key_path), Some(cert_path)) => Some(config::configure_tls(
key_path,
cert_path,
args.get_one::<String>("certs-dir"),
)?),
(None, None) => None,
_ => bail!("either both or neither tls-key and tls-cert must be specified"),
};
@@ -254,6 +258,12 @@ fn cli() -> clap::Command {
.alias("ssl-cert") // backwards compatibility
.help("path to TLS cert for client postgres connections"),
)
// tls-key and tls-cert are for backwards compatibility, we can put all certs in one dir
.arg(
Arg::new("certs-dir")
.long("certs-dir")
.help("path to directory with TLS certificates for client postgres connections"),
)
.arg(
Arg::new("metric-collection-endpoint")
.long("metric-collection-endpoint")

View File

@@ -5,7 +5,7 @@ use chrono::{DateTime, Utc};
use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
use serde::Serialize;
use std::collections::HashMap;
use tracing::{debug, error, info, instrument, trace};
use tracing::{error, info, instrument, trace, warn};
const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
@@ -84,10 +84,14 @@ fn gather_proxy_io_bytes_per_client() -> Vec<(Ids, (u64, DateTime<Utc>))> {
let value = ms.get_counter().get_value() as u64;
debug!(
"branch_id {} endpoint_id {} val: {}",
branch_id, endpoint_id, value
);
// Report if the metric value is suspiciously large
if value > (1u64 << 40) {
warn!(
"potentially abnormal counter value: branch_id {} endpoint_id {} val: {}",
branch_id, endpoint_id, value
);
}
current_metrics.push((
Ids {
endpoint_id: endpoint_id.to_string(),
@@ -124,11 +128,15 @@ async fn collect_metrics_iteration(
let mut value = *curr_val;
if let Some((prev_val, prev_time)) = cached_metrics.get(curr_key) {
// Only send metrics updates if the metric has changed
if curr_val - prev_val > 0 {
// Only send metrics updates if the metric has increased
if curr_val > prev_val {
value = curr_val - prev_val;
start_time = *prev_time;
} else {
if curr_val < prev_val {
error!("proxy_io_bytes_per_client metric value decreased from {} to {} for key {:?}",
prev_val, curr_val, curr_key);
}
return None;
}
};
@@ -189,7 +197,7 @@ async fn collect_metrics_iteration(
})
// update cached value (add delta) and time
.and_modify(|e| {
e.0 += send_metric.value;
e.0 = e.0.saturating_add(send_metric.value);
e.1 = stop_time
})
// cache new metric

View File

@@ -98,7 +98,7 @@ pub async fn task_main(
}
// TODO(tech debt): unite this with its twin below.
#[tracing::instrument(fields(session_id), skip_all)]
#[tracing::instrument(fields(session_id = ?session_id), skip_all)]
pub async fn handle_ws_client(
config: &'static ProxyConfig,
cancel_map: &CancelMap,
@@ -124,11 +124,11 @@ pub async fn handle_ws_client(
// Extract credentials which we're going to use for auth.
let creds = {
let common_name = tls.and_then(|tls| tls.common_name.as_deref());
let common_names = tls.and_then(|tls| tls.common_names.clone());
let result = config
.auth_backend
.as_ref()
.map(|_| auth::ClientCredentials::parse(&params, hostname, common_name))
.map(|_| auth::ClientCredentials::parse(&params, hostname, common_names))
.transpose();
async { result }.or_else(|e| stream.throw_error(e)).await?
@@ -140,7 +140,7 @@ pub async fn handle_ws_client(
.await
}
#[tracing::instrument(fields(session_id), skip_all)]
#[tracing::instrument(fields(session_id = ?session_id), skip_all)]
async fn handle_client(
config: &'static ProxyConfig,
cancel_map: &CancelMap,
@@ -163,11 +163,11 @@ async fn handle_client(
// Extract credentials which we're going to use for auth.
let creds = {
let sni = stream.get_ref().sni_hostname();
let common_name = tls.and_then(|tls| tls.common_name.as_deref());
let common_names = tls.and_then(|tls| tls.common_names.clone());
let result = config
.auth_backend
.as_ref()
.map(|_| auth::ClientCredentials::parse(&params, sni, common_name))
.map(|_| auth::ClientCredentials::parse(&params, sni, common_names))
.transpose();
async { result }.or_else(|e| stream.throw_error(e)).await?

View File

@@ -54,9 +54,11 @@ fn generate_tls_config<'a>(
.with_single_cert(vec![cert], key)?
.into();
let common_names = Some([common_name.to_owned()].iter().cloned().collect());
TlsConfig {
config,
common_name: Some(common_name.to_string()),
common_names,
}
};

View File

@@ -26,7 +26,7 @@ prometheus-client = "^0.14.1"
pytest-timeout = "^2.1.0"
Werkzeug = "^2.2.3"
pytest-order = "^1.0.1"
allure-pytest = "^2.10.0"
allure-pytest = "^2.13.1"
pytest-asyncio = "^0.19.0"
toml = "^0.10.2"
psutil = "^5.9.4"
@@ -34,6 +34,7 @@ types-psutil = "^5.9.5.4"
types-toml = "^0.10.8"
pytest-httpserver = "^1.0.6"
aiohttp = "3.7.4"
pytest-rerunfailures = "^11.1.2"
[tool.poetry.group.dev.dependencies]
black = "^23.1.0"
@@ -69,6 +70,9 @@ strict = true
module = [
"asyncpg.*",
"pg8000.*",
"allure.*",
"allure_commons.*",
"allure_pytest.*",
]
ignore_missing_imports = true

View File

@@ -8,13 +8,7 @@
# warnings and errors right in the editor.
# In vscode, this setting is Rust-analyzer>Check On Save:Command
# manual-range-contains wants
# !(4..=MAX_STARTUP_PACKET_LENGTH).contains(&len)
# instead of
# len < 4 || len > MAX_STARTUP_PACKET_LENGTH
# , let's disagree.
# * `-A unknown_lints` do not warn about unknown lint suppressions
# that people with newer toolchains might use
# * `-D warnings` - fail on any warnings (`cargo` returns non-zero exit status)
cargo clippy --locked --all --all-targets --all-features -- -A unknown_lints -A clippy::manual-range-contains -D warnings
cargo clippy --locked --all --all-targets --all-features -- -A unknown_lints -D warnings

View File

@@ -1,5 +1,5 @@
[toolchain]
channel = "1.66.1"
channel = "1.68.2"
profile = "default"
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
# https://rust-lang.github.io/rustup/concepts/profiles.html

View File

@@ -5,6 +5,7 @@ use anyhow::{bail, Context, Result};
use clap::Parser;
use remote_storage::RemoteStorageConfig;
use toml_edit::Document;
use utils::signals::ShutdownSignals;
use std::fs::{self, File};
use std::io::{ErrorKind, Write};
@@ -39,7 +40,7 @@ use utils::{
logging::{self, LogFormat},
project_git_version,
sentry_init::init_sentry,
signals, tcp_listener,
tcp_listener,
};
const PID_FILE_NAME: &str = "safekeeper.pid";
@@ -216,7 +217,6 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
let timeline_collector = safekeeper::metrics::TimelineCollector::new();
metrics::register_internal(Box::new(timeline_collector))?;
let signals = signals::install_shutdown_handlers()?;
let mut threads = vec![];
let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);
@@ -274,15 +274,12 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
set_build_info_metric(GIT_VERSION);
// TODO: put more thoughts into handling of failed threads
// We probably should restart them.
// We should catch & die if they are in trouble.
// NOTE: we still have to handle signals like SIGQUIT to prevent coredumps
signals.handle(|signal| {
// TODO: implement graceful shutdown with joining threads etc
info!(
"received {}, terminating in immediate shutdown mode",
signal.name()
);
// On any shutdown signal, log receival and exit. Additionally, handling
// SIGQUIT prevents coredump.
ShutdownSignals::handle(|signal| {
info!("received {}, terminating", signal.name());
std::process::exit(0);
})
}

View File

@@ -242,6 +242,7 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
safekeeper_connstr: sk_info.safekeeper_connstr.unwrap_or_else(|| "".to_owned()),
backup_lsn: sk_info.backup_lsn.0,
local_start_lsn: sk_info.local_start_lsn.0,
availability_zone: None,
};
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;

View File

@@ -255,7 +255,7 @@ pub struct TimelineCollector {
epoch_start_lsn: GenericGaugeVec<AtomicU64>,
peer_horizon_lsn: GenericGaugeVec<AtomicU64>,
remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
feedback_ps_write_lsn: GenericGaugeVec<AtomicU64>,
ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
timeline_active: GenericGaugeVec<AtomicU64>,
wal_backup_active: GenericGaugeVec<AtomicU64>,
@@ -339,15 +339,15 @@ impl TimelineCollector {
.unwrap();
descs.extend(remote_consistent_lsn.desc().into_iter().cloned());
let feedback_ps_write_lsn = GenericGaugeVec::new(
let ps_last_received_lsn = GenericGaugeVec::new(
Opts::new(
"safekeeper_feedback_ps_write_lsn",
"safekeeper_ps_last_received_lsn",
"Last LSN received by the pageserver, acknowledged in the feedback",
),
&["tenant_id", "timeline_id"],
)
.unwrap();
descs.extend(feedback_ps_write_lsn.desc().into_iter().cloned());
descs.extend(ps_last_received_lsn.desc().into_iter().cloned());
let feedback_last_time_seconds = GenericGaugeVec::new(
Opts::new(
@@ -458,7 +458,7 @@ impl TimelineCollector {
epoch_start_lsn,
peer_horizon_lsn,
remote_consistent_lsn,
feedback_ps_write_lsn,
ps_last_received_lsn,
feedback_last_time_seconds,
timeline_active,
wal_backup_active,
@@ -489,7 +489,7 @@ impl Collector for TimelineCollector {
self.epoch_start_lsn.reset();
self.peer_horizon_lsn.reset();
self.remote_consistent_lsn.reset();
self.feedback_ps_write_lsn.reset();
self.ps_last_received_lsn.reset();
self.feedback_last_time_seconds.reset();
self.timeline_active.reset();
self.wal_backup_active.reset();
@@ -514,11 +514,11 @@ impl Collector for TimelineCollector {
let timeline_id = tli.ttid.timeline_id.to_string();
let labels = &[tenant_id.as_str(), timeline_id.as_str()];
let mut most_advanced: Option<pq_proto::ReplicationFeedback> = None;
let mut most_advanced: Option<pq_proto::PageserverFeedback> = None;
for replica in tli.replicas.iter() {
if let Some(replica_feedback) = replica.pageserver_feedback {
if let Some(current) = most_advanced {
if current.ps_writelsn < replica_feedback.ps_writelsn {
if current.last_received_lsn < replica_feedback.last_received_lsn {
most_advanced = Some(replica_feedback);
}
} else {
@@ -568,11 +568,10 @@ impl Collector for TimelineCollector {
.set(tli.wal_storage.flush_wal_seconds);
if let Some(feedback) = most_advanced {
self.feedback_ps_write_lsn
self.ps_last_received_lsn
.with_label_values(labels)
.set(feedback.ps_writelsn);
if let Ok(unix_time) = feedback.ps_replytime.duration_since(SystemTime::UNIX_EPOCH)
{
.set(feedback.last_received_lsn);
if let Ok(unix_time) = feedback.replytime.duration_since(SystemTime::UNIX_EPOCH) {
self.feedback_last_time_seconds
.with_label_values(labels)
.set(unix_time.as_secs());
@@ -599,7 +598,7 @@ impl Collector for TimelineCollector {
mfs.extend(self.epoch_start_lsn.collect());
mfs.extend(self.peer_horizon_lsn.collect());
mfs.extend(self.remote_consistent_lsn.collect());
mfs.extend(self.feedback_ps_write_lsn.collect());
mfs.extend(self.ps_last_received_lsn.collect());
mfs.extend(self.feedback_last_time_seconds.collect());
mfs.extend(self.timeline_active.collect());
mfs.extend(self.wal_backup_active.collect());

View File

@@ -18,7 +18,7 @@ use crate::control_file;
use crate::send_wal::HotStandbyFeedback;
use crate::wal_storage;
use pq_proto::{ReplicationFeedback, SystemId};
use pq_proto::{PageserverFeedback, SystemId};
use utils::{
bin_ser::LeSer,
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
@@ -360,7 +360,7 @@ pub struct AppendResponse {
// a criterion for walproposer --sync mode exit
pub commit_lsn: Lsn,
pub hs_feedback: HotStandbyFeedback,
pub pageserver_feedback: ReplicationFeedback,
pub pageserver_feedback: PageserverFeedback,
}
impl AppendResponse {
@@ -370,7 +370,7 @@ impl AppendResponse {
flush_lsn: Lsn(0),
commit_lsn: Lsn(0),
hs_feedback: HotStandbyFeedback::empty(),
pageserver_feedback: ReplicationFeedback::empty(),
pageserver_feedback: PageserverFeedback::empty(),
}
}
}
@@ -708,7 +708,7 @@ where
commit_lsn: self.state.commit_lsn,
// will be filled by the upper code to avoid bothering safekeeper
hs_feedback: HotStandbyFeedback::empty(),
pageserver_feedback: ReplicationFeedback::empty(),
pageserver_feedback: PageserverFeedback::empty(),
};
trace!("formed AppendResponse {:?}", ar);
ar

View File

@@ -11,7 +11,7 @@ use postgres_backend::PostgresBackend;
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
use postgres_ffi::get_current_timestamp;
use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
use pq_proto::{BeMessage, ReplicationFeedback, WalSndKeepAlive, XLogDataBody};
use pq_proto::{BeMessage, PageserverFeedback, WalSndKeepAlive, XLogDataBody};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncWrite};
@@ -319,11 +319,9 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
// pageserver sends this.
// Note: deserializing is on m[9..] because we skip the tag byte and len bytes.
let buf = Bytes::copy_from_slice(&msg[9..]);
let reply = ReplicationFeedback::parse(buf);
let reply = PageserverFeedback::parse(buf);
trace!("ReplicationFeedback is {:?}", reply);
// Only pageserver sends ReplicationFeedback, so set the flag.
// This replica is the source of information to resend to compute.
trace!("PageserverFeedback is {:?}", reply);
self.feedback.pageserver_feedback = Some(reply);
self.tli

View File

@@ -4,7 +4,7 @@
use anyhow::{anyhow, bail, Result};
use parking_lot::{Mutex, MutexGuard};
use postgres_ffi::XLogSegNo;
use pq_proto::ReplicationFeedback;
use pq_proto::PageserverFeedback;
use serde::Serialize;
use std::cmp::{max, min};
use std::path::PathBuf;
@@ -91,7 +91,7 @@ pub struct ReplicaState {
/// combined hot standby feedback from all replicas
pub hs_feedback: HotStandbyFeedback,
/// Replication specific feedback received from pageserver, if any
pub pageserver_feedback: Option<ReplicationFeedback>,
pub pageserver_feedback: Option<PageserverFeedback>,
}
impl Default for ReplicaState {
@@ -276,7 +276,7 @@ impl SharedState {
//
if let Some(pageserver_feedback) = state.pageserver_feedback {
if let Some(acc_feedback) = acc.pageserver_feedback {
if acc_feedback.ps_writelsn < pageserver_feedback.ps_writelsn {
if acc_feedback.last_received_lsn < pageserver_feedback.last_received_lsn {
warn!("More than one pageserver is streaming WAL for the timeline. Feedback resolving is not fully supported yet.");
acc.pageserver_feedback = Some(pageserver_feedback);
}
@@ -287,12 +287,12 @@ impl SharedState {
// last lsn received by pageserver
// FIXME if multiple pageservers are streaming WAL, last_received_lsn must be tracked per pageserver.
// See https://github.com/neondatabase/neon/issues/1171
acc.last_received_lsn = Lsn::from(pageserver_feedback.ps_writelsn);
acc.last_received_lsn = Lsn::from(pageserver_feedback.last_received_lsn);
// When at least one pageserver has preserved data up to remote_consistent_lsn,
// safekeeper is free to delete it, so choose max of all pageservers.
acc.remote_consistent_lsn = max(
Lsn::from(pageserver_feedback.ps_applylsn),
Lsn::from(pageserver_feedback.remote_consistent_lsn),
acc.remote_consistent_lsn,
);
}
@@ -337,6 +337,7 @@ impl SharedState {
safekeeper_connstr: conf.listen_pg_addr.clone(),
backup_lsn: self.sk.inmem.backup_lsn.0,
local_start_lsn: self.sk.state.local_start_lsn.0,
availability_zone: conf.availability_zone.clone(),
}
}
}
@@ -584,7 +585,7 @@ impl Timeline {
let replica_state = shared_state.replicas[replica_id].unwrap();
let reported_remote_consistent_lsn = replica_state
.pageserver_feedback
.map(|f| Lsn(f.ps_applylsn))
.map(|f| Lsn(f.remote_consistent_lsn))
.unwrap_or(Lsn::INVALID);
let stop = shared_state.sk.inmem.commit_lsn == Lsn(0) || // no data at all yet
(reported_remote_consistent_lsn!= Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet.
@@ -673,7 +674,8 @@ impl Timeline {
bail!(TimelineError::Cancelled(self.ttid));
}
self.write_shared_state().sk.inmem.backup_lsn = backup_lsn;
let mut state = self.write_shared_state();
state.sk.inmem.backup_lsn = max(state.sk.inmem.backup_lsn, backup_lsn);
// we should check whether to shut down offloader, but this will be done
// soon by peer communication anyway.
Ok(())

View File

@@ -323,7 +323,8 @@ impl WalBackupTask {
}
match backup_lsn_range(
backup_lsn,
&self.timeline,
&mut backup_lsn,
commit_lsn,
self.wal_seg_size,
&self.timeline_dir,
@@ -331,13 +332,7 @@ impl WalBackupTask {
)
.await
{
Ok(backup_lsn_result) => {
backup_lsn = backup_lsn_result;
let res = self.timeline.set_wal_backup_lsn(backup_lsn_result);
if let Err(e) = res {
error!("failed to set wal_backup_lsn: {}", e);
return;
}
Ok(()) => {
retry_attempt = 0;
}
Err(e) => {
@@ -354,20 +349,25 @@ impl WalBackupTask {
}
pub async fn backup_lsn_range(
start_lsn: Lsn,
timeline: &Arc<Timeline>,
backup_lsn: &mut Lsn,
end_lsn: Lsn,
wal_seg_size: usize,
timeline_dir: &Path,
workspace_dir: &Path,
) -> Result<Lsn> {
let mut res = start_lsn;
) -> Result<()> {
let start_lsn = *backup_lsn;
let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
for s in &segments {
backup_single_segment(s, timeline_dir, workspace_dir)
.await
.with_context(|| format!("offloading segno {}", s.seg_no))?;
res = s.end_lsn;
let new_backup_lsn = s.end_lsn;
timeline
.set_wal_backup_lsn(new_backup_lsn)
.context("setting wal_backup_lsn")?;
*backup_lsn = new_backup_lsn;
}
info!(
"offloaded segnos {:?} up to {}, previous backup_lsn {}",
@@ -375,7 +375,7 @@ pub async fn backup_lsn_range(
end_lsn,
start_lsn,
);
Ok(res)
Ok(())
}
async fn backup_single_segment(

87
scripts/flaky_tests.py Executable file
View File

@@ -0,0 +1,87 @@
#! /usr/bin/env python3
import argparse
import json
import logging
from collections import defaultdict
from typing import DefaultDict, Dict
import psycopg2
import psycopg2.extras
# We call the test "flaky" if it failed at least once on the main branch in the last N=10 days.
FLAKY_TESTS_QUERY = """
SELECT
DISTINCT parent_suite, suite, test
FROM
(
SELECT
revision,
jsonb_array_elements(data -> 'children') -> 'name' as parent_suite,
jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'name' as suite,
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') -> 'name' as test,
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') -> 'status' as status,
to_timestamp((jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') -> 'time' -> 'start')::bigint / 1000)::date as timestamp
FROM
regress_test_results
WHERE
reference = 'refs/heads/main'
) data
WHERE
timestamp > CURRENT_DATE - INTERVAL '%s' day
AND status::text IN ('"failed"', '"broken"')
;
"""
def main(args: argparse.Namespace):
connstr = args.connstr
interval_days = args.days
output = args.output
res: DefaultDict[str, DefaultDict[str, Dict[str, bool]]]
res = defaultdict(lambda: defaultdict(dict))
logging.info("connecting to the database...")
with psycopg2.connect(connstr, connect_timeout=10) as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
logging.info("fetching flaky tests...")
cur.execute(FLAKY_TESTS_QUERY, (interval_days,))
rows = cur.fetchall()
for row in rows:
logging.info(f"\t{row['parent_suite'].replace('.', '/')}/{row['suite']}.py::{row['test']}")
res[row["parent_suite"]][row["suite"]][row["test"]] = True
logging.info(f"saving results to {output.name}")
json.dump(res, output, indent=2)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Detect flaky tests in the last N days")
parser.add_argument(
"--output",
type=argparse.FileType("w"),
default="flaky.json",
help="path to output json file (default: flaky.json)",
)
parser.add_argument(
"--days",
required=False,
default=10,
type=int,
help="how many days to look back for flaky tests (default: 10)",
)
parser.add_argument(
"connstr",
help="connection string to the test results database",
)
args = parser.parse_args()
level = logging.INFO
logging.basicConfig(
format="%(message)s",
level=level,
)
main(args)

View File

@@ -0,0 +1,125 @@
//
// The script parses Allure reports and posts a comment with a summary of the test results to the PR.
// It accepts an array of items and creates a comment with a summary for each one (for "release" and "debug", together or separately if any of them failed to be generated).
//
// The comment is updated on each run with the latest results.
//
// It is designed to be used with actions/github-script from GitHub Workflows:
// - uses: actions/github-script@v6
// with:
// script: |
// const script = require("./scripts/pr-comment-test-report.js")
// await script({
// github,
// context,
// fetch,
// reports: [{...}, ...], // each report is expected to have "buildType", "reportUrl", and "jsonUrl" properties
// })
//
module.exports = async ({ github, context, fetch, reports }) => {
// Marker to find the comment in the subsequent runs
const startMarker = `<!--AUTOMATIC COMMENT START #${context.payload.number}-->`
// GitHub bot id taken from (https://api.github.com/users/github-actions[bot])
const githubActionsBotId = 41898282
// The latest commit in the PR URL
const commitUrl = `${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/pull/${context.payload.number}/commits/${context.payload.pull_request.head.sha}`
// Commend body itself
let commentBody = `${startMarker}\n### Test results for ${commitUrl}:\n___\n`
// Common parameters for GitHub API requests
const ownerRepoParams = {
owner: context.repo.owner,
repo: context.repo.repo,
}
for (const report of reports) {
const {buildType, reportUrl, jsonUrl} = report
if (!reportUrl || !jsonUrl) {
console.warn(`"reportUrl" or "jsonUrl" aren't set for ${buildType} build`)
continue
}
const suites = await (await fetch(jsonUrl)).json()
// Allure distinguishes "failed" (with an assertion error) and "broken" (with any other error) tests.
// For this report it's ok to treat them in the same way (as failed).
failedTests = []
passedTests = []
skippedTests = []
retriedTests = []
retriedStatusChangedTests = []
for (const parentSuite of suites.children) {
for (const suite of parentSuite.children) {
for (const test of suite.children) {
pytestName = `${parentSuite.name.replace(".", "/")}/${suite.name}.py::${test.name}`
test.pytestName = pytestName
if (test.status === "passed") {
passedTests.push(test);
} else if (test.status === "failed" || test.status === "broken") {
failedTests.push(test);
} else if (test.status === "skipped") {
skippedTests.push(test);
}
if (test.retriesCount > 0) {
retriedTests.push(test);
if (test.retriedStatusChangedTests) {
retriedStatusChangedTests.push(test);
}
}
}
}
}
const totalTestsCount = failedTests.length + passedTests.length + skippedTests.length
commentBody += `#### ${buildType} build: ${totalTestsCount} tests run: ${passedTests.length} passed, ${failedTests.length} failed, ${skippedTests.length} ([full report](${reportUrl}))\n`
if (failedTests.length > 0) {
commentBody += `Failed tests:\n`
for (const test of failedTests) {
const allureLink = `${reportUrl}#suites/${test.parentUid}/${test.uid}`
commentBody += `- [\`${test.pytestName}\`](${allureLink})`
if (test.retriesCount > 0) {
commentBody += ` (ran [${test.retriesCount + 1} times](${allureLink}/retries))`
}
commentBody += "\n"
}
commentBody += "\n"
}
if (retriedStatusChangedTests > 0) {
commentBody += `Flaky tests:\n`
for (const test of retriedStatusChangedTests) {
const status = test.status === "passed" ? ":white_check_mark:" : ":x:"
commentBody += `- ${status} [\`${test.pytestName}\`](${reportUrl}#suites/${test.parentUid}/${test.uid}/retries)\n`
}
commentBody += "\n"
}
commentBody += "___\n"
}
const { data: comments } = await github.rest.issues.listComments({
issue_number: context.payload.number,
...ownerRepoParams,
})
const comment = comments.find(comment => comment.user.id === githubActionsBotId && comment.body.startsWith(startMarker))
if (comment) {
await github.rest.issues.updateComment({
comment_id: comment.id,
body: commentBody,
...ownerRepoParams,
})
} else {
await github.rest.issues.createComment({
issue_number: context.payload.number,
body: commentBody,
...ownerRepoParams,
})
}
}

2
scripts/sk_collect_dumps/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
result
*.json

View File

@@ -0,0 +1,25 @@
# Collect /v1/debug_dump from all safekeeper nodes
1. Run ansible playbooks to collect .json dumps from all safekeepers and store them in `./result` directory.
2. Run `DB_CONNSTR=... ./upload.sh prod_feb30` to upload dumps to `prod_feb30` table in specified postgres database.
## How to use ansible (staging)
```
AWS_DEFAULT_PROFILE=dev ansible-playbook -i ../../.github/ansible/staging.us-east-2.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
AWS_DEFAULT_PROFILE=dev ansible-playbook -i ../../.github/ansible/staging.eu-west-1.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
```
## How to use ansible (prod)
```
AWS_DEFAULT_PROFILE=prod ansible-playbook -i ../../.github/ansible/prod.us-west-2.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
AWS_DEFAULT_PROFILE=prod ansible-playbook -i ../../.github/ansible/prod.us-east-2.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
AWS_DEFAULT_PROFILE=prod ansible-playbook -i ../../.github/ansible/prod.eu-central-1.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
AWS_DEFAULT_PROFILE=prod ansible-playbook -i ../../.github/ansible/prod.ap-southeast-1.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
```

View File

@@ -0,0 +1,18 @@
- name: Fetch state dumps from safekeepers
hosts: safekeepers
gather_facts: False
remote_user: "{{ remote_user }}"
tasks:
- name: Download file
get_url:
url: "http://{{ inventory_hostname }}:7676/v1/debug_dump?dump_all=true&dump_disk_content=false"
dest: "/tmp/{{ inventory_hostname }}.json"
- name: Fetch file from remote hosts
fetch:
src: "/tmp/{{ inventory_hostname }}.json"
dest: "./result/{{ inventory_hostname }}.json"
flat: yes
fail_on_missing: no

View File

@@ -0,0 +1,52 @@
#!/bin/bash
if [ -z "$DB_CONNSTR" ]; then
echo "DB_CONNSTR is not set"
exit 1
fi
# Create a temporary table for JSON data
psql $DB_CONNSTR -c 'DROP TABLE IF EXISTS tmp_json'
psql $DB_CONNSTR -c 'CREATE TABLE tmp_json (data jsonb)'
for file in ./result/*.json; do
echo "$file"
SK_ID=$(jq '.config.id' $file)
echo "SK_ID: $SK_ID"
jq -c ".timelines[] | . + {\"sk_id\": $SK_ID}" $file | psql $DB_CONNSTR -c "\\COPY tmp_json (data) FROM STDIN"
done
TABLE_NAME=$1
if [ -z "$TABLE_NAME" ]; then
echo "TABLE_NAME is not set, skipping conversion to table with typed columns"
echo "Usage: ./upload.sh TABLE_NAME"
exit 0
fi
psql $DB_CONNSTR <<EOF
CREATE TABLE $TABLE_NAME AS
SELECT
(data->>'sk_id')::bigint AS sk_id,
(data->>'tenant_id') AS tenant_id,
(data->>'timeline_id') AS timeline_id,
(data->'memory'->>'active')::bool AS active,
(data->'memory'->>'flush_lsn')::bigint AS flush_lsn,
(data->'memory'->'mem_state'->>'backup_lsn')::bigint AS backup_lsn,
(data->'memory'->'mem_state'->>'commit_lsn')::bigint AS commit_lsn,
(data->'memory'->'mem_state'->>'peer_horizon_lsn')::bigint AS peer_horizon_lsn,
(data->'memory'->'mem_state'->>'remote_consistent_lsn')::bigint AS remote_consistent_lsn,
(data->'memory'->>'write_lsn')::bigint AS write_lsn,
(data->'memory'->>'num_computes')::bigint AS num_computes,
(data->'memory'->>'epoch_start_lsn')::bigint AS epoch_start_lsn,
(data->'memory'->>'last_removed_segno')::bigint AS last_removed_segno,
(data->'memory'->>'is_cancelled')::bool AS is_cancelled,
(data->'control_file'->>'backup_lsn')::bigint AS disk_backup_lsn,
(data->'control_file'->>'commit_lsn')::bigint AS disk_commit_lsn,
(data->'control_file'->'acceptor_state'->>'term')::bigint AS disk_term,
(data->'control_file'->>'local_start_lsn')::bigint AS local_start_lsn,
(data->'control_file'->>'peer_horizon_lsn')::bigint AS disk_peer_horizon_lsn,
(data->'control_file'->>'timeline_start_lsn')::bigint AS timeline_start_lsn,
(data->'control_file'->>'remote_consistent_lsn')::bigint AS disk_remote_consistent_lsn
FROM tmp_json
EOF

View File

@@ -133,6 +133,7 @@ async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
peer_horizon_lsn: 5,
safekeeper_connstr: "zenith-1-sk-1.local:7676".to_owned(),
local_start_lsn: 0,
availability_zone: None,
};
counter += 1;
yield info;

View File

@@ -36,9 +36,11 @@ message SafekeeperTimelineInfo {
uint64 local_start_lsn = 9;
// A connection string to use for WAL receiving.
string safekeeper_connstr = 10;
// Availability zone of a safekeeper.
optional string availability_zone = 11;
}
message TenantTimelineId {
bytes tenant_id = 1;
bytes timeline_id = 2;
}
}

View File

@@ -33,6 +33,7 @@ use tonic::transport::server::Connected;
use tonic::Code;
use tonic::{Request, Response, Status};
use tracing::*;
use utils::signals::ShutdownSignals;
use metrics::{Encoder, TextEncoder};
use storage_broker::metrics::{NUM_PUBS, NUM_SUBS_ALL, NUM_SUBS_TIMELINE};
@@ -437,6 +438,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!("version: {GIT_VERSION}");
::metrics::set_build_info_metric(GIT_VERSION);
// On any shutdown signal, log receival and exit.
std::thread::spawn(move || {
ShutdownSignals::handle(|signal| {
info!("received {}, terminating", signal.name());
std::process::exit(0);
})
});
let registry = Registry {
shared_state: Arc::new(RwLock::new(SharedState::new(args.all_keys_chan_size))),
timeline_chan_size: args.timeline_chan_size,
@@ -516,6 +525,7 @@ mod tests {
peer_horizon_lsn: 5,
safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(),
local_start_lsn: 0,
availability_zone: None,
}
}

View File

@@ -4,4 +4,5 @@ pytest_plugins = (
"fixtures.pg_stats",
"fixtures.compare_fixtures",
"fixtures.slow",
"fixtures.flaky",
)

View File

@@ -0,0 +1,58 @@
import json
from pathlib import Path
from typing import List
import pytest
from _pytest.config import Config
from _pytest.config.argparsing import Parser
from allure_commons.types import LabelType
from allure_pytest.utils import allure_name, allure_suite_labels
from fixtures.log_helper import log
"""
The plugin reruns flaky tests.
It uses `pytest.mark.flaky` provided by `pytest-rerunfailures` plugin and flaky tests detected by `scripts/flaky_tests.py`
Note: the logic of getting flaky tests is extracted to a separate script to avoid running it for each of N xdist workers
"""
def pytest_addoption(parser: Parser):
parser.addoption(
"--flaky-tests-json",
action="store",
type=Path,
help="Path to json file with flaky tests generated by scripts/flaky_tests.py",
)
def pytest_collection_modifyitems(config: Config, items: List[pytest.Item]):
if not config.getoption("--flaky-tests-json"):
return
# Any error with getting flaky tests aren't critical, so just do not rerun any tests
flaky_json = config.getoption("--flaky-tests-json")
if not flaky_json.exists():
return
content = flaky_json.read_text()
try:
flaky_tests = json.loads(content)
except ValueError:
log.error(f"Can't parse {content} as json")
return
for item in items:
# Use the same logic for constructing test name as Allure does (we store allure-provided data in DB)
# Ref https://github.com/allure-framework/allure-python/blob/2.13.1/allure-pytest/src/listener.py#L98-L100
allure_labels = dict(allure_suite_labels(item))
parent_suite = str(allure_labels.get(LabelType.PARENT_SUITE))
suite = str(allure_labels.get(LabelType.SUITE))
params = item.callspec.params if hasattr(item, "callspec") else {}
name = allure_name(item, params)
if flaky_tests.get(parent_suite, {}).get(suite, {}).get(name, False):
# Rerun 3 times = 1 original run + 2 reruns
log.info(f"Marking {item.nodeid} as flaky. It will be rerun up to 3 times")
item.add_marker(pytest.mark.flaky(reruns=2))

View File

@@ -14,9 +14,9 @@ import tempfile
import textwrap
import time
import uuid
from collections import defaultdict
from contextlib import closing, contextmanager
from dataclasses import dataclass, field
from datetime import datetime
from enum import Flag, auto
from functools import cached_property
from itertools import chain, product
@@ -43,11 +43,12 @@ from psycopg2.extensions import make_dsn, parse_dsn
from typing_extensions import Literal
from fixtures.log_helper import log
from fixtures.metrics import Metrics, parse_metrics
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import (
ATTACHMENT_NAME_REGEX,
Fn,
allure_add_grafana_links,
allure_attach_from_dir,
get_self_dir,
subprocess_capture,
@@ -1118,538 +1119,6 @@ def neon_env_builder(
yield builder
class PageserverApiException(Exception):
def __init__(self, message, status_code: int):
super().__init__(message)
self.status_code = status_code
class PageserverHttpClient(requests.Session):
def __init__(self, port: int, is_testing_enabled_or_skip: Fn, auth_token: Optional[str] = None):
super().__init__()
self.port = port
self.auth_token = auth_token
self.is_testing_enabled_or_skip = is_testing_enabled_or_skip
if auth_token is not None:
self.headers["Authorization"] = f"Bearer {auth_token}"
def verbose_error(self, res: requests.Response):
try:
res.raise_for_status()
except requests.RequestException as e:
try:
msg = res.json()["msg"]
except: # noqa: E722
msg = ""
raise PageserverApiException(msg, res.status_code) from e
def check_status(self):
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
def configure_failpoints(self, config_strings: Tuple[str, str] | List[Tuple[str, str]]):
self.is_testing_enabled_or_skip()
if isinstance(config_strings, tuple):
pairs = [config_strings]
else:
pairs = config_strings
log.info(f"Requesting config failpoints: {repr(pairs)}")
res = self.put(
f"http://localhost:{self.port}/v1/failpoints",
json=[{"name": name, "actions": actions} for name, actions in pairs],
)
log.info(f"Got failpoints request response code {res.status_code}")
self.verbose_error(res)
res_json = res.json()
assert res_json is None
return res_json
def tenant_list(self) -> List[Dict[Any, Any]]:
res = self.get(f"http://localhost:{self.port}/v1/tenant")
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, list)
return res_json
def tenant_create(self, new_tenant_id: Optional[TenantId] = None) -> TenantId:
res = self.post(
f"http://localhost:{self.port}/v1/tenant",
json={
"new_tenant_id": str(new_tenant_id) if new_tenant_id else None,
},
)
self.verbose_error(res)
if res.status_code == 409:
raise Exception(f"could not create tenant: already exists for id {new_tenant_id}")
new_tenant_id = res.json()
assert isinstance(new_tenant_id, str)
return TenantId(new_tenant_id)
def tenant_attach(self, tenant_id: TenantId):
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/attach")
self.verbose_error(res)
def tenant_detach(self, tenant_id: TenantId, detach_ignored=False):
params = {}
if detach_ignored:
params["detach_ignored"] = "true"
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/detach", params=params)
self.verbose_error(res)
def tenant_load(self, tenant_id: TenantId):
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/load")
self.verbose_error(res)
def tenant_ignore(self, tenant_id: TenantId):
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/ignore")
self.verbose_error(res)
def tenant_status(self, tenant_id: TenantId) -> Dict[Any, Any]:
res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def tenant_config(self, tenant_id: TenantId) -> TenantConfig:
res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/config")
self.verbose_error(res)
return TenantConfig.from_json(res.json())
def set_tenant_config(self, tenant_id: TenantId, config: dict[str, Any]):
assert "tenant_id" not in config.keys()
res = self.put(
f"http://localhost:{self.port}/v1/tenant/config",
json={**config, "tenant_id": str(tenant_id)},
)
self.verbose_error(res)
def patch_tenant_config_client_side(
self,
tenant_id: TenantId,
inserts: Optional[Dict[str, Any]] = None,
removes: Optional[List[str]] = None,
):
current = self.tenant_config(tenant_id).tenant_specific_overrides
if inserts is not None:
current.update(inserts)
if removes is not None:
for key in removes:
del current[key]
self.set_tenant_config(tenant_id, current)
def tenant_size(self, tenant_id: TenantId) -> int:
return self.tenant_size_and_modelinputs(tenant_id)[0]
def tenant_size_and_modelinputs(self, tenant_id: TenantId) -> Tuple[int, Dict[str, Any]]:
"""
Returns the tenant size, together with the model inputs as the second tuple item.
"""
res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/synthetic_size")
self.verbose_error(res)
res = res.json()
assert isinstance(res, dict)
assert TenantId(res["id"]) == tenant_id
size = res["size"]
assert type(size) == int
inputs = res["inputs"]
assert type(inputs) is dict
return (size, inputs)
def tenant_size_debug(self, tenant_id: TenantId) -> str:
"""
Returns the tenant size debug info, as an HTML string
"""
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/synthetic_size",
headers={"Accept": "text/html"},
)
return res.text
def timeline_list(
self,
tenant_id: TenantId,
include_non_incremental_logical_size: bool = False,
include_timeline_dir_layer_file_size_sum: bool = False,
) -> List[Dict[str, Any]]:
params = {}
if include_non_incremental_logical_size:
params["include-non-incremental-logical-size"] = "true"
if include_timeline_dir_layer_file_size_sum:
params["include-timeline-dir-layer-file-size-sum"] = "true"
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline", params=params
)
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, list)
return res_json
def timeline_create(
self,
tenant_id: TenantId,
new_timeline_id: Optional[TimelineId] = None,
ancestor_timeline_id: Optional[TimelineId] = None,
ancestor_start_lsn: Optional[Lsn] = None,
) -> Dict[Any, Any]:
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline",
json={
"new_timeline_id": str(new_timeline_id) if new_timeline_id else None,
"ancestor_start_lsn": str(ancestor_start_lsn) if ancestor_start_lsn else None,
"ancestor_timeline_id": str(ancestor_timeline_id) if ancestor_timeline_id else None,
},
)
self.verbose_error(res)
if res.status_code == 409:
raise Exception(f"could not create timeline: already exists for id {new_timeline_id}")
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def timeline_detail(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
include_non_incremental_logical_size: bool = False,
include_timeline_dir_layer_file_size_sum: bool = False,
**kwargs,
) -> Dict[Any, Any]:
params = {}
if include_non_incremental_logical_size:
params["include-non-incremental-logical-size"] = "true"
if include_timeline_dir_layer_file_size_sum:
params["include-timeline-dir-layer-file-size-sum"] = "true"
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}",
params=params,
**kwargs,
)
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def timeline_delete(self, tenant_id: TenantId, timeline_id: TimelineId):
res = self.delete(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}"
)
self.verbose_error(res)
res_json = res.json()
assert res_json is None
def timeline_gc(
self, tenant_id: TenantId, timeline_id: TimelineId, gc_horizon: Optional[int]
) -> dict[str, Any]:
self.is_testing_enabled_or_skip()
log.info(
f"Requesting GC: tenant {tenant_id}, timeline {timeline_id}, gc_horizon {repr(gc_horizon)}"
)
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/do_gc",
json={"gc_horizon": gc_horizon},
)
log.info(f"Got GC request response code: {res.status_code}")
self.verbose_error(res)
res_json = res.json()
assert res_json is not None
assert isinstance(res_json, dict)
return res_json
def timeline_compact(self, tenant_id: TenantId, timeline_id: TimelineId):
self.is_testing_enabled_or_skip()
log.info(f"Requesting compact: tenant {tenant_id}, timeline {timeline_id}")
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/compact"
)
log.info(f"Got compact request response code: {res.status_code}")
self.verbose_error(res)
res_json = res.json()
assert res_json is None
def timeline_get_lsn_by_timestamp(
self, tenant_id: TenantId, timeline_id: TimelineId, timestamp
):
log.info(
f"Requesting lsn by timestamp {timestamp}, tenant {tenant_id}, timeline {timeline_id}"
)
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp}",
)
self.verbose_error(res)
res_json = res.json()
return res_json
def timeline_checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId):
self.is_testing_enabled_or_skip()
log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}")
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint"
)
log.info(f"Got checkpoint request response code: {res.status_code}")
self.verbose_error(res)
res_json = res.json()
assert res_json is None
def timeline_spawn_download_remote_layers(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
max_concurrent_downloads: int,
) -> dict[str, Any]:
body = {
"max_concurrent_downloads": max_concurrent_downloads,
}
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/download_remote_layers",
json=body,
)
self.verbose_error(res)
res_json = res.json()
assert res_json is not None
assert isinstance(res_json, dict)
return res_json
def timeline_poll_download_remote_layers_status(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
spawn_response: dict[str, Any],
poll_state=None,
) -> None | dict[str, Any]:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/download_remote_layers",
)
self.verbose_error(res)
res_json = res.json()
assert res_json is not None
assert isinstance(res_json, dict)
# assumption in this API client here is that nobody else spawns the task
assert res_json["task_id"] == spawn_response["task_id"]
if poll_state is None or res_json["state"] == poll_state:
return res_json
return None
def timeline_download_remote_layers(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
max_concurrent_downloads: int,
errors_ok=False,
at_least_one_download=True,
):
res = self.timeline_spawn_download_remote_layers(
tenant_id, timeline_id, max_concurrent_downloads
)
while True:
completed = self.timeline_poll_download_remote_layers_status(
tenant_id, timeline_id, res, poll_state="Completed"
)
if not completed:
time.sleep(0.1)
continue
if not errors_ok:
assert completed["failed_download_count"] == 0
if at_least_one_download:
assert completed["successful_download_count"] > 0
return completed
def get_metrics_str(self) -> str:
"""You probably want to use get_metrics() instead."""
res = self.get(f"http://localhost:{self.port}/metrics")
self.verbose_error(res)
return res.text
def get_metrics(self) -> Metrics:
res = self.get_metrics_str()
return parse_metrics(res)
def get_timeline_metric(
self, tenant_id: TenantId, timeline_id: TimelineId, metric_name: str
) -> float:
metrics = self.get_metrics()
return metrics.query_one(
metric_name,
filter={
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
},
).value
def get_remote_timeline_client_metric(
self,
metric_name: str,
tenant_id: TenantId,
timeline_id: TimelineId,
file_kind: str,
op_kind: str,
) -> Optional[float]:
metrics = self.get_metrics()
matches = metrics.query_all(
name=metric_name,
filter={
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
"file_kind": str(file_kind),
"op_kind": str(op_kind),
},
)
if len(matches) == 0:
value = None
elif len(matches) == 1:
value = matches[0].value
assert value is not None
else:
assert len(matches) < 2, "above filter should uniquely identify metric"
return value
def get_metric_value(
self, name: str, filter: Optional[Dict[str, str]] = None
) -> Optional[float]:
metrics = self.get_metrics()
results = metrics.query_all(name, filter=filter)
if not results:
log.info(f'could not find metric "{name}"')
return None
assert len(results) == 1, f"metric {name} with given filters is not unique, got: {results}"
return results[0].value
def layer_map_info(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> LayerMapInfo:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/",
)
self.verbose_error(res)
return LayerMapInfo.from_json(res.json())
def download_layer(self, tenant_id: TenantId, timeline_id: TimelineId, layer_name: str):
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/{layer_name}",
)
self.verbose_error(res)
assert res.status_code == 200
def evict_layer(self, tenant_id: TenantId, timeline_id: TimelineId, layer_name: str):
res = self.delete(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/{layer_name}",
)
self.verbose_error(res)
assert res.status_code == 200
def evict_all_layers(self, tenant_id: TenantId, timeline_id: TimelineId):
info = self.layer_map_info(tenant_id, timeline_id)
for layer in info.historic_layers:
self.evict_layer(tenant_id, timeline_id, layer.layer_file_name)
def disk_usage_eviction_run(self, request: dict[str, Any]):
res = self.put(
f"http://localhost:{self.port}/v1/disk_usage_eviction/run",
json=request,
)
self.verbose_error(res)
return res.json()
def tenant_break(self, tenant_id: TenantId):
res = self.put(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/break")
self.verbose_error(res)
@dataclass
class TenantConfig:
tenant_specific_overrides: Dict[str, Any]
effective_config: Dict[str, Any]
@classmethod
def from_json(cls, d: Dict[str, Any]) -> TenantConfig:
return TenantConfig(
tenant_specific_overrides=d["tenant_specific_overrides"],
effective_config=d["effective_config"],
)
@dataclass
class LayerMapInfo:
in_memory_layers: List[InMemoryLayerInfo]
historic_layers: List[HistoricLayerInfo]
@classmethod
def from_json(cls, d: Dict[str, Any]) -> LayerMapInfo:
info = LayerMapInfo(in_memory_layers=[], historic_layers=[])
json_in_memory_layers = d["in_memory_layers"]
assert isinstance(json_in_memory_layers, List)
for json_in_memory_layer in json_in_memory_layers:
info.in_memory_layers.append(InMemoryLayerInfo.from_json(json_in_memory_layer))
json_historic_layers = d["historic_layers"]
assert isinstance(json_historic_layers, List)
for json_historic_layer in json_historic_layers:
info.historic_layers.append(HistoricLayerInfo.from_json(json_historic_layer))
return info
def kind_count(self) -> Dict[str, int]:
counts: Dict[str, int] = defaultdict(int)
for inmem_layer in self.in_memory_layers:
counts[inmem_layer.kind] += 1
for hist_layer in self.historic_layers:
counts[hist_layer.kind] += 1
return counts
@dataclass
class InMemoryLayerInfo:
kind: str
lsn_start: str
lsn_end: Optional[str]
@classmethod
def from_json(cls, d: Dict[str, Any]) -> InMemoryLayerInfo:
return InMemoryLayerInfo(
kind=d["kind"],
lsn_start=d["lsn_start"],
lsn_end=d.get("lsn_end"),
)
@dataclass(frozen=True)
class HistoricLayerInfo:
kind: str
layer_file_name: str
layer_file_size: Optional[int]
lsn_start: str
lsn_end: Optional[str]
remote: bool
@classmethod
def from_json(cls, d: Dict[str, Any]) -> HistoricLayerInfo:
return HistoricLayerInfo(
kind=d["kind"],
layer_file_name=d["layer_file_name"],
layer_file_size=d.get("layer_file_size"),
lsn_start=d["lsn_start"],
lsn_end=d.get("lsn_end"),
remote=d["remote"],
)
@dataclass
class PageserverPort:
pg: int
@@ -2436,10 +1905,16 @@ def remote_pg(
connstr = os.getenv("BENCHMARK_CONNSTR")
if connstr is None:
raise ValueError("no connstr provided, use BENCHMARK_CONNSTR environment variable")
start_ms = int(datetime.utcnow().timestamp() * 1000)
with RemotePostgres(pg_bin, connstr) as remote_pg:
yield remote_pg
end_ms = int(datetime.utcnow().timestamp() * 1000)
host = parse_dsn(connstr).get("host", "")
if host.endswith(".neon.build"):
# Add 10s margin to the start and end times
allure_add_grafana_links(host, start_ms - 10_000, end_ms + 10_000)
class PSQL:
"""
@@ -3378,151 +2853,6 @@ def check_restored_datadir_content(
assert (mismatch, error) == ([], [])
def wait_until(number_of_iterations: int, interval: float, func):
"""
Wait until 'func' returns successfully, without exception. Returns the
last return value from the function.
"""
last_exception = None
for i in range(number_of_iterations):
try:
res = func()
except Exception as e:
log.info("waiting for %s iteration %s failed", func, i + 1)
last_exception = e
time.sleep(interval)
continue
return res
raise Exception("timed out while waiting for %s" % func) from last_exception
def wait_while(number_of_iterations: int, interval: float, func):
"""
Wait until 'func' returns false, or throws an exception.
"""
for i in range(number_of_iterations):
try:
if not func():
return
log.info("waiting for %s iteration %s failed", func, i + 1)
time.sleep(interval)
continue
except Exception:
return
raise Exception("timed out while waiting for %s" % func)
def assert_tenant_status(
pageserver_http_client: PageserverHttpClient, tenant: TenantId, expected_status: str
):
tenant_status = pageserver_http_client.tenant_status(tenant)
log.info(f"tenant_status: {tenant_status}")
assert tenant_status["state"] == expected_status, tenant_status
def tenant_exists(ps_http: PageserverHttpClient, tenant_id: TenantId):
tenants = ps_http.tenant_list()
matching = [t for t in tenants if TenantId(t["id"]) == tenant_id]
assert len(matching) < 2
if len(matching) == 0:
return None
return matching[0]
def remote_consistent_lsn(
pageserver_http_client: PageserverHttpClient, tenant: TenantId, timeline: TimelineId
) -> Lsn:
detail = pageserver_http_client.timeline_detail(tenant, timeline)
if detail["remote_consistent_lsn"] is None:
# No remote information at all. This happens right after creating
# a timeline, before any part of it has been uploaded to remote
# storage yet.
return Lsn(0)
else:
lsn_str = detail["remote_consistent_lsn"]
assert isinstance(lsn_str, str)
return Lsn(lsn_str)
def wait_for_upload(
pageserver_http_client: PageserverHttpClient,
tenant: TenantId,
timeline: TimelineId,
lsn: Lsn,
):
"""waits for local timeline upload up to specified lsn"""
for i in range(20):
current_lsn = remote_consistent_lsn(pageserver_http_client, tenant, timeline)
if current_lsn >= lsn:
log.info("wait finished")
return
log.info(
"waiting for remote_consistent_lsn to reach {}, now {}, iteration {}".format(
lsn, current_lsn, i + 1
)
)
time.sleep(1)
raise Exception(
"timed out while waiting for remote_consistent_lsn to reach {}, was {}".format(
lsn, current_lsn
)
)
# Does not use `wait_until` for debugging purposes
def wait_until_tenant_state(
pageserver_http: PageserverHttpClient,
tenant_id: TenantId,
expected_state: str,
iterations: int,
) -> bool:
for _ in range(iterations):
try:
tenant = pageserver_http.tenant_status(tenant_id=tenant_id)
log.debug(f"Tenant {tenant_id} data: {tenant}")
if tenant["state"] == expected_state:
return True
except Exception as e:
log.debug(f"Tenant {tenant_id} state retrieval failure: {e}")
time.sleep(1)
raise Exception(f"Tenant {tenant_id} did not become {expected_state} in {iterations} seconds")
def last_record_lsn(
pageserver_http_client: PageserverHttpClient, tenant: TenantId, timeline: TimelineId
) -> Lsn:
detail = pageserver_http_client.timeline_detail(tenant, timeline)
lsn_str = detail["last_record_lsn"]
assert isinstance(lsn_str, str)
return Lsn(lsn_str)
def wait_for_last_record_lsn(
pageserver_http_client: PageserverHttpClient,
tenant: TenantId,
timeline: TimelineId,
lsn: Lsn,
) -> Lsn:
"""waits for pageserver to catch up to a certain lsn, returns the last observed lsn."""
for i in range(10):
current_lsn = last_record_lsn(pageserver_http_client, tenant, timeline)
if current_lsn >= lsn:
return current_lsn
log.info(
"waiting for last_record_lsn to reach {}, now {}, iteration {}".format(
lsn, current_lsn, i + 1
)
)
time.sleep(1)
raise Exception(
"timed out while waiting for last_record_lsn to reach {}, was {}".format(lsn, current_lsn)
)
def wait_for_last_flush_lsn(
env: NeonEnv, pg: Postgres, tenant: TenantId, timeline: TimelineId
) -> Lsn:
@@ -3584,23 +2914,3 @@ def wait_for_sk_commit_lsn_to_reach_remote_storage(
ps_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(ps_http, tenant_id, timeline_id, lsn)
return lsn
def wait_for_upload_queue_empty(
pageserver: NeonPageserver, tenant_id: TenantId, timeline_id: TimelineId
):
ps_http = pageserver.http_client()
while True:
all_metrics = ps_http.get_metrics()
tl = all_metrics.query_all(
"pageserver_remote_timeline_client_calls_unfinished",
{
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
},
)
assert len(tl) > 0
log.info(f"upload queue for {tenant_id}/{timeline_id}: {tl}")
if all(m.value == 0 for m in tl):
return
time.sleep(0.2)

View File

@@ -0,0 +1,545 @@
from __future__ import annotations
import time
from collections import defaultdict
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
import requests
from fixtures.log_helper import log
from fixtures.metrics import Metrics, parse_metrics
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import Fn
class PageserverApiException(Exception):
def __init__(self, message, status_code: int):
super().__init__(message)
self.status_code = status_code
@dataclass
class InMemoryLayerInfo:
kind: str
lsn_start: str
lsn_end: Optional[str]
@classmethod
def from_json(cls, d: Dict[str, Any]) -> InMemoryLayerInfo:
return InMemoryLayerInfo(
kind=d["kind"],
lsn_start=d["lsn_start"],
lsn_end=d.get("lsn_end"),
)
@dataclass(frozen=True)
class HistoricLayerInfo:
kind: str
layer_file_name: str
layer_file_size: Optional[int]
lsn_start: str
lsn_end: Optional[str]
remote: bool
@classmethod
def from_json(cls, d: Dict[str, Any]) -> HistoricLayerInfo:
return HistoricLayerInfo(
kind=d["kind"],
layer_file_name=d["layer_file_name"],
layer_file_size=d.get("layer_file_size"),
lsn_start=d["lsn_start"],
lsn_end=d.get("lsn_end"),
remote=d["remote"],
)
@dataclass
class LayerMapInfo:
in_memory_layers: List[InMemoryLayerInfo]
historic_layers: List[HistoricLayerInfo]
@classmethod
def from_json(cls, d: Dict[str, Any]) -> LayerMapInfo:
info = LayerMapInfo(in_memory_layers=[], historic_layers=[])
json_in_memory_layers = d["in_memory_layers"]
assert isinstance(json_in_memory_layers, List)
for json_in_memory_layer in json_in_memory_layers:
info.in_memory_layers.append(InMemoryLayerInfo.from_json(json_in_memory_layer))
json_historic_layers = d["historic_layers"]
assert isinstance(json_historic_layers, List)
for json_historic_layer in json_historic_layers:
info.historic_layers.append(HistoricLayerInfo.from_json(json_historic_layer))
return info
def kind_count(self) -> Dict[str, int]:
counts: Dict[str, int] = defaultdict(int)
for inmem_layer in self.in_memory_layers:
counts[inmem_layer.kind] += 1
for hist_layer in self.historic_layers:
counts[hist_layer.kind] += 1
return counts
@dataclass
class TenantConfig:
tenant_specific_overrides: Dict[str, Any]
effective_config: Dict[str, Any]
@classmethod
def from_json(cls, d: Dict[str, Any]) -> TenantConfig:
return TenantConfig(
tenant_specific_overrides=d["tenant_specific_overrides"],
effective_config=d["effective_config"],
)
class PageserverHttpClient(requests.Session):
def __init__(self, port: int, is_testing_enabled_or_skip: Fn, auth_token: Optional[str] = None):
super().__init__()
self.port = port
self.auth_token = auth_token
self.is_testing_enabled_or_skip = is_testing_enabled_or_skip
if auth_token is not None:
self.headers["Authorization"] = f"Bearer {auth_token}"
def verbose_error(self, res: requests.Response):
try:
res.raise_for_status()
except requests.RequestException as e:
try:
msg = res.json()["msg"]
except: # noqa: E722
msg = ""
raise PageserverApiException(msg, res.status_code) from e
def check_status(self):
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
def configure_failpoints(self, config_strings: Tuple[str, str] | List[Tuple[str, str]]):
self.is_testing_enabled_or_skip()
if isinstance(config_strings, tuple):
pairs = [config_strings]
else:
pairs = config_strings
log.info(f"Requesting config failpoints: {repr(pairs)}")
res = self.put(
f"http://localhost:{self.port}/v1/failpoints",
json=[{"name": name, "actions": actions} for name, actions in pairs],
)
log.info(f"Got failpoints request response code {res.status_code}")
self.verbose_error(res)
res_json = res.json()
assert res_json is None
return res_json
def tenant_list(self) -> List[Dict[Any, Any]]:
res = self.get(f"http://localhost:{self.port}/v1/tenant")
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, list)
return res_json
def tenant_create(self, new_tenant_id: Optional[TenantId] = None) -> TenantId:
res = self.post(
f"http://localhost:{self.port}/v1/tenant",
json={
"new_tenant_id": str(new_tenant_id) if new_tenant_id else None,
},
)
self.verbose_error(res)
if res.status_code == 409:
raise Exception(f"could not create tenant: already exists for id {new_tenant_id}")
new_tenant_id = res.json()
assert isinstance(new_tenant_id, str)
return TenantId(new_tenant_id)
def tenant_attach(self, tenant_id: TenantId):
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/attach")
self.verbose_error(res)
def tenant_detach(self, tenant_id: TenantId, detach_ignored=False):
params = {}
if detach_ignored:
params["detach_ignored"] = "true"
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/detach", params=params)
self.verbose_error(res)
def tenant_load(self, tenant_id: TenantId):
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/load")
self.verbose_error(res)
def tenant_ignore(self, tenant_id: TenantId):
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/ignore")
self.verbose_error(res)
def tenant_status(self, tenant_id: TenantId) -> Dict[Any, Any]:
res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def tenant_config(self, tenant_id: TenantId) -> TenantConfig:
res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/config")
self.verbose_error(res)
return TenantConfig.from_json(res.json())
def set_tenant_config(self, tenant_id: TenantId, config: dict[str, Any]):
assert "tenant_id" not in config.keys()
res = self.put(
f"http://localhost:{self.port}/v1/tenant/config",
json={**config, "tenant_id": str(tenant_id)},
)
self.verbose_error(res)
def patch_tenant_config_client_side(
self,
tenant_id: TenantId,
inserts: Optional[Dict[str, Any]] = None,
removes: Optional[List[str]] = None,
):
current = self.tenant_config(tenant_id).tenant_specific_overrides
if inserts is not None:
current.update(inserts)
if removes is not None:
for key in removes:
del current[key]
self.set_tenant_config(tenant_id, current)
def tenant_size(self, tenant_id: TenantId) -> int:
return self.tenant_size_and_modelinputs(tenant_id)[0]
def tenant_size_and_modelinputs(self, tenant_id: TenantId) -> Tuple[int, Dict[str, Any]]:
"""
Returns the tenant size, together with the model inputs as the second tuple item.
"""
res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/synthetic_size")
self.verbose_error(res)
res = res.json()
assert isinstance(res, dict)
assert TenantId(res["id"]) == tenant_id
size = res["size"]
assert type(size) == int
inputs = res["inputs"]
assert type(inputs) is dict
return (size, inputs)
def tenant_size_debug(self, tenant_id: TenantId) -> str:
"""
Returns the tenant size debug info, as an HTML string
"""
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/synthetic_size",
headers={"Accept": "text/html"},
)
return res.text
def timeline_list(
self,
tenant_id: TenantId,
include_non_incremental_logical_size: bool = False,
include_timeline_dir_layer_file_size_sum: bool = False,
) -> List[Dict[str, Any]]:
params = {}
if include_non_incremental_logical_size:
params["include-non-incremental-logical-size"] = "true"
if include_timeline_dir_layer_file_size_sum:
params["include-timeline-dir-layer-file-size-sum"] = "true"
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline", params=params
)
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, list)
return res_json
def timeline_create(
self,
tenant_id: TenantId,
new_timeline_id: Optional[TimelineId] = None,
ancestor_timeline_id: Optional[TimelineId] = None,
ancestor_start_lsn: Optional[Lsn] = None,
) -> Dict[Any, Any]:
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline",
json={
"new_timeline_id": str(new_timeline_id) if new_timeline_id else None,
"ancestor_start_lsn": str(ancestor_start_lsn) if ancestor_start_lsn else None,
"ancestor_timeline_id": str(ancestor_timeline_id) if ancestor_timeline_id else None,
},
)
self.verbose_error(res)
if res.status_code == 409:
raise Exception(f"could not create timeline: already exists for id {new_timeline_id}")
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def timeline_detail(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
include_non_incremental_logical_size: bool = False,
include_timeline_dir_layer_file_size_sum: bool = False,
**kwargs,
) -> Dict[Any, Any]:
params = {}
if include_non_incremental_logical_size:
params["include-non-incremental-logical-size"] = "true"
if include_timeline_dir_layer_file_size_sum:
params["include-timeline-dir-layer-file-size-sum"] = "true"
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}",
params=params,
**kwargs,
)
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def timeline_delete(self, tenant_id: TenantId, timeline_id: TimelineId):
res = self.delete(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}"
)
self.verbose_error(res)
res_json = res.json()
assert res_json is None
def timeline_gc(
self, tenant_id: TenantId, timeline_id: TimelineId, gc_horizon: Optional[int]
) -> dict[str, Any]:
self.is_testing_enabled_or_skip()
log.info(
f"Requesting GC: tenant {tenant_id}, timeline {timeline_id}, gc_horizon {repr(gc_horizon)}"
)
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/do_gc",
json={"gc_horizon": gc_horizon},
)
log.info(f"Got GC request response code: {res.status_code}")
self.verbose_error(res)
res_json = res.json()
assert res_json is not None
assert isinstance(res_json, dict)
return res_json
def timeline_compact(self, tenant_id: TenantId, timeline_id: TimelineId):
self.is_testing_enabled_or_skip()
log.info(f"Requesting compact: tenant {tenant_id}, timeline {timeline_id}")
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/compact"
)
log.info(f"Got compact request response code: {res.status_code}")
self.verbose_error(res)
res_json = res.json()
assert res_json is None
def timeline_get_lsn_by_timestamp(
self, tenant_id: TenantId, timeline_id: TimelineId, timestamp
):
log.info(
f"Requesting lsn by timestamp {timestamp}, tenant {tenant_id}, timeline {timeline_id}"
)
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp}",
)
self.verbose_error(res)
res_json = res.json()
return res_json
def timeline_checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId):
self.is_testing_enabled_or_skip()
log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}")
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint"
)
log.info(f"Got checkpoint request response code: {res.status_code}")
self.verbose_error(res)
res_json = res.json()
assert res_json is None
def timeline_spawn_download_remote_layers(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
max_concurrent_downloads: int,
) -> dict[str, Any]:
body = {
"max_concurrent_downloads": max_concurrent_downloads,
}
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/download_remote_layers",
json=body,
)
self.verbose_error(res)
res_json = res.json()
assert res_json is not None
assert isinstance(res_json, dict)
return res_json
def timeline_poll_download_remote_layers_status(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
spawn_response: dict[str, Any],
poll_state=None,
) -> None | dict[str, Any]:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/download_remote_layers",
)
self.verbose_error(res)
res_json = res.json()
assert res_json is not None
assert isinstance(res_json, dict)
# assumption in this API client here is that nobody else spawns the task
assert res_json["task_id"] == spawn_response["task_id"]
if poll_state is None or res_json["state"] == poll_state:
return res_json
return None
def timeline_download_remote_layers(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
max_concurrent_downloads: int,
errors_ok=False,
at_least_one_download=True,
):
res = self.timeline_spawn_download_remote_layers(
tenant_id, timeline_id, max_concurrent_downloads
)
while True:
completed = self.timeline_poll_download_remote_layers_status(
tenant_id, timeline_id, res, poll_state="Completed"
)
if not completed:
time.sleep(0.1)
continue
if not errors_ok:
assert completed["failed_download_count"] == 0
if at_least_one_download:
assert completed["successful_download_count"] > 0
return completed
def get_metrics_str(self) -> str:
"""You probably want to use get_metrics() instead."""
res = self.get(f"http://localhost:{self.port}/metrics")
self.verbose_error(res)
return res.text
def get_metrics(self) -> Metrics:
res = self.get_metrics_str()
return parse_metrics(res)
def get_timeline_metric(
self, tenant_id: TenantId, timeline_id: TimelineId, metric_name: str
) -> float:
metrics = self.get_metrics()
return metrics.query_one(
metric_name,
filter={
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
},
).value
def get_remote_timeline_client_metric(
self,
metric_name: str,
tenant_id: TenantId,
timeline_id: TimelineId,
file_kind: str,
op_kind: str,
) -> Optional[float]:
metrics = self.get_metrics()
matches = metrics.query_all(
name=metric_name,
filter={
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
"file_kind": str(file_kind),
"op_kind": str(op_kind),
},
)
if len(matches) == 0:
value = None
elif len(matches) == 1:
value = matches[0].value
assert value is not None
else:
assert len(matches) < 2, "above filter should uniquely identify metric"
return value
def get_metric_value(
self, name: str, filter: Optional[Dict[str, str]] = None
) -> Optional[float]:
metrics = self.get_metrics()
results = metrics.query_all(name, filter=filter)
if not results:
log.info(f'could not find metric "{name}"')
return None
assert len(results) == 1, f"metric {name} with given filters is not unique, got: {results}"
return results[0].value
def layer_map_info(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> LayerMapInfo:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/",
)
self.verbose_error(res)
return LayerMapInfo.from_json(res.json())
def download_layer(self, tenant_id: TenantId, timeline_id: TimelineId, layer_name: str):
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/{layer_name}",
)
self.verbose_error(res)
assert res.status_code == 200
def evict_layer(self, tenant_id: TenantId, timeline_id: TimelineId, layer_name: str):
res = self.delete(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/{layer_name}",
)
self.verbose_error(res)
assert res.status_code == 200
def evict_all_layers(self, tenant_id: TenantId, timeline_id: TimelineId):
info = self.layer_map_info(tenant_id, timeline_id)
for layer in info.historic_layers:
self.evict_layer(tenant_id, timeline_id, layer.layer_file_name)
def disk_usage_eviction_run(self, request: dict[str, Any]):
res = self.put(
f"http://localhost:{self.port}/v1/disk_usage_eviction/run",
json=request,
)
self.verbose_error(res)
return res.json()
def tenant_break(self, tenant_id: TenantId):
res = self.put(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/break")
self.verbose_error(res)

View File

@@ -0,0 +1,145 @@
import time
from fixtures.log_helper import log
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.types import Lsn, TenantId, TimelineId
def assert_tenant_status(
pageserver_http: PageserverHttpClient, tenant: TenantId, expected_status: str
):
tenant_status = pageserver_http.tenant_status(tenant)
log.info(f"tenant_status: {tenant_status}")
assert tenant_status["state"] == expected_status, tenant_status
def tenant_exists(pageserver_http: PageserverHttpClient, tenant_id: TenantId):
tenants = pageserver_http.tenant_list()
matching = [t for t in tenants if TenantId(t["id"]) == tenant_id]
assert len(matching) < 2
if len(matching) == 0:
return None
return matching[0]
def remote_consistent_lsn(
pageserver_http: PageserverHttpClient, tenant: TenantId, timeline: TimelineId
) -> Lsn:
detail = pageserver_http.timeline_detail(tenant, timeline)
if detail["remote_consistent_lsn"] is None:
# No remote information at all. This happens right after creating
# a timeline, before any part of it has been uploaded to remote
# storage yet.
return Lsn(0)
else:
lsn_str = detail["remote_consistent_lsn"]
assert isinstance(lsn_str, str)
return Lsn(lsn_str)
def wait_for_upload(
pageserver_http: PageserverHttpClient,
tenant: TenantId,
timeline: TimelineId,
lsn: Lsn,
):
"""waits for local timeline upload up to specified lsn"""
for i in range(20):
current_lsn = remote_consistent_lsn(pageserver_http, tenant, timeline)
if current_lsn >= lsn:
log.info("wait finished")
return
log.info(
"waiting for remote_consistent_lsn to reach {}, now {}, iteration {}".format(
lsn, current_lsn, i + 1
)
)
time.sleep(1)
raise Exception(
"timed out while waiting for remote_consistent_lsn to reach {}, was {}".format(
lsn, current_lsn
)
)
def wait_until_tenant_state(
pageserver_http: PageserverHttpClient,
tenant_id: TenantId,
expected_state: str,
iterations: int,
) -> bool:
"""
Does not use `wait_until` for debugging purposes
"""
for _ in range(iterations):
try:
tenant = pageserver_http.tenant_status(tenant_id=tenant_id)
log.debug(f"Tenant {tenant_id} data: {tenant}")
if tenant["state"] == expected_state:
return True
except Exception as e:
log.debug(f"Tenant {tenant_id} state retrieval failure: {e}")
time.sleep(1)
raise Exception(f"Tenant {tenant_id} did not become {expected_state} in {iterations} seconds")
def wait_until_tenant_active(
pageserver_http: PageserverHttpClient, tenant_id: TenantId, iterations: int = 30
):
wait_until_tenant_state(
pageserver_http, tenant_id, expected_state="Active", iterations=iterations
)
def last_record_lsn(
pageserver_http_client: PageserverHttpClient, tenant: TenantId, timeline: TimelineId
) -> Lsn:
detail = pageserver_http_client.timeline_detail(tenant, timeline)
lsn_str = detail["last_record_lsn"]
assert isinstance(lsn_str, str)
return Lsn(lsn_str)
def wait_for_last_record_lsn(
pageserver_http: PageserverHttpClient,
tenant: TenantId,
timeline: TimelineId,
lsn: Lsn,
) -> Lsn:
"""waits for pageserver to catch up to a certain lsn, returns the last observed lsn."""
for i in range(10):
current_lsn = last_record_lsn(pageserver_http, tenant, timeline)
if current_lsn >= lsn:
return current_lsn
log.info(
"waiting for last_record_lsn to reach {}, now {}, iteration {}".format(
lsn, current_lsn, i + 1
)
)
time.sleep(1)
raise Exception(
"timed out while waiting for last_record_lsn to reach {}, was {}".format(lsn, current_lsn)
)
def wait_for_upload_queue_empty(
pageserver_http: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId
):
while True:
all_metrics = pageserver_http.get_metrics()
tl = all_metrics.query_all(
"pageserver_remote_timeline_client_calls_unfinished",
{
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
},
)
assert len(tl) > 0
log.info(f"upload queue for {tenant_id}/{timeline_id}: {tl}")
if all(m.value == 0 for m in tl):
return
time.sleep(0.2)

Some files were not shown because too many files have changed in this diff Show More