Compare commits

...

72 Commits

Author SHA1 Message Date
Alek Westover
9317aec580 not working 2023-07-19 11:21:56 -04:00
Alek Westover
f025f3adbf WIP 2023-07-19 10:35:29 -04:00
Alek Westover
b7a8eba4de revert to python script 2023-07-18 15:14:29 -04:00
Alek Westover
f8c69804f7 manually upload index file 2023-07-12 13:29:14 -04:00
Alek Westover
550d0ef946 get rid of container 2023-07-12 12:05:16 -04:00
Alek Westover
88c5c5b41e make it bash not python 2023-07-12 11:42:35 -04:00
Alek Westover
cc8bc91881 style python 2023-07-12 10:29:42 -04:00
Alek Westover
ebc48ed67c delete notes file 2023-07-12 10:26:44 -04:00
Alek Westover
a6a88d2b9f fix path 2023-07-12 10:24:30 -04:00
Alek Westover
7619a169c1 I guess poetry needs a containr to run int 2023-07-11 14:34:16 -04:00
Alek Westover
e9700d4022 fix style 2023-07-11 14:22:50 -04:00
Alek Westover
5d1c4d10fa I much preffer this organizational scheme 2023-07-11 14:21:06 -04:00
Alek Westover
4fecf82291 black style 2023-07-11 13:58:26 -04:00
Alek Westover
f37980a026 fix directory structure 2023-07-11 13:52:19 -04:00
Alek Westover
13fa5285af remove recursive flag 2023-07-11 12:53:12 -04:00
Alek Westover
f132904546 actually commnet out logic 2023-07-11 12:14:47 -04:00
Alek Westover
ad582470e4 fix bad thing 2023-07-11 10:47:22 -04:00
Alek Westover
de8b914bfe Merge branch 'zip_ext' of github.com:neondatabase/neon into zip_ext 2023-07-11 10:46:40 -04:00
Alek Westover
005e9eee3c upload files 2023-07-11 10:46:23 -04:00
Alek Westover
fdcbadd552 style fix 2023-07-10 18:47:12 -04:00
Alek Westover
e88a3ae640 Merge branch 'zip_ext' of github.com:neondatabase/neon into zip_ext 2023-07-10 18:42:55 -04:00
Alek Westover
8c634622cf try to put the files in the bucket 2023-07-10 18:42:45 -04:00
Alek Westover
7bfde488b6 fixed python script 2023-07-10 13:59:00 -04:00
Alek Westover
26a1daf1bf remember to create and upload index.json 2023-07-10 13:46:32 -04:00
Alek Westover
b06dcaa88a fix again 2023-07-10 13:13:46 -04:00
Alek Westover
4b96e19e44 move script file 2023-07-10 13:13:01 -04:00
Alek Westover
d33bfb130d fix pahs 2023-07-10 13:10:01 -04:00
Alek Westover
8a40044659 merge 2023-07-10 12:39:23 -04:00
Alek Westover
7ccdd0c14e fix typo 2023-07-10 12:38:22 -04:00
Alek Westover
ce14c50a4b try to create folder 2023-07-10 11:41:30 -04:00
Alek Westover
ed95dc1c7a try to do it with just copy-pasting a lot of code 2023-07-10 11:04:10 -04:00
Alek Westover
7f028266b7 control 2023-07-07 13:48:37 -04:00
Alek Westover
e308de8224 dont zip control files 2023-07-07 13:46:43 -04:00
Alek Westover
ae349c7874 fix syntax error 2023-07-07 13:40:55 -04:00
Alek Westover
807ca84e54 copy lib tar gz 2023-07-07 13:37:14 -04:00
Alek Westover
f07c03d5ff fix 2023-07-07 13:15:00 -04:00
Alek Westover
977572bff8 fix 2023-07-07 13:11:45 -04:00
Alek Westover
ed183d1720 change to targz 2023-07-07 11:24:30 -04:00
Alek Westover
afa797d534 zip anon extension. only upload zipped extensions 2023-07-07 11:14:03 -04:00
Dmitry Rodionov
20137d9588 Polish tracing helpers (#4651)
Context: comments here: https://github.com/neondatabase/neon/pull/4645
2023-07-06 19:49:14 +03:00
Arseny Sher
634be4f4e0 Fix async write in safekeepers.
General Rust Write trait semantics (as well as its async brother) is that write
definitely happens only after Write::flush(). This wasn't needed in sync where
rust write calls the syscall directly, but is required in async.

Also fix setting initial end_pos in walsender, sometimes it was from the future.

fixes https://github.com/neondatabase/neon/issues/4518
2023-07-06 19:56:28 +04:00
Alex Chi Z
d340cf3721 dump more info in layer map (#4567)
A simple commit extracted from
https://github.com/neondatabase/neon/pull/4539

This PR adds more info for layer dumps (is_delta, is_incremental, size).

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-07-06 18:21:45 +03:00
Konstantin Knizhnik
1741edf933 Pageserver reconnect v2 (#4519)
## Problem

Compute is not always able to reconnect to pages server.
First of all it caused by long time of restart of pageserver.
So number of attempts is increased from 5 (hardcoded) to 60 (GUC).
Also we do not perform flush after each command to increase performance
(it is especially critical for prefetch).
Unfortunately such pending flush makes it not possible to transparently
reconnect to restarted pageserver.
What we can do is to try to minimzie such probabilty.
Most likely broken connection will be detected in first sens command
after some idle period.
This is why max_flush_delay parameter is added which force flush to be
performed after first request after some idle period.

See #4497

## Summary of changes

Add neon.max_reconnect_attempts and neon.max_glush_delay GUCs which
contol when flush has to be done
and when it is possible to try to reconnect to page server


## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist
2023-07-06 12:47:43 +03:00
Joonas Koivunen
269e20aeab fix: filter out zero synthetic sizes (#4639)
Apparently sending the synthetic_size == 0 is causing problems, so
filter out sending zeros.

Slack discussion:
https://neondb.slack.com/archives/C03F5SM1N02/p1688574285718989?thread_ts=1687874910.681049&cid=C03F5SM1N02
2023-07-06 12:32:34 +03:00
Tomoka Hayashi
91435006bd Fix docker-compose file and document (#4621)
## Problem

- Running the command according to docker.md gives warning and error.
- Warning `permissions should be u=rw (0600) or less` is output when
executing `psql -h localhost -p 55433 -U cloud_admin`.
- `FATAL: password authentication failed for user "root”` is output in
compute logs.

## Summary of changes

- Add `$ chmod 600 ~/.pgpass` in docker.md to avoid warning.
- Add username (cloud_admin) to pg_isready command in docker-compose.yml
to avoid error.

---------

Co-authored-by: Tomoka Hayashi <tomoka.hayashi@ntt.com>
2023-07-06 10:11:24 +01:00
Dmitry Rodionov
b263510866 move some logical size bits to separate logical_size.rs 2023-07-06 11:58:41 +03:00
Dmitry Rodionov
e418fc6dc3 move some tracing related assertions to separate module for tenant and timeline 2023-07-06 11:58:41 +03:00
Dmitry Rodionov
434eaadbe3 Move uninitialized timeline from tenant.rs to timeline/uninit.rs 2023-07-06 11:58:41 +03:00
Alexander Bayandin
6fb7edf494 Compile pg_embedding extension (#4634)
```
CREATE EXTENSION embedding;
CREATE TABLE t (val real[]);
INSERT INTO t (val) VALUES ('{0,0,0}'), ('{1,2,3}'), ('{1,1,1}'), (NULL);
CREATE INDEX ON t USING hnsw (val) WITH (maxelements = 10, dims=3, m=3);
INSERT INTO t (val) VALUES (array[1,2,4]);

SELECT * FROM t ORDER BY val <-> array[3,3,3];
   val   
---------
 {1,2,3}
 {1,2,4}
 {1,1,1}
 {0,0,0}
 
(5 rows)
```
2023-07-05 18:40:25 +01:00
Christian Schwarz
505aa242ac page cache: add size metrics (#4629)
Make them a member of `struct PageCache` to prepare for a future
where there's no global state.
2023-07-05 15:36:42 +03:00
arpad-m
1c516906e7 Impl Display for LayerFileName and require it for Layer (#4630)
Does three things:

* add a `Display` impl for `LayerFileName` equal to the `short_id`
* based on that, replace the `Layer::short_id` function by a requirement
for a `Display` impl
* use that `Display` impl in the places where the `short_id` and `file_name()` functions were used instead

Fixes #4145
2023-07-05 14:27:50 +02:00
Christian Schwarz
7d7cd8375c callers of task_mgr::spawn: some top-level async blocks were missing tenant/timeline id (#4283)
Looking at logs from staging and prod, I found there are a bunch of log
lines without tenant / timeline context.

Manully walk through all task_mgr::spawn lines and fix that using the
least amount of work required.

While doing it, remove some redundant `shutting down` messages. 

refs https://github.com/neondatabase/neon/issues/4222
2023-07-05 14:04:05 +02:00
Vadim Kharitonov
c92b7543b5 Update pgvector to 0.4.4 (#4632)
After announcing `hnsw`, there is a hypothesis that the community will
start comparing it with `pgvector` by themselves. Therefore, let's have
an actual version of `pgvector` in Neon.
2023-07-05 13:39:51 +03:00
Stas Kelvich
dbf88cf2d7 Minimalistic pool for http endpoint compute connections (under opt-in flag)
Cache up to 20 connections per endpoint. Once all pooled connections
are used current implementation can open an extra connection, so the
maximum number of simultaneous connections is not enforced.

There are more things to do here, especially with background clean-up
of closed connections, and checks for transaction state. But current
implementation allows to check for smaller coonection latencies that
this cache should bring.
2023-07-05 12:00:03 +03:00
Konstantin Knizhnik
f1db87ac36 Check if there is enough memory for HNSW index (#4602)
## Problem

HNSW index is created in memory.
Try to prevent OOM by checking of available RAM.


## Summary of changes

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2023-07-05 11:40:38 +03:00
Christian Schwarz
3f9defbfb4 page cache: add access & hit rate metrics (#4628)
Co-authored-by: Dmitry Rodionov <dmitry@neon.tech>
2023-07-05 10:38:32 +02:00
bojanserafimov
c7143dbde6 compute_ctl: Fix misleading metric (#4608) 2023-07-04 19:07:36 -04:00
Stas Kelvich
cbf9a40889 Set a shorter timeout for the initial connection attempti in proxy.
In case we try to connect to an outdated address that is no longer valid, the
default behavior of Kubernetes is to drop the packets, causing us to wait for
the entire timeout period. We want to fail fast in such cases.

A specific case to consider is when we have cached compute node information
with a 5-minute TTL (Time To Live), but the user has executed a `/suspend` API
call, resulting in the nonexistence of the compute node.
2023-07-04 20:34:22 +03:00
Joonas Koivunen
10aba174c9 metrics: Remove comments regarding upgradeable rwlocks (#4622)
Closes #4001 by removing the comments alluding towards
upgradeable/downgradeable RwLocks.
2023-07-04 17:40:51 +03:00
Conrad Ludgate
ab2ea8cfa5 use pbkdf2 crate (#4626)
## Problem

While pbkdf2 is a simple algorithm, we should probably use a well tested
implementation

## Summary of changes

* Use pbkdf2 crate
* Use arrays like the hmac comment says

## Checklist before requesting a review

- [X] I have performed a self-review of my code.
- [X] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.
2023-07-04 14:54:59 +01:00
arpad-m
9c8c55e819 Add _cached and _bytes to pageserver_tenant_synthetic_size metric name (#4616)
This renames the `pageserver_tenant_synthetic_size` metric to
`pageserver_tenant_synthetic_cached_size_bytes`, as was requested on
slack (link in the linked issue).

* `_cached` to hint that it is not incrementally calculated
* `_bytes` to indicate the unit the size is measured in

Fixes #3748
2023-07-03 19:34:07 +02:00
Conrad Ludgate
10110bee69 fix setup instructions (#4615)
## Problem

1. The local endpoints provision 2 ports (postgres and HTTP) which means
the migration_check endpoint has a different port than what the setup
implies
2. psycopg2-binary 2.9.3 has a deprecated poetry config and doesn't
install.

## Summary of changes

Update psycopg2-binary and update the endpoint ports in the readme

---------

Co-authored-by: Alexander Bayandin <alexander@neon.tech>
2023-07-03 18:10:44 +03:00
Joonas Koivunen
cff7ae0b0d fix: no more ansi colored logs (#4613)
Allure does not support ansi colored logs, yet `compute_ctl` has them.

Upgrade criterion to get rid of atty dependency, disable ansi colors,
remove atty dependency and disable ansi feature of tracing-subscriber.

This is a heavy-handed approach. I am not aware of a workflow where
you'd want to connect a terminal directly to for example `compute_ctl`,
usually you find the logs in a file. If someone had been using colors,
they will now need to:
- turn the `tracing-subscriber.default-features` to `true`
- edit their wanted project to have colors

I decided to explicitly disable ansi colors in case we would have in
future a dependency accidentally enabling the feature on
`tracing-subscriber`, which would be quite surprising but not
unimagineable.

By getting rid of `atty` from dependencies we get rid of
<https://github.com/advisories/GHSA-g98v-hv3f-hcfr>.
2023-07-03 16:37:02 +03:00
Alexander Bayandin
78a7f68902 Make pg_version and build_type regular parameters (#4311)
## Problem

All tests have already been parametrised by Postgres version and build
type (to have them distinguishable in the Allure report), but despite
it, it's anyway required to have DEFAULT_PG_VERSION and BUILD_TYPE env
vars set to corresponding values, for example to
run`test_timeline_deletion_with_files_stuck_in_upload_queue[release-pg14-local_fs]`
test it's required to set `DEFAULT_PG_VERSION=14` and
`BUILD_TYPE=release`.
This PR makes the test framework pick up parameters from the test name
itself.

## Summary of changes
- Postgres version and build type related fixtures now are
function-scoped (instead of being sessions scoped before)
- Deprecate `--pg-version` argument in favour of DEFAULT_PG_VERSION env
variable (it's easier to parse)
- GitHub autocomment now includes only one command with all the failed
tests + runs them in parallel
2023-07-03 13:51:40 +01:00
Christian Schwarz
24eaa3b7ca timeline creation: reflect failures due to ancestor LSN issues in status code (#4600)
Before, it was a `500` and control plane would retry, wereas it actually
should have stopped retrying.

(Stacked on top of https://github.com/neondatabase/neon/pull/4597 )

fixes https://github.com/neondatabase/neon/issues/4595
part of https://github.com/neondatabase/cloud/issues/5626

---------

Co-authored-by: Shany Pozin <shany@neon.tech>
2023-07-03 15:21:10 +03:00
Shany Pozin
26828560a8 Add timeouts and retries to consumption metrics reporting client (#4563)
## Problem
#4528
## Summary of changes
Add a 60 seconds default timeout to the reqwest client 
Add retries for up to 3 times to call into the metric consumption
endpoint

---------

Co-authored-by: Christian Schwarz <christian@neon.tech>
2023-07-03 15:20:49 +03:00
Alek Westover
86604b3b7d Delete Unnecessary files in Extension Bucket (#4606)
Co-authored-by: Alexander Bayandin <alexander@neon.tech>
2023-07-03 07:37:30 -04:00
Joonas Koivunen
4957bb2d48 fix(proxy): stray span enter globbers up logs (#4612)
Prod logs have deep accidential span nesting. Introduced in #3759 and
has been untouched since, maybe no one watches proxy logs? :) I found it
by accident when looking to see if proxy logs have ansi colors with
`{neon_service="proxy"}`.

The solution is to mostly stop using `Span::enter` or `Span::entered` in
async code. Kept on `Span::entered` in cancel on shutdown related path.
2023-07-03 11:53:57 +01:00
Sasha Krassovsky
ff1a1aea86 Make control plane connector send encrypted password (#4607)
Control plane needs the encrypted hash that postgres itself generates
2023-06-30 14:17:44 -07:00
Em Sharnoff
c9f05d418d Bump vm-builder v0.11.0 -> v0.11.1 (#4605)
This applies the fix from https://github.com/neondatabase/autoscaling/pull/367,
which should resolve the "leaking cloud_admin connections" issue that
has been  observed for some customers.
2023-06-30 23:49:06 +03:00
bojanserafimov
9de1a6fb14 cold starts: Run sync_safekeepers on compute_ctl shutdown (#4588) 2023-06-30 16:29:47 -04:00
Konstantin Knizhnik
fbd37740c5 Make file_cache logging less verbose (#4601)
## Problem


Message "set local file cache limit to..." polutes compute logs.

## Summary of changes

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2023-06-30 21:55:56 +03:00
82 changed files with 1988 additions and 1086 deletions

View File

@@ -21,4 +21,5 @@
!workspace_hack/
!neon_local/
!scripts/ninstall.sh
!scripts/combine_control_files.py
!vm-cgconfig.conf

View File

@@ -767,7 +767,7 @@ jobs:
run:
shell: sh -eu {0}
env:
VM_BUILDER_VERSION: v0.11.0
VM_BUILDER_VERSION: v0.11.1
steps:
- name: Checkout
@@ -917,9 +917,9 @@ jobs:
run: rm -rf ~/.ecr
upload-postgres-extensions-to-s3:
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
# if: |
# (github.ref_name == 'main' || github.ref_name == 'release') &&
# github.event_name != 'workflow_dispatch'
runs-on: ${{ github.ref_name == 'release' && fromJSON('["self-hosted", "prod", "x64"]') || fromJSON('["self-hosted", "gen3", "small"]') }}
needs: [ tag, promote-images ]
strategy:
@@ -929,7 +929,7 @@ jobs:
env:
# While on transition period we extract public extensions from compute-node image and custom extensions from extensions image.
# Later all the extensions will be moved to extensions image.
# Later all the extensions will be moved to extensions image. (unless that is too slow)
EXTENSIONS_IMAGE: ${{ github.ref_name == 'release' && '093970136003' || '369495373322'}}.dkr.ecr.eu-central-1.amazonaws.com/extensions-${{ matrix.version }}:latest
COMPUTE_NODE_IMAGE: ${{ github.ref_name == 'release' && '093970136003' || '369495373322'}}.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:latest
AWS_ACCESS_KEY_ID: ${{ github.ref_name == 'release' && secrets.AWS_ACCESS_KEY_PROD || secrets.AWS_ACCESS_KEY_DEV }}
@@ -956,29 +956,23 @@ jobs:
- name: Extract postgres-extensions from container
run: |
rm -rf ./extensions-to-upload ./custom-extensions # Just in case
rm -rf ./extensions ./control_files # Just in case
mkdir extensions
mkdir control_files
# In compute image we have a bit different directory layout
mkdir -p extensions-to-upload/share
docker cp ${{ steps.create-container.outputs.CID }}:/usr/local/share/extension ./extensions-to-upload/share/extension
docker cp ${{ steps.create-container.outputs.CID }}:/usr/local/lib ./extensions-to-upload/lib
# TODO: Delete Neon extensitons (they always present on compute-node image)
# rm -rf ./extensions/share/extension/neon*
# rm -rf ./extensions/lib/neon*
# Delete Neon extensitons (they always present on compute-node image)
rm -rf ./extensions-to-upload/share/extension/neon*
rm -rf ./extensions-to-upload/lib/neon*
docker cp ${{ steps.create-container.outputs.EID }}:/extensions ./custom-extensions
for EXT_NAME in $(ls ./custom-extensions); do
mkdir -p ./extensions-to-upload/${EXT_NAME}/share
mv ./custom-extensions/${EXT_NAME}/share/extension ./extensions-to-upload/${EXT_NAME}/share/extension
mv ./custom-extensions/${EXT_NAME}/lib ./extensions-to-upload/${EXT_NAME}/lib
done
docker cp ${{ steps.create-container.outputs.EID }}:/ ./
# sh ./scripts/combine_control_files.sh
echo '{ "enabled_extensions": { "123454321": [ "anon" ], "public": [ "embedding" ] }, "control_data": { "embedding": "comment = 'hnsw index' \ndefault_version = '0.1.0' \nmodule_pathname = '$libdir/embedding' \nrelocatable = true \ntrusted = true", "anon": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n" } }' > ext_index.json
- name: Upload postgres-extensions to S3
run: |
for BUCKET in $(echo ${S3_BUCKETS}); do
aws s3 cp --recursive --only-show-errors ./extensions-to-upload s3://${BUCKET}/${{ needs.tag.outputs.build-tag }}/${{ matrix.version }}
aws s3 cp --recursive --only-show-errors ./extensions s3://${BUCKET}/${{ needs.tag.outputs.build-tag }}/${{ matrix.version }}
aws s3 cp --only-show-errors ./ext_index.json s3://${BUCKET}/${{ needs.tag.outputs.build-tag }}/${{ matrix.version }}
done
- name: Cleanup

209
Cargo.lock generated
View File

@@ -200,17 +200,6 @@ dependencies = [
"critical-section",
]
[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi 0.1.19",
"libc",
"winapi",
]
[[package]]
name = "autocfg"
version = "1.1.0"
@@ -805,18 +794,6 @@ dependencies = [
"libloading",
]
[[package]]
name = "clap"
version = "3.2.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ea181bf566f71cb9a5d17a59e1871af638180a18fb0035c92ae62b705207123"
dependencies = [
"bitflags",
"clap_lex 0.2.4",
"indexmap",
"textwrap",
]
[[package]]
name = "clap"
version = "4.3.0"
@@ -837,7 +814,7 @@ dependencies = [
"anstream",
"anstyle",
"bitflags",
"clap_lex 0.5.0",
"clap_lex",
"strsim",
]
@@ -853,15 +830,6 @@ dependencies = [
"syn 2.0.16",
]
[[package]]
name = "clap_lex"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2850f2f5a82cbf437dd5af4d49848fbdfc27c157c3d010345776f952765261c5"
dependencies = [
"os_str_bytes",
]
[[package]]
name = "clap_lex"
version = "0.5.0"
@@ -915,7 +883,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"clap 4.3.0",
"clap",
"compute_api",
"futures",
"hyper",
@@ -977,7 +945,7 @@ name = "control_plane"
version = "0.1.0"
dependencies = [
"anyhow",
"clap 4.3.0",
"clap",
"comfy-table",
"compute_api",
"git-version",
@@ -1047,19 +1015,19 @@ dependencies = [
[[package]]
name = "criterion"
version = "0.4.0"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7c76e09c1aae2bc52b3d2f29e13c6572553b30c4aa1b8a49fd70de6412654cb"
checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f"
dependencies = [
"anes",
"atty",
"cast",
"ciborium",
"clap 3.2.25",
"clap",
"criterion-plot",
"is-terminal",
"itertools",
"lazy_static",
"num-traits",
"once_cell",
"oorandom",
"plotters",
"rayon",
@@ -1140,7 +1108,7 @@ dependencies = [
"crossterm_winapi",
"libc",
"mio",
"parking_lot",
"parking_lot 0.12.1",
"signal-hook",
"signal-hook-mio",
"winapi",
@@ -1210,7 +1178,7 @@ dependencies = [
"hashbrown 0.12.3",
"lock_api",
"once_cell",
"parking_lot_core",
"parking_lot_core 0.9.7",
]
[[package]]
@@ -1676,15 +1644,6 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
[[package]]
name = "hermit-abi"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33"
dependencies = [
"libc",
]
[[package]]
name = "hermit-abi"
version = "0.2.6"
@@ -1939,6 +1898,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c"
dependencies = [
"cfg-if",
"js-sys",
"wasm-bindgen",
"web-sys",
]
[[package]]
@@ -2267,16 +2229,6 @@ dependencies = [
"windows-sys 0.45.0",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num-bigint"
version = "0.4.3"
@@ -2504,31 +2456,19 @@ dependencies = [
"winapi",
]
[[package]]
name = "os_str_bytes"
version = "6.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ceedf44fb00f2d1984b0bc98102627ce622e083e49a5bacdb3e514fa4238e267"
[[package]]
name = "outref"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "pagectl"
version = "0.1.0"
dependencies = [
"anyhow",
"bytes",
"clap 4.3.0",
"clap",
"git-version",
"pageserver",
"postgres_ffi",
@@ -2547,7 +2487,7 @@ dependencies = [
"byteorder",
"bytes",
"chrono",
"clap 4.3.0",
"clap",
"close_fds",
"const_format",
"consumption_metrics",
@@ -2629,6 +2569,17 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "parking_lot"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
dependencies = [
"instant",
"lock_api",
"parking_lot_core 0.8.6",
]
[[package]]
name = "parking_lot"
version = "0.12.1"
@@ -2636,7 +2587,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
"parking_lot_core",
"parking_lot_core 0.9.7",
]
[[package]]
name = "parking_lot_core"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc"
dependencies = [
"cfg-if",
"instant",
"libc",
"redox_syscall 0.2.16",
"smallvec",
"winapi",
]
[[package]]
@@ -2652,6 +2617,16 @@ dependencies = [
"windows-sys 0.45.0",
]
[[package]]
name = "pbkdf2"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0ca0b5a68607598bf3bad68f32227a8164f6254833f84eafaac409cd6746c31"
dependencies = [
"digest",
"hmac",
]
[[package]]
name = "peeking_take_while"
version = "0.1.2"
@@ -2957,7 +2932,7 @@ dependencies = [
"lazy_static",
"libc",
"memchr",
"parking_lot",
"parking_lot 0.12.1",
"procfs",
"thiserror",
]
@@ -3022,12 +2997,11 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"atty",
"base64 0.13.1",
"bstr",
"bytes",
"chrono",
"clap 4.3.0",
"clap",
"consumption_metrics",
"futures",
"git-version",
@@ -3045,7 +3019,8 @@ dependencies = [
"native-tls",
"once_cell",
"opentelemetry",
"parking_lot",
"parking_lot 0.12.1",
"pbkdf2",
"pin-project-lite",
"postgres-native-tls",
"postgres_backend",
@@ -3056,6 +3031,7 @@ dependencies = [
"regex",
"reqwest",
"reqwest-middleware",
"reqwest-retry",
"reqwest-tracing",
"routerify",
"rstest",
@@ -3291,6 +3267,29 @@ dependencies = [
"thiserror",
]
[[package]]
name = "reqwest-retry"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48d0fd6ef4c6d23790399fe15efc8d12cd9f3d4133958f9bd7801ee5cbaec6c4"
dependencies = [
"anyhow",
"async-trait",
"chrono",
"futures",
"getrandom",
"http",
"hyper",
"parking_lot 0.11.2",
"reqwest",
"reqwest-middleware",
"retry-policies",
"task-local-extensions",
"tokio",
"tracing",
"wasm-timer",
]
[[package]]
name = "reqwest-tracing"
version = "0.4.4"
@@ -3309,6 +3308,17 @@ dependencies = [
"tracing-opentelemetry",
]
[[package]]
name = "retry-policies"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e09bbcb5003282bcb688f0bae741b278e9c7e8f378f561522c9806c58e075d9b"
dependencies = [
"anyhow",
"chrono",
"rand",
]
[[package]]
name = "ring"
version = "0.16.20"
@@ -3507,7 +3517,7 @@ dependencies = [
"byteorder",
"bytes",
"chrono",
"clap 4.3.0",
"clap",
"const_format",
"crc32c",
"fs2",
@@ -3518,7 +3528,7 @@ dependencies = [
"hyper",
"metrics",
"once_cell",
"parking_lot",
"parking_lot 0.12.1",
"postgres",
"postgres-protocol",
"postgres_backend",
@@ -3937,7 +3947,7 @@ dependencies = [
"anyhow",
"async-stream",
"bytes",
"clap 4.3.0",
"clap",
"const_format",
"futures",
"futures-core",
@@ -3947,7 +3957,7 @@ dependencies = [
"hyper",
"metrics",
"once_cell",
"parking_lot",
"parking_lot 0.12.1",
"prost",
"tokio",
"tokio-stream",
@@ -4118,12 +4128,6 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "textwrap"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d"
[[package]]
name = "thiserror"
version = "1.0.40"
@@ -4281,7 +4285,7 @@ dependencies = [
"futures-channel",
"futures-util",
"log",
"parking_lot",
"parking_lot 0.12.1",
"percent-encoding",
"phf",
"pin-project-lite",
@@ -4539,7 +4543,7 @@ name = "trace"
version = "0.1.0"
dependencies = [
"anyhow",
"clap 4.3.0",
"clap",
"pageserver_api",
"utils",
"workspace_hack",
@@ -4641,7 +4645,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"serde",
@@ -4810,7 +4813,6 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"atty",
"bincode",
"byteorder",
"bytes",
@@ -4887,7 +4889,7 @@ name = "wal_craft"
version = "0.1.0"
dependencies = [
"anyhow",
"clap 4.3.0",
"clap",
"env_logger",
"log",
"once_cell",
@@ -4991,6 +4993,21 @@ version = "0.2.86"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed9d5b4305409d1fc9482fee2d7f9bcbf24b3972bf59817ef757e23982242a93"
[[package]]
name = "wasm-timer"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be0ecb0db480561e9a7642b5d3e4187c128914e58aa84330b9493e3eb68c5e7f"
dependencies = [
"futures",
"js-sys",
"parking_lot 0.11.2",
"pin-utils",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]]
name = "web-sys"
version = "0.3.63"
@@ -5252,7 +5269,7 @@ dependencies = [
"anyhow",
"bytes",
"chrono",
"clap 4.3.0",
"clap",
"clap_builder",
"crossbeam-utils",
"either",

View File

@@ -34,7 +34,6 @@ license = "Apache-2.0"
anyhow = { version = "1.0", features = ["backtrace"] }
async-stream = "0.3"
async-trait = "0.1"
atty = "0.2.14"
aws-config = { version = "0.55", default-features = false, features=["rustls"] }
aws-sdk-s3 = "0.27"
aws-smithy-http = "0.55"
@@ -87,6 +86,7 @@ opentelemetry = "0.18.0"
opentelemetry-otlp = { version = "0.11.0", default_features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-semantic-conventions = "0.10.0"
parking_lot = "0.12"
pbkdf2 = "0.12.1"
pin-project-lite = "0.2"
prometheus = {version = "0.13", default_features=false, features = ["process"]} # removes protobuf dependency
prost = "0.11"
@@ -95,6 +95,7 @@ regex = "1.4"
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"] }
reqwest-tracing = { version = "0.4.0", features = ["opentelemetry_0_18"] }
reqwest-middleware = "0.2.0"
reqwest-retry = "0.2.2"
routerify = "3"
rpds = "0.13"
rustls = "0.20"
@@ -128,7 +129,7 @@ tonic = {version = "0.9", features = ["tls", "tls-roots"]}
tracing = "0.1"
tracing-error = "0.2.0"
tracing-opentelemetry = "0.18.0"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-subscriber = { version = "0.3", default_features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter"] }
url = "2.2"
uuid = { version = "1.2", features = ["v4", "serde"] }
walkdir = "2.3.2"
@@ -170,7 +171,7 @@ utils = { version = "0.1", path = "./libs/utils/" }
workspace_hack = { version = "0.1", path = "./workspace_hack/" }
## Build dependencies
criterion = "0.4"
criterion = "0.5.1"
rcgen = "0.10"
rstest = "0.17"
tempfile = "3.4"

View File

@@ -189,8 +189,8 @@ RUN wget https://github.com/df7cb/postgresql-unit/archive/refs/tags/7.7.tar.gz -
FROM build-deps AS vector-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.4.0.tar.gz -O pgvector.tar.gz && \
echo "b76cf84ddad452cc880a6c8c661d137ddd8679c000a16332f4f03ecf6e10bcc8 pgvector.tar.gz" | sha256sum --check && \
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.4.4.tar.gz -O pgvector.tar.gz && \
echo "1cb70a63f8928e396474796c22a20be9f7285a8a013009deb8152445b61b72e6 pgvector.tar.gz" | sha256sum --check && \
mkdir pgvector-src && cd pgvector-src && tar xvzf ../pgvector.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -515,6 +515,31 @@ RUN wget https://github.com/ChenHuajun/pg_roaringbitmap/archive/refs/tags/v0.5.4
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/roaringbitmap.control
#########################################################################################
#
# Layer "pg-embedding-pg-build"
# compile pg_embedding extension
#
#########################################################################################
FROM build-deps AS pg-embedding-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ENV PATH "/usr/local/pgsql/bin/:$PATH"
# 2465f831ea1f8d49c1d74f8959adb7fc277d70cd made on 05/07/2023
# There is no release tag yet
RUN wget https://github.com/neondatabase/pg_embedding/archive/2465f831ea1f8d49c1d74f8959adb7fc277d70cd.tar.gz -O pg_embedding.tar.gz && \
echo "047af2b1f664a1e6e37867bd4eeaf5934fa27d6ba3d6c4461efa388ddf7cd1d5 pg_embedding.tar.gz" | sha256sum --check && \
mkdir pg_embedding-src && cd pg_embedding-src && tar xvzf ../pg_embedding.tar.gz --strip-components=1 -C . && \
find /usr/local/pgsql -type f | sort > /before_embedding.txt && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/embedding.control &&\
find /usr/local/pgsql -type f | sort > /after_embedding.txt &&\
/bin/bash -c 'for from in $(comm -13 /before_embedding.txt /after_embedding.txt); do to=/extensions/embedding/${from:17} && mkdir -p $(dirname ${to}) && cp -a ${from} ${to}; done' && \
tar -zcvf /extensions/embedding.tar.gz /extensions/embedding && \
mkdir -p /control_files &&\
cp /usr/local/pgsql/share/extension/embedding.control /control_files/embedding.control
#########################################################################################
#
# Layer "pg-anon-pg-build"
@@ -533,7 +558,15 @@ RUN wget https://gitlab.com/dalibo/postgresql_anonymizer/-/archive/1.1.0/postgre
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/anon.control && \
find /usr/local/pgsql -type f | sort > /after.txt && \
/bin/bash -c 'for from in $(comm -13 /before.txt /after.txt); do to=/extensions/anon/${from:17} && mkdir -p $(dirname ${to}) && cp -a ${from} ${to}; done'
/bin/bash -c 'for from in $(comm -13 /before.txt /after.txt); do to=/extensions/anon/${from:17} && mkdir -p $(dirname ${to}) && cp -a ${from} ${to}; done' && \
mkdir /control_files &&\
tar -zcvf /extensions/anon.tar.gz /extensions/anon && \
mkdir -p /control_files &&\
cp /usr/local/pgsql/share/extension/anon.control /control_files/anon.control
# # TODO: Delete leftovers from the extension build step
# rm -rf ./extensions/lib/pgxs
# rm -rf ./extensions/lib/pkgconfig
#########################################################################################
#
@@ -671,6 +704,7 @@ COPY --from=pg-pgx-ulid-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=rdkit-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-uuidv7-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-roaringbitmap-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-embedding-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \
@@ -721,19 +755,24 @@ RUN rm /usr/local/pgsql/lib/lib*.a
#########################################################################################
#
# Extenstion only
# Extension only
#
#########################################################################################
FROM scratch AS postgres-extensions
# After the transition this layer will include all extensitons.
# As for now, it's only for new custom ones
#
# # Default extensions
# COPY --from=postgres-cleanup-layer /usr/local/pgsql/share/extension /usr/local/pgsql/share/extension
# COPY --from=postgres-cleanup-layer /usr/local/pgsql/lib /usr/local/pgsql/lib
# Custom extensions
COPY --from=pg-anon-pg-build /extensions/anon/lib/ /extensions/anon/lib
COPY --from=pg-anon-pg-build /extensions/anon/share/extension /extensions/anon/share/extension
# extensions
COPY --from=pg-anon-pg-build /extensions/anon.tar.gz /extensions/anon.tar.gz
COPY --from=pg-anon-pg-build /control_files/anon.control /control_files/anon.control
COPY --from=pg-embedding-pg-build /extensions/embedding.tar.gz /extensions/embedding.tar.gz
COPY --from=pg-embedding-pg-build /control_files/embedding.control /control_files/embedding.control
FROM python:3.11-slim-bookworm AS alekpython
COPY scripts/combine_control_files.py combine_control_files.py
COPY --from=postgres-extensions /control_files /control_files
CMD [ "python3", "./combine_control_files.py"]
#########################################################################################
#

View File

@@ -132,13 +132,13 @@ Python (3.9 or higher), and install python3 packages using `./scripts/pysync` (r
# Create repository in .neon with proper paths to binaries and data
# Later that would be responsibility of a package install script
> cargo neon init
Starting pageserver at '127.0.0.1:64000' in '.neon'.
Initializing pageserver node 1 at '127.0.0.1:64000' in ".neon"
# start pageserver, safekeeper, and broker for their intercommunication
> cargo neon start
Starting neon broker at 127.0.0.1:50051
Starting neon broker at 127.0.0.1:50051.
storage_broker started, pid: 2918372
Starting pageserver at '127.0.0.1:64000' in '.neon'.
Starting pageserver node 1 at '127.0.0.1:64000' in ".neon".
pageserver started, pid: 2918386
Starting safekeeper at '127.0.0.1:5454' in '.neon/safekeepers/sk1'.
safekeeper 1 started, pid: 2918437
@@ -152,8 +152,7 @@ Setting tenant 9ef87a5bf0d92544f6fafeeb3239695c as a default one
# start postgres compute node
> cargo neon endpoint start main
Starting new endpoint main (PostgreSQL v14) on timeline de200bd42b49cc1814412c7e592dd6e9 ...
Extracting base backup to create postgres instance: path=.neon/pgdatadirs/tenants/9ef87a5bf0d92544f6fafeeb3239695c/main port=55432
Starting postgres at 'host=127.0.0.1 port=55432 user=cloud_admin dbname=postgres'
Starting postgres at 'postgresql://cloud_admin@127.0.0.1:55432/postgres'
# check list of running postgres instances
> cargo neon endpoint list
@@ -189,18 +188,17 @@ Created timeline 'b3b863fa45fa9e57e615f9f2d944e601' at Lsn 0/16F9A00 for tenant:
# start postgres on that branch
> cargo neon endpoint start migration_check --branch-name migration_check
Starting new endpoint migration_check (PostgreSQL v14) on timeline b3b863fa45fa9e57e615f9f2d944e601 ...
Extracting base backup to create postgres instance: path=.neon/pgdatadirs/tenants/9ef87a5bf0d92544f6fafeeb3239695c/migration_check port=55433
Starting postgres at 'host=127.0.0.1 port=55433 user=cloud_admin dbname=postgres'
Starting postgres at 'postgresql://cloud_admin@127.0.0.1:55434/postgres'
# check the new list of running postgres instances
> cargo neon endpoint list
ENDPOINT ADDRESS TIMELINE BRANCH NAME LSN STATUS
main 127.0.0.1:55432 de200bd42b49cc1814412c7e592dd6e9 main 0/16F9A38 running
migration_check 127.0.0.1:55433 b3b863fa45fa9e57e615f9f2d944e601 migration_check 0/16F9A70 running
migration_check 127.0.0.1:55434 b3b863fa45fa9e57e615f9f2d944e601 migration_check 0/16F9A70 running
# this new postgres instance will have all the data from 'main' postgres,
# but all modifications would not affect data in original postgres
> psql -p55433 -h 127.0.0.1 -U cloud_admin postgres
> psql -p55434 -h 127.0.0.1 -U cloud_admin postgres
postgres=# select * from t;
key | value
-----+-------

View File

@@ -256,6 +256,16 @@ fn main() -> Result<()> {
exit_code = ecode.code()
}
// Maybe sync safekeepers again, to speed up next startup
let compute_state = compute.state.lock().unwrap().clone();
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
if matches!(pspec.spec.mode, compute_api::spec::ComputeMode::Primary) {
info!("syncing safekeepers on shutdown");
let storage_auth_token = pspec.storage_auth_token.clone();
let lsn = compute.sync_safekeepers(storage_auth_token)?;
info!("synced safekeepers at lsn {lsn}");
}
if let Err(err) = compute.check_for_core_dumps() {
error!("error while checking for core dumps: {err:?}");
}

View File

@@ -278,7 +278,7 @@ impl ComputeNode {
// Run `postgres` in a special mode with `--sync-safekeepers` argument
// and return the reported LSN back to the caller.
#[instrument(skip_all)]
fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
pub fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
let start_time = Utc::now();
let sync_handle = Command::new(&self.pgbin)
@@ -516,9 +516,9 @@ impl ComputeNode {
self.prepare_pgdata(&compute_state)?;
let start_time = Utc::now();
let pg = self.start_postgres(pspec.storage_auth_token.clone())?;
let config_time = Utc::now();
if pspec.spec.mode == ComputeMode::Primary && !pspec.spec.skip_pg_catalog_updates {
self.apply_config(&compute_state)?;
}
@@ -526,11 +526,16 @@ impl ComputeNode {
let startup_end_time = Utc::now();
{
let mut state = self.state.lock().unwrap();
state.metrics.config_ms = startup_end_time
state.metrics.start_postgres_ms = config_time
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64;
state.metrics.config_ms = startup_end_time
.signed_duration_since(config_time)
.to_std()
.unwrap()
.as_millis() as u64;
state.metrics.total_startup_ms = startup_end_time
.signed_duration_since(compute_state.start_time)
.to_std()

View File

@@ -18,6 +18,7 @@ pub fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result<()> {
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_log_level));
let fmt_layer = tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_target(false)
.with_writer(std::io::stderr);

View File

@@ -180,6 +180,11 @@ pub fn stop_process(immediate: bool, process_name: &str, pid_file: &Path) -> any
}
// Wait until process is gone
wait_until_stopped(process_name, pid)?;
Ok(())
}
pub fn wait_until_stopped(process_name: &str, pid: Pid) -> anyhow::Result<()> {
for retries in 0..RETRIES {
match process_has_stopped(pid) {
Ok(true) => {

View File

@@ -405,6 +405,16 @@ impl Endpoint {
String::from_utf8_lossy(&pg_ctl.stderr),
);
}
// Also wait for the compute_ctl process to die. It might have some cleanup
// work to do after postgres stops, like syncing safekeepers, etc.
//
// TODO use background_process::stop_process instead
let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
let pid: u32 = std::fs::read_to_string(pidfile_path)?.parse()?;
let pid = nix::unistd::Pid::from_raw(pid as i32);
crate::background_process::wait_until_stopped("compute_ctl", pid)?;
Ok(())
}
@@ -507,7 +517,13 @@ impl Endpoint {
.stdin(std::process::Stdio::null())
.stderr(logfile.try_clone()?)
.stdout(logfile);
let _child = cmd.spawn()?;
let child = cmd.spawn()?;
// Write down the pid so we can wait for it when we want to stop
// TODO use background_process::start_process instead
let pid = child.id();
let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
std::fs::write(pidfile_path, pid.to_string())?;
// Wait for it to start
let mut attempt = 0;

View File

@@ -189,7 +189,7 @@ services:
- "/bin/bash"
- "-c"
command:
- "until pg_isready -h compute -p 55433 ; do
- "until pg_isready -h compute -p 55433 -U cloud_admin ; do
echo 'Waiting to start compute...' && sleep 1;
done"
depends_on:

View File

@@ -48,6 +48,7 @@ Creating docker-compose_storage_broker_1 ... done
2. connect compute node
```
$ echo "localhost:55433:postgres:cloud_admin:cloud_admin" >> ~/.pgpass
$ chmod 600 ~/.pgpass
$ psql -h localhost -p 55433 -U cloud_admin
postgres=# CREATE TABLE t(key int primary key, value text);
CREATE TABLE

View File

@@ -71,6 +71,7 @@ pub struct ComputeMetrics {
pub wait_for_spec_ms: u64,
pub sync_safekeepers_ms: u64,
pub basebackup_ms: u64,
pub start_postgres_ms: u64,
pub config_ms: u64,
pub total_startup_ms: u64,
}

View File

@@ -5,7 +5,6 @@ edition.workspace = true
license.workspace = true
[dependencies]
atty.workspace = true
sentry.workspace = true
async-trait.workspace = true
anyhow.workspace = true

View File

@@ -84,7 +84,7 @@ pub fn init(
let r = r.with({
let log_layer = tracing_subscriber::fmt::layer()
.with_target(false)
.with_ansi(atty::is(atty::Stream::Stdout))
.with_ansi(false)
.with_writer(std::io::stdout);
let log_layer = match log_format {
LogFormat::Json => log_layer.json().boxed(),

View File

@@ -24,6 +24,8 @@ const RESIDENT_SIZE: &str = "resident_size";
const REMOTE_STORAGE_SIZE: &str = "remote_storage_size";
const TIMELINE_LOGICAL_SIZE: &str = "timeline_logical_size";
const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
#[serde_as]
#[derive(Serialize, Debug)]
struct Ids {
@@ -73,7 +75,10 @@ pub async fn collect_metrics(
);
// define client here to reuse it for all requests
let client = reqwest::Client::new();
let client = reqwest::ClientBuilder::new()
.timeout(DEFAULT_HTTP_REPORTING_TIMEOUT)
.build()
.expect("Failed to create http client with timeout");
let mut cached_metrics: HashMap<PageserverConsumptionMetricsKey, u64> = HashMap::new();
let mut prev_iteration_time: std::time::Instant = std::time::Instant::now();
@@ -83,7 +88,7 @@ pub async fn collect_metrics(
info!("collect_metrics received cancellation request");
return Ok(());
},
_ = ticker.tick() => {
tick_at = ticker.tick() => {
// send cached metrics every cached_metric_collection_interval
let send_cached = prev_iteration_time.elapsed() >= cached_metric_collection_interval;
@@ -93,6 +98,12 @@ pub async fn collect_metrics(
}
collect_metrics_iteration(&client, &mut cached_metrics, metric_collection_endpoint, node_id, &ctx, send_cached).await;
crate::tenant::tasks::warn_when_period_overrun(
tick_at.elapsed(),
metric_collection_interval,
"consumption_metrics_collect_metrics",
);
}
}
}
@@ -223,14 +234,18 @@ pub async fn collect_metrics_iteration(
// Note that this metric is calculated in a separate bgworker
// Here we only use cached value, which may lag behind the real latest one
let tenant_synthetic_size = tenant.get_cached_synthetic_size();
current_metrics.push((
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: None,
metric: SYNTHETIC_STORAGE_SIZE,
},
tenant_synthetic_size,
));
if tenant_synthetic_size != 0 {
// only send non-zeroes because otherwise these show up as errors in logs
current_metrics.push((
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: None,
metric: SYNTHETIC_STORAGE_SIZE,
},
tenant_synthetic_size,
));
}
}
// Filter metrics, unless we want to send all metrics, including cached ones.
@@ -273,31 +288,42 @@ pub async fn collect_metrics_iteration(
})
.expect("PageserverConsumptionMetric should not fail serialization");
let res = client
.post(metric_collection_endpoint.clone())
.json(&chunk_json)
.send()
.await;
const MAX_RETRIES: u32 = 3;
match res {
Ok(res) => {
if res.status().is_success() {
// update cached metrics after they were sent successfully
for (curr_key, curr_val) in chunk.iter() {
cached_metrics.insert(curr_key.clone(), *curr_val);
}
} else {
error!("metrics endpoint refused the sent metrics: {:?}", res);
for metric in chunk_to_send.iter() {
// Report if the metric value is suspiciously large
if metric.value > (1u64 << 40) {
for attempt in 0..MAX_RETRIES {
let res = client
.post(metric_collection_endpoint.clone())
.json(&chunk_json)
.send()
.await;
match res {
Ok(res) => {
if res.status().is_success() {
// update cached metrics after they were sent successfully
for (curr_key, curr_val) in chunk.iter() {
cached_metrics.insert(curr_key.clone(), *curr_val);
}
} else {
error!("metrics endpoint refused the sent metrics: {:?}", res);
for metric in chunk_to_send
.iter()
.filter(|metric| metric.value > (1u64 << 40))
{
// Report if the metric value is suspiciously large
error!("potentially abnormal metric value: {:?}", metric);
}
}
break;
}
Err(err) if err.is_timeout() => {
error!(attempt, "timeout sending metrics, retrying immediately");
continue;
}
Err(err) => {
error!(attempt, ?err, "failed to send metrics");
break;
}
}
Err(err) => {
error!("failed to send metrics: {:?}", err);
}
}
}
@@ -317,7 +343,7 @@ pub async fn calculate_synthetic_size_worker(
_ = task_mgr::shutdown_watcher() => {
return Ok(());
},
_ = ticker.tick() => {
tick_at = ticker.tick() => {
let tenants = match mgr::list_tenants().await {
Ok(tenants) => tenants,
@@ -343,6 +369,12 @@ pub async fn calculate_synthetic_size_worker(
}
}
crate::tenant::tasks::warn_when_period_overrun(
tick_at.elapsed(),
synthetic_size_calculation_interval,
"consumption_metrics_synthetic_size_worker",
);
}
}
}

View File

@@ -110,7 +110,6 @@ pub fn launch_disk_usage_global_eviction_task(
disk_usage_eviction_task(&state, task_config, storage, &conf.tenants_path(), cancel)
.await;
info!("disk usage based eviction task finishing");
Ok(())
},
);
@@ -126,13 +125,16 @@ async fn disk_usage_eviction_task(
tenants_dir: &Path,
cancel: CancellationToken,
) {
scopeguard::defer! {
info!("disk usage based eviction task finishing");
};
use crate::tenant::tasks::random_init_delay;
{
if random_init_delay(task_config.period, &cancel)
.await
.is_err()
{
info!("shutting down");
return;
}
}
@@ -167,7 +169,6 @@ async fn disk_usage_eviction_task(
tokio::select! {
_ = tokio::time::sleep_until(sleep_until) => {},
_ = cancel.cancelled() => {
info!("shutting down");
break
}
}
@@ -314,7 +315,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
partition,
candidate.layer.get_tenant_id(),
candidate.layer.get_timeline_id(),
candidate.layer.filename().file_name(),
candidate.layer,
);
}

View File

@@ -722,6 +722,12 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"406":
description: Permanently unsatisfiable request, don't retry.
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
"409":
description: Timeline already exists, creation skipped
content:

View File

@@ -338,6 +338,11 @@ async fn timeline_create_handler(
Err(tenant::CreateTimelineError::AlreadyExists) => {
json_response(StatusCode::CONFLICT, ())
}
Err(tenant::CreateTimelineError::AncestorLsn(err)) => {
json_response(StatusCode::NOT_ACCEPTABLE, HttpErrorBody::from_msg(
format!("{err:#}")
))
}
Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)),
}
}

View File

@@ -1,9 +1,9 @@
use metrics::metric_vec_duration::DurationResultObserver;
use metrics::{
register_counter_vec, register_histogram, register_histogram_vec, register_int_counter,
register_int_counter_vec, register_int_gauge, register_int_gauge_vec, register_uint_gauge_vec,
Counter, CounterVec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
UIntGauge, UIntGaugeVec,
register_int_counter_vec, register_int_gauge, register_int_gauge_vec, register_uint_gauge,
register_uint_gauge_vec, Counter, CounterVec, Histogram, HistogramVec, IntCounter,
IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec,
};
use once_cell::sync::Lazy;
use pageserver_api::models::TenantState;
@@ -130,6 +130,122 @@ pub static MATERIALIZED_PAGE_CACHE_HIT: Lazy<IntCounter> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub struct PageCacheMetrics {
pub read_accesses_materialized_page: IntCounter,
pub read_accesses_ephemeral: IntCounter,
pub read_accesses_immutable: IntCounter,
pub read_hits_ephemeral: IntCounter,
pub read_hits_immutable: IntCounter,
pub read_hits_materialized_page_exact: IntCounter,
pub read_hits_materialized_page_older_lsn: IntCounter,
}
static PAGE_CACHE_READ_HITS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_page_cache_read_hits_total",
"Number of read accesses to the page cache that hit",
&["key_kind", "hit_kind"]
)
.expect("failed to define a metric")
});
static PAGE_CACHE_READ_ACCESSES: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_page_cache_read_accesses_total",
"Number of read accesses to the page cache",
&["key_kind"]
)
.expect("failed to define a metric")
});
pub static PAGE_CACHE: Lazy<PageCacheMetrics> = Lazy::new(|| PageCacheMetrics {
read_accesses_materialized_page: {
PAGE_CACHE_READ_ACCESSES
.get_metric_with_label_values(&["materialized_page"])
.unwrap()
},
read_accesses_ephemeral: {
PAGE_CACHE_READ_ACCESSES
.get_metric_with_label_values(&["ephemeral"])
.unwrap()
},
read_accesses_immutable: {
PAGE_CACHE_READ_ACCESSES
.get_metric_with_label_values(&["immutable"])
.unwrap()
},
read_hits_ephemeral: {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&["ephemeral", "-"])
.unwrap()
},
read_hits_immutable: {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&["immutable", "-"])
.unwrap()
},
read_hits_materialized_page_exact: {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&["materialized_page", "exact"])
.unwrap()
},
read_hits_materialized_page_older_lsn: {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&["materialized_page", "older_lsn"])
.unwrap()
},
});
pub struct PageCacheSizeMetrics {
pub max_bytes: UIntGauge,
pub current_bytes_ephemeral: UIntGauge,
pub current_bytes_immutable: UIntGauge,
pub current_bytes_materialized_page: UIntGauge,
}
static PAGE_CACHE_SIZE_CURRENT_BYTES: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_page_cache_size_current_bytes",
"Current size of the page cache in bytes, by key kind",
&["key_kind"]
)
.expect("failed to define a metric")
});
pub static PAGE_CACHE_SIZE: Lazy<PageCacheSizeMetrics> = Lazy::new(|| PageCacheSizeMetrics {
max_bytes: {
register_uint_gauge!(
"pageserver_page_cache_size_max_bytes",
"Maximum size of the page cache in bytes"
)
.expect("failed to define a metric")
},
current_bytes_ephemeral: {
PAGE_CACHE_SIZE_CURRENT_BYTES
.get_metric_with_label_values(&["ephemeral"])
.unwrap()
},
current_bytes_immutable: {
PAGE_CACHE_SIZE_CURRENT_BYTES
.get_metric_with_label_values(&["immutable"])
.unwrap()
},
current_bytes_materialized_page: {
PAGE_CACHE_SIZE_CURRENT_BYTES
.get_metric_with_label_values(&["materialized_page"])
.unwrap()
},
});
static WAIT_LSN_TIME: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_wait_lsn_seconds",
@@ -204,11 +320,11 @@ pub static TENANT_STATE_METRIC: Lazy<UIntGaugeVec> = Lazy::new(|| {
pub static TENANT_SYNTHETIC_SIZE_METRIC: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_tenant_synthetic_size",
"Synthetic size of each tenant",
"pageserver_tenant_synthetic_cached_size_bytes",
"Synthetic size of each tenant in bytes",
&["tenant_id"]
)
.expect("Failed to register pageserver_tenant_synthetic_size metric")
.expect("Failed to register pageserver_tenant_synthetic_cached_size_bytes metric")
});
// Metrics for cloud upload. These metrics reflect data uploaded to cloud storage,
@@ -968,7 +1084,6 @@ impl RemoteTimelineClientMetrics {
op_kind: &RemoteOpKind,
status: &'static str,
) -> Histogram {
// XXX would be nice to have an upgradable RwLock
let mut guard = self.remote_operation_time.lock().unwrap();
let key = (file_kind.as_str(), op_kind.as_str(), status);
let metric = guard.entry(key).or_insert_with(move || {
@@ -990,7 +1105,6 @@ impl RemoteTimelineClientMetrics {
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
) -> IntGauge {
// XXX would be nice to have an upgradable RwLock
let mut guard = self.calls_unfinished_gauge.lock().unwrap();
let key = (file_kind.as_str(), op_kind.as_str());
let metric = guard.entry(key).or_insert_with(move || {
@@ -1011,7 +1125,6 @@ impl RemoteTimelineClientMetrics {
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
) -> Histogram {
// XXX would be nice to have an upgradable RwLock
let mut guard = self.calls_started_hist.lock().unwrap();
let key = (file_kind.as_str(), op_kind.as_str());
let metric = guard.entry(key).or_insert_with(move || {
@@ -1032,7 +1145,6 @@ impl RemoteTimelineClientMetrics {
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
) -> IntCounter {
// XXX would be nice to have an upgradable RwLock
let mut guard = self.bytes_started_counter.lock().unwrap();
let key = (file_kind.as_str(), op_kind.as_str());
let metric = guard.entry(key).or_insert_with(move || {
@@ -1053,7 +1165,6 @@ impl RemoteTimelineClientMetrics {
file_kind: &RemoteOpFileKind,
op_kind: &RemoteOpKind,
) -> IntCounter {
// XXX would be nice to have an upgradable RwLock
let mut guard = self.bytes_finished_counter.lock().unwrap();
let key = (file_kind.as_str(), op_kind.as_str());
let metric = guard.entry(key).or_insert_with(move || {

View File

@@ -53,8 +53,8 @@ use utils::{
lsn::Lsn,
};
use crate::repository::Key;
use crate::tenant::writeback_ephemeral_file;
use crate::{metrics::PageCacheSizeMetrics, repository::Key};
static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
const TEST_PAGE_CACHE_SIZE: usize = 50;
@@ -187,6 +187,8 @@ pub struct PageCache {
/// Index of the next candidate to evict, for the Clock replacement algorithm.
/// This is interpreted modulo the page cache size.
next_evict_slot: AtomicUsize,
size_metrics: &'static PageCacheSizeMetrics,
}
///
@@ -313,6 +315,10 @@ impl PageCache {
key: &Key,
lsn: Lsn,
) -> Option<(Lsn, PageReadGuard)> {
crate::metrics::PAGE_CACHE
.read_accesses_materialized_page
.inc();
let mut cache_key = CacheKey::MaterializedPage {
hash_key: MaterializedPageHashKey {
tenant_id,
@@ -323,8 +329,21 @@ impl PageCache {
};
if let Some(guard) = self.try_lock_for_read(&mut cache_key) {
if let CacheKey::MaterializedPage { hash_key: _, lsn } = cache_key {
Some((lsn, guard))
if let CacheKey::MaterializedPage {
hash_key: _,
lsn: available_lsn,
} = cache_key
{
if available_lsn == lsn {
crate::metrics::PAGE_CACHE
.read_hits_materialized_page_exact
.inc();
} else {
crate::metrics::PAGE_CACHE
.read_hits_materialized_page_older_lsn
.inc();
}
Some((available_lsn, guard))
} else {
panic!("unexpected key type in slot");
}
@@ -499,11 +518,31 @@ impl PageCache {
/// ```
///
fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result<ReadBufResult> {
let (read_access, hit) = match cache_key {
CacheKey::MaterializedPage { .. } => {
unreachable!("Materialized pages use lookup_materialized_page")
}
CacheKey::EphemeralPage { .. } => (
&crate::metrics::PAGE_CACHE.read_accesses_ephemeral,
&crate::metrics::PAGE_CACHE.read_hits_ephemeral,
),
CacheKey::ImmutableFilePage { .. } => (
&crate::metrics::PAGE_CACHE.read_accesses_immutable,
&crate::metrics::PAGE_CACHE.read_hits_immutable,
),
};
read_access.inc();
let mut is_first_iteration = true;
loop {
// First check if the key already exists in the cache.
if let Some(read_guard) = self.try_lock_for_read(cache_key) {
if is_first_iteration {
hit.inc();
}
return Ok(ReadBufResult::Found(read_guard));
}
is_first_iteration = false;
// Not found. Find a victim buffer
let (slot_idx, mut inner) =
@@ -681,6 +720,9 @@ impl PageCache {
if let Ok(version_idx) = versions.binary_search_by_key(old_lsn, |v| v.lsn) {
versions.remove(version_idx);
self.size_metrics
.current_bytes_materialized_page
.sub_page_sz(1);
if versions.is_empty() {
old_entry.remove_entry();
}
@@ -693,11 +735,13 @@ impl PageCache {
let mut map = self.ephemeral_page_map.write().unwrap();
map.remove(&(*file_id, *blkno))
.expect("could not find old key in mapping");
self.size_metrics.current_bytes_ephemeral.sub_page_sz(1);
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let mut map = self.immutable_page_map.write().unwrap();
map.remove(&(*file_id, *blkno))
.expect("could not find old key in mapping");
self.size_metrics.current_bytes_immutable.sub_page_sz(1);
}
}
}
@@ -725,6 +769,9 @@ impl PageCache {
slot_idx,
},
);
self.size_metrics
.current_bytes_materialized_page
.add_page_sz(1);
None
}
}
@@ -735,6 +782,7 @@ impl PageCache {
Entry::Occupied(entry) => Some(*entry.get()),
Entry::Vacant(entry) => {
entry.insert(slot_idx);
self.size_metrics.current_bytes_ephemeral.add_page_sz(1);
None
}
}
@@ -745,6 +793,7 @@ impl PageCache {
Entry::Occupied(entry) => Some(*entry.get()),
Entry::Vacant(entry) => {
entry.insert(slot_idx);
self.size_metrics.current_bytes_immutable.add_page_sz(1);
None
}
}
@@ -844,6 +893,12 @@ impl PageCache {
let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
size_metrics.max_bytes.set_page_sz(num_pages);
size_metrics.current_bytes_ephemeral.set_page_sz(0);
size_metrics.current_bytes_immutable.set_page_sz(0);
size_metrics.current_bytes_materialized_page.set_page_sz(0);
let slots = page_buffer
.chunks_exact_mut(PAGE_SZ)
.map(|chunk| {
@@ -866,6 +921,30 @@ impl PageCache {
immutable_page_map: Default::default(),
slots,
next_evict_slot: AtomicUsize::new(0),
size_metrics,
}
}
}
trait PageSzBytesMetric {
fn set_page_sz(&self, count: usize);
fn add_page_sz(&self, count: usize);
fn sub_page_sz(&self, count: usize);
}
#[inline(always)]
fn count_times_page_sz(count: usize) -> u64 {
u64::try_from(count).unwrap() * u64::try_from(PAGE_SZ).unwrap()
}
impl PageSzBytesMetric for metrics::UIntGauge {
fn set_page_sz(&self, count: usize) {
self.set(count_times_page_sz(count));
}
fn add_page_sz(&self, count: usize) {
self.add(count_times_page_sz(count));
}
fn sub_page_sz(&self, count: usize) {
self.sub(count_times_page_sz(count));
}
}

View File

@@ -11,7 +11,7 @@
//! parent timeline, and the last LSN that has been written to disk.
//!
use anyhow::{bail, ensure, Context};
use anyhow::{bail, Context};
use futures::FutureExt;
use pageserver_api::models::TimelineState;
use remote_storage::DownloadError;
@@ -49,6 +49,8 @@ use std::time::{Duration, Instant};
use self::config::TenantConf;
use self::metadata::TimelineMetadata;
use self::remote_timeline_client::RemoteTimelineClient;
use self::timeline::uninit::TimelineUninitMark;
use self::timeline::uninit::UninitializedTimeline;
use self::timeline::EvictionTaskTenantState;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
@@ -68,6 +70,7 @@ use crate::tenant::storage_layer::ImageLayer;
use crate::tenant::storage_layer::Layer;
use crate::InitializationOrder;
use crate::tenant::timeline::uninit::cleanup_timeline_directory;
use crate::virtual_file::VirtualFile;
use crate::walredo::PostgresRedoManager;
use crate::walredo::WalRedoManager;
@@ -87,6 +90,7 @@ pub mod disk_btree;
pub(crate) mod ephemeral_file;
pub mod layer_map;
pub mod manifest;
mod span;
pub mod metadata;
mod par_fsync;
@@ -102,7 +106,7 @@ mod timeline;
pub mod size;
pub(crate) use timeline::debug_assert_current_span_has_tenant_and_timeline_id;
pub(crate) use timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
pub use timeline::{
LocalLayerInfoForDiskUsageEviction, LogicalSizeCalculationCause, PageReconstructError, Timeline,
};
@@ -161,200 +165,6 @@ pub struct Tenant {
eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
}
/// A timeline with some of its files on disk, being initialized.
/// This struct ensures the atomicity of the timeline init: it's either properly created and inserted into pageserver's memory, or
/// its local files are removed. In the worst case of a crash, an uninit mark file is left behind, which causes the directory
/// to be removed on next restart.
///
/// The caller is responsible for proper timeline data filling before the final init.
#[must_use]
pub struct UninitializedTimeline<'t> {
owning_tenant: &'t Tenant,
timeline_id: TimelineId,
raw_timeline: Option<(Arc<Timeline>, TimelineUninitMark)>,
}
/// An uninit mark file, created along the timeline dir to ensure the timeline either gets fully initialized and loaded into pageserver's memory,
/// or gets removed eventually.
///
/// XXX: it's important to create it near the timeline dir, not inside it to ensure timeline dir gets removed first.
#[must_use]
struct TimelineUninitMark {
uninit_mark_deleted: bool,
uninit_mark_path: PathBuf,
timeline_path: PathBuf,
}
impl UninitializedTimeline<'_> {
/// Finish timeline creation: insert it into the Tenant's timelines map and remove the
/// uninit mark file.
///
/// This function launches the flush loop if not already done.
///
/// The caller is responsible for activating the timeline (function `.activate()`).
fn finish_creation(mut self) -> anyhow::Result<Arc<Timeline>> {
let timeline_id = self.timeline_id;
let tenant_id = self.owning_tenant.tenant_id;
let (new_timeline, uninit_mark) = self.raw_timeline.take().with_context(|| {
format!("No timeline for initalization found for {tenant_id}/{timeline_id}")
})?;
// Check that the caller initialized disk_consistent_lsn
let new_disk_consistent_lsn = new_timeline.get_disk_consistent_lsn();
ensure!(
new_disk_consistent_lsn.is_valid(),
"new timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn"
);
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
match timelines.entry(timeline_id) {
Entry::Occupied(_) => anyhow::bail!(
"Found freshly initialized timeline {tenant_id}/{timeline_id} in the tenant map"
),
Entry::Vacant(v) => {
uninit_mark.remove_uninit_mark().with_context(|| {
format!(
"Failed to remove uninit mark file for timeline {tenant_id}/{timeline_id}"
)
})?;
v.insert(Arc::clone(&new_timeline));
new_timeline.maybe_spawn_flush_loop();
}
}
Ok(new_timeline)
}
/// Prepares timeline data by loading it from the basebackup archive.
pub async fn import_basebackup_from_tar(
self,
copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
base_lsn: Lsn,
broker_client: storage_broker::BrokerClientChannel,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let raw_timeline = self.raw_timeline()?;
import_datadir::import_basebackup_from_tar(raw_timeline, copyin_read, base_lsn, ctx)
.await
.context("Failed to import basebackup")?;
// Flush the new layer files to disk, before we make the timeline as available to
// the outside world.
//
// Flush loop needs to be spawned in order to be able to flush.
raw_timeline.maybe_spawn_flush_loop();
fail::fail_point!("before-checkpoint-new-timeline", |_| {
bail!("failpoint before-checkpoint-new-timeline");
});
raw_timeline
.freeze_and_flush()
.await
.context("Failed to flush after basebackup import")?;
// All the data has been imported. Insert the Timeline into the tenant's timelines
// map and remove the uninit mark file.
let tl = self.finish_creation()?;
tl.activate(broker_client, None, ctx);
Ok(tl)
}
fn raw_timeline(&self) -> anyhow::Result<&Arc<Timeline>> {
Ok(&self
.raw_timeline
.as_ref()
.with_context(|| {
format!(
"No raw timeline {}/{} found",
self.owning_tenant.tenant_id, self.timeline_id
)
})?
.0)
}
}
impl Drop for UninitializedTimeline<'_> {
fn drop(&mut self) {
if let Some((_, uninit_mark)) = self.raw_timeline.take() {
let _entered = info_span!("drop_uninitialized_timeline", tenant = %self.owning_tenant.tenant_id, timeline = %self.timeline_id).entered();
error!("Timeline got dropped without initializing, cleaning its files");
cleanup_timeline_directory(uninit_mark);
}
}
}
fn cleanup_timeline_directory(uninit_mark: TimelineUninitMark) {
let timeline_path = &uninit_mark.timeline_path;
match ignore_absent_files(|| fs::remove_dir_all(timeline_path)) {
Ok(()) => {
info!("Timeline dir {timeline_path:?} removed successfully, removing the uninit mark")
}
Err(e) => {
error!("Failed to clean up uninitialized timeline directory {timeline_path:?}: {e:?}")
}
}
drop(uninit_mark); // mark handles its deletion on drop, gets retained if timeline dir exists
}
impl TimelineUninitMark {
fn new(uninit_mark_path: PathBuf, timeline_path: PathBuf) -> Self {
Self {
uninit_mark_deleted: false,
uninit_mark_path,
timeline_path,
}
}
fn remove_uninit_mark(mut self) -> anyhow::Result<()> {
if !self.uninit_mark_deleted {
self.delete_mark_file_if_present()?;
}
Ok(())
}
fn delete_mark_file_if_present(&mut self) -> anyhow::Result<()> {
let uninit_mark_file = &self.uninit_mark_path;
let uninit_mark_parent = uninit_mark_file
.parent()
.with_context(|| format!("Uninit mark file {uninit_mark_file:?} has no parent"))?;
ignore_absent_files(|| fs::remove_file(uninit_mark_file)).with_context(|| {
format!("Failed to remove uninit mark file at path {uninit_mark_file:?}")
})?;
crashsafe::fsync(uninit_mark_parent).context("Failed to fsync uninit mark parent")?;
self.uninit_mark_deleted = true;
Ok(())
}
}
impl Drop for TimelineUninitMark {
fn drop(&mut self) {
if !self.uninit_mark_deleted {
if self.timeline_path.exists() {
error!(
"Uninit mark {} is not removed, timeline {} stays uninitialized",
self.uninit_mark_path.display(),
self.timeline_path.display()
)
} else {
// unblock later timeline creation attempts
warn!(
"Removing intermediate uninit mark file {}",
self.uninit_mark_path.display()
);
if let Err(e) = self.delete_mark_file_if_present() {
error!("Failed to remove the uninit mark file: {e}")
}
}
}
}
}
// We should not blindly overwrite local metadata with remote one.
// For example, consider the following case:
// Image layer is flushed to disk as a new delta layer, we update local metadata and start upload task but after that
@@ -506,6 +316,8 @@ pub enum CreateTimelineError {
#[error("a timeline with the given ID already exists")]
AlreadyExists,
#[error(transparent)]
AncestorLsn(anyhow::Error),
#[error(transparent)]
Other(#[from] anyhow::Error),
}
@@ -693,7 +505,7 @@ impl Tenant {
/// No background tasks are started as part of this routine.
///
async fn attach(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
span::debug_assert_current_span_has_tenant_id();
let marker_file = self.conf.tenant_attaching_mark_file_path(&self.tenant_id);
if !tokio::fs::try_exists(&marker_file)
@@ -831,7 +643,7 @@ impl Tenant {
remote_client: RemoteTimelineClient,
ctx: &RequestContext,
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
span::debug_assert_current_span_has_tenant_id();
info!("downloading index file for timeline {}", timeline_id);
tokio::fs::create_dir_all(self.conf.timeline_path(&timeline_id, &self.tenant_id))
@@ -910,7 +722,7 @@ impl Tenant {
init_order: Option<InitializationOrder>,
ctx: &RequestContext,
) -> Arc<Tenant> {
debug_assert_current_span_has_tenant_id();
span::debug_assert_current_span_has_tenant_id();
let tenant_conf = match Self::load_tenant_config(conf, tenant_id) {
Ok(conf) => conf,
@@ -1096,7 +908,7 @@ impl Tenant {
init_order: Option<&InitializationOrder>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
span::debug_assert_current_span_has_tenant_id();
debug!("loading tenant task");
@@ -1142,7 +954,7 @@ impl Tenant {
init_order: Option<&InitializationOrder>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
span::debug_assert_current_span_has_tenant_id();
let remote_client = self.remote_storage.as_ref().map(|remote_storage| {
RemoteTimelineClient::new(
@@ -1432,7 +1244,7 @@ impl Tenant {
let ancestor_ancestor_lsn = ancestor_timeline.get_ancestor_lsn();
if ancestor_ancestor_lsn > *lsn {
// can we safely just branch from the ancestor instead?
return Err(CreateTimelineError::Other(anyhow::anyhow!(
return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
"invalid start lsn {} for ancestor timeline {}: less than timeline ancestor lsn {}",
lsn,
ancestor_timeline_id,
@@ -1733,7 +1545,7 @@ impl Tenant {
timeline_id: TimelineId,
_ctx: &RequestContext,
) -> Result<(), DeleteTimelineError> {
timeline::debug_assert_current_span_has_tenant_and_timeline_id();
debug_assert_current_span_has_tenant_and_timeline_id();
// Transition the timeline into TimelineState::Stopping.
// This should prevent new operations from starting.
@@ -1897,7 +1709,7 @@ impl Tenant {
background_jobs_can_start: Option<&completion::Barrier>,
ctx: &RequestContext,
) {
debug_assert_current_span_has_tenant_id();
span::debug_assert_current_span_has_tenant_id();
let mut activating = false;
self.state.send_modify(|current_state| {
@@ -1968,7 +1780,7 @@ impl Tenant {
///
/// This will attempt to shutdown even if tenant is broken.
pub(crate) async fn shutdown(&self, freeze_and_flush: bool) -> Result<(), ShutdownError> {
debug_assert_current_span_has_tenant_id();
span::debug_assert_current_span_has_tenant_id();
// Set tenant (and its timlines) to Stoppping state.
//
// Since we can only transition into Stopping state after activation is complete,
@@ -2715,7 +2527,7 @@ impl Tenant {
dst_id: TimelineId,
start_lsn: Option<Lsn>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
) -> Result<Arc<Timeline>, CreateTimelineError> {
let tl = self
.branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx)
.await?;
@@ -2732,7 +2544,7 @@ impl Tenant {
dst_id: TimelineId,
start_lsn: Option<Lsn>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
) -> Result<Arc<Timeline>, CreateTimelineError> {
self.branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx)
.await
}
@@ -2743,7 +2555,7 @@ impl Tenant {
dst_id: TimelineId,
start_lsn: Option<Lsn>,
_ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
) -> Result<Arc<Timeline>, CreateTimelineError> {
let src_id = src_timeline.timeline_id;
// If no start LSN is specified, we branch the new timeline from the source timeline's last record LSN
@@ -2783,16 +2595,17 @@ impl Tenant {
.context(format!(
"invalid branch start lsn: less than latest GC cutoff {}",
*latest_gc_cutoff_lsn,
))?;
))
.map_err(CreateTimelineError::AncestorLsn)?;
// and then the planned GC cutoff
{
let gc_info = src_timeline.gc_info.read().unwrap();
let cutoff = min(gc_info.pitr_cutoff, gc_info.horizon_cutoff);
if start_lsn < cutoff {
bail!(format!(
return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
"invalid branch start lsn: less than planned GC cutoff {cutoff}"
));
)));
}
}
@@ -3009,11 +2822,11 @@ impl Tenant {
debug!("Successfully created initial files for timeline {tenant_id}/{new_timeline_id}");
Ok(UninitializedTimeline {
owning_tenant: self,
timeline_id: new_timeline_id,
raw_timeline: Some((timeline_struct, uninit_mark)),
})
Ok(UninitializedTimeline::new(
self,
new_timeline_id,
Some((timeline_struct, uninit_mark)),
))
}
fn create_timeline_files(
@@ -3825,6 +3638,9 @@ mod tests {
{
Ok(_) => panic!("branching should have failed"),
Err(err) => {
let CreateTimelineError::AncestorLsn(err) = err else {
panic!("wrong error type")
};
assert!(err.to_string().contains("invalid branch start lsn"));
assert!(err
.source()
@@ -3854,6 +3670,9 @@ mod tests {
{
Ok(_) => panic!("branching should have failed"),
Err(err) => {
let CreateTimelineError::AncestorLsn(err) = err else {
panic!("wrong error type");
};
assert!(&err.to_string().contains("invalid branch start lsn"));
assert!(&err
.source()
@@ -4562,28 +4381,3 @@ mod tests {
Ok(())
}
}
#[cfg(not(debug_assertions))]
#[inline]
pub(crate) fn debug_assert_current_span_has_tenant_id() {}
#[cfg(debug_assertions)]
pub static TENANT_ID_EXTRACTOR: once_cell::sync::Lazy<
utils::tracing_span_assert::MultiNameExtractor<2>,
> = once_cell::sync::Lazy::new(|| {
utils::tracing_span_assert::MultiNameExtractor::new("TenantId", ["tenant_id", "tenant"])
});
#[cfg(debug_assertions)]
#[inline]
pub(crate) fn debug_assert_current_span_has_tenant_id() {
use utils::tracing_span_assert;
match tracing_span_assert::check_fields_present([&*TENANT_ID_EXTRACTOR]) {
Ok(()) => (),
Err(missing) => panic!(
"missing extractors: {:?}",
missing.into_iter().map(|e| e.name()).collect::<Vec<_>>()
),
}
}

View File

@@ -608,10 +608,7 @@ impl RemoteTimelineClient {
self.calls_unfinished_metric_begin(&op);
upload_queue.queued_operations.push_back(op);
info!(
"scheduled layer file upload {}",
layer_file_name.file_name()
);
info!("scheduled layer file upload {layer_file_name}");
// Launch the task immediately, if possible
self.launch_queued_tasks(upload_queue);
@@ -664,7 +661,7 @@ impl RemoteTimelineClient {
});
self.calls_unfinished_metric_begin(&op);
upload_queue.queued_operations.push_back(op);
info!("scheduled layer file deletion {}", name.file_name());
info!("scheduled layer file deletion {name}");
}
// Launch the tasks immediately, if possible
@@ -828,7 +825,7 @@ impl RemoteTimelineClient {
.queued_operations
.push_back(op);
info!("scheduled layer file deletion {}", name.file_name());
info!("scheduled layer file deletion {name}");
deletions_queued += 1;
}

View File

@@ -16,7 +16,7 @@ use tracing::{info, warn};
use crate::config::PageServerConf;
use crate::tenant::storage_layer::LayerFileName;
use crate::tenant::timeline::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::{exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS};
use remote_storage::{DownloadError, GenericRemoteStorage};
use utils::crashsafe::path_with_suffix_extension;

View File

@@ -0,0 +1,20 @@
#[cfg(debug_assertions)]
use utils::tracing_span_assert::{check_fields_present, MultiNameExtractor};
#[cfg(not(debug_assertions))]
pub(crate) fn debug_assert_current_span_has_tenant_id() {}
#[cfg(debug_assertions)]
pub(crate) static TENANT_ID_EXTRACTOR: once_cell::sync::Lazy<MultiNameExtractor<2>> =
once_cell::sync::Lazy::new(|| MultiNameExtractor::new("TenantId", ["tenant_id", "tenant"]));
#[cfg(debug_assertions)]
#[track_caller]
pub(crate) fn debug_assert_current_span_has_tenant_id() {
if let Err(missing) = check_fields_present([&*TENANT_ID_EXTRACTOR]) {
panic!(
"missing extractors: {:?}",
missing.into_iter().map(|e| e.name()).collect::<Vec<_>>()
)
}
}

View File

@@ -335,7 +335,7 @@ impl LayerAccessStats {
/// All layers should implement a minimal `std::fmt::Debug` without tenant or
/// timeline names, because those are known in the context of which the layers
/// are used in (timeline).
pub trait Layer: std::fmt::Debug + Send + Sync {
pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync {
/// Range of keys that this layer covers
fn get_key_range(&self) -> Range<Key>;
@@ -373,9 +373,6 @@ pub trait Layer: std::fmt::Debug + Send + Sync {
ctx: &RequestContext,
) -> Result<ValueReconstructResult>;
/// A short ID string that uniquely identifies the given layer within a [`LayerMap`].
fn short_id(&self) -> String;
/// Dump summary of the contents of the layer to stdout
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()>;
}
@@ -512,10 +509,12 @@ pub mod tests {
fn is_incremental(&self) -> bool {
self.layer_desc().is_incremental
}
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn short_id(&self) -> String {
self.layer_desc().short_id()
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
impl std::fmt::Display for LayerDescriptor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.layer_desc().short_id())
}
}

View File

@@ -222,13 +222,14 @@ impl Layer for DeltaLayer {
/// debugging function to print out the contents of the layer
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
println!(
"----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} ----",
"----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} size {} ----",
self.desc.tenant_id,
self.desc.timeline_id,
self.desc.key_range.start,
self.desc.key_range.end,
self.desc.lsn_range.start,
self.desc.lsn_range.end
self.desc.lsn_range.end,
self.desc.file_size,
);
if !verbose {
@@ -394,10 +395,11 @@ impl Layer for DeltaLayer {
fn is_incremental(&self) -> bool {
self.layer_desc().is_incremental
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn short_id(&self) -> String {
self.layer_desc().short_id()
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
impl std::fmt::Display for DeltaLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.layer_desc().short_id())
}
}

View File

@@ -210,9 +210,15 @@ pub enum LayerFileName {
impl LayerFileName {
pub fn file_name(&self) -> String {
self.to_string()
}
}
impl fmt::Display for LayerFileName {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Image(fname) => fname.to_string(),
Self::Delta(fname) => fname.to_string(),
Self::Image(fname) => write!(f, "{fname}"),
Self::Delta(fname) => write!(f, "{fname}"),
}
}
}

View File

@@ -153,12 +153,14 @@ impl Layer for ImageLayer {
/// debugging function to print out the contents of the layer
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
println!(
"----- image layer for ten {} tli {} key {}-{} at {} ----",
"----- image layer for ten {} tli {} key {}-{} at {} is_incremental {} size {} ----",
self.desc.tenant_id,
self.desc.timeline_id,
self.desc.key_range.start,
self.desc.key_range.end,
self.lsn
self.lsn,
self.desc.is_incremental,
self.desc.file_size
);
if !verbose {
@@ -230,10 +232,12 @@ impl Layer for ImageLayer {
fn is_incremental(&self) -> bool {
self.layer_desc().is_incremental
}
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn short_id(&self) -> String {
self.layer_desc().short_id()
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
impl std::fmt::Display for ImageLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.layer_desc().short_id())
}
}

View File

@@ -131,13 +131,6 @@ impl Layer for InMemoryLayer {
true
}
fn short_id(&self) -> String {
let inner = self.inner.read().unwrap();
let end_lsn = inner.end_lsn.unwrap_or(Lsn(u64::MAX));
format!("inmem-{:016X}-{:016X}", self.start_lsn.0, end_lsn.0)
}
/// debugging function to print out the contents of the layer
fn dump(&self, verbose: bool, _ctx: &RequestContext) -> Result<()> {
let inner = self.inner.read().unwrap();
@@ -240,6 +233,15 @@ impl Layer for InMemoryLayer {
}
}
impl std::fmt::Display for InMemoryLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let inner = self.inner.read().unwrap();
let end_lsn = inner.end_lsn.unwrap_or(Lsn(u64::MAX));
write!(f, "inmem-{:016X}-{:016X}", self.start_lsn.0, end_lsn.0)
}
}
impl InMemoryLayer {
///
/// Get layer size on the disk

View File

@@ -1,4 +1,5 @@
use anyhow::Result;
use core::fmt::Display;
use std::ops::Range;
use utils::{
id::{TenantId, TimelineId},
@@ -48,8 +49,8 @@ impl PersistentLayerDesc {
}
}
pub fn short_id(&self) -> String {
self.filename().file_name()
pub fn short_id(&self) -> impl Display {
self.filename()
}
#[cfg(test)]
@@ -173,13 +174,16 @@ impl PersistentLayerDesc {
pub fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
println!(
"----- layer for ten {} tli {} keys {}-{} lsn {}-{} ----",
"----- layer for ten {} tli {} keys {}-{} lsn {}-{} is_delta {} is_incremental {} size {} ----",
self.tenant_id,
self.timeline_id,
self.key_range.start,
self.key_range.end,
self.lsn_range.start,
self.lsn_range.end
self.lsn_range.end,
self.is_delta,
self.is_incremental,
self.file_size,
);
Ok(())

View File

@@ -71,22 +71,22 @@ impl Layer for RemoteLayer {
_reconstruct_state: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
bail!(
"layer {} needs to be downloaded",
self.filename().file_name()
);
bail!("layer {self} needs to be downloaded");
}
/// debugging function to print out the contents of the layer
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
println!(
"----- remote layer for ten {} tli {} keys {}-{} lsn {}-{} ----",
"----- remote layer for ten {} tli {} keys {}-{} lsn {}-{} is_delta {} is_incremental {} size {} ----",
self.desc.tenant_id,
self.desc.timeline_id,
self.desc.key_range.start,
self.desc.key_range.end,
self.desc.lsn_range.start,
self.desc.lsn_range.end
self.desc.lsn_range.end,
self.desc.is_delta,
self.desc.is_incremental,
self.desc.file_size,
);
Ok(())
@@ -106,10 +106,12 @@ impl Layer for RemoteLayer {
fn is_incremental(&self) -> bool {
self.layer_desc().is_incremental
}
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn short_id(&self) -> String {
self.layer_desc().short_id()
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
impl std::fmt::Display for RemoteLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.layer_desc().short_id())
}
}

View File

@@ -1,6 +1,9 @@
//!
mod eviction_task;
mod logical_size;
pub mod span;
pub mod uninit;
mod walreceiver;
use anyhow::{anyhow, bail, ensure, Context, Result};
@@ -8,7 +11,6 @@ use bytes::Bytes;
use fail::fail_point;
use futures::StreamExt;
use itertools::Itertools;
use once_cell::sync::OnceCell;
use pageserver_api::models::{
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceEventReason, LayerResidenceStatus,
@@ -17,7 +19,7 @@ use pageserver_api::models::{
use remote_storage::GenericRemoteStorage;
use serde_with::serde_as;
use storage_broker::BrokerClientChannel;
use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError};
use tokio::sync::{oneshot, watch, TryAcquireError};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::id::TenantTimelineId;
@@ -28,7 +30,7 @@ use std::fs;
use std::ops::{Deref, Range};
use std::path::{Path, PathBuf};
use std::pin::pin;
use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::{Arc, Mutex, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
@@ -38,6 +40,7 @@ use crate::tenant::storage_layer::{
DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer,
LayerAccessStats, LayerFileName, RemoteLayer,
};
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
use crate::tenant::{
ephemeral_file::is_ephemeral_file,
layer_map::{LayerMap, SearchResult},
@@ -79,6 +82,7 @@ use crate::{is_temporary, task_mgr};
pub(super) use self::eviction_task::EvictionTaskTenantState;
use self::eviction_task::EvictionTaskTimelineState;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
@@ -128,7 +132,7 @@ impl LayerFileManager {
// A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
self.0
.get(&desc.key())
.with_context(|| format!("get layer from desc: {}", desc.filename().file_name()))
.with_context(|| format!("get layer from desc: {}", desc.filename()))
.expect("not found")
.clone()
}
@@ -365,126 +369,6 @@ pub struct Timeline {
initial_logical_size_attempt: Mutex<Option<completion::Completion>>,
}
/// Internal structure to hold all data needed for logical size calculation.
///
/// Calculation consists of two stages:
///
/// 1. Initial size calculation. That might take a long time, because it requires
/// reading all layers containing relation sizes at `initial_part_end`.
///
/// 2. Collecting an incremental part and adding that to the initial size.
/// Increments are appended on walreceiver writing new timeline data,
/// which result in increase or decrease of the logical size.
struct LogicalSize {
/// Size, potentially slow to compute. Calculating this might require reading multiple
/// layers, and even ancestor's layers.
///
/// NOTE: size at a given LSN is constant, but after a restart we will calculate
/// the initial size at a different LSN.
initial_logical_size: OnceCell<u64>,
/// Semaphore to track ongoing calculation of `initial_logical_size`.
initial_size_computation: Arc<tokio::sync::Semaphore>,
/// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines.
initial_part_end: Option<Lsn>,
/// All other size changes after startup, combined together.
///
/// Size shouldn't ever be negative, but this is signed for two reasons:
///
/// 1. If we initialized the "baseline" size lazily, while we already
/// process incoming WAL, the incoming WAL records could decrement the
/// variable and temporarily make it negative. (This is just future-proofing;
/// the initialization is currently not done lazily.)
///
/// 2. If there is a bug and we e.g. forget to increment it in some cases
/// when size grows, but remember to decrement it when it shrinks again, the
/// variable could go negative. In that case, it seems better to at least
/// try to keep tracking it, rather than clamp or overflow it. Note that
/// get_current_logical_size() will clamp the returned value to zero if it's
/// negative, and log an error. Could set it permanently to zero or some
/// special value to indicate "broken" instead, but this will do for now.
///
/// Note that we also expose a copy of this value as a prometheus metric,
/// see `current_logical_size_gauge`. Use the `update_current_logical_size`
/// to modify this, it will also keep the prometheus metric in sync.
size_added_after_initial: AtomicI64,
}
/// Normalized current size, that the data in pageserver occupies.
#[derive(Debug, Clone, Copy)]
enum CurrentLogicalSize {
/// The size is not yet calculated to the end, this is an intermediate result,
/// constructed from walreceiver increments and normalized: logical data could delete some objects, hence be negative,
/// yet total logical size cannot be below 0.
Approximate(u64),
// Fully calculated logical size, only other future walreceiver increments are changing it, and those changes are
// available for observation without any calculations.
Exact(u64),
}
impl CurrentLogicalSize {
fn size(&self) -> u64 {
*match self {
Self::Approximate(size) => size,
Self::Exact(size) => size,
}
}
}
impl LogicalSize {
fn empty_initial() -> Self {
Self {
initial_logical_size: OnceCell::with_value(0),
// initial_logical_size already computed, so, don't admit any calculations
initial_size_computation: Arc::new(Semaphore::new(0)),
initial_part_end: None,
size_added_after_initial: AtomicI64::new(0),
}
}
fn deferred_initial(compute_to: Lsn) -> Self {
Self {
initial_logical_size: OnceCell::new(),
initial_size_computation: Arc::new(Semaphore::new(1)),
initial_part_end: Some(compute_to),
size_added_after_initial: AtomicI64::new(0),
}
}
fn current_size(&self) -> anyhow::Result<CurrentLogicalSize> {
let size_increment: i64 = self.size_added_after_initial.load(AtomicOrdering::Acquire);
// ^^^ keep this type explicit so that the casts in this function break if
// we change the type.
match self.initial_logical_size.get() {
Some(initial_size) => {
initial_size.checked_add_signed(size_increment)
.with_context(|| format!("Overflow during logical size calculation, initial_size: {initial_size}, size_increment: {size_increment}"))
.map(CurrentLogicalSize::Exact)
}
None => {
let non_negative_size_increment = u64::try_from(size_increment).unwrap_or(0);
Ok(CurrentLogicalSize::Approximate(non_negative_size_increment))
}
}
}
fn increment_size(&self, delta: i64) {
self.size_added_after_initial
.fetch_add(delta, AtomicOrdering::SeqCst);
}
/// Make the value computed by initial logical size computation
/// available for re-use. This doesn't contain the incremental part.
fn initialized_size(&self, lsn: Lsn) -> Option<u64> {
match self.initial_part_end {
Some(v) if v == lsn => self.initial_logical_size.get().copied(),
_ => None,
}
}
}
pub struct WalReceiverInfo {
pub wal_source_connconf: PgConnectionConfig,
pub last_received_msg_lsn: Lsn,
@@ -1381,9 +1265,9 @@ impl Timeline {
.read()
.unwrap()
.observe(delta);
info!(layer=%local_layer.short_id(), residence_millis=delta.as_millis(), "evicted layer after known residence period");
info!(layer=%local_layer, residence_millis=delta.as_millis(), "evicted layer after known residence period");
} else {
info!(layer=%local_layer.short_id(), "evicted layer after unknown residence period");
info!(layer=%local_layer, "evicted layer after unknown residence period");
}
true
@@ -2239,7 +2123,7 @@ impl Timeline {
ctx: &RequestContext,
cancel: CancellationToken,
) -> Result<u64, CalculateLogicalSizeError> {
debug_assert_current_span_has_tenant_and_timeline_id();
span::debug_assert_current_span_has_tenant_and_timeline_id();
let mut timeline_state_updates = self.subscribe_for_state_updates();
let self_calculation = Arc::clone(self);
@@ -2462,11 +2346,7 @@ impl TraversalLayerExt for Arc<dyn PersistentLayer> {
format!("{}", local_path.display())
}
None => {
format!(
"remote {}/{}",
self.get_timeline_id(),
self.filename().file_name()
)
format!("remote {}/{self}", self.get_timeline_id())
}
}
}
@@ -2474,11 +2354,7 @@ impl TraversalLayerExt for Arc<dyn PersistentLayer> {
impl TraversalLayerExt for Arc<InMemoryLayer> {
fn traversal_id(&self) -> TraversalId {
format!(
"timeline {} in-memory {}",
self.get_timeline_id(),
self.short_id()
)
format!("timeline {} in-memory {self}", self.get_timeline_id())
}
}
@@ -2998,7 +2874,7 @@ impl Timeline {
}
/// Flush one frozen in-memory layer to disk, as a new delta layer.
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer.short_id()))]
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer))]
async fn flush_frozen_layer(
self: &Arc<Self>,
frozen_layer: Arc<InMemoryLayer>,
@@ -3677,7 +3553,7 @@ impl Timeline {
let remotes = deltas_to_compact
.iter()
.filter(|l| l.is_remote_layer())
.inspect(|l| info!("compact requires download of {}", l.filename().file_name()))
.inspect(|l| info!("compact requires download of {l}"))
.map(|l| {
l.clone()
.downcast_remote_layer()
@@ -3701,7 +3577,7 @@ impl Timeline {
);
for l in deltas_to_compact.iter() {
info!("compact includes {}", l.filename().file_name());
info!("compact includes {l}");
}
// We don't need the original list of layers anymore. Drop it so that
@@ -4316,8 +4192,8 @@ impl Timeline {
if l.get_lsn_range().end > horizon_cutoff {
debug!(
"keeping {} because it's newer than horizon_cutoff {}",
l.filename().file_name(),
horizon_cutoff
l.filename(),
horizon_cutoff,
);
result.layers_needed_by_cutoff += 1;
continue 'outer;
@@ -4327,8 +4203,8 @@ impl Timeline {
if l.get_lsn_range().end > pitr_cutoff {
debug!(
"keeping {} because it's newer than pitr_cutoff {}",
l.filename().file_name(),
pitr_cutoff
l.filename(),
pitr_cutoff,
);
result.layers_needed_by_pitr += 1;
continue 'outer;
@@ -4346,7 +4222,7 @@ impl Timeline {
if &l.get_lsn_range().start <= retain_lsn {
debug!(
"keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}",
l.filename().file_name(),
l.filename(),
retain_lsn,
l.is_incremental(),
);
@@ -4377,10 +4253,7 @@ impl Timeline {
if !layers
.image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))?
{
debug!(
"keeping {} because it is the latest layer",
l.filename().file_name()
);
debug!("keeping {} because it is the latest layer", l.filename());
// Collect delta key ranges that need image layers to allow garbage
// collecting the layers.
// It is not so obvious whether we need to propagate information only about
@@ -4397,7 +4270,7 @@ impl Timeline {
// We didn't find any reason to keep this file, so remove it.
debug!(
"garbage collecting {} is_dropped: xx is_incremental: {}",
l.filename().file_name(),
l.filename(),
l.is_incremental(),
);
layers_to_remove.push(Arc::clone(&l));
@@ -4551,12 +4424,12 @@ impl Timeline {
/// If the caller has a deadline or needs a timeout, they can simply stop polling:
/// we're **cancellation-safe** because the download happens in a separate task_mgr task.
/// So, the current download attempt will run to completion even if we stop polling.
#[instrument(skip_all, fields(layer=%remote_layer.short_id()))]
#[instrument(skip_all, fields(layer=%remote_layer))]
pub async fn download_remote_layer(
&self,
remote_layer: Arc<RemoteLayer>,
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_and_timeline_id();
span::debug_assert_current_span_has_tenant_and_timeline_id();
use std::sync::atomic::Ordering::Relaxed;
@@ -4589,7 +4462,7 @@ impl Timeline {
TaskKind::RemoteDownloadTask,
Some(self.tenant_id),
Some(self.timeline_id),
&format!("download layer {}", remote_layer.short_id()),
&format!("download layer {}", remote_layer),
false,
async move {
let remote_client = self_clone.remote_client.as_ref().unwrap();
@@ -4865,15 +4738,12 @@ impl Timeline {
continue;
}
let last_activity_ts = l
.access_stats()
.latest_activity()
.unwrap_or_else(|| {
// We only use this fallback if there's an implementation error.
// `latest_activity` already does rate-limited warn!() log.
debug!(layer=%l.filename().file_name(), "last_activity returns None, using SystemTime::now");
SystemTime::now()
});
let last_activity_ts = l.access_stats().latest_activity().unwrap_or_else(|| {
// We only use this fallback if there's an implementation error.
// `latest_activity` already does rate-limited warn!() log.
debug!(layer=%l, "last_activity returns None, using SystemTime::now");
SystemTime::now()
});
resident_layers.push(LocalLayerInfoForDiskUsageEviction {
layer: l,
@@ -4993,33 +4863,6 @@ fn rename_to_backup(path: &Path) -> anyhow::Result<()> {
bail!("couldn't find an unused backup number for {:?}", path)
}
#[cfg(not(debug_assertions))]
#[inline]
pub(crate) fn debug_assert_current_span_has_tenant_and_timeline_id() {}
#[cfg(debug_assertions)]
#[inline]
pub(crate) fn debug_assert_current_span_has_tenant_and_timeline_id() {
use utils::tracing_span_assert;
pub static TIMELINE_ID_EXTRACTOR: once_cell::sync::Lazy<
tracing_span_assert::MultiNameExtractor<2>,
> = once_cell::sync::Lazy::new(|| {
tracing_span_assert::MultiNameExtractor::new("TimelineId", ["timeline_id", "timeline"])
});
match tracing_span_assert::check_fields_present([
&*super::TENANT_ID_EXTRACTOR,
&*TIMELINE_ID_EXTRACTOR,
]) {
Ok(()) => (),
Err(missing) => panic!(
"missing extractors: {:?}",
missing.into_iter().map(|e| e.name()).collect::<Vec<_>>()
),
}
}
/// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables.
///
/// Returns `true` if the two `Arc` point to the same layer, false otherwise.

View File

@@ -70,7 +70,6 @@ impl Timeline {
};
self_clone.eviction_task(cancel).await;
info!("eviction task finishing");
Ok(())
},
);
@@ -78,6 +77,9 @@ impl Timeline {
#[instrument(skip_all, fields(tenant_id = %self.tenant_id, timeline_id = %self.timeline_id))]
async fn eviction_task(self: Arc<Self>, cancel: CancellationToken) {
scopeguard::defer! {
info!("eviction task finishing");
}
use crate::tenant::tasks::random_init_delay;
{
let policy = self.get_eviction_policy();
@@ -86,7 +88,6 @@ impl Timeline {
EvictionPolicy::NoEviction => Duration::from_secs(10),
};
if random_init_delay(period, &cancel).await.is_err() {
info!("shutting down");
return;
}
}
@@ -101,7 +102,6 @@ impl Timeline {
ControlFlow::Continue(sleep_until) => {
tokio::select! {
_ = cancel.cancelled() => {
info!("shutting down");
break;
}
_ = tokio::time::sleep_until(sleep_until) => { }
@@ -209,7 +209,7 @@ impl Timeline {
let last_activity_ts = hist_layer.access_stats().latest_activity().unwrap_or_else(|| {
// We only use this fallback if there's an implementation error.
// `latest_activity` already does rate-limited warn!() log.
debug!(layer=%hist_layer.filename().file_name(), "last_activity returns None, using SystemTime::now");
debug!(layer=%hist_layer, "last_activity returns None, using SystemTime::now");
SystemTime::now()
});

View File

@@ -0,0 +1,128 @@
use anyhow::Context;
use once_cell::sync::OnceCell;
use tokio::sync::Semaphore;
use utils::lsn::Lsn;
use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
use std::sync::Arc;
/// Internal structure to hold all data needed for logical size calculation.
///
/// Calculation consists of two stages:
///
/// 1. Initial size calculation. That might take a long time, because it requires
/// reading all layers containing relation sizes at `initial_part_end`.
///
/// 2. Collecting an incremental part and adding that to the initial size.
/// Increments are appended on walreceiver writing new timeline data,
/// which result in increase or decrease of the logical size.
pub(super) struct LogicalSize {
/// Size, potentially slow to compute. Calculating this might require reading multiple
/// layers, and even ancestor's layers.
///
/// NOTE: size at a given LSN is constant, but after a restart we will calculate
/// the initial size at a different LSN.
pub initial_logical_size: OnceCell<u64>,
/// Semaphore to track ongoing calculation of `initial_logical_size`.
pub initial_size_computation: Arc<tokio::sync::Semaphore>,
/// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines.
pub initial_part_end: Option<Lsn>,
/// All other size changes after startup, combined together.
///
/// Size shouldn't ever be negative, but this is signed for two reasons:
///
/// 1. If we initialized the "baseline" size lazily, while we already
/// process incoming WAL, the incoming WAL records could decrement the
/// variable and temporarily make it negative. (This is just future-proofing;
/// the initialization is currently not done lazily.)
///
/// 2. If there is a bug and we e.g. forget to increment it in some cases
/// when size grows, but remember to decrement it when it shrinks again, the
/// variable could go negative. In that case, it seems better to at least
/// try to keep tracking it, rather than clamp or overflow it. Note that
/// get_current_logical_size() will clamp the returned value to zero if it's
/// negative, and log an error. Could set it permanently to zero or some
/// special value to indicate "broken" instead, but this will do for now.
///
/// Note that we also expose a copy of this value as a prometheus metric,
/// see `current_logical_size_gauge`. Use the `update_current_logical_size`
/// to modify this, it will also keep the prometheus metric in sync.
pub size_added_after_initial: AtomicI64,
}
/// Normalized current size, that the data in pageserver occupies.
#[derive(Debug, Clone, Copy)]
pub(super) enum CurrentLogicalSize {
/// The size is not yet calculated to the end, this is an intermediate result,
/// constructed from walreceiver increments and normalized: logical data could delete some objects, hence be negative,
/// yet total logical size cannot be below 0.
Approximate(u64),
// Fully calculated logical size, only other future walreceiver increments are changing it, and those changes are
// available for observation without any calculations.
Exact(u64),
}
impl CurrentLogicalSize {
pub(super) fn size(&self) -> u64 {
*match self {
Self::Approximate(size) => size,
Self::Exact(size) => size,
}
}
}
impl LogicalSize {
pub(super) fn empty_initial() -> Self {
Self {
initial_logical_size: OnceCell::with_value(0),
// initial_logical_size already computed, so, don't admit any calculations
initial_size_computation: Arc::new(Semaphore::new(0)),
initial_part_end: None,
size_added_after_initial: AtomicI64::new(0),
}
}
pub(super) fn deferred_initial(compute_to: Lsn) -> Self {
Self {
initial_logical_size: OnceCell::new(),
initial_size_computation: Arc::new(Semaphore::new(1)),
initial_part_end: Some(compute_to),
size_added_after_initial: AtomicI64::new(0),
}
}
pub(super) fn current_size(&self) -> anyhow::Result<CurrentLogicalSize> {
let size_increment: i64 = self.size_added_after_initial.load(AtomicOrdering::Acquire);
// ^^^ keep this type explicit so that the casts in this function break if
// we change the type.
match self.initial_logical_size.get() {
Some(initial_size) => {
initial_size.checked_add_signed(size_increment)
.with_context(|| format!("Overflow during logical size calculation, initial_size: {initial_size}, size_increment: {size_increment}"))
.map(CurrentLogicalSize::Exact)
}
None => {
let non_negative_size_increment = u64::try_from(size_increment).unwrap_or(0);
Ok(CurrentLogicalSize::Approximate(non_negative_size_increment))
}
}
}
pub(super) fn increment_size(&self, delta: i64) {
self.size_added_after_initial
.fetch_add(delta, AtomicOrdering::SeqCst);
}
/// Make the value computed by initial logical size computation
/// available for re-use. This doesn't contain the incremental part.
pub(super) fn initialized_size(&self, lsn: Lsn) -> Option<u64> {
match self.initial_part_end {
Some(v) if v == lsn => self.initial_logical_size.get().copied(),
_ => None,
}
}
}

View File

@@ -0,0 +1,25 @@
#[cfg(debug_assertions)]
use utils::tracing_span_assert::{check_fields_present, Extractor, MultiNameExtractor};
#[cfg(not(debug_assertions))]
pub(crate) fn debug_assert_current_span_has_tenant_and_timeline_id() {}
#[cfg(debug_assertions)]
#[track_caller]
pub(crate) fn debug_assert_current_span_has_tenant_and_timeline_id() {
static TIMELINE_ID_EXTRACTOR: once_cell::sync::Lazy<MultiNameExtractor<2>> =
once_cell::sync::Lazy::new(|| {
MultiNameExtractor::new("TimelineId", ["timeline_id", "timeline"])
});
let fields: [&dyn Extractor; 2] = [
&*crate::tenant::span::TENANT_ID_EXTRACTOR,
&*TIMELINE_ID_EXTRACTOR,
];
if let Err(missing) = check_fields_present(fields) {
panic!(
"missing extractors: {:?}",
missing.into_iter().map(|e| e.name()).collect::<Vec<_>>()
)
}
}

View File

@@ -0,0 +1,219 @@
use std::{collections::hash_map::Entry, fs, path::PathBuf, sync::Arc};
use anyhow::Context;
use tracing::{error, info, info_span, warn};
use utils::{crashsafe, id::TimelineId, lsn::Lsn};
use crate::{
context::RequestContext,
import_datadir,
tenant::{ignore_absent_files, Tenant},
};
use super::Timeline;
/// A timeline with some of its files on disk, being initialized.
/// This struct ensures the atomicity of the timeline init: it's either properly created and inserted into pageserver's memory, or
/// its local files are removed. In the worst case of a crash, an uninit mark file is left behind, which causes the directory
/// to be removed on next restart.
///
/// The caller is responsible for proper timeline data filling before the final init.
#[must_use]
pub struct UninitializedTimeline<'t> {
pub(crate) owning_tenant: &'t Tenant,
timeline_id: TimelineId,
raw_timeline: Option<(Arc<Timeline>, TimelineUninitMark)>,
}
impl<'t> UninitializedTimeline<'t> {
pub(crate) fn new(
owning_tenant: &'t Tenant,
timeline_id: TimelineId,
raw_timeline: Option<(Arc<Timeline>, TimelineUninitMark)>,
) -> Self {
Self {
owning_tenant,
timeline_id,
raw_timeline,
}
}
/// Finish timeline creation: insert it into the Tenant's timelines map and remove the
/// uninit mark file.
///
/// This function launches the flush loop if not already done.
///
/// The caller is responsible for activating the timeline (function `.activate()`).
pub(crate) fn finish_creation(mut self) -> anyhow::Result<Arc<Timeline>> {
let timeline_id = self.timeline_id;
let tenant_id = self.owning_tenant.tenant_id;
let (new_timeline, uninit_mark) = self.raw_timeline.take().with_context(|| {
format!("No timeline for initalization found for {tenant_id}/{timeline_id}")
})?;
// Check that the caller initialized disk_consistent_lsn
let new_disk_consistent_lsn = new_timeline.get_disk_consistent_lsn();
anyhow::ensure!(
new_disk_consistent_lsn.is_valid(),
"new timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn"
);
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
match timelines.entry(timeline_id) {
Entry::Occupied(_) => anyhow::bail!(
"Found freshly initialized timeline {tenant_id}/{timeline_id} in the tenant map"
),
Entry::Vacant(v) => {
uninit_mark.remove_uninit_mark().with_context(|| {
format!(
"Failed to remove uninit mark file for timeline {tenant_id}/{timeline_id}"
)
})?;
v.insert(Arc::clone(&new_timeline));
new_timeline.maybe_spawn_flush_loop();
}
}
Ok(new_timeline)
}
/// Prepares timeline data by loading it from the basebackup archive.
pub(crate) async fn import_basebackup_from_tar(
self,
copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
base_lsn: Lsn,
broker_client: storage_broker::BrokerClientChannel,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let raw_timeline = self.raw_timeline()?;
import_datadir::import_basebackup_from_tar(raw_timeline, copyin_read, base_lsn, ctx)
.await
.context("Failed to import basebackup")?;
// Flush the new layer files to disk, before we make the timeline as available to
// the outside world.
//
// Flush loop needs to be spawned in order to be able to flush.
raw_timeline.maybe_spawn_flush_loop();
fail::fail_point!("before-checkpoint-new-timeline", |_| {
anyhow::bail!("failpoint before-checkpoint-new-timeline");
});
raw_timeline
.freeze_and_flush()
.await
.context("Failed to flush after basebackup import")?;
// All the data has been imported. Insert the Timeline into the tenant's timelines
// map and remove the uninit mark file.
let tl = self.finish_creation()?;
tl.activate(broker_client, None, ctx);
Ok(tl)
}
pub(crate) fn raw_timeline(&self) -> anyhow::Result<&Arc<Timeline>> {
Ok(&self
.raw_timeline
.as_ref()
.with_context(|| {
format!(
"No raw timeline {}/{} found",
self.owning_tenant.tenant_id, self.timeline_id
)
})?
.0)
}
}
impl Drop for UninitializedTimeline<'_> {
fn drop(&mut self) {
if let Some((_, uninit_mark)) = self.raw_timeline.take() {
let _entered = info_span!("drop_uninitialized_timeline", tenant = %self.owning_tenant.tenant_id, timeline = %self.timeline_id).entered();
error!("Timeline got dropped without initializing, cleaning its files");
cleanup_timeline_directory(uninit_mark);
}
}
}
pub(crate) fn cleanup_timeline_directory(uninit_mark: TimelineUninitMark) {
let timeline_path = &uninit_mark.timeline_path;
match ignore_absent_files(|| fs::remove_dir_all(timeline_path)) {
Ok(()) => {
info!("Timeline dir {timeline_path:?} removed successfully, removing the uninit mark")
}
Err(e) => {
error!("Failed to clean up uninitialized timeline directory {timeline_path:?}: {e:?}")
}
}
drop(uninit_mark); // mark handles its deletion on drop, gets retained if timeline dir exists
}
/// An uninit mark file, created along the timeline dir to ensure the timeline either gets fully initialized and loaded into pageserver's memory,
/// or gets removed eventually.
///
/// XXX: it's important to create it near the timeline dir, not inside it to ensure timeline dir gets removed first.
#[must_use]
pub(crate) struct TimelineUninitMark {
uninit_mark_deleted: bool,
uninit_mark_path: PathBuf,
pub(crate) timeline_path: PathBuf,
}
impl TimelineUninitMark {
pub(crate) fn new(uninit_mark_path: PathBuf, timeline_path: PathBuf) -> Self {
Self {
uninit_mark_deleted: false,
uninit_mark_path,
timeline_path,
}
}
fn remove_uninit_mark(mut self) -> anyhow::Result<()> {
if !self.uninit_mark_deleted {
self.delete_mark_file_if_present()?;
}
Ok(())
}
fn delete_mark_file_if_present(&mut self) -> anyhow::Result<()> {
let uninit_mark_file = &self.uninit_mark_path;
let uninit_mark_parent = uninit_mark_file
.parent()
.with_context(|| format!("Uninit mark file {uninit_mark_file:?} has no parent"))?;
ignore_absent_files(|| fs::remove_file(uninit_mark_file)).with_context(|| {
format!("Failed to remove uninit mark file at path {uninit_mark_file:?}")
})?;
crashsafe::fsync(uninit_mark_parent).context("Failed to fsync uninit mark parent")?;
self.uninit_mark_deleted = true;
Ok(())
}
}
impl Drop for TimelineUninitMark {
fn drop(&mut self) {
if !self.uninit_mark_deleted {
if self.timeline_path.exists() {
error!(
"Uninit mark {} is not removed, timeline {} stays uninitialized",
self.uninit_mark_path.display(),
self.timeline_path.display()
)
} else {
// unblock later timeline creation attempts
warn!(
"Removing intermediate uninit mark file {}",
self.uninit_mark_path.display()
);
if let Err(e) = self.delete_mark_file_if_present() {
error!("Failed to remove the uninit mark file: {e}")
}
}
}
}
}

View File

@@ -71,6 +71,8 @@ pub(super) async fn handle_walreceiver_connection(
ctx: RequestContext,
node: NodeId,
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_and_timeline_id();
WALRECEIVER_STARTED_CONNECTIONS.inc();
// Connect to the database in replication mode.
@@ -140,6 +142,9 @@ pub(super) async fn handle_walreceiver_connection(
}
Ok(())
}
// Enrich the log lines emitted by this closure with meaningful context.
// TODO: technically, this task outlives the surrounding function, so, the
// spans won't be properly nested.
.instrument(tracing::info_span!("poller")),
);

View File

@@ -302,15 +302,6 @@ impl VirtualFile {
.observe_closure_duration(|| self.open_options.open(&self.path))?;
// Perform the requested operation on it
//
// TODO: We could downgrade the locks to read mode before calling
// 'func', to allow a little bit more concurrency, but the standard
// library RwLock doesn't allow downgrading without releasing the lock,
// and that doesn't seem worth the trouble.
//
// XXX: `parking_lot::RwLock` can enable such downgrades, yet its implementation is fair and
// may deadlock on subsequent read calls.
// Simply replacing all `RwLock` in project causes deadlocks, so use it sparingly.
let result = STORAGE_IO_TIME
.with_label_values(&[op, &self.tenant_id, &self.timeline_id])
.observe_closure_duration(|| func(&file));

View File

@@ -122,6 +122,43 @@ hnsw_populate(HierarchicalNSW* hnsw, Relation indexRel, Relation heapRel)
true, true, hnsw_build_callback, (void *) hnsw, NULL);
}
#ifdef __APPLE__
#include <sys/types.h>
#include <sys/sysctl.h>
static void
hnsw_check_available_memory(Size requested)
{
size_t total;
if (sysctlbyname("hw.memsize", NULL, &total, NULL, 0) < 0)
elog(ERROR, "Failed to get amount of RAM: %m");
if ((Size)NBuffers*BLCKSZ + requested >= total)
elog(ERROR, "HNSW index requeries %ld bytes while only %ld are available",
requested, total - (Size)NBuffers*BLCKSZ);
}
#else
#include <sys/sysinfo.h>
static void
hnsw_check_available_memory(Size requested)
{
struct sysinfo si;
Size total;
if (sysinfo(&si) < 0)
elog(ERROR, "Failed to get amount of RAM: %n");
total = si.totalram*si.mem_unit;
if ((Size)NBuffers*BLCKSZ + requested >= total)
elog(ERROR, "HNSW index requeries %ld bytes while only %ld are available",
requested, total - (Size)NBuffers*BLCKSZ);
}
#endif
static HierarchicalNSW*
hnsw_get_index(Relation indexRel, Relation heapRel)
{
@@ -156,6 +193,8 @@ hnsw_get_index(Relation indexRel, Relation heapRel)
size_data_per_element = size_links_level0 + data_size + sizeof(label_t);
shmem_size = hnsw_sizeof() + maxelements * size_data_per_element;
hnsw_check_available_memory(shmem_size);
/* first try to attach to existed index */
if (!dsm_impl_op(DSM_OP_ATTACH, handle, 0, &impl_private,
&mapped_address, &mapped_size, DEBUG1))

View File

@@ -1,4 +1,4 @@
comment = 'hNsw index'
comment = 'hnsw index'
default_version = '0.1.0'
module_pathname = '$libdir/hnsw'
relocatable = true

View File

@@ -32,6 +32,7 @@
#include "port.h"
#include <curl/curl.h>
#include "utils/jsonb.h"
#include "libpq/crypt.h"
static ProcessUtility_hook_type PreviousProcessUtilityHook = NULL;
@@ -161,7 +162,22 @@ ConstructDeltaMessage()
PushKeyValue(&state, "name", entry->name);
if (entry->password)
{
#if PG_MAJORVERSION_NUM == 14
char *logdetail;
#else
const char *logdetail;
#endif
PushKeyValue(&state, "password", (char *) entry->password);
char *encrypted_password = get_role_password(entry->name, &logdetail);
if (encrypted_password)
{
PushKeyValue(&state, "encrypted_password", encrypted_password);
}
else
{
elog(ERROR, "Failed to get encrypted password: %s", logdetail);
}
}
if (entry->old_name[0] != '\0')
{

View File

@@ -190,7 +190,7 @@ lfc_change_limit_hook(int newval, void *extra)
hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL);
lfc_ctl->used -= 1;
}
elog(LOG, "set local file cache limit to %d", new_size);
elog(DEBUG1, "set local file cache limit to %d", new_size);
LWLockRelease(lfc_lock);
}

View File

@@ -34,7 +34,6 @@
#define PageStoreTrace DEBUG5
#define MAX_RECONNECT_ATTEMPTS 5
#define RECONNECT_INTERVAL_USEC 1000000
bool connected = false;
@@ -55,13 +54,15 @@ int32 max_cluster_size;
char *page_server_connstring;
char *neon_auth_token;
int n_unflushed_requests = 0;
int flush_every_n_requests = 8;
int readahead_buffer_size = 128;
int flush_every_n_requests = 8;
int n_reconnect_attempts = 0;
int max_reconnect_attempts = 60;
bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;
static void pageserver_flush(void);
static bool pageserver_flush(void);
static bool
pageserver_connect(int elevel)
@@ -232,16 +233,17 @@ pageserver_disconnect(void)
}
}
static void
static bool
pageserver_send(NeonRequest * request)
{
StringInfoData req_buff;
int n_reconnect_attempts = 0;
/* If the connection was lost for some reason, reconnect */
if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD)
{
neon_log(LOG, "pageserver_send disconnect bad connection");
pageserver_disconnect();
}
req_buff = nm_pack_request(request);
@@ -252,53 +254,36 @@ pageserver_send(NeonRequest * request)
* See https://github.com/neondatabase/neon/issues/1138
* So try to reestablish connection in case of failure.
*/
while (true)
if (!connected)
{
if (!connected)
while (!pageserver_connect(n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
{
if (!pageserver_connect(n_reconnect_attempts < MAX_RECONNECT_ATTEMPTS ? LOG : ERROR))
{
n_reconnect_attempts += 1;
pg_usleep(RECONNECT_INTERVAL_USEC);
continue;
}
n_reconnect_attempts += 1;
pg_usleep(RECONNECT_INTERVAL_USEC);
}
n_reconnect_attempts = 0;
}
/*
* Send request.
*
* In principle, this could block if the output buffer is full, and we
* should use async mode and check for interrupts while waiting. In
* practice, our requests are small enough to always fit in the output and
* TCP buffer.
*/
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0)
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
if (n_reconnect_attempts < MAX_RECONNECT_ATTEMPTS)
{
neon_log(LOG, "failed to send page request (try to reconnect): %s", msg);
if (n_reconnect_attempts != 0) /* do not sleep before first reconnect attempt, assuming that pageserver is already restarted */
pg_usleep(RECONNECT_INTERVAL_USEC);
n_reconnect_attempts += 1;
continue;
}
else
{
pageserver_disconnect();
neon_log(ERROR, "failed to send page request: %s", msg);
}
}
break;
/*
* Send request.
*
* In principle, this could block if the output buffer is full, and we
* should use async mode and check for interrupts while waiting. In
* practice, our requests are small enough to always fit in the output and
* TCP buffer.
*/
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0)
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect();
neon_log(LOG, "pageserver_send disconnect because failed to send page request (try to reconnect): %s", msg);
pfree(msg);
pfree(req_buff.data);
return false;
}
pfree(req_buff.data);
n_unflushed_requests++;
if (flush_every_n_requests > 0 && n_unflushed_requests >= flush_every_n_requests)
pageserver_flush();
if (message_level_is_interesting(PageStoreTrace))
{
char *msg = nm_to_string((NeonMessage *) request);
@@ -306,6 +291,7 @@ pageserver_send(NeonRequest * request)
neon_log(PageStoreTrace, "sent request: %s", msg);
pfree(msg);
}
return true;
}
static NeonResponse *
@@ -340,16 +326,25 @@ pageserver_receive(void)
}
else if (rc == -1)
{
neon_log(LOG, "pageserver_receive disconnect because call_PQgetCopyData returns -1: %s", pchomp(PQerrorMessage(pageserver_conn)));
pageserver_disconnect();
resp = NULL;
}
else if (rc == -2)
neon_log(ERROR, "could not read COPY data: %s", pchomp(PQerrorMessage(pageserver_conn)));
{
char* msg = pchomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect();
neon_log(ERROR, "pageserver_receive disconnect because could not read COPY data: %s", msg);
}
else
neon_log(ERROR, "unexpected PQgetCopyData return value: %d", rc);
{
pageserver_disconnect();
neon_log(ERROR, "pageserver_receive disconnect because unexpected PQgetCopyData return value: %d", rc);
}
}
PG_CATCH();
{
neon_log(LOG, "pageserver_receive disconnect due to caught exception");
pageserver_disconnect();
PG_RE_THROW();
}
@@ -359,21 +354,25 @@ pageserver_receive(void)
}
static void
static bool
pageserver_flush(void)
{
if (!connected)
{
neon_log(WARNING, "Tried to flush while disconnected");
}
else if (PQflush(pageserver_conn))
else
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect();
neon_log(ERROR, "failed to flush page requests: %s", msg);
if (PQflush(pageserver_conn))
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect();
neon_log(LOG, "pageserver_flush disconnect because failed to flush page requests: %s", msg);
pfree(msg);
return false;
}
}
n_unflushed_requests = 0;
return true;
}
page_server_api api = {
@@ -439,6 +438,14 @@ pg_init_libpagestore(void)
PGC_USERSET,
0, /* no flags required */
NULL, NULL, NULL);
DefineCustomIntVariable("neon.max_reconnect_attempts",
"Maximal attempts to reconnect to pages server (with 1 second timeout)",
NULL,
&max_reconnect_attempts,
10, 0, INT_MAX,
PGC_USERSET,
0,
NULL, NULL, NULL);
DefineCustomIntVariable("neon.readahead_buffer_size",
"number of prefetches to buffer",
"This buffer is used to hold and manage prefetched "

View File

@@ -145,9 +145,9 @@ extern char *nm_to_string(NeonMessage * msg);
typedef struct
{
void (*send) (NeonRequest * request);
bool (*send) (NeonRequest * request);
NeonResponse *(*receive) (void);
void (*flush) (void);
bool (*flush) (void);
} page_server_api;
extern void prefetch_on_ps_disconnect(void);

View File

@@ -489,7 +489,8 @@ prefetch_wait_for(uint64 ring_index)
if (MyPState->ring_flush <= ring_index &&
MyPState->ring_unused > MyPState->ring_flush)
{
page_server->flush();
if (!page_server->flush())
return false;
MyPState->ring_flush = MyPState->ring_unused;
}
@@ -666,7 +667,7 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
* smaller than the current WAL insert/redo pointer, which is already
* larger than this prefetch_lsn. So in any case, that would
* invalidate this cache.
*
*
* The best LSN to use for effective_request_lsn would be
* XLogCtl->Insert.RedoRecPtr, but that's expensive to access.
*/
@@ -677,7 +678,8 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_unused);
page_server->send((NeonRequest *) &request);
while (!page_server->send((NeonRequest *) &request));
/* update prefetch state */
MyPState->n_requests_inflight += 1;
@@ -687,6 +689,7 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
/* update slot state */
slot->status = PRFS_REQUESTED;
prfh_insert(MyPState->prf_hash, slot, &found);
Assert(!found);
}
@@ -743,6 +746,7 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
prefetch_set_unused(ring_index);
entry = NULL;
}
}
/* if we don't want the latest version, only accept requests with the exact same LSN */
else
@@ -756,20 +760,23 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
}
}
/*
* We received a prefetch for a page that was recently read and
* removed from the buffers. Remove that request from the buffers.
*/
else if (slot->status == PRFS_TAG_REMAINS)
if (entry != NULL)
{
prefetch_set_unused(ring_index);
entry = NULL;
}
else
{
/* The buffered request is good enough, return that index */
pgBufferUsage.prefetch.duplicates++;
return ring_index;
/*
* We received a prefetch for a page that was recently read and
* removed from the buffers. Remove that request from the buffers.
*/
if (slot->status == PRFS_TAG_REMAINS)
{
prefetch_set_unused(ring_index);
entry = NULL;
}
else
{
/* The buffered request is good enough, return that index */
pgBufferUsage.prefetch.duplicates++;
return ring_index;
}
}
}
@@ -859,8 +866,7 @@ page_server_request(void const *req)
{
NeonResponse* resp;
do {
page_server->send((NeonRequest *) req);
page_server->flush();
while (!page_server->send((NeonRequest *) req) || !page_server->flush());
MyPState->ring_flush = MyPState->ring_unused;
consume_prefetch_responses();
resp = page_server->receive();

123
poetry.lock generated
View File

@@ -1654,71 +1654,74 @@ test = ["enum34", "ipaddress", "mock", "pywin32", "wmi"]
[[package]]
name = "psycopg2-binary"
version = "2.9.3"
version = "2.9.6"
description = "psycopg2 - Python-PostgreSQL Database Adapter"
category = "main"
optional = false
python-versions = ">=3.6"
files = [
{file = "psycopg2-binary-2.9.3.tar.gz", hash = "sha256:761df5313dc15da1502b21453642d7599d26be88bff659382f8f9747c7ebea4e"},
{file = "psycopg2_binary-2.9.3-cp310-cp310-macosx_10_14_x86_64.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl", hash = "sha256:539b28661b71da7c0e428692438efbcd048ca21ea81af618d845e06ebfd29478"},
{file = "psycopg2_binary-2.9.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:2f2534ab7dc7e776a263b463a16e189eb30e85ec9bbe1bff9e78dae802608932"},
{file = "psycopg2_binary-2.9.3-cp310-cp310-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6e82d38390a03da28c7985b394ec3f56873174e2c88130e6966cb1c946508e65"},
{file = "psycopg2_binary-2.9.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:57804fc02ca3ce0dbfbef35c4b3a4a774da66d66ea20f4bda601294ad2ea6092"},
{file = "psycopg2_binary-2.9.3-cp310-cp310-manylinux_2_24_aarch64.whl", hash = "sha256:083a55275f09a62b8ca4902dd11f4b33075b743cf0d360419e2051a8a5d5ff76"},
{file = "psycopg2_binary-2.9.3-cp310-cp310-manylinux_2_24_ppc64le.whl", hash = "sha256:0a29729145aaaf1ad8bafe663131890e2111f13416b60e460dae0a96af5905c9"},
{file = "psycopg2_binary-2.9.3-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:3a79d622f5206d695d7824cbf609a4f5b88ea6d6dab5f7c147fc6d333a8787e4"},
{file = "psycopg2_binary-2.9.3-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:090f3348c0ab2cceb6dfbe6bf721ef61262ddf518cd6cc6ecc7d334996d64efa"},
{file = "psycopg2_binary-2.9.3-cp310-cp310-musllinux_1_1_ppc64le.whl", hash = "sha256:a9e1f75f96ea388fbcef36c70640c4efbe4650658f3d6a2967b4cc70e907352e"},
{file = "psycopg2_binary-2.9.3-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:c3ae8e75eb7160851e59adc77b3a19a976e50622e44fd4fd47b8b18208189d42"},
{file = "psycopg2_binary-2.9.3-cp310-cp310-win32.whl", hash = "sha256:7b1e9b80afca7b7a386ef087db614faebbf8839b7f4db5eb107d0f1a53225029"},
{file = "psycopg2_binary-2.9.3-cp310-cp310-win_amd64.whl", hash = "sha256:8b344adbb9a862de0c635f4f0425b7958bf5a4b927c8594e6e8d261775796d53"},
{file = "psycopg2_binary-2.9.3-cp36-cp36m-macosx_10_14_x86_64.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl", hash = "sha256:e847774f8ffd5b398a75bc1c18fbb56564cda3d629fe68fd81971fece2d3c67e"},
{file = "psycopg2_binary-2.9.3-cp36-cp36m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:68641a34023d306be959101b345732360fc2ea4938982309b786f7be1b43a4a1"},
{file = "psycopg2_binary-2.9.3-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3303f8807f342641851578ee7ed1f3efc9802d00a6f83c101d21c608cb864460"},
{file = "psycopg2_binary-2.9.3-cp36-cp36m-manylinux_2_24_aarch64.whl", hash = "sha256:e3699852e22aa68c10de06524a3721ade969abf382da95884e6a10ff798f9281"},
{file = "psycopg2_binary-2.9.3-cp36-cp36m-manylinux_2_24_ppc64le.whl", hash = "sha256:526ea0378246d9b080148f2d6681229f4b5964543c170dd10bf4faaab6e0d27f"},
{file = "psycopg2_binary-2.9.3-cp36-cp36m-musllinux_1_1_aarch64.whl", hash = "sha256:b1c8068513f5b158cf7e29c43a77eb34b407db29aca749d3eb9293ee0d3103ca"},
{file = "psycopg2_binary-2.9.3-cp36-cp36m-musllinux_1_1_i686.whl", hash = "sha256:15803fa813ea05bef089fa78835118b5434204f3a17cb9f1e5dbfd0b9deea5af"},
{file = "psycopg2_binary-2.9.3-cp36-cp36m-musllinux_1_1_ppc64le.whl", hash = "sha256:152f09f57417b831418304c7f30d727dc83a12761627bb826951692cc6491e57"},
{file = "psycopg2_binary-2.9.3-cp36-cp36m-musllinux_1_1_x86_64.whl", hash = "sha256:404224e5fef3b193f892abdbf8961ce20e0b6642886cfe1fe1923f41aaa75c9d"},
{file = "psycopg2_binary-2.9.3-cp36-cp36m-win32.whl", hash = "sha256:1f6b813106a3abdf7b03640d36e24669234120c72e91d5cbaeb87c5f7c36c65b"},
{file = "psycopg2_binary-2.9.3-cp36-cp36m-win_amd64.whl", hash = "sha256:2d872e3c9d5d075a2e104540965a1cf898b52274a5923936e5bfddb58c59c7c2"},
{file = "psycopg2_binary-2.9.3-cp37-cp37m-macosx_10_14_x86_64.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl", hash = "sha256:10bb90fb4d523a2aa67773d4ff2b833ec00857f5912bafcfd5f5414e45280fb1"},
{file = "psycopg2_binary-2.9.3-cp37-cp37m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:874a52ecab70af13e899f7847b3e074eeb16ebac5615665db33bce8a1009cf33"},
{file = "psycopg2_binary-2.9.3-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a29b3ca4ec9defec6d42bf5feb36bb5817ba3c0230dd83b4edf4bf02684cd0ae"},
{file = "psycopg2_binary-2.9.3-cp37-cp37m-manylinux_2_24_aarch64.whl", hash = "sha256:12b11322ea00ad8db8c46f18b7dfc47ae215e4df55b46c67a94b4effbaec7094"},
{file = "psycopg2_binary-2.9.3-cp37-cp37m-manylinux_2_24_ppc64le.whl", hash = "sha256:53293533fcbb94c202b7c800a12c873cfe24599656b341f56e71dd2b557be063"},
{file = "psycopg2_binary-2.9.3-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:c381bda330ddf2fccbafab789d83ebc6c53db126e4383e73794c74eedce855ef"},
{file = "psycopg2_binary-2.9.3-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:9d29409b625a143649d03d0fd7b57e4b92e0ecad9726ba682244b73be91d2fdb"},
{file = "psycopg2_binary-2.9.3-cp37-cp37m-musllinux_1_1_ppc64le.whl", hash = "sha256:183a517a3a63503f70f808b58bfbf962f23d73b6dccddae5aa56152ef2bcb232"},
{file = "psycopg2_binary-2.9.3-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:15c4e4cfa45f5a60599d9cec5f46cd7b1b29d86a6390ec23e8eebaae84e64554"},
{file = "psycopg2_binary-2.9.3-cp37-cp37m-win32.whl", hash = "sha256:adf20d9a67e0b6393eac162eb81fb10bc9130a80540f4df7e7355c2dd4af9fba"},
{file = "psycopg2_binary-2.9.3-cp37-cp37m-win_amd64.whl", hash = "sha256:2f9ffd643bc7349eeb664eba8864d9e01f057880f510e4681ba40a6532f93c71"},
{file = "psycopg2_binary-2.9.3-cp38-cp38-macosx_10_14_x86_64.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl", hash = "sha256:def68d7c21984b0f8218e8a15d514f714d96904265164f75f8d3a70f9c295667"},
{file = "psycopg2_binary-2.9.3-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:e6aa71ae45f952a2205377773e76f4e3f27951df38e69a4c95440c779e013560"},
{file = "psycopg2_binary-2.9.3-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:dffc08ca91c9ac09008870c9eb77b00a46b3378719584059c034b8945e26b272"},
{file = "psycopg2_binary-2.9.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:280b0bb5cbfe8039205c7981cceb006156a675362a00fe29b16fbc264e242834"},
{file = "psycopg2_binary-2.9.3-cp38-cp38-manylinux_2_24_aarch64.whl", hash = "sha256:af9813db73395fb1fc211bac696faea4ca9ef53f32dc0cfa27e4e7cf766dcf24"},
{file = "psycopg2_binary-2.9.3-cp38-cp38-manylinux_2_24_ppc64le.whl", hash = "sha256:63638d875be8c2784cfc952c9ac34e2b50e43f9f0a0660b65e2a87d656b3116c"},
{file = "psycopg2_binary-2.9.3-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:ffb7a888a047696e7f8240d649b43fb3644f14f0ee229077e7f6b9f9081635bd"},
{file = "psycopg2_binary-2.9.3-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:0c9d5450c566c80c396b7402895c4369a410cab5a82707b11aee1e624da7d004"},
{file = "psycopg2_binary-2.9.3-cp38-cp38-musllinux_1_1_ppc64le.whl", hash = "sha256:d1c1b569ecafe3a69380a94e6ae09a4789bbb23666f3d3a08d06bbd2451f5ef1"},
{file = "psycopg2_binary-2.9.3-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:8fc53f9af09426a61db9ba357865c77f26076d48669f2e1bb24d85a22fb52307"},
{file = "psycopg2_binary-2.9.3-cp38-cp38-win32.whl", hash = "sha256:6472a178e291b59e7f16ab49ec8b4f3bdada0a879c68d3817ff0963e722a82ce"},
{file = "psycopg2_binary-2.9.3-cp38-cp38-win_amd64.whl", hash = "sha256:35168209c9d51b145e459e05c31a9eaeffa9a6b0fd61689b48e07464ffd1a83e"},
{file = "psycopg2_binary-2.9.3-cp39-cp39-macosx_10_14_x86_64.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl", hash = "sha256:47133f3f872faf28c1e87d4357220e809dfd3fa7c64295a4a148bcd1e6e34ec9"},
{file = "psycopg2_binary-2.9.3-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:b3a24a1982ae56461cc24f6680604fffa2c1b818e9dc55680da038792e004d18"},
{file = "psycopg2_binary-2.9.3-cp39-cp39-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:91920527dea30175cc02a1099f331aa8c1ba39bf8b7762b7b56cbf54bc5cce42"},
{file = "psycopg2_binary-2.9.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:887dd9aac71765ac0d0bac1d0d4b4f2c99d5f5c1382d8b770404f0f3d0ce8a39"},
{file = "psycopg2_binary-2.9.3-cp39-cp39-manylinux_2_24_aarch64.whl", hash = "sha256:1f14c8b0942714eb3c74e1e71700cbbcb415acbc311c730370e70c578a44a25c"},
{file = "psycopg2_binary-2.9.3-cp39-cp39-manylinux_2_24_ppc64le.whl", hash = "sha256:7af0dd86ddb2f8af5da57a976d27cd2cd15510518d582b478fbb2292428710b4"},
{file = "psycopg2_binary-2.9.3-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:93cd1967a18aa0edd4b95b1dfd554cf15af657cb606280996d393dadc88c3c35"},
{file = "psycopg2_binary-2.9.3-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:bda845b664bb6c91446ca9609fc69f7db6c334ec5e4adc87571c34e4f47b7ddb"},
{file = "psycopg2_binary-2.9.3-cp39-cp39-musllinux_1_1_ppc64le.whl", hash = "sha256:01310cf4cf26db9aea5158c217caa92d291f0500051a6469ac52166e1a16f5b7"},
{file = "psycopg2_binary-2.9.3-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:99485cab9ba0fa9b84f1f9e1fef106f44a46ef6afdeec8885e0b88d0772b49e8"},
{file = "psycopg2_binary-2.9.3-cp39-cp39-win32.whl", hash = "sha256:46f0e0a6b5fa5851bbd9ab1bc805eef362d3a230fbdfbc209f4a236d0a7a990d"},
{file = "psycopg2_binary-2.9.3-cp39-cp39-win_amd64.whl", hash = "sha256:accfe7e982411da3178ec690baaceaad3c278652998b2c45828aaac66cd8285f"},
{file = "psycopg2-binary-2.9.6.tar.gz", hash = "sha256:1f64dcfb8f6e0c014c7f55e51c9759f024f70ea572fbdef123f85318c297947c"},
{file = "psycopg2_binary-2.9.6-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:d26e0342183c762de3276cca7a530d574d4e25121ca7d6e4a98e4f05cb8e4df7"},
{file = "psycopg2_binary-2.9.6-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:c48d8f2db17f27d41fb0e2ecd703ea41984ee19362cbce52c097963b3a1b4365"},
{file = "psycopg2_binary-2.9.6-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ffe9dc0a884a8848075e576c1de0290d85a533a9f6e9c4e564f19adf8f6e54a7"},
{file = "psycopg2_binary-2.9.6-cp310-cp310-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:8a76e027f87753f9bd1ab5f7c9cb8c7628d1077ef927f5e2446477153a602f2c"},
{file = "psycopg2_binary-2.9.6-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:6460c7a99fc939b849431f1e73e013d54aa54293f30f1109019c56a0b2b2ec2f"},
{file = "psycopg2_binary-2.9.6-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ae102a98c547ee2288637af07393dd33f440c25e5cd79556b04e3fca13325e5f"},
{file = "psycopg2_binary-2.9.6-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:9972aad21f965599ed0106f65334230ce826e5ae69fda7cbd688d24fa922415e"},
{file = "psycopg2_binary-2.9.6-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:7a40c00dbe17c0af5bdd55aafd6ff6679f94a9be9513a4c7e071baf3d7d22a70"},
{file = "psycopg2_binary-2.9.6-cp310-cp310-musllinux_1_1_ppc64le.whl", hash = "sha256:cacbdc5839bdff804dfebc058fe25684cae322987f7a38b0168bc1b2df703fb1"},
{file = "psycopg2_binary-2.9.6-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:7f0438fa20fb6c7e202863e0d5ab02c246d35efb1d164e052f2f3bfe2b152bd0"},
{file = "psycopg2_binary-2.9.6-cp310-cp310-win32.whl", hash = "sha256:b6c8288bb8a84b47e07013bb4850f50538aa913d487579e1921724631d02ea1b"},
{file = "psycopg2_binary-2.9.6-cp310-cp310-win_amd64.whl", hash = "sha256:61b047a0537bbc3afae10f134dc6393823882eb263088c271331602b672e52e9"},
{file = "psycopg2_binary-2.9.6-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:964b4dfb7c1c1965ac4c1978b0f755cc4bd698e8aa2b7667c575fb5f04ebe06b"},
{file = "psycopg2_binary-2.9.6-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:afe64e9b8ea66866a771996f6ff14447e8082ea26e675a295ad3bdbffdd72afb"},
{file = "psycopg2_binary-2.9.6-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:15e2ee79e7cf29582ef770de7dab3d286431b01c3bb598f8e05e09601b890081"},
{file = "psycopg2_binary-2.9.6-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:dfa74c903a3c1f0d9b1c7e7b53ed2d929a4910e272add6700c38f365a6002820"},
{file = "psycopg2_binary-2.9.6-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:b83456c2d4979e08ff56180a76429263ea254c3f6552cd14ada95cff1dec9bb8"},
{file = "psycopg2_binary-2.9.6-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0645376d399bfd64da57148694d78e1f431b1e1ee1054872a5713125681cf1be"},
{file = "psycopg2_binary-2.9.6-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:e99e34c82309dd78959ba3c1590975b5d3c862d6f279f843d47d26ff89d7d7e1"},
{file = "psycopg2_binary-2.9.6-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:4ea29fc3ad9d91162c52b578f211ff1c931d8a38e1f58e684c45aa470adf19e2"},
{file = "psycopg2_binary-2.9.6-cp311-cp311-musllinux_1_1_ppc64le.whl", hash = "sha256:4ac30da8b4f57187dbf449294d23b808f8f53cad6b1fc3623fa8a6c11d176dd0"},
{file = "psycopg2_binary-2.9.6-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:e78e6e2a00c223e164c417628572a90093c031ed724492c763721c2e0bc2a8df"},
{file = "psycopg2_binary-2.9.6-cp311-cp311-win32.whl", hash = "sha256:1876843d8e31c89c399e31b97d4b9725a3575bb9c2af92038464231ec40f9edb"},
{file = "psycopg2_binary-2.9.6-cp311-cp311-win_amd64.whl", hash = "sha256:b4b24f75d16a89cc6b4cdff0eb6a910a966ecd476d1e73f7ce5985ff1328e9a6"},
{file = "psycopg2_binary-2.9.6-cp36-cp36m-win32.whl", hash = "sha256:498807b927ca2510baea1b05cc91d7da4718a0f53cb766c154c417a39f1820a0"},
{file = "psycopg2_binary-2.9.6-cp36-cp36m-win_amd64.whl", hash = "sha256:0d236c2825fa656a2d98bbb0e52370a2e852e5a0ec45fc4f402977313329174d"},
{file = "psycopg2_binary-2.9.6-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:34b9ccdf210cbbb1303c7c4db2905fa0319391bd5904d32689e6dd5c963d2ea8"},
{file = "psycopg2_binary-2.9.6-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:84d2222e61f313c4848ff05353653bf5f5cf6ce34df540e4274516880d9c3763"},
{file = "psycopg2_binary-2.9.6-cp37-cp37m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:30637a20623e2a2eacc420059be11527f4458ef54352d870b8181a4c3020ae6b"},
{file = "psycopg2_binary-2.9.6-cp37-cp37m-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:8122cfc7cae0da9a3077216528b8bb3629c43b25053284cc868744bfe71eb141"},
{file = "psycopg2_binary-2.9.6-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:38601cbbfe600362c43714482f43b7c110b20cb0f8172422c616b09b85a750c5"},
{file = "psycopg2_binary-2.9.6-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:c7e62ab8b332147a7593a385d4f368874d5fe4ad4e341770d4983442d89603e3"},
{file = "psycopg2_binary-2.9.6-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:2ab652e729ff4ad76d400df2624d223d6e265ef81bb8aa17fbd63607878ecbee"},
{file = "psycopg2_binary-2.9.6-cp37-cp37m-musllinux_1_1_ppc64le.whl", hash = "sha256:c83a74b68270028dc8ee74d38ecfaf9c90eed23c8959fca95bd703d25b82c88e"},
{file = "psycopg2_binary-2.9.6-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:d4e6036decf4b72d6425d5b29bbd3e8f0ff1059cda7ac7b96d6ac5ed34ffbacd"},
{file = "psycopg2_binary-2.9.6-cp37-cp37m-win32.whl", hash = "sha256:a8c28fd40a4226b4a84bdf2d2b5b37d2c7bd49486b5adcc200e8c7ec991dfa7e"},
{file = "psycopg2_binary-2.9.6-cp37-cp37m-win_amd64.whl", hash = "sha256:51537e3d299be0db9137b321dfb6a5022caaab275775680e0c3d281feefaca6b"},
{file = "psycopg2_binary-2.9.6-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:cf4499e0a83b7b7edcb8dabecbd8501d0d3a5ef66457200f77bde3d210d5debb"},
{file = "psycopg2_binary-2.9.6-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:7e13a5a2c01151f1208d5207e42f33ba86d561b7a89fca67c700b9486a06d0e2"},
{file = "psycopg2_binary-2.9.6-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0e0f754d27fddcfd74006455b6e04e6705d6c31a612ec69ddc040a5468e44b4e"},
{file = "psycopg2_binary-2.9.6-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d57c3fd55d9058645d26ae37d76e61156a27722097229d32a9e73ed54819982a"},
{file = "psycopg2_binary-2.9.6-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:71f14375d6f73b62800530b581aed3ada394039877818b2d5f7fc77e3bb6894d"},
{file = "psycopg2_binary-2.9.6-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:441cc2f8869a4f0f4bb408475e5ae0ee1f3b55b33f350406150277f7f35384fc"},
{file = "psycopg2_binary-2.9.6-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:65bee1e49fa6f9cf327ce0e01c4c10f39165ee76d35c846ade7cb0ec6683e303"},
{file = "psycopg2_binary-2.9.6-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:af335bac6b666cc6aea16f11d486c3b794029d9df029967f9938a4bed59b6a19"},
{file = "psycopg2_binary-2.9.6-cp38-cp38-musllinux_1_1_ppc64le.whl", hash = "sha256:cfec476887aa231b8548ece2e06d28edc87c1397ebd83922299af2e051cf2827"},
{file = "psycopg2_binary-2.9.6-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:65c07febd1936d63bfde78948b76cd4c2a411572a44ac50719ead41947d0f26b"},
{file = "psycopg2_binary-2.9.6-cp38-cp38-win32.whl", hash = "sha256:4dfb4be774c4436a4526d0c554af0cc2e02082c38303852a36f6456ece7b3503"},
{file = "psycopg2_binary-2.9.6-cp38-cp38-win_amd64.whl", hash = "sha256:02c6e3cf3439e213e4ee930308dc122d6fb4d4bea9aef4a12535fbd605d1a2fe"},
{file = "psycopg2_binary-2.9.6-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:e9182eb20f41417ea1dd8e8f7888c4d7c6e805f8a7c98c1081778a3da2bee3e4"},
{file = "psycopg2_binary-2.9.6-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:8a6979cf527e2603d349a91060f428bcb135aea2be3201dff794813256c274f1"},
{file = "psycopg2_binary-2.9.6-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8338a271cb71d8da40b023a35d9c1e919eba6cbd8fa20a54b748a332c355d896"},
{file = "psycopg2_binary-2.9.6-cp39-cp39-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e3ed340d2b858d6e6fb5083f87c09996506af483227735de6964a6100b4e6a54"},
{file = "psycopg2_binary-2.9.6-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:f81e65376e52f03422e1fb475c9514185669943798ed019ac50410fb4c4df232"},
{file = "psycopg2_binary-2.9.6-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bfb13af3c5dd3a9588000910178de17010ebcccd37b4f9794b00595e3a8ddad3"},
{file = "psycopg2_binary-2.9.6-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:4c727b597c6444a16e9119386b59388f8a424223302d0c06c676ec8b4bc1f963"},
{file = "psycopg2_binary-2.9.6-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:4d67fbdaf177da06374473ef6f7ed8cc0a9dc640b01abfe9e8a2ccb1b1402c1f"},
{file = "psycopg2_binary-2.9.6-cp39-cp39-musllinux_1_1_ppc64le.whl", hash = "sha256:0892ef645c2fabb0c75ec32d79f4252542d0caec1d5d949630e7d242ca4681a3"},
{file = "psycopg2_binary-2.9.6-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:02c0f3757a4300cf379eb49f543fb7ac527fb00144d39246ee40e1df684ab514"},
{file = "psycopg2_binary-2.9.6-cp39-cp39-win32.whl", hash = "sha256:c3dba7dab16709a33a847e5cd756767271697041fbe3fe97c215b1fc1f5c9848"},
{file = "psycopg2_binary-2.9.6-cp39-cp39-win_amd64.whl", hash = "sha256:f6a88f384335bb27812293fdb11ac6aee2ca3f51d3c7820fe03de0a304ab6249"},
]
[[package]]

View File

@@ -7,7 +7,6 @@ license.workspace = true
[dependencies]
anyhow.workspace = true
async-trait.workspace = true
atty.workspace = true
base64.workspace = true
bstr.workspace = true
bytes = { workspace = true, features = ["serde"] }
@@ -30,6 +29,7 @@ metrics.workspace = true
once_cell.workspace = true
opentelemetry.workspace = true
parking_lot.workspace = true
pbkdf2.workspace = true
pin-project-lite.workspace = true
postgres_backend.workspace = true
pq_proto.workspace = true
@@ -38,6 +38,7 @@ rand.workspace = true
regex.workspace = true
reqwest = { workspace = true, features = ["json"] }
reqwest-middleware.workspace = true
reqwest-retry.workspace = true
reqwest-tracing.workspace = true
routerify.workspace = true
rustls-pemfile.workspace = true

View File

@@ -136,18 +136,17 @@ impl Default for ConnCfg {
impl ConnCfg {
/// Establish a raw TCP connection to the compute node.
async fn connect_raw(&self) -> io::Result<(SocketAddr, TcpStream, &str)> {
async fn connect_raw(&self, timeout: Duration) -> io::Result<(SocketAddr, TcpStream, &str)> {
use tokio_postgres::config::Host;
// wrap TcpStream::connect with timeout
let connect_with_timeout = |host, port| {
let connection_timeout = Duration::from_millis(10000);
tokio::time::timeout(connection_timeout, TcpStream::connect((host, port))).map(
tokio::time::timeout(timeout, TcpStream::connect((host, port))).map(
move |res| match res {
Ok(tcpstream_connect_res) => tcpstream_connect_res,
Err(_) => Err(io::Error::new(
io::ErrorKind::TimedOut,
format!("exceeded connection timeout {connection_timeout:?}"),
format!("exceeded connection timeout {timeout:?}"),
)),
},
)
@@ -223,8 +222,9 @@ impl ConnCfg {
async fn do_connect(
&self,
allow_self_signed_compute: bool,
timeout: Duration,
) -> Result<PostgresConnection, ConnectionError> {
let (socket_addr, stream, host) = self.connect_raw().await?;
let (socket_addr, stream, host) = self.connect_raw(timeout).await?;
let tls_connector = native_tls::TlsConnector::builder()
.danger_accept_invalid_certs(allow_self_signed_compute)
@@ -264,8 +264,9 @@ impl ConnCfg {
pub async fn connect(
&self,
allow_self_signed_compute: bool,
timeout: Duration,
) -> Result<PostgresConnection, ConnectionError> {
self.do_connect(allow_self_signed_compute)
self.do_connect(allow_self_signed_compute, timeout)
.inspect_err(|err| {
// Immediately log the error we have at our disposal.
error!("couldn't connect to compute node: {err}");

View File

@@ -212,7 +212,7 @@ pub struct CacheOptions {
impl CacheOptions {
/// Default options for [`crate::auth::caches::NodeInfoCache`].
pub const DEFAULT_OPTIONS_NODE_INFO: &str = "size=4000,ttl=5m";
pub const DEFAULT_OPTIONS_NODE_INFO: &str = "size=4000,ttl=4m";
/// Parse cache options passed via cmdline.
/// Example: [`Self::DEFAULT_OPTIONS_NODE_INFO`].

View File

@@ -8,7 +8,7 @@ use postgres_backend::{self, AuthType, PostgresBackend, PostgresBackendTCP, Quer
use pq_proto::{BeMessage, SINGLE_COL_ROWDESC};
use std::future;
use tokio::net::{TcpListener, TcpStream};
use tracing::{error, info, info_span};
use tracing::{error, info, info_span, Instrument};
static CPLANE_WAITERS: Lazy<Waiters<ComputeReady>> = Lazy::new(Default::default);
@@ -44,19 +44,30 @@ pub async fn task_main(listener: TcpListener) -> anyhow::Result<()> {
.set_nodelay(true)
.context("failed to set client socket option")?;
tokio::task::spawn(async move {
let span = info_span!("mgmt", peer = %peer_addr);
let _enter = span.enter();
let span = info_span!("mgmt", peer = %peer_addr);
info!("started a new console management API thread");
scopeguard::defer! {
info!("console management API thread is about to finish");
}
tokio::task::spawn(
async move {
info!("serving a new console management API connection");
if let Err(e) = handle_connection(socket).await {
error!("thread failed with an error: {e}");
// these might be long running connections, have a separate logging for cancelling
// on shutdown and other ways of stopping.
let cancelled = scopeguard::guard(tracing::Span::current(), |span| {
let _e = span.entered();
info!("console management API task cancelled");
});
if let Err(e) = handle_connection(socket).await {
error!("serving failed with an error: {e}");
} else {
info!("serving completed");
}
// we can no longer get dropped
scopeguard::ScopeGuard::into_inner(cancelled);
}
});
.instrument(span),
);
}
}
@@ -77,14 +88,14 @@ impl postgres_backend::Handler<tokio::net::TcpStream> for MgmtHandler {
pgb: &mut PostgresBackendTCP,
query: &str,
) -> Result<(), QueryError> {
try_process_query(pgb, query).await.map_err(|e| {
try_process_query(pgb, query).map_err(|e| {
error!("failed to process response: {e:?}");
e
})
}
}
async fn try_process_query(pgb: &mut PostgresBackendTCP, query: &str) -> Result<(), QueryError> {
fn try_process_query(pgb: &mut PostgresBackendTCP, query: &str) -> Result<(), QueryError> {
let resp: KickSession = serde_json::from_str(query).context("Failed to parse query as json")?;
let span = info_span!("event", session_id = resp.session_id);

View File

@@ -2,12 +2,16 @@
//! Other modules should use stuff from this module instead of
//! directly relying on deps like `reqwest` (think loose coupling).
pub mod conn_pool;
pub mod server;
pub mod sql_over_http;
pub mod websocket;
use std::time::Duration;
pub use reqwest::{Request, Response, StatusCode};
pub use reqwest_middleware::{ClientWithMiddleware, Error};
pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use crate::url::ApiUrl;
use reqwest_middleware::RequestBuilder;
@@ -21,6 +25,24 @@ pub fn new_client() -> ClientWithMiddleware {
.build()
}
pub fn new_client_with_timeout(default_timout: Duration) -> ClientWithMiddleware {
let timeout_client = reqwest::ClientBuilder::new()
.timeout(default_timout)
.build()
.expect("Failed to create http client with timeout");
let retry_policy =
ExponentialBackoff::builder().build_with_total_retry_duration(default_timout);
reqwest_middleware::ClientBuilder::new(timeout_client)
.with(reqwest_tracing::TracingMiddleware::default())
// As per docs, "This middleware always errors when given requests with streaming bodies".
// That's all right because we only use this client to send `serde_json::RawValue`, which
// is not a stream.
.with(RetryTransientMiddleware::new_with_policy(retry_policy))
.build()
}
/// Thin convenience wrapper for an API provided by an http endpoint.
#[derive(Debug, Clone)]
pub struct Endpoint {

278
proxy/src/http/conn_pool.rs Normal file
View File

@@ -0,0 +1,278 @@
use parking_lot::Mutex;
use pq_proto::StartupMessageParams;
use std::fmt;
use std::{collections::HashMap, sync::Arc};
use futures::TryFutureExt;
use crate::config;
use crate::{auth, console};
use super::sql_over_http::MAX_RESPONSE_SIZE;
use crate::proxy::invalidate_cache;
use crate::proxy::NUM_RETRIES_WAKE_COMPUTE;
use tracing::error;
use tracing::info;
pub const APP_NAME: &str = "sql_over_http";
const MAX_CONNS_PER_ENDPOINT: usize = 20;
#[derive(Debug)]
pub struct ConnInfo {
pub username: String,
pub dbname: String,
pub hostname: String,
pub password: String,
}
impl ConnInfo {
// hm, change to hasher to avoid cloning?
pub fn db_and_user(&self) -> (String, String) {
(self.dbname.clone(), self.username.clone())
}
}
impl fmt::Display for ConnInfo {
// use custom display to avoid logging password
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}@{}/{}", self.username, self.hostname, self.dbname)
}
}
struct ConnPoolEntry {
conn: tokio_postgres::Client,
_last_access: std::time::Instant,
}
// Per-endpoint connection pool, (dbname, username) -> Vec<ConnPoolEntry>
// Number of open connections is limited by the `max_conns_per_endpoint`.
pub struct EndpointConnPool {
pools: HashMap<(String, String), Vec<ConnPoolEntry>>,
total_conns: usize,
}
pub struct GlobalConnPool {
// endpoint -> per-endpoint connection pool
//
// That should be a fairly conteded map, so return reference to the per-endpoint
// pool as early as possible and release the lock.
global_pool: Mutex<HashMap<String, Arc<Mutex<EndpointConnPool>>>>,
// Maximum number of connections per one endpoint.
// Can mix different (dbname, username) connections.
// When running out of free slots for a particular endpoint,
// falls back to opening a new connection for each request.
max_conns_per_endpoint: usize,
proxy_config: &'static crate::config::ProxyConfig,
}
impl GlobalConnPool {
pub fn new(config: &'static crate::config::ProxyConfig) -> Arc<Self> {
Arc::new(Self {
global_pool: Mutex::new(HashMap::new()),
max_conns_per_endpoint: MAX_CONNS_PER_ENDPOINT,
proxy_config: config,
})
}
pub async fn get(
&self,
conn_info: &ConnInfo,
force_new: bool,
) -> anyhow::Result<tokio_postgres::Client> {
let mut client: Option<tokio_postgres::Client> = None;
if !force_new {
let pool = self.get_endpoint_pool(&conn_info.hostname).await;
// find a pool entry by (dbname, username) if exists
let mut pool = pool.lock();
let pool_entries = pool.pools.get_mut(&conn_info.db_and_user());
if let Some(pool_entries) = pool_entries {
if let Some(entry) = pool_entries.pop() {
client = Some(entry.conn);
pool.total_conns -= 1;
}
}
}
// ok return cached connection if found and establish a new one otherwise
if let Some(client) = client {
if client.is_closed() {
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
connect_to_compute(self.proxy_config, conn_info).await
} else {
info!("pool: reusing connection '{conn_info}'");
Ok(client)
}
} else {
info!("pool: opening a new connection '{conn_info}'");
connect_to_compute(self.proxy_config, conn_info).await
}
}
pub async fn put(
&self,
conn_info: &ConnInfo,
client: tokio_postgres::Client,
) -> anyhow::Result<()> {
let pool = self.get_endpoint_pool(&conn_info.hostname).await;
// return connection to the pool
let mut total_conns;
let mut returned = false;
let mut per_db_size = 0;
{
let mut pool = pool.lock();
total_conns = pool.total_conns;
let pool_entries: &mut Vec<ConnPoolEntry> = pool
.pools
.entry(conn_info.db_and_user())
.or_insert_with(|| Vec::with_capacity(1));
if total_conns < self.max_conns_per_endpoint {
pool_entries.push(ConnPoolEntry {
conn: client,
_last_access: std::time::Instant::now(),
});
total_conns += 1;
returned = true;
per_db_size = pool_entries.len();
pool.total_conns += 1;
}
}
// do logging outside of the mutex
if returned {
info!("pool: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}");
} else {
info!("pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}");
}
Ok(())
}
async fn get_endpoint_pool(&self, endpoint: &String) -> Arc<Mutex<EndpointConnPool>> {
// find or create a pool for this endpoint
let mut created = false;
let mut global_pool = self.global_pool.lock();
let pool = global_pool
.entry(endpoint.clone())
.or_insert_with(|| {
created = true;
Arc::new(Mutex::new(EndpointConnPool {
pools: HashMap::new(),
total_conns: 0,
}))
})
.clone();
let global_pool_size = global_pool.len();
drop(global_pool);
// log new global pool size
if created {
info!(
"pool: created new pool for '{endpoint}', global pool size now {global_pool_size}"
);
}
pool
}
}
//
// Wake up the destination if needed. Code here is a bit involved because
// we reuse the code from the usual proxy and we need to prepare few structures
// that this code expects.
//
async fn connect_to_compute(
config: &config::ProxyConfig,
conn_info: &ConnInfo,
) -> anyhow::Result<tokio_postgres::Client> {
let tls = config.tls_config.as_ref();
let common_names = tls.and_then(|tls| tls.common_names.clone());
let credential_params = StartupMessageParams::new([
("user", &conn_info.username),
("database", &conn_info.dbname),
("application_name", APP_NAME),
]);
let creds = config
.auth_backend
.as_ref()
.map(|_| {
auth::ClientCredentials::parse(
&credential_params,
Some(&conn_info.hostname),
common_names,
)
})
.transpose()?;
let extra = console::ConsoleReqExtra {
session_id: uuid::Uuid::new_v4(),
application_name: Some(APP_NAME),
};
let node_info = &mut creds.wake_compute(&extra).await?.expect("msg");
// This code is a copy of `connect_to_compute` from `src/proxy.rs` with
// the difference that it uses `tokio_postgres` for the connection.
let mut num_retries: usize = NUM_RETRIES_WAKE_COMPUTE;
loop {
match connect_to_compute_once(node_info, conn_info).await {
Err(e) if num_retries > 0 => {
info!("compute node's state has changed; requesting a wake-up");
match creds.wake_compute(&extra).await? {
// Update `node_info` and try one more time.
Some(new) => {
*node_info = new;
}
// Link auth doesn't work that way, so we just exit.
None => return Err(e),
}
}
other => return other,
}
num_retries -= 1;
info!("retrying after wake-up ({num_retries} attempts left)");
}
}
async fn connect_to_compute_once(
node_info: &console::CachedNodeInfo,
conn_info: &ConnInfo,
) -> anyhow::Result<tokio_postgres::Client> {
let mut config = (*node_info.config).clone();
let (client, connection) = config
.user(&conn_info.username)
.password(&conn_info.password)
.dbname(&conn_info.dbname)
.max_backend_message_size(MAX_RESPONSE_SIZE)
.connect(tokio_postgres::NoTls)
.inspect_err(|e: &tokio_postgres::Error| {
error!(
"failed to connect to compute node hosts={:?} ports={:?}: {}",
node_info.config.get_hosts(),
node_info.config.get_ports(),
e
);
invalidate_cache(node_info)
})
.await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("connection error: {}", e);
}
});
Ok(client)
}

View File

@@ -1,25 +1,21 @@
use std::sync::Arc;
use futures::pin_mut;
use futures::StreamExt;
use futures::TryFutureExt;
use hyper::body::HttpBody;
use hyper::http::HeaderName;
use hyper::http::HeaderValue;
use hyper::{Body, HeaderMap, Request};
use pq_proto::StartupMessageParams;
use serde_json::json;
use serde_json::Map;
use serde_json::Value;
use tokio_postgres::types::Kind;
use tokio_postgres::types::Type;
use tokio_postgres::Row;
use tracing::error;
use tracing::info;
use tracing::instrument;
use url::Url;
use crate::proxy::invalidate_cache;
use crate::proxy::NUM_RETRIES_WAKE_COMPUTE;
use crate::{auth, config::ProxyConfig, console};
use super::conn_pool::ConnInfo;
use super::conn_pool::GlobalConnPool;
#[derive(serde::Deserialize)]
struct QueryData {
@@ -27,12 +23,13 @@ struct QueryData {
params: Vec<serde_json::Value>,
}
const APP_NAME: &str = "sql_over_http";
const MAX_RESPONSE_SIZE: usize = 1024 * 1024; // 1 MB
pub const MAX_RESPONSE_SIZE: usize = 1024 * 1024; // 1 MB
const MAX_REQUEST_SIZE: u64 = 1024 * 1024; // 1 MB
static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode");
static ALLOW_POOL: HeaderName = HeaderName::from_static("neon-pool-opt-in");
static HEADER_VALUE_TRUE: HeaderValue = HeaderValue::from_static("true");
//
@@ -96,13 +93,6 @@ fn json_array_to_pg_array(value: &Value) -> Result<Option<String>, serde_json::E
}
}
struct ConnInfo {
username: String,
dbname: String,
hostname: String,
password: String,
}
fn get_conn_info(
headers: &HeaderMap,
sni_hostname: Option<String>,
@@ -169,50 +159,23 @@ fn get_conn_info(
// TODO: return different http error codes
pub async fn handle(
config: &'static ProxyConfig,
request: Request<Body>,
sni_hostname: Option<String>,
conn_pool: Arc<GlobalConnPool>,
) -> anyhow::Result<Value> {
//
// Determine the destination and connection params
//
let headers = request.headers();
let conn_info = get_conn_info(headers, sni_hostname)?;
let credential_params = StartupMessageParams::new([
("user", &conn_info.username),
("database", &conn_info.dbname),
("application_name", APP_NAME),
]);
// Determine the output options. Default behaviour is 'false'. Anything that is not
// strictly 'true' assumed to be false.
let raw_output = headers.get(&RAW_TEXT_OUTPUT) == Some(&HEADER_VALUE_TRUE);
let array_mode = headers.get(&ARRAY_MODE) == Some(&HEADER_VALUE_TRUE);
//
// Wake up the destination if needed. Code here is a bit involved because
// we reuse the code from the usual proxy and we need to prepare few structures
// that this code expects.
//
let tls = config.tls_config.as_ref();
let common_names = tls.and_then(|tls| tls.common_names.clone());
let creds = config
.auth_backend
.as_ref()
.map(|_| {
auth::ClientCredentials::parse(
&credential_params,
Some(&conn_info.hostname),
common_names,
)
})
.transpose()?;
let extra = console::ConsoleReqExtra {
session_id: uuid::Uuid::new_v4(),
application_name: Some(APP_NAME),
};
let mut node_info = creds.wake_compute(&extra).await?.expect("msg");
// Allow connection pooling only if explicitly requested
let allow_pool = headers.get(&ALLOW_POOL) == Some(&HEADER_VALUE_TRUE);
let request_content_length = match request.body().size_hint().upper() {
Some(v) => v,
@@ -235,7 +198,8 @@ pub async fn handle(
//
// Now execute the query and return the result
//
let client = connect_to_compute(&mut node_info, &extra, &creds, &conn_info).await?;
let client = conn_pool.get(&conn_info, !allow_pool).await?;
let row_stream = client.query_raw_txt(query, query_params).await?;
// Manually drain the stream into a vector to leave row_stream hanging
@@ -292,6 +256,13 @@ pub async fn handle(
.map(|row| pg_text_row_to_json(row, raw_output, array_mode))
.collect::<Result<Vec<_>, _>>()?;
if allow_pool {
// return connection to the pool
tokio::task::spawn(async move {
let _ = conn_pool.put(&conn_info, client).await;
});
}
// resulting JSON format is based on the format of node-postgres result
Ok(json!({
"command": command_tag_name,
@@ -302,70 +273,6 @@ pub async fn handle(
}))
}
/// This function is a copy of `connect_to_compute` from `src/proxy.rs` with
/// the difference that it uses `tokio_postgres` for the connection.
#[instrument(skip_all)]
async fn connect_to_compute(
node_info: &mut console::CachedNodeInfo,
extra: &console::ConsoleReqExtra<'_>,
creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>,
conn_info: &ConnInfo,
) -> anyhow::Result<tokio_postgres::Client> {
let mut num_retries: usize = NUM_RETRIES_WAKE_COMPUTE;
loop {
match connect_to_compute_once(node_info, conn_info).await {
Err(e) if num_retries > 0 => {
info!("compute node's state has changed; requesting a wake-up");
match creds.wake_compute(extra).await? {
// Update `node_info` and try one more time.
Some(new) => {
*node_info = new;
}
// Link auth doesn't work that way, so we just exit.
None => return Err(e),
}
}
other => return other,
}
num_retries -= 1;
info!("retrying after wake-up ({num_retries} attempts left)");
}
}
async fn connect_to_compute_once(
node_info: &console::CachedNodeInfo,
conn_info: &ConnInfo,
) -> anyhow::Result<tokio_postgres::Client> {
let mut config = (*node_info.config).clone();
let (client, connection) = config
.user(&conn_info.username)
.password(&conn_info.password)
.dbname(&conn_info.dbname)
.max_backend_message_size(MAX_RESPONSE_SIZE)
.connect(tokio_postgres::NoTls)
.inspect_err(|e: &tokio_postgres::Error| {
error!(
"failed to connect to compute node hosts={:?} ports={:?}: {}",
node_info.config.get_hosts(),
node_info.config.get_ports(),
e
);
invalidate_cache(node_info)
})
.await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("connection error: {}", e);
}
});
Ok(client)
}
//
// Convert postgres row with text-encoded values to JSON object
//

View File

@@ -35,7 +35,7 @@ use utils::http::{error::ApiError, json::json_response};
// Tracking issue: https://github.com/rust-lang/rust/issues/98407.
use sync_wrapper::SyncWrapper;
use super::sql_over_http;
use super::{conn_pool::GlobalConnPool, sql_over_http};
pin_project! {
/// This is a wrapper around a [`WebSocketStream`] that
@@ -164,6 +164,7 @@ async fn serve_websocket(
async fn ws_handler(
mut request: Request<Body>,
config: &'static ProxyConfig,
conn_pool: Arc<GlobalConnPool>,
cancel_map: Arc<CancelMap>,
session_id: uuid::Uuid,
sni_hostname: Option<String>,
@@ -192,7 +193,7 @@ async fn ws_handler(
// TODO: that deserves a refactor as now this function also handles http json client besides websockets.
// Right now I don't want to blow up sql-over-http patch with file renames and do that as a follow up instead.
} else if request.uri().path() == "/sql" && request.method() == Method::POST {
let result = sql_over_http::handle(config, request, sni_hostname)
let result = sql_over_http::handle(request, sni_hostname, conn_pool)
.instrument(info_span!("sql-over-http"))
.await;
let status_code = match result {
@@ -234,6 +235,8 @@ pub async fn task_main(
info!("websocket server has shut down");
}
let conn_pool: Arc<GlobalConnPool> = GlobalConnPool::new(config);
let tls_config = config.tls_config.as_ref().map(|cfg| cfg.to_server_config());
let tls_acceptor: tokio_rustls::TlsAcceptor = match tls_config {
Some(config) => config.into(),
@@ -258,15 +261,18 @@ pub async fn task_main(
let make_svc =
hyper::service::make_service_fn(|stream: &tokio_rustls::server::TlsStream<AddrStream>| {
let sni_name = stream.get_ref().1.sni_hostname().map(|s| s.to_string());
let conn_pool = conn_pool.clone();
async move {
Ok::<_, Infallible>(hyper::service::service_fn(move |req: Request<Body>| {
let sni_name = sni_name.clone();
let conn_pool = conn_pool.clone();
async move {
let cancel_map = Arc::new(CancelMap::default());
let session_id = uuid::Uuid::new_v4();
ws_handler(req, config, cancel_map, session_id, sni_name)
ws_handler(req, config, conn_pool, cancel_map, session_id, sni_name)
.instrument(info_span!(
"ws-client",
session = format_args!("{session_id}")

View File

@@ -18,7 +18,7 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
.from_env_lossy();
let fmt_layer = tracing_subscriber::fmt::layer()
.with_ansi(atty::is(atty::Stream::Stderr))
.with_ansi(false)
.with_writer(std::io::stderr)
.with_target(false);

View File

@@ -4,11 +4,13 @@ use crate::{config::MetricCollectionConfig, http};
use chrono::{DateTime, Utc};
use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
use serde::Serialize;
use std::collections::HashMap;
use std::{collections::HashMap, time::Duration};
use tracing::{error, info, instrument, trace, warn};
const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
///
/// Key that uniquely identifies the object, this metric describes.
/// Currently, endpoint_id is enough, but this may change later,
@@ -30,7 +32,7 @@ pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<()> {
info!("metrics collector has shut down");
}
let http_client = http::new_client();
let http_client = http::new_client_with_timeout(DEFAULT_HTTP_REPORTING_TIMEOUT);
let mut cached_metrics: HashMap<Ids, (u64, DateTime<Utc>)> = HashMap::new();
let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
@@ -182,36 +184,36 @@ async fn collect_metrics_iteration(
}
};
if res.status().is_success() {
// update cached metrics after they were sent successfully
for send_metric in chunk {
let stop_time = match send_metric.kind {
EventType::Incremental { stop_time, .. } => stop_time,
_ => unreachable!(),
};
cached_metrics
.entry(Ids {
endpoint_id: send_metric.extra.endpoint_id.clone(),
branch_id: send_metric.extra.branch_id.clone(),
})
// update cached value (add delta) and time
.and_modify(|e| {
e.0 = e.0.saturating_add(send_metric.value);
e.1 = stop_time
})
// cache new metric
.or_insert((send_metric.value, stop_time));
}
} else {
if !res.status().is_success() {
error!("metrics endpoint refused the sent metrics: {:?}", res);
for metric in chunk.iter() {
for metric in chunk.iter().filter(|metric| metric.value > (1u64 << 40)) {
// Report if the metric value is suspiciously large
if metric.value > (1u64 << 40) {
error!("potentially abnormal metric value: {:?}", metric);
}
error!("potentially abnormal metric value: {:?}", metric);
}
}
// update cached metrics after they were sent
// (to avoid sending the same metrics twice)
// see the relevant discussion on why to do so even if the status is not success:
// https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
for send_metric in chunk {
let stop_time = match send_metric.kind {
EventType::Incremental { stop_time, .. } => stop_time,
_ => unreachable!(),
};
cached_metrics
.entry(Ids {
endpoint_id: send_metric.extra.endpoint_id.clone(),
branch_id: send_metric.extra.branch_id.clone(),
})
// update cached value (add delta) and time
.and_modify(|e| {
e.0 = e.0.saturating_add(send_metric.value);
e.1 = stop_time
})
// cache new metric
.or_insert((send_metric.value, stop_time));
}
}
Ok(())
}

View File

@@ -16,7 +16,10 @@ use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCou
use once_cell::sync::Lazy;
use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams};
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::{
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
time,
};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use utils::measured_stream::MeasuredStream;
@@ -305,12 +308,13 @@ pub fn invalidate_cache(node_info: &console::CachedNodeInfo) {
#[tracing::instrument(name = "connect_once", skip_all)]
async fn connect_to_compute_once(
node_info: &console::CachedNodeInfo,
timeout: time::Duration,
) -> Result<PostgresConnection, compute::ConnectionError> {
let allow_self_signed_compute = node_info.allow_self_signed_compute;
node_info
.config
.connect(allow_self_signed_compute)
.connect(allow_self_signed_compute, timeout)
.inspect_err(|_: &compute::ConnectionError| invalidate_cache(node_info))
.await
}
@@ -328,7 +332,27 @@ async fn connect_to_compute(
loop {
// Apply startup params to the (possibly, cached) compute node info.
node_info.config.set_startup_params(params);
match connect_to_compute_once(node_info).await {
// Set a shorter timeout for the initial connection attempt.
//
// In case we try to connect to an outdated address that is no longer valid, the
// default behavior of Kubernetes is to drop the packets, causing us to wait for
// the entire timeout period. We want to fail fast in such cases.
//
// A specific case to consider is when we have cached compute node information
// with a 4-minute TTL (Time To Live), but the user has executed a `/suspend` API
// call, resulting in the nonexistence of the compute node.
//
// We only use caching in case of scram proxy backed by the console, so reduce
// the timeout only in that case.
let is_scram_proxy = matches!(creds, auth::BackendType::Console(_, _));
let timeout = if is_scram_proxy && num_retries == NUM_RETRIES_WAKE_COMPUTE {
time::Duration::from_secs(2)
} else {
time::Duration::from_secs(10)
};
match connect_to_compute_once(node_info, timeout).await {
Err(e) if num_retries > 0 => {
info!("compute node's state has changed; requesting a wake-up");
match creds.wake_compute(extra).map_err(io_error).await? {

View File

@@ -45,17 +45,74 @@ fn hmac_sha256<'a>(key: &[u8], parts: impl IntoIterator<Item = &'a [u8]>) -> [u8
let mut mac = Hmac::<Sha256>::new_from_slice(key).expect("bad key size");
parts.into_iter().for_each(|s| mac.update(s));
// TODO: maybe newer `hmac` et al already migrated to regular arrays?
let mut result = [0u8; 32];
result.copy_from_slice(mac.finalize().into_bytes().as_slice());
result
mac.finalize().into_bytes().into()
}
fn sha256<'a>(parts: impl IntoIterator<Item = &'a [u8]>) -> [u8; 32] {
let mut hasher = Sha256::new();
parts.into_iter().for_each(|s| hasher.update(s));
let mut result = [0u8; 32];
result.copy_from_slice(hasher.finalize().as_slice());
result
hasher.finalize().into()
}
#[cfg(test)]
mod tests {
use crate::sasl::{Mechanism, Step};
use super::{password::SaltedPassword, Exchange, ServerSecret};
#[test]
fn happy_path() {
let iterations = 4096;
let salt_base64 = "QSXCR+Q6sek8bf92";
let pw = SaltedPassword::new(
b"pencil",
base64::decode(salt_base64).unwrap().as_slice(),
iterations,
);
let secret = ServerSecret {
iterations,
salt_base64: salt_base64.to_owned(),
stored_key: pw.client_key().sha256(),
server_key: pw.server_key(),
doomed: false,
};
const NONCE: [u8; 18] = [
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
];
let mut exchange = Exchange::new(&secret, || NONCE, None);
let client_first = "n,,n=user,r=rOprNGfwEbeRWgbNEkqO";
let client_final = "c=biws,r=rOprNGfwEbeRWgbNEkqOAQIDBAUGBwgJCgsMDQ4PEBES,p=rw1r5Kph5ThxmaUBC2GAQ6MfXbPnNkFiTIvdb/Rear0=";
let server_first =
"r=rOprNGfwEbeRWgbNEkqOAQIDBAUGBwgJCgsMDQ4PEBES,s=QSXCR+Q6sek8bf92,i=4096";
let server_final = "v=qtUDIofVnIhM7tKn93EQUUt5vgMOldcDVu1HC+OH0o0=";
exchange = match exchange.exchange(client_first).unwrap() {
Step::Continue(exchange, message) => {
assert_eq!(message, server_first);
exchange
}
Step::Success(_, _) => panic!("expected continue, got success"),
Step::Failure(f) => panic!("{f}"),
};
let key = match exchange.exchange(client_final).unwrap() {
Step::Success(key, message) => {
assert_eq!(message, server_final);
key
}
Step::Continue(_, _) => panic!("expected success, got continue"),
Step::Failure(f) => panic!("{f}"),
};
assert_eq!(
key.as_bytes(),
[
74, 103, 1, 132, 12, 31, 200, 48, 28, 54, 82, 232, 207, 12, 138, 189, 40, 32, 134,
27, 125, 170, 232, 35, 171, 167, 166, 41, 70, 228, 182, 112,
]
);
}
}

View File

@@ -14,19 +14,7 @@ impl SaltedPassword {
/// See `scram-common.c : scram_SaltedPassword` for details.
/// Further reading: <https://datatracker.ietf.org/doc/html/rfc2898> (see `PBKDF2`).
pub fn new(password: &[u8], salt: &[u8], iterations: u32) -> SaltedPassword {
let one = 1_u32.to_be_bytes(); // magic
let mut current = super::hmac_sha256(password, [salt, &one]);
let mut result = current;
for _ in 1..iterations {
current = super::hmac_sha256(password, [current.as_ref()]);
// TODO: result = current.zip(result).map(|(x, y)| x ^ y), issue #80094
for (i, x) in current.iter().enumerate() {
result[i] ^= x;
}
}
result.into()
pbkdf2::pbkdf2_hmac_array::<sha2::Sha256, 32>(password, salt, iterations).into()
}
/// Derive `ClientKey` from a salted hashed password.
@@ -46,3 +34,41 @@ impl From<[u8; SALTED_PASSWORD_LEN]> for SaltedPassword {
Self { bytes }
}
}
#[cfg(test)]
mod tests {
use super::SaltedPassword;
fn legacy_pbkdf2_impl(password: &[u8], salt: &[u8], iterations: u32) -> SaltedPassword {
let one = 1_u32.to_be_bytes(); // magic
let mut current = super::super::hmac_sha256(password, [salt, &one]);
let mut result = current;
for _ in 1..iterations {
current = super::super::hmac_sha256(password, [current.as_ref()]);
// TODO: result = current.zip(result).map(|(x, y)| x ^ y), issue #80094
for (i, x) in current.iter().enumerate() {
result[i] ^= x;
}
}
result.into()
}
#[test]
fn pbkdf2() {
let password = "a-very-secure-password";
let salt = "such-a-random-salt";
let iterations = 4096;
let output = [
203, 18, 206, 81, 4, 154, 193, 100, 147, 41, 211, 217, 177, 203, 69, 210, 194, 211,
101, 1, 248, 156, 96, 0, 8, 223, 30, 87, 158, 41, 20, 42,
];
let actual = SaltedPassword::new(password.as_bytes(), salt.as_bytes(), iterations);
let expected = legacy_pbkdf2_impl(password.as_bytes(), salt.as_bytes(), iterations);
assert_eq!(actual.bytes, output);
assert_eq!(actual.bytes, expected.bytes);
}
}

View File

@@ -7,7 +7,7 @@ authors = []
[tool.poetry.dependencies]
python = "^3.9"
pytest = "^7.3.1"
psycopg2-binary = "^2.9.1"
psycopg2-binary = "^2.9.6"
typing-extensions = "^4.6.1"
PyJWT = {version = "^2.1.0", extras = ["crypto"]}
requests = "^2.31.0"

View File

@@ -191,6 +191,12 @@ impl Storage for FileStorage {
control_partial_path.display()
)
})?;
control_partial.flush().await.with_context(|| {
format!(
"failed to flush safekeeper state into control file at: {}",
control_partial_path.display()
)
})?;
// fsync the file
if !self.conf.no_sync {

View File

@@ -188,6 +188,7 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response>
let mut response = client.get(&http_url).send().await?;
while let Some(chunk) = response.chunk().await? {
file.write_all(&chunk).await?;
file.flush().await?;
}
}

View File

@@ -403,16 +403,18 @@ impl SafekeeperPostgresHandler {
};
// take the latest commit_lsn if don't have stop_pos
let mut end_pos = stop_pos.unwrap_or(*commit_lsn_watch_rx.borrow());
let end_pos = stop_pos.unwrap_or(*commit_lsn_watch_rx.borrow());
if end_pos < start_pos {
warn!("start_pos {} is ahead of end_pos {}", start_pos, end_pos);
end_pos = start_pos;
warn!(
"requested start_pos {} is ahead of available WAL end_pos {}",
start_pos, end_pos
);
}
info!(
"starting streaming from {:?} till {:?}",
start_pos, stop_pos
"starting streaming from {:?} till {:?}, available WAL ends at {}",
start_pos, stop_pos, end_pos
);
// switch to copy
@@ -547,12 +549,14 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
self.end_pos = *self.commit_lsn_watch_rx.borrow();
if self.end_pos > self.start_pos {
// We have something to send.
trace!("got end_pos {:?}, streaming", self.end_pos);
return Ok(());
}
// Wait for WAL to appear, now self.end_pos == self.start_pos.
if let Some(lsn) = wait_for_lsn(&mut self.commit_lsn_watch_rx, self.start_pos).await? {
self.end_pos = lsn;
trace!("got end_pos {:?}, streaming", self.end_pos);
return Ok(());
}

View File

@@ -248,6 +248,10 @@ impl PhysicalStorage {
};
file.write_all(buf).await?;
// Note: flush just ensures write above reaches the OS (this is not
// needed in case of sync IO as Write::write there calls directly write
// syscall, but needed in case of async). It does *not* fsyncs the file.
file.flush().await?;
if xlogoff + buf.len() == self.wal_seg_size {
// If we reached the end of a WAL segment, flush and close it.
@@ -716,6 +720,7 @@ async fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> {
count -= XLOG_BLCKSZ;
}
file.write_all(&ZERO_BLOCK[0..count]).await?;
file.flush().await?;
Ok(())
}

View File

@@ -0,0 +1,20 @@
import json
import os
with open("SOMETHING", "w") as f:
f.write("SOMETHING")
# enable custom extensions for specific tenants
enabled_extensions = {"123454321": ["anon"], "public": ["embedding"]}
control_data = {}
os.chdir("control_files")
for control_file in os.listdir("."):
ext_name = control_file.replace(".control", "")
with open(control_file, "r") as f:
control_data[ext_name] = f.read()
all_data = {"enabled_extensions": enabled_extensions, "control_data": control_data}
with open("ext_index.json", "w") as f:
json.dump(all_data, f)

View File

@@ -144,17 +144,24 @@ const reportSummary = async (params) => {
}
summary += `- \`${testName}\`: ${links.join(", ")}\n`
}
const testsToRerun = Object.values(failedTests[pgVersion]).map(x => x[0].name)
const command = `DEFAULT_PG_VERSION=${pgVersion} scripts/pytest -k "${testsToRerun.join(" or ")}"`
summary += "```\n"
summary += `# Run failed on Postgres ${pgVersion} tests locally:\n`
summary += `${command}\n`
summary += "```\n"
}
}
if (failedTestsCount > 0) {
const testsToRerun = []
for (const pgVersion of Object.keys(failedTests)) {
for (const testName of Object.keys(failedTests[pgVersion])) {
testsToRerun.push(...failedTests[pgVersion][testName].map(test => test.name))
}
}
const command = `scripts/pytest -vv -n $(nproc) -k "${testsToRerun.join(" or ")}"`
summary += "```\n"
summary += `# Run all failed tests locally:\n`
summary += `${command}\n`
summary += "```\n"
}
if (flakyTestsCount > 0) {
summary += `<details>\n<summary>Flaky tests (${flakyTestsCount})</summary>\n\n`
for (const pgVersion of Array.from(pgVersions).sort().reverse()) {
@@ -164,8 +171,7 @@ const reportSummary = async (params) => {
const links = []
for (const test of tests) {
const allureLink = `${reportUrl}#suites/${test.parentUid}/${test.uid}/retries`
const status = test.status === "passed" ? ":white_check_mark:" : ":x:"
links.push(`[${status} ${test.buildType}](${allureLink})`)
links.push(`[${test.buildType}](${allureLink})`)
}
summary += `- \`${testName}\`: ${links.join(", ")}\n`
}

View File

@@ -1,6 +1,6 @@
pytest_plugins = (
"fixtures.pg_version",
"fixtures.allure",
"fixtures.parametrize",
"fixtures.neon_fixtures",
"fixtures.benchmark_fixture",
"fixtures.pg_stats",

View File

@@ -1,25 +0,0 @@
import os
import pytest
from fixtures.pg_version import DEFAULT_VERSION, PgVersion
"""
Set of utilities to make Allure report more informative.
- It adds BUILD_TYPE and DEFAULT_PG_VERSION to the test names (only in test_runner/regress)
to make tests distinguishable in Allure report.
"""
@pytest.fixture(scope="function", autouse=True)
def allure_noop():
pass
def pytest_generate_tests(metafunc):
if "test_runner/regress" in metafunc.definition._nodeid:
build_type = os.environ.get("BUILD_TYPE", "DEBUG").lower()
pg_version = PgVersion(os.environ.get("DEFAULT_PG_VERSION", DEFAULT_VERSION))
metafunc.parametrize("allure_noop", [f"{build_type}-pg{pg_version}"])

View File

@@ -59,6 +59,10 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
"libmetrics_tracing_event_count_total",
"pageserver_materialized_cache_hits_total",
"pageserver_materialized_cache_hits_direct_total",
"pageserver_page_cache_read_hits_total",
"pageserver_page_cache_read_accesses_total",
"pageserver_page_cache_size_current_bytes",
"pageserver_page_cache_size_max_bytes",
"pageserver_getpage_reconstruct_seconds_bucket",
"pageserver_getpage_reconstruct_seconds_count",
"pageserver_getpage_reconstruct_seconds_sum",

View File

@@ -102,8 +102,8 @@ def base_dir() -> Iterator[Path]:
yield base_dir
@pytest.fixture(scope="session")
def neon_binpath(base_dir: Path) -> Iterator[Path]:
@pytest.fixture(scope="function")
def neon_binpath(base_dir: Path, build_type: str) -> Iterator[Path]:
if os.getenv("REMOTE_ENV"):
# we are in remote env and do not have neon binaries locally
# this is the case for benchmarks run on self-hosted runner
@@ -113,7 +113,6 @@ def neon_binpath(base_dir: Path) -> Iterator[Path]:
if env_neon_bin := os.environ.get("NEON_BIN"):
binpath = Path(env_neon_bin)
else:
build_type = os.environ.get("BUILD_TYPE", "debug")
binpath = base_dir / "target" / build_type
log.info(f"neon_binpath is {binpath}")
@@ -123,7 +122,7 @@ def neon_binpath(base_dir: Path) -> Iterator[Path]:
yield binpath
@pytest.fixture(scope="session")
@pytest.fixture(scope="function")
def pg_distrib_dir(base_dir: Path) -> Iterator[Path]:
if env_postgres_bin := os.environ.get("POSTGRES_DISTRIB_DIR"):
distrib_dir = Path(env_postgres_bin).resolve()
@@ -147,7 +146,7 @@ def top_output_dir(base_dir: Path) -> Iterator[Path]:
yield output_dir
@pytest.fixture(scope="session")
@pytest.fixture(scope="function")
def versioned_pg_distrib_dir(pg_distrib_dir: Path, pg_version: PgVersion) -> Iterator[Path]:
versioned_dir = pg_distrib_dir / pg_version.v_prefixed
@@ -174,7 +173,23 @@ def shareable_scope(fixture_name: str, config: Config) -> Literal["session", "fu
def myfixture(...)
...
"""
return "function" if os.environ.get("TEST_SHARED_FIXTURES") is None else "session"
scope: Literal["session", "function"]
if os.environ.get("TEST_SHARED_FIXTURES") is None:
# Create the environment in the per-test output directory
scope = "function"
elif (
os.environ.get("BUILD_TYPE") is not None
and os.environ.get("DEFAULT_PG_VERSION") is not None
):
scope = "session"
else:
pytest.fail(
"Shared environment(TEST_SHARED_FIXTURES) requires BUILD_TYPE and DEFAULT_PG_VERSION to be set",
pytrace=False,
)
return scope
@pytest.fixture(scope="session")

View File

@@ -21,6 +21,18 @@ class PageserverApiException(Exception):
self.status_code = status_code
class TimelineCreate406(PageserverApiException):
def __init__(self, res: requests.Response):
assert res.status_code == 406
super().__init__(res.json()["msg"], res.status_code)
class TimelineCreate409(PageserverApiException):
def __init__(self, res: requests.Response):
assert res.status_code == 409
super().__init__("", res.status_code)
@dataclass
class InMemoryLayerInfo:
kind: str
@@ -309,9 +321,12 @@ class PageserverHttpClient(requests.Session):
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline", json=body, **kwargs
)
self.verbose_error(res)
if res.status_code == 409:
raise Exception(f"could not create timeline: already exists for id {new_timeline_id}")
raise TimelineCreate409(res)
if res.status_code == 406:
raise TimelineCreate406(res)
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, dict)

View File

@@ -0,0 +1,50 @@
import os
from typing import Optional
import pytest
from _pytest.fixtures import FixtureRequest
from _pytest.python import Metafunc
from fixtures.pg_version import PgVersion
"""
Dynamically parametrize tests by Postgres version and build type (debug/release/remote)
"""
@pytest.fixture(scope="function", autouse=True)
def pg_version(request: FixtureRequest) -> Optional[PgVersion]:
# Do not parametrize performance tests yet, we need to prepare grafana charts first
if "test_runner/performance" in str(request.node.path):
v = os.environ.get("DEFAULT_PG_VERSION")
return PgVersion(v)
return None
@pytest.fixture(scope="function", autouse=True)
def build_type(request: FixtureRequest) -> Optional[str]:
# Do not parametrize performance tests yet, we need to prepare grafana charts first
if "test_runner/performance" in str(request.node.path):
return os.environ.get("BUILD_TYPE", "").lower()
return None
def pytest_generate_tests(metafunc: Metafunc):
# Do not parametrize performance tests yet, we need to prepare grafana charts first
if "test_runner/performance" in metafunc.definition._nodeid:
return
if (v := os.environ.get("DEFAULT_PG_VERSION")) is None:
pg_versions = [version for version in PgVersion if version != PgVersion.NOT_SET]
else:
pg_versions = [PgVersion(v)]
if (bt := os.environ.get("BUILD_TYPE")) is None:
build_types = ["debug", "release"]
else:
build_types = [bt.lower()]
metafunc.parametrize("build_type", build_types)
metafunc.parametrize("pg_version", pg_versions, ids=map(lambda v: f"pg{v}", pg_versions))

View File

@@ -1,12 +1,10 @@
import enum
import os
from typing import Iterator, Optional
from typing import Optional
import pytest
from _pytest.config import Config
from _pytest.config.argparsing import Parser
from pytest import FixtureRequest
from fixtures.log_helper import log
"""
This fixture is used to determine which version of Postgres to use for tests.
@@ -75,18 +73,10 @@ def pytest_addoption(parser: Parser):
"--pg-version",
action="store",
type=PgVersion,
help="Postgres version to use for tests",
help="DEPRECATED: Postgres version to use for tests",
)
@pytest.fixture(scope="session")
def pg_version(request: FixtureRequest) -> Iterator[PgVersion]:
if v := request.config.getoption("--pg-version"):
version, source = v, "from --pg-version command-line argument"
elif v := os.environ.get("DEFAULT_PG_VERSION"):
version, source = PgVersion(v), "from DEFAULT_PG_VERSION environment variable"
else:
version, source = DEFAULT_VERSION, "default version"
log.info(f"pg_version is {version} ({source})")
yield version
def pytest_configure(config: Config):
if config.getoption("--pg-version"):
raise Exception("--pg-version is deprecated, use DEFAULT_PG_VERSION env var instead")

View File

@@ -52,6 +52,7 @@ def test_startup_simple(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenc
"wait_for_spec_ms": f"{i}_wait_for_spec",
"sync_safekeepers_ms": f"{i}_sync_safekeepers",
"basebackup_ms": f"{i}_basebackup",
"start_postgres_ms": f"{i}_start_postgres",
"config_ms": f"{i}_config",
"total_startup_ms": f"{i}_total_startup",
}

View File

@@ -4,7 +4,8 @@ import time
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.types import Lsn
from fixtures.pageserver.http import TimelineCreate406
from fixtures.types import Lsn, TimelineId
from fixtures.utils import query_scalar
@@ -173,5 +174,12 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
# The starting LSN is invalid as the corresponding record is scheduled to be removed by in-queue GC.
with pytest.raises(Exception, match="invalid branch start lsn: .*"):
env.neon_cli.create_branch("b1", "b0", tenant_id=tenant, ancestor_start_lsn=lsn)
# retry the same with the HTTP API, so that we can inspect the status code
with pytest.raises(TimelineCreate406):
new_timeline_id = TimelineId.generate()
log.info(
f"Expecting failure for branch behind gc'ing LSN, new_timeline_id={new_timeline_id}"
)
pageserver_http_client.timeline_create(env.pg_version, tenant, new_timeline_id, b0, lsn)
thread.join()

View File

@@ -1,6 +1,7 @@
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.pageserver.http import TimelineCreate406
from fixtures.types import Lsn, TimelineId
from fixtures.utils import print_gc_result, query_scalar
@@ -17,7 +18,7 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
env.pageserver.allowed_errors.append(".*invalid start lsn .* for ancestor timeline.*")
# Branch at the point where only 100 rows were inserted
env.neon_cli.create_branch("test_branch_behind")
branch_behind_timeline_id = env.neon_cli.create_branch("test_branch_behind")
endpoint_main = env.endpoints.create_start("test_branch_behind")
log.info("postgres is running on 'test_branch_behind' branch")
@@ -89,6 +90,7 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
assert query_scalar(main_cur, "SELECT count(*) FROM foo") == 400100
# Check bad lsn's for branching
pageserver_http = env.pageserver.http_client()
# branch at segment boundary
env.neon_cli.create_branch(
@@ -97,27 +99,52 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
endpoint = env.endpoints.create_start("test_branch_segment_boundary")
assert endpoint.safe_psql("SELECT 1")[0][0] == 1
# branch at pre-initdb lsn
# branch at pre-initdb lsn (from main branch)
with pytest.raises(Exception, match="invalid branch start lsn: .*"):
env.neon_cli.create_branch("test_branch_preinitdb", ancestor_start_lsn=Lsn("0/42"))
# retry the same with the HTTP API, so that we can inspect the status code
with pytest.raises(TimelineCreate406):
new_timeline_id = TimelineId.generate()
log.info(f"Expecting failure for branch pre-initdb LSN, new_timeline_id={new_timeline_id}")
pageserver_http.timeline_create(
env.pg_version, env.initial_tenant, new_timeline_id, env.initial_timeline, Lsn("0/42")
)
# branch at pre-ancestor lsn
with pytest.raises(Exception, match="less than timeline ancestor lsn"):
env.neon_cli.create_branch(
"test_branch_preinitdb", "test_branch_behind", ancestor_start_lsn=Lsn("0/42")
)
# retry the same with the HTTP API, so that we can inspect the status code
with pytest.raises(TimelineCreate406):
new_timeline_id = TimelineId.generate()
log.info(
f"Expecting failure for branch pre-ancestor LSN, new_timeline_id={new_timeline_id}"
)
pageserver_http.timeline_create(
env.pg_version,
env.initial_tenant,
new_timeline_id,
branch_behind_timeline_id,
Lsn("0/42"),
)
# check that we cannot create branch based on garbage collected data
with env.pageserver.http_client() as pageserver_http:
pageserver_http.timeline_checkpoint(env.initial_tenant, timeline)
gc_result = pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
print_gc_result(gc_result)
pageserver_http.timeline_checkpoint(env.initial_tenant, timeline)
gc_result = pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
print_gc_result(gc_result)
with pytest.raises(Exception, match="invalid branch start lsn: .*"):
# this gced_lsn is pretty random, so if gc is disabled this woudln't fail
env.neon_cli.create_branch(
"test_branch_create_fail", "test_branch_behind", ancestor_start_lsn=gced_lsn
)
# retry the same with the HTTP API, so that we can inspect the status code
with pytest.raises(TimelineCreate406):
new_timeline_id = TimelineId.generate()
log.info(f"Expecting failure for branch behind gc'd LSN, new_timeline_id={new_timeline_id}")
pageserver_http.timeline_create(
env.pg_version, env.initial_tenant, new_timeline_id, branch_behind_timeline_id, gced_lsn
)
# check that after gc everything is still there
assert query_scalar(hundred_cur, "SELECT count(*) FROM foo") == 100

View File

@@ -33,6 +33,7 @@ def handle_role(dbs, roles, operation):
dbs[db] = operation["name"]
if "password" in operation:
roles[operation["name"]] = operation["password"]
assert "encrypted_password" in operation
elif operation["op"] == "del":
if "old_name" in operation:
roles.pop(operation["old_name"])

View File

@@ -416,6 +416,7 @@ def test_timeline_physical_size_post_compaction(
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, new_timeline_id)
# shutdown safekeepers to prevent new data from coming in
endpoint.stop() # We can't gracefully stop after safekeepers die
for sk in env.safekeepers:
sk.stop()

View File

@@ -1210,6 +1210,10 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
with conn.cursor() as cur:
cur.execute("INSERT INTO t (key) VALUES (1)")
# Stop all computes gracefully before safekeepers stop responding to them
endpoint_1.stop_and_destroy()
endpoint_3.stop_and_destroy()
# Remove initial tenant's br1 (active)
assert sk_http.timeline_delete_force(tenant_id, timeline_id_1)["dir_existed"]
assert not (sk_data_dir / str(tenant_id) / str(timeline_id_1)).exists()

View File

@@ -54,7 +54,7 @@ toml_edit = { version = "0.19", features = ["serde"] }
tower = { version = "0.4", features = ["balance", "buffer", "limit", "retry", "timeout", "util"] }
tracing = { version = "0.1", features = ["log"] }
tracing-core = { version = "0.1" }
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
tracing-subscriber = { version = "0.3", default-features = false, features = ["env-filter", "fmt", "json", "smallvec", "tracing-log"] }
url = { version = "2", features = ["serde"] }
[build-dependencies]