Compare commits

...

29 Commits

Author SHA1 Message Date
Stas Kelvich
42ad8df3a9 Update docs/rfcs/032-compute-aux-files.md
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2024-02-08 17:43:36 +02:00
Stas Kelvich
4a8cfb99bb Update docs/rfcs/032-compute-aux-files.md
Co-authored-by: MMeent <matthias@neon.tech>
2024-02-08 17:39:44 +02:00
Stas Kelvich
04870fad73 Update docs/rfcs/032-compute-aux-files.md
Co-authored-by: MMeent <matthias@neon.tech>
2024-02-08 17:39:30 +02:00
Stas Kelvich
6bb4ab33fa Update docs/rfcs/032-compute-aux-files.md
Co-authored-by: Bodobolero <peterbendel@neon.tech>
2024-02-08 17:39:16 +02:00
Stas Kelvich
3cd0b22113 Update docs/rfcs/032-compute-aux-files.md
Co-authored-by: Bodobolero <peterbendel@neon.tech>
2024-02-08 17:38:52 +02:00
Stas Kelvich
9661dd2253 Update docs/rfcs/032-compute-aux-files.md
Co-authored-by: Bodobolero <peterbendel@neon.tech>
2024-02-08 17:38:45 +02:00
Stas Kelvich
195fc8738b Update docs/rfcs/032-compute-aux-files.md
Co-authored-by: Bodobolero <peterbendel@neon.tech>
2024-02-08 17:38:34 +02:00
Stas Kelvich
0945424d51 Update docs/rfcs/032-compute-aux-files.md
Co-authored-by: Bodobolero <peterbendel@neon.tech>
2024-02-08 17:37:43 +02:00
Stas Kelvich
eaa15be7b9 aux files rfc, pass one 2024-02-05 01:40:46 +02:00
Stas Kelvich
be6e86b21d aux files rfc 2024-02-05 01:23:15 +02:00
Vadim Kharitonov
7e8529bec1 Revert "Update pgvector to v0.6.0, third attempt" (#6610)
The issue is still unsolved because of shmem size in VMs. Need to figure it out before applying this patch.

For more details:

```
ERROR:  could not resize shared memory segment "/PostgreSQL.2892504480" to 16774205952 bytes: No space left on device
```

As an example, the same issue in community pgvector/pgvector#453.
2024-02-04 22:27:07 +00:00
Clarence
09519c1773 chore: update wording in docs to improve readability (#6607)
## Problem
 Found typos while reading the docs

## Summary of changes
Fixed the typos found
2024-02-04 19:33:38 +00:00
Joonas Koivunen
9dd69194d4 refactor(proxy): std::io::Write for BytesMut exists (#6606)
Replace TODO with an existing implementation via `BufMut::writer``.
2024-02-03 22:15:59 +00:00
Heikki Linnakangas
647b85fc15 Update pgvector to v0.6.0, third attempt
This includes a compatibility patch that is needed because pgvector
now skips WAL-logging during the index build, and WAL-logs the index
only in one go at the end. That's how GIN, GiST and SP-GIST index
builds work in core PostgreSQL too, but we need some Neon-specific
calls to mark the beginning and end of those build phases.

pgvector is the first index AM that does that with parallel workers,
so I had to modify those functions in the Neon extension to be aware
of parallel workers. Only the leader needs to create the underlying
file and perform the WAL-logging. (In principle, the parallel workers
could participate in the WAL-logging too, but pgvector doesn't do
that. This will need some further work if that changes).

The previous attempt at this (#6592) missed that parallel workers
needed those changes, and segfaulted in parallel build that spilled to
disk.

Testing
-------

We don't have a place for regression tests of extensions at the
moment. I tested this manually with the following script:

```
CREATE EXTENSION IF NOT EXISTS vector;

DROP TABLE IF EXISTS tst;
CREATE TABLE tst (i serial, v vector(3));

INSERT INTO tst (v) SELECT ARRAY[random(), random(), random()] FROM generate_series(1, 15000) g;

-- Serial build, in memory
ALTER TABLE tst SET (parallel_workers=0);
SET maintenance_work_mem='50 MB';
CREATE INDEX idx ON tst USING hnsw (v vector_l2_ops);

-- Test that the index works. (The table contents are random, and the
-- search is approximate anyway, so we cannot check the exact values.
-- For now, just eyeball that they look reasonable)
set enable_seqscan=off;
explain SELECT * FROM tst ORDER BY v <-> ARRAY[0, 0, 0]::vector LIMIT 5;
SELECT * FROM tst ORDER BY v <-> ARRAY[0, 0, 0]::vector LIMIT 5;

DROP INDEX idx;

-- Serial build, spills to on disk

ALTER TABLE tst SET (parallel_workers=0);
SET maintenance_work_mem='5 MB';
CREATE INDEX idx ON tst USING hnsw (v vector_l2_ops);
SELECT * FROM tst ORDER BY v <-> ARRAY[0, 0, 0]::vector LIMIT 5;
DROP INDEX idx;

-- Parallel build, in memory

ALTER TABLE tst SET (parallel_workers=4);
SET maintenance_work_mem='50 MB';
CREATE INDEX idx ON tst USING hnsw (v vector_l2_ops);
SELECT * FROM tst ORDER BY v <-> ARRAY[0, 0, 0]::vector LIMIT 5;
DROP INDEX idx;

-- Parallel build, spills to disk

ALTER TABLE tst SET (parallel_workers=4);
SET maintenance_work_mem='5 MB';
CREATE INDEX idx ON tst USING hnsw (v vector_l2_ops);
SELECT * FROM tst ORDER BY v <-> ARRAY[0, 0, 0]::vector LIMIT 5;
DROP INDEX idx;
```
2024-02-03 09:19:37 +02:00
Heikki Linnakangas
c96aead502 Reorganize .dockerignore
Author: Alexander Bayandin <alexander@neon.tech>
2024-02-03 09:19:37 +02:00
Arpad Müller
aac8eb2c36 Minor logging improvements (#6593)
* log when `lsn_by_timestamp` finished together with its result
* add back logging of the layer name as suggested in
https://github.com/neondatabase/neon/pull/6549#discussion_r1475756808
2024-02-03 02:16:20 +01:00
Clarence
3d1b08496a Update words in docs for better readability (#6600)
## Problem
 Found typos while reading the docs

## Summary of changes
Fixed the typos found
2024-02-03 00:59:39 +00:00
Arpad Müller
0ac2606c8a S3 restore test: Use a workaround to enable moto's self-copy support (#6594)
While working on https://github.com/getmoto/moto/pull/7303 I discovered
that if you enable bucket encryption, moto allows self-copies. So we can
un-ignore the test. I tried it out locally, it works great.

Followup of #6533, part of
https://github.com/neondatabase/cloud/issues/8233
2024-02-02 23:45:57 +01:00
Em Sharnoff
d820d64e38 Bump vm-builder v0.21.0 -> v0.23.2 (#6480)
Relevant changes were all from v0.23.0:

- neondatabase/autoscaling#724
- neondatabase/autoscaling#726
- neondatabase/autoscaling#732

Co-authored-by: Alexander Bayandin <alexander@neon.tech>
2024-02-02 22:39:20 +00:00
Arthur Petukhovsky
f2aa96f003 Console split RFC (#1997)
[Rendered](https://github.com/neondatabase/neon/blob/rfc-console-split/docs/rfcs/017-console-split.md)

Co-authored-by: Stas Kelvich <stas.kelvich@gmail.com>
2024-02-02 23:41:55 +02:00
Sasha Krassovsky
2fd8e24c8f Switch sleeps to wait_until (#6575)
## Problem
I didn't know about `wait_until` and was relying on `sleep` to wait for
stuff. This caused some tests to be flaky.
https://github.com/neondatabase/neon/issues/6561
## Summary of changes
Switch to `wait_until`, this should make it tests less flaky
2024-02-02 21:32:40 +00:00
Heikki Linnakangas
c9876b0993 Fix double-free bug in walredo process. (#6534)
At the end of ApplyRecord(), we called pfree on the decoded record, if
it was "oversized". However, we had alread linked it to the "decode
queue" list in XLogReaderState. If we later called XLogBeginRead(), it
called ResetDecoder and tried to free the same record again.

The conditions to hit this are:

- a large WAL record (larger than aboue 64 kB I think, per
DEFAULT_DECODE_BUFFER_SIZE), and
- another WAL record processed by the same WAL redo process after the
large one.

I think the reason we haven't seen this earlier is that you don't get
WAL records that large that are sent to the WAL redo process, except
when logical replication is enabled. Logical replication adds data to
the WAL records, making them larger.

To fix, allocate the buffer ourselves, and don't link it to the decode
queue. Alternatively, we could perhaps have just removed the pfree(),
but frankly I'm a bit scared about the whole queue thing.
2024-02-02 21:49:11 +02:00
John Spray
786e9cf75b control_plane: implement HTTP compute hook for attachment service (#6471)
## Problem

When we change which physical pageservers a tenant is attached to, we
must update the control plane so that it can update computes. This will
be done via an HTTP hook, as described in
https://www.notion.so/neondatabase/Sharding-Service-Control-Plane-interface-6de56dd310a043bfa5c2f5564fa98365#1fe185a35d6d41f0a54279ac1a41bc94

## Summary of changes

- Optional CLI args `--control-plane-jwt-token` and `-compute-hook-url`
are added. If these are set, then we will use this HTTP endpoint,
instead of trying to use neon_local LocalEnv to update compute
configuration.
- Implement an HTTP-driven version of ComputeHook that calls into the
configured URL
- Notify for all tenants on startup, to ensure that we don't miss
notifications if we crash partway through a change, and carry a
`pending_compute_notification` flag at runtime to allow notifications to
fail without risking never sending the update.
- Add a test for all this

One might wonder: why not do a "forever" retry for compute hook
notifications, rather than carrying a flag on the shard to call
reconcile() again later. The reason is that we will later limit
concurreny of reconciles, when dealing with larger numbers of shards,
and if reconcile is stuck waiting for the control plane to accept a
notification request, it could jam up the whole system and prevent us
making other changes. Anyway: from the perspective of the outside world,
we _do_ retry forever, but we don't retry forever within a given
Reconciler lifetime.

The `pending_compute_notification` logic is predicated on later adding a
background task that just calls `Service::reconcile_all` on a schedule
to make sure that anything+everything that can fail a
Reconciler::reconcile call will eventually be retried.
2024-02-02 19:22:03 +00:00
Vadim Kharitonov
0b91edb943 Revert pgvector 0.6.0 (#6592)
It doesn't work in our VMs. Need more time to investigate
2024-02-02 18:36:31 +00:00
John Spray
2e5eab69c6 tests: remove test_gc_cutoff (#6587)
This test became flaky when postgres retry handling was fixed to use
backoff delays -- each iteration in this test's loop was taking much
longer because pgbench doesn't fail until postgres has given up on
retrying to the pageserver.

We are just removing it, because the condition it tests is no longer
risky: we reload all metadata from remote storage on restart, so
crashing directly between making local changes and doing remote uploads
isn't interesting any more.

Closes:  https://github.com/neondatabase/neon/issues/2856
Closes: https://github.com/neondatabase/neon/issues/5329
2024-02-02 18:20:18 +00:00
Joonas Koivunen
caf868e274 test: assert we eventually free space (#6536)
in `test_statvfs_pressure_{usage,min_avail_bytes}` we now race against
initial logical size calculation on-demand downloading the layers. first
wait out the initial logical sizes, then change the final asserts to be
"eventual", which is not great but it is faster than failing and
retrying.

this issue seems to happen only in debug mode tests.

Fixes: #6510
2024-02-02 19:46:47 +02:00
John Spray
7e2436695d storage controller: use AWS Secrets Manager for database URL, etc (#6585)
## Problem

Passing secrets in via CLI/environment is awkward when using helm for
deployment, and not ideal for security (secrets may show up in ps,
/proc).

We can bypass these issues by simply connecting directly to the AWS
Secrets Manager service at runtime.

## Summary of changes

- Add dependency on aws-sdk-secretsmanager
- Update other aws dependencies to latest, to match transitive
dependency versions
- Add `Secrets` type in attachment service, using AWS SDK to load if
secrets are not provided on the command line.
2024-02-02 16:57:11 +00:00
Conrad Ludgate
6506fd14c4 proxy: more refactors (#6526)
## Problem

not really any problem, just some drive-by changes

## Summary of changes

1. move wake compute
2. move json processing
3. move handle_try_wake
4. move test backend to api provider
5. reduce wake-compute concerns
6. remove duplicate wake-compute loop
2024-02-02 16:07:35 +00:00
John Spray
46fb1a90ce pageserver: avoid calculating/sending logical sizes on shard !=0 (#6567)
## Problem

Sharded tenants only maintain accurate relation sizes on shard 0.
Therefore logical size can only be calculated on shard 0. Fortunately it
is also only _needed_ on shard 0, to provide Safekeeper feedback and to
send consumption metrics.

Closes: #6307

## Summary of changes

- Send 0 for logical size to safekeepers on shards !=0
- Skip logical size warmup task on shards !=0
- Skip imitate_layer_accesses on shards !=0
2024-02-02 15:52:03 +00:00
68 changed files with 2388 additions and 1176 deletions

View File

@@ -17,7 +17,6 @@
!libs/
!neon_local/
!pageserver/
!patches/
!pgxn/
!proxy/
!s3_scrubber/

View File

@@ -872,7 +872,7 @@ jobs:
run:
shell: sh -eu {0}
env:
VM_BUILDER_VERSION: v0.21.0
VM_BUILDER_VERSION: v0.23.2
steps:
- name: Checkout

243
Cargo.lock generated
View File

@@ -275,6 +275,8 @@ name = "attachment_service"
version = "0.1.0"
dependencies = [
"anyhow",
"aws-config",
"aws-sdk-secretsmanager",
"camino",
"clap",
"control_plane",
@@ -286,6 +288,7 @@ dependencies = [
"pageserver_api",
"pageserver_client",
"postgres_connection",
"reqwest",
"serde",
"serde_json",
"thiserror",
@@ -304,12 +307,11 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "aws-config"
version = "1.0.1"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80c950a809d39bc9480207cb1cfc879ace88ea7e3a4392a8e9999e45d6e5692e"
checksum = "8b30c39ebe61f75d1b3785362b1586b41991873c9ab3e317a9181c246fb71d82"
dependencies = [
"aws-credential-types",
"aws-http",
"aws-runtime",
"aws-sdk-sso",
"aws-sdk-ssooidc",
@@ -324,7 +326,7 @@ dependencies = [
"bytes",
"fastrand 2.0.0",
"hex",
"http",
"http 0.2.9",
"hyper",
"ring 0.17.6",
"time",
@@ -335,9 +337,9 @@ dependencies = [
[[package]]
name = "aws-credential-types"
version = "1.0.1"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c1317e1a3514b103cf7d5828bbab3b4d30f56bd22d684f8568bc51b6cfbbb1c"
checksum = "33cc49dcdd31c8b6e79850a179af4c367669150c7ac0135f176c61bec81a70f7"
dependencies = [
"aws-smithy-async",
"aws-smithy-runtime-api",
@@ -345,30 +347,13 @@ dependencies = [
"zeroize",
]
[[package]]
name = "aws-http"
version = "0.60.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "361c4310fdce94328cc2d1ca0c8a48c13f43009c61d3367585685a50ca8c66b6"
dependencies = [
"aws-smithy-runtime-api",
"aws-smithy-types",
"aws-types",
"bytes",
"http",
"http-body",
"pin-project-lite",
"tracing",
]
[[package]]
name = "aws-runtime"
version = "1.0.1"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ed7ef604a15fd0d4d9e43701295161ea6b504b63c44990ead352afea2bc15e9"
checksum = "eb031bff99877c26c28895766f7bb8484a05e24547e370768d6cc9db514662aa"
dependencies = [
"aws-credential-types",
"aws-http",
"aws-sigv4",
"aws-smithy-async",
"aws-smithy-eventstream",
@@ -376,21 +361,23 @@ dependencies = [
"aws-smithy-runtime-api",
"aws-smithy-types",
"aws-types",
"bytes",
"fastrand 2.0.0",
"http",
"http 0.2.9",
"http-body",
"percent-encoding",
"pin-project-lite",
"tracing",
"uuid",
]
[[package]]
name = "aws-sdk-s3"
version = "1.4.0"
version = "1.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9dcafc2fe52cc30b2d56685e2fa6a879ba50d79704594852112337a472ddbd24"
checksum = "951f7730f51a2155c711c85c79f337fbc02a577fa99d2a0a8059acfce5392113"
dependencies = [
"aws-credential-types",
"aws-http",
"aws-runtime",
"aws-sigv4",
"aws-smithy-async",
@@ -404,23 +391,22 @@ dependencies = [
"aws-smithy-xml",
"aws-types",
"bytes",
"http",
"http 0.2.9",
"http-body",
"once_cell",
"percent-encoding",
"regex",
"regex-lite",
"tracing",
"url",
]
[[package]]
name = "aws-sdk-sso"
version = "1.3.0"
name = "aws-sdk-secretsmanager"
version = "1.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0619ab97a5ca8982e7de073cdc66f93e5f6a1b05afc09e696bec1cb3607cd4df"
checksum = "0a0b64e61e7d632d9df90a2e0f32630c68c24960cab1d27d848718180af883d3"
dependencies = [
"aws-credential-types",
"aws-http",
"aws-runtime",
"aws-smithy-async",
"aws-smithy-http",
@@ -430,19 +416,42 @@ dependencies = [
"aws-smithy-types",
"aws-types",
"bytes",
"http",
"regex",
"fastrand 2.0.0",
"http 0.2.9",
"once_cell",
"regex-lite",
"tracing",
]
[[package]]
name = "aws-sdk-sso"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f486420a66caad72635bc2ce0ff6581646e0d32df02aa39dc983bfe794955a5b"
dependencies = [
"aws-credential-types",
"aws-runtime",
"aws-smithy-async",
"aws-smithy-http",
"aws-smithy-json",
"aws-smithy-runtime",
"aws-smithy-runtime-api",
"aws-smithy-types",
"aws-types",
"bytes",
"http 0.2.9",
"once_cell",
"regex-lite",
"tracing",
]
[[package]]
name = "aws-sdk-ssooidc"
version = "1.3.0"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f04b9f5474cc0f35d829510b2ec8c21e352309b46bf9633c5a81fb9321e9b1c7"
checksum = "39ddccf01d82fce9b4a15c8ae8608211ee7db8ed13a70b514bbfe41df3d24841"
dependencies = [
"aws-credential-types",
"aws-http",
"aws-runtime",
"aws-smithy-async",
"aws-smithy-http",
@@ -452,19 +461,19 @@ dependencies = [
"aws-smithy-types",
"aws-types",
"bytes",
"http",
"regex",
"http 0.2.9",
"once_cell",
"regex-lite",
"tracing",
]
[[package]]
name = "aws-sdk-sts"
version = "1.3.0"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5700da387716ccfc30b27f44b008f457e1baca5b0f05b6b95455778005e3432a"
checksum = "1a591f8c7e6a621a501b2b5d2e88e1697fcb6274264523a6ad4d5959889a41ce"
dependencies = [
"aws-credential-types",
"aws-http",
"aws-runtime",
"aws-smithy-async",
"aws-smithy-http",
@@ -475,16 +484,17 @@ dependencies = [
"aws-smithy-types",
"aws-smithy-xml",
"aws-types",
"http",
"regex",
"http 0.2.9",
"once_cell",
"regex-lite",
"tracing",
]
[[package]]
name = "aws-sigv4"
version = "1.0.1"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "380adcc8134ad8bbdfeb2ace7626a869914ee266322965276cbc54066186d236"
checksum = "c371c6b0ac54d4605eb6f016624fb5c7c2925d315fdf600ac1bf21b19d5f1742"
dependencies = [
"aws-credential-types",
"aws-smithy-eventstream",
@@ -496,11 +506,11 @@ dependencies = [
"form_urlencoded",
"hex",
"hmac",
"http",
"http 0.2.9",
"http 1.0.0",
"once_cell",
"p256",
"percent-encoding",
"regex",
"ring 0.17.6",
"sha2",
"subtle",
@@ -511,9 +521,9 @@ dependencies = [
[[package]]
name = "aws-smithy-async"
version = "1.0.2"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e37ca17d25fe1e210b6d4bdf59b81caebfe99f986201a1228cb5061233b4b13"
checksum = "72ee2d09cce0ef3ae526679b522835d63e75fb427aca5413cd371e490d52dcc6"
dependencies = [
"futures-util",
"pin-project-lite",
@@ -522,9 +532,9 @@ dependencies = [
[[package]]
name = "aws-smithy-checksums"
version = "0.60.0"
version = "0.60.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5a373ec01aede3dd066ec018c1bc4e8f5dd11b2c11c59c8eef1a5c68101f397"
checksum = "be2acd1b9c6ae5859999250ed5a62423aedc5cf69045b844432de15fa2f31f2b"
dependencies = [
"aws-smithy-http",
"aws-smithy-types",
@@ -532,7 +542,7 @@ dependencies = [
"crc32c",
"crc32fast",
"hex",
"http",
"http 0.2.9",
"http-body",
"md-5",
"pin-project-lite",
@@ -543,9 +553,9 @@ dependencies = [
[[package]]
name = "aws-smithy-eventstream"
version = "0.60.0"
version = "0.60.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c669e1e5fc0d79561bf7a122b118bd50c898758354fe2c53eb8f2d31507cbc3"
checksum = "e6363078f927f612b970edf9d1903ef5cef9a64d1e8423525ebb1f0a1633c858"
dependencies = [
"aws-smithy-types",
"bytes",
@@ -554,9 +564,9 @@ dependencies = [
[[package]]
name = "aws-smithy-http"
version = "0.60.0"
version = "0.60.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b1de8aee22f67de467b2e3d0dd0fb30859dc53f579a63bd5381766b987db644"
checksum = "dab56aea3cd9e1101a0a999447fb346afb680ab1406cebc44b32346e25b4117d"
dependencies = [
"aws-smithy-eventstream",
"aws-smithy-runtime-api",
@@ -564,7 +574,7 @@ dependencies = [
"bytes",
"bytes-utils",
"futures-core",
"http",
"http 0.2.9",
"http-body",
"once_cell",
"percent-encoding",
@@ -575,18 +585,18 @@ dependencies = [
[[package]]
name = "aws-smithy-json"
version = "0.60.0"
version = "0.60.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a46dd338dc9576d6a6a5b5a19bd678dcad018ececee11cf28ecd7588bd1a55c"
checksum = "fd3898ca6518f9215f62678870064398f00031912390efd03f1f6ef56d83aa8e"
dependencies = [
"aws-smithy-types",
]
[[package]]
name = "aws-smithy-query"
version = "0.60.0"
version = "0.60.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "feb5b8c7a86d4b6399169670723b7e6f21a39fc833a30f5c5a2f997608178129"
checksum = "bda4b1dfc9810e35fba8a620e900522cd1bd4f9578c446e82f49d1ce41d2e9f9"
dependencies = [
"aws-smithy-types",
"urlencoding",
@@ -594,9 +604,9 @@ dependencies = [
[[package]]
name = "aws-smithy-runtime"
version = "1.0.2"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "273479291efc55e7b0bce985b139d86b6031adb8e50f65c1f712f20ba38f6388"
checksum = "fafdab38f40ad7816e7da5dec279400dd505160780083759f01441af1bbb10ea"
dependencies = [
"aws-smithy-async",
"aws-smithy-http",
@@ -605,7 +615,7 @@ dependencies = [
"bytes",
"fastrand 2.0.0",
"h2",
"http",
"http 0.2.9",
"http-body",
"hyper",
"hyper-rustls",
@@ -619,14 +629,14 @@ dependencies = [
[[package]]
name = "aws-smithy-runtime-api"
version = "1.0.2"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6cebff0d977b6b6feed2fd07db52aac58ba3ccaf26cdd49f1af4add5061bef9"
checksum = "c18276dd28852f34b3bf501f4f3719781f4999a51c7bff1a5c6dc8c4529adc29"
dependencies = [
"aws-smithy-async",
"aws-smithy-types",
"bytes",
"http",
"http 0.2.9",
"pin-project-lite",
"tokio",
"tracing",
@@ -635,15 +645,15 @@ dependencies = [
[[package]]
name = "aws-smithy-types"
version = "1.0.2"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7f48b3f27ddb40ab19892a5abda331f403e3cb877965e4e51171447807104af"
checksum = "bb3e134004170d3303718baa2a4eb4ca64ee0a1c0a7041dca31b38be0fb414f3"
dependencies = [
"base64-simd",
"bytes",
"bytes-utils",
"futures-core",
"http",
"http 0.2.9",
"http-body",
"itoa",
"num-integer",
@@ -658,24 +668,24 @@ dependencies = [
[[package]]
name = "aws-smithy-xml"
version = "0.60.0"
version = "0.60.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ec40d74a67fd395bc3f6b4ccbdf1543672622d905ef3f979689aea5b730cb95"
checksum = "8604a11b25e9ecaf32f9aa56b9fe253c5e2f606a3477f0071e96d3155a5ed218"
dependencies = [
"xmlparser",
]
[[package]]
name = "aws-types"
version = "1.0.1"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8403fc56b1f3761e8efe45771ddc1165e47ec3417c68e68a4519b5cb030159ca"
checksum = "789bbe008e65636fe1b6dbbb374c40c8960d1232b96af5ff4aec349f9c4accf4"
dependencies = [
"aws-credential-types",
"aws-smithy-async",
"aws-smithy-runtime-api",
"aws-smithy-types",
"http",
"http 0.2.9",
"rustc_version",
"tracing",
]
@@ -692,7 +702,7 @@ dependencies = [
"bitflags 1.3.2",
"bytes",
"futures-util",
"http",
"http 0.2.9",
"http-body",
"hyper",
"itoa",
@@ -724,7 +734,7 @@ dependencies = [
"async-trait",
"bytes",
"futures-util",
"http",
"http 0.2.9",
"http-body",
"mime",
"rustversion",
@@ -2003,9 +2013,9 @@ dependencies = [
[[package]]
name = "futures-channel"
version = "0.3.28"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2"
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
dependencies = [
"futures-core",
"futures-sink",
@@ -2013,9 +2023,9 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.28"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]]
name = "futures-executor"
@@ -2030,9 +2040,9 @@ dependencies = [
[[package]]
name = "futures-io"
version = "0.3.28"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
[[package]]
name = "futures-lite"
@@ -2051,9 +2061,9 @@ dependencies = [
[[package]]
name = "futures-macro"
version = "0.3.28"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
@@ -2062,15 +2072,15 @@ dependencies = [
[[package]]
name = "futures-sink"
version = "0.3.28"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e"
checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5"
[[package]]
name = "futures-task"
version = "0.3.28"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65"
checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004"
[[package]]
name = "futures-timer"
@@ -2080,9 +2090,9 @@ checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
[[package]]
name = "futures-util"
version = "0.3.28"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [
"futures-channel",
"futures-core",
@@ -2186,7 +2196,7 @@ dependencies = [
"futures-core",
"futures-sink",
"futures-util",
"http",
"http 0.2.9",
"indexmap 2.0.1",
"slab",
"tokio",
@@ -2337,6 +2347,17 @@ dependencies = [
"itoa",
]
[[package]]
name = "http"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea"
dependencies = [
"bytes",
"fnv",
"itoa",
]
[[package]]
name = "http-body"
version = "0.4.5"
@@ -2344,7 +2365,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1"
dependencies = [
"bytes",
"http",
"http 0.2.9",
"pin-project-lite",
]
@@ -2407,7 +2428,7 @@ dependencies = [
"futures-core",
"futures-util",
"h2",
"http",
"http 0.2.9",
"http-body",
"httparse",
"httpdate",
@@ -2426,7 +2447,7 @@ version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0646026eb1b3eea4cd9ba47912ea5ce9cc07713d105b1a14698f4e6433d348b7"
dependencies = [
"http",
"http 0.2.9",
"hyper",
"log",
"rustls",
@@ -3108,7 +3129,7 @@ dependencies = [
"base64 0.13.1",
"chrono",
"getrandom 0.2.11",
"http",
"http 0.2.9",
"rand 0.8.5",
"serde",
"serde_json",
@@ -3210,7 +3231,7 @@ checksum = "c7594ec0e11d8e33faf03530a4c49af7064ebba81c1480e01be67d90b356508b"
dependencies = [
"async-trait",
"bytes",
"http",
"http 0.2.9",
"opentelemetry_api",
"reqwest",
]
@@ -3223,7 +3244,7 @@ checksum = "7e5e5a5c4135864099f3faafbe939eb4d7f9b80ebf68a8448da961b32a7c1275"
dependencies = [
"async-trait",
"futures-core",
"http",
"http 0.2.9",
"opentelemetry-http",
"opentelemetry-proto",
"opentelemetry-semantic-conventions",
@@ -4323,6 +4344,12 @@ dependencies = [
"regex-syntax 0.8.2",
]
[[package]]
name = "regex-lite"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30b661b2f27137bdbc16f00eda72866a92bb28af1753ffbd56744fb6e2e9cd8e"
[[package]]
name = "regex-syntax"
version = "0.6.29"
@@ -4392,7 +4419,7 @@ dependencies = [
"futures-core",
"futures-util",
"h2",
"http",
"http 0.2.9",
"http-body",
"hyper",
"hyper-rustls",
@@ -4433,7 +4460,7 @@ checksum = "4531c89d50effe1fac90d095c8b133c20c5c714204feee0bfc3fd158e784209d"
dependencies = [
"anyhow",
"async-trait",
"http",
"http 0.2.9",
"reqwest",
"serde",
"task-local-extensions",
@@ -4451,7 +4478,7 @@ dependencies = [
"chrono",
"futures",
"getrandom 0.2.11",
"http",
"http 0.2.9",
"hyper",
"parking_lot 0.11.2",
"reqwest",
@@ -4538,7 +4565,7 @@ version = "3.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "496c1d3718081c45ba9c31fbfc07417900aa96f4070ff90dc29961836b7a9945"
dependencies = [
"http",
"http 0.2.9",
"hyper",
"lazy_static",
"percent-encoding",
@@ -5868,7 +5895,7 @@ dependencies = [
"futures-core",
"futures-util",
"h2",
"http",
"http 0.2.9",
"http-body",
"hyper",
"hyper-timeout",
@@ -6083,7 +6110,7 @@ dependencies = [
"byteorder",
"bytes",
"data-encoding",
"http",
"http 0.2.9",
"httparse",
"log",
"rand 0.8.5",

View File

@@ -48,11 +48,12 @@ azure_storage_blobs = "0.18"
flate2 = "1.0.26"
async-stream = "0.3"
async-trait = "0.1"
aws-config = { version = "1.0", default-features = false, features=["rustls"] }
aws-sdk-s3 = "1.0"
aws-smithy-async = { version = "1.0", default-features = false, features=["rt-tokio"] }
aws-smithy-types = "1.0"
aws-credential-types = "1.0"
aws-config = { version = "1.1.4", default-features = false, features=["rustls"] }
aws-sdk-s3 = "1.14"
aws-sdk-secretsmanager = { version = "1.14.0" }
aws-smithy-async = { version = "1.1.4", default-features = false, features=["rt-tokio"] }
aws-smithy-types = "1.1.4"
aws-credential-types = "1.1.4"
axum = { version = "0.6.20", features = ["ws"] }
base64 = "0.13.0"
bincode = "1.3"

View File

@@ -241,12 +241,9 @@ RUN wget https://github.com/df7cb/postgresql-unit/archive/refs/tags/7.7.tar.gz -
FROM build-deps AS vector-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY patches/pgvector.patch /pgvector.patch
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.6.0.tar.gz -O pgvector.tar.gz && \
echo "b0cf4ba1ab016335ac8fb1cada0d2106235889a194fffeece217c5bda90b2f19 pgvector.tar.gz" | sha256sum --check && \
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.5.1.tar.gz -O pgvector.tar.gz && \
echo "cc7a8e034a96e30a819911ac79d32f6bc47bdd1aa2de4d7d4904e26b83209dc8 pgvector.tar.gz" | sha256sum --check && \
mkdir pgvector-src && cd pgvector-src && tar xvzf ../pgvector.tar.gz --strip-components=1 -C . && \
patch -p1 < /pgvector.patch && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/vector.control

View File

@@ -6,6 +6,8 @@ license.workspace = true
[dependencies]
anyhow.workspace = true
aws-config.workspace = true
aws-sdk-secretsmanager.workspace = true
camino.workspace = true
clap.workspace = true
futures.workspace = true
@@ -14,6 +16,7 @@ hyper.workspace = true
pageserver_api.workspace = true
pageserver_client.workspace = true
postgres_connection.workspace = true
reqwest.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true

View File

@@ -1,24 +1,76 @@
use std::collections::HashMap;
use std::{collections::HashMap, time::Duration};
use control_plane::endpoint::ComputeControlPlane;
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
use control_plane::local_env::LocalEnv;
use pageserver_api::shard::{ShardCount, ShardIndex, TenantShardId};
use hyper::{Method, StatusCode};
use pageserver_api::shard::{ShardCount, ShardIndex, ShardNumber, TenantShardId};
use postgres_connection::parse_host_port;
use utils::id::{NodeId, TenantId};
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use utils::{
backoff::{self},
id::{NodeId, TenantId},
};
use crate::service::Config;
const BUSY_DELAY: Duration = Duration::from_secs(1);
const SLOWDOWN_DELAY: Duration = Duration::from_secs(5);
pub(crate) const API_CONCURRENCY: usize = 32;
pub(super) struct ComputeHookTenant {
shards: Vec<(ShardIndex, NodeId)>,
}
#[derive(Serialize, Deserialize, Debug)]
struct ComputeHookNotifyRequestShard {
node_id: NodeId,
shard_number: ShardNumber,
}
/// Request body that we send to the control plane to notify it of where a tenant is attached
#[derive(Serialize, Deserialize, Debug)]
struct ComputeHookNotifyRequest {
tenant_id: TenantId,
shards: Vec<ComputeHookNotifyRequestShard>,
}
/// Error type for attempts to call into the control plane compute notification hook
#[derive(thiserror::Error, Debug)]
pub(crate) enum NotifyError {
// Request was not send successfully, e.g. transport error
#[error("Sending request: {0}")]
Request(#[from] reqwest::Error),
// Request could not be serviced right now due to ongoing Operation in control plane, but should be possible soon.
#[error("Control plane tenant busy")]
Busy,
// Explicit 429 response asking us to retry less frequently
#[error("Control plane overloaded")]
SlowDown,
// A 503 response indicates the control plane can't handle the request right now
#[error("Control plane unavailable (status {0})")]
Unavailable(StatusCode),
// API returned unexpected non-success status. We will retry, but log a warning.
#[error("Control plane returned unexpected status {0}")]
Unexpected(StatusCode),
// We shutdown while sending
#[error("Shutting down")]
ShuttingDown,
// A response indicates we will never succeed, such as 400 or 404
#[error("Non-retryable error {0}")]
Fatal(StatusCode),
}
impl ComputeHookTenant {
pub(super) async fn maybe_reconfigure(&mut self, tenant_id: TenantId) -> anyhow::Result<()> {
async fn maybe_reconfigure(&mut self, tenant_id: TenantId) -> Option<ComputeHookNotifyRequest> {
// Find the highest shard count and drop any shards that aren't
// for that shard count.
let shard_count = self.shards.iter().map(|(k, _v)| k.shard_count).max();
let Some(shard_count) = shard_count else {
// No shards, nothing to do.
tracing::info!("ComputeHookTenant::maybe_reconfigure: no shards");
return Ok(());
return None;
};
self.shards.retain(|(k, _v)| k.shard_count == shard_count);
@@ -26,38 +78,18 @@ impl ComputeHookTenant {
.sort_by_key(|(shard, _node_id)| shard.shard_number);
if self.shards.len() == shard_count.0 as usize || shard_count == ShardCount(0) {
// We have pageservers for all the shards: proceed to reconfigure compute
let env = match LocalEnv::load_config() {
Ok(e) => e,
Err(e) => {
tracing::warn!(
"Couldn't load neon_local config, skipping compute update ({e})"
);
return Ok(());
}
};
let cplane = ComputeControlPlane::load(env.clone())
.expect("Error loading compute control plane");
let compute_pageservers = self
.shards
.iter()
.map(|(_shard, node_id)| {
let ps_conf = env
.get_pageserver_conf(*node_id)
.expect("Unknown pageserver");
let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr)
.expect("Unable to parse listen_pg_addr");
(pg_host, pg_port.unwrap_or(5432))
})
.collect::<Vec<_>>();
for (endpoint_name, endpoint) in &cplane.endpoints {
if endpoint.tenant_id == tenant_id && endpoint.status() == "running" {
tracing::info!("🔁 Reconfiguring endpoint {}", endpoint_name,);
endpoint.reconfigure(compute_pageservers.clone()).await?;
}
}
// We have pageservers for all the shards: emit a configuration update
return Some(ComputeHookNotifyRequest {
tenant_id,
shards: self
.shards
.iter()
.map(|(shard, node_id)| ComputeHookNotifyRequestShard {
shard_number: shard.shard_number,
node_id: *node_id,
})
.collect(),
});
} else {
tracing::info!(
"ComputeHookTenant::maybe_reconfigure: not enough shards ({}/{})",
@@ -66,7 +98,7 @@ impl ComputeHookTenant {
);
}
Ok(())
None
}
}
@@ -74,22 +106,171 @@ impl ComputeHookTenant {
/// mapping. It aggregates updates for the shards in a tenant, and when appropriate reconfigures
/// the compute connection string.
pub(super) struct ComputeHook {
config: Config,
state: tokio::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
authorization_header: Option<String>,
}
impl ComputeHook {
pub(super) fn new() -> Self {
pub(super) fn new(config: Config) -> Self {
let authorization_header = config
.control_plane_jwt_token
.clone()
.map(|jwt| format!("Bearer {}", jwt));
Self {
state: Default::default(),
config,
authorization_header,
}
}
/// For test environments: use neon_local's LocalEnv to update compute
async fn do_notify_local(
&self,
reconfigure_request: ComputeHookNotifyRequest,
) -> anyhow::Result<()> {
let env = match LocalEnv::load_config() {
Ok(e) => e,
Err(e) => {
tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})");
return Ok(());
}
};
let cplane =
ComputeControlPlane::load(env.clone()).expect("Error loading compute control plane");
let ComputeHookNotifyRequest { tenant_id, shards } = reconfigure_request;
let compute_pageservers = shards
.into_iter()
.map(|shard| {
let ps_conf = env
.get_pageserver_conf(shard.node_id)
.expect("Unknown pageserver");
let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr)
.expect("Unable to parse listen_pg_addr");
(pg_host, pg_port.unwrap_or(5432))
})
.collect::<Vec<_>>();
for (endpoint_name, endpoint) in &cplane.endpoints {
if endpoint.tenant_id == tenant_id && endpoint.status() == EndpointStatus::Running {
tracing::info!("🔁 Reconfiguring endpoint {}", endpoint_name,);
endpoint.reconfigure(compute_pageservers.clone()).await?;
}
}
Ok(())
}
async fn do_notify_iteration(
&self,
client: &reqwest::Client,
url: &String,
reconfigure_request: &ComputeHookNotifyRequest,
cancel: &CancellationToken,
) -> Result<(), NotifyError> {
let req = client.request(Method::POST, url);
let req = if let Some(value) = &self.authorization_header {
req.header(reqwest::header::AUTHORIZATION, value)
} else {
req
};
tracing::debug!(
"Sending notify request to {} ({:?})",
url,
reconfigure_request
);
let send_result = req.json(&reconfigure_request).send().await;
let response = match send_result {
Ok(r) => r,
Err(e) => return Err(e.into()),
};
// Treat all 2xx responses as success
if response.status() >= StatusCode::OK && response.status() < StatusCode::MULTIPLE_CHOICES {
if response.status() != StatusCode::OK {
// Non-200 2xx response: it doesn't make sense to retry, but this is unexpected, so
// log a warning.
tracing::warn!(
"Unexpected 2xx response code {} from control plane",
response.status()
);
}
return Ok(());
}
// Error response codes
match response.status() {
StatusCode::TOO_MANY_REQUESTS => {
// TODO: 429 handling should be global: set some state visible to other requests
// so that they will delay before starting, rather than all notifications trying
// once before backing off.
tokio::time::timeout(SLOWDOWN_DELAY, cancel.cancelled())
.await
.ok();
Err(NotifyError::SlowDown)
}
StatusCode::LOCKED => {
// Delay our retry if busy: the usual fast exponential backoff in backoff::retry
// is not appropriate
tokio::time::timeout(BUSY_DELAY, cancel.cancelled())
.await
.ok();
Err(NotifyError::Busy)
}
StatusCode::SERVICE_UNAVAILABLE
| StatusCode::GATEWAY_TIMEOUT
| StatusCode::BAD_GATEWAY => Err(NotifyError::Unavailable(response.status())),
StatusCode::BAD_REQUEST | StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
Err(NotifyError::Fatal(response.status()))
}
_ => Err(NotifyError::Unexpected(response.status())),
}
}
async fn do_notify(
&self,
url: &String,
reconfigure_request: ComputeHookNotifyRequest,
cancel: &CancellationToken,
) -> Result<(), NotifyError> {
let client = reqwest::Client::new();
backoff::retry(
|| self.do_notify_iteration(&client, url, &reconfigure_request, cancel),
|e| matches!(e, NotifyError::Fatal(_)),
3,
10,
"Send compute notification",
backoff::Cancel::new(cancel.clone(), || NotifyError::ShuttingDown),
)
.await
}
/// Call this to notify the compute (postgres) tier of new pageservers to use
/// for a tenant. notify() is called by each shard individually, and this function
/// will decide whether an update to the tenant is sent. An update is sent on the
/// condition that:
/// - We know a pageserver for every shard.
/// - All the shards have the same shard_count (i.e. we are not mid-split)
///
/// Cancellation token enables callers to drop out, e.g. if calling from a Reconciler
/// that is cancelled.
///
/// This function is fallible, including in the case that the control plane is transiently
/// unavailable. A limited number of retries are done internally to efficiently hide short unavailability
/// periods, but we don't retry forever. The **caller** is responsible for handling failures and
/// ensuring that they eventually call again to ensure that the compute is eventually notified of
/// the proper pageserver nodes for a tenant.
#[tracing::instrument(skip_all, fields(tenant_shard_id, node_id))]
pub(super) async fn notify(
&self,
tenant_shard_id: TenantShardId,
node_id: NodeId,
) -> anyhow::Result<()> {
tracing::info!("ComputeHook::notify: {}->{}", tenant_shard_id, node_id);
cancel: &CancellationToken,
) -> Result<(), NotifyError> {
let mut locked = self.state.lock().await;
let entry = locked
.entry(tenant_shard_id.tenant_id)
@@ -111,6 +292,25 @@ impl ComputeHook {
entry.shards.push((shard_index, node_id));
}
entry.maybe_reconfigure(tenant_shard_id.tenant_id).await
let reconfigure_request = entry.maybe_reconfigure(tenant_shard_id.tenant_id).await;
let Some(reconfigure_request) = reconfigure_request else {
// The tenant doesn't yet have pageservers for all its shards: we won't notify anything
// until it does.
tracing::debug!("Tenant isn't yet ready to emit a notification",);
return Ok(());
};
if let Some(notify_url) = &self.config.compute_hook_url {
self.do_notify(notify_url, reconfigure_request, cancel)
.await
} else {
self.do_notify_local(reconfigure_request)
.await
.map_err(|e| {
// This path is for testing only, so munge the error into our prod-style error type.
tracing::error!("Local notification hook failed: {e}");
NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR)
})
}
}
}

View File

@@ -8,6 +8,7 @@ use anyhow::anyhow;
use attachment_service::http::make_router;
use attachment_service::persistence::Persistence;
use attachment_service::service::{Config, Service};
use aws_config::{self, BehaviorVersion, Region};
use camino::Utf8PathBuf;
use clap::Parser;
use metrics::launch_timestamp::LaunchTimestamp;
@@ -34,9 +35,18 @@ struct Cli {
public_key: Option<camino::Utf8PathBuf>,
/// Token for authenticating this service with the pageservers it controls
#[arg(short, long)]
#[arg(long)]
jwt_token: Option<String>,
/// Token for authenticating this service with the control plane, when calling
/// the compute notification endpoint
#[arg(long)]
control_plane_jwt_token: Option<String>,
/// URL to control plane compute notification endpoint
#[arg(long)]
compute_hook_url: Option<String>,
/// Path to the .json file to store state (will be created if it doesn't exist)
#[arg(short, long)]
path: Option<Utf8PathBuf>,
@@ -46,6 +56,117 @@ struct Cli {
database_url: String,
}
/// Secrets may either be provided on the command line (for testing), or loaded from AWS SecretManager: this
/// type encapsulates the logic to decide which and do the loading.
struct Secrets {
database_url: String,
public_key: Option<JwtAuth>,
jwt_token: Option<String>,
control_plane_jwt_token: Option<String>,
}
impl Secrets {
const DATABASE_URL_SECRET: &'static str = "rds-neon-storage-controller-url";
const PAGESERVER_JWT_TOKEN_SECRET: &'static str =
"neon-storage-controller-pageserver-jwt-token";
const CONTROL_PLANE_JWT_TOKEN_SECRET: &'static str =
"neon-storage-controller-control-plane-jwt-token";
const PUBLIC_KEY_SECRET: &'static str = "neon-storage-controller-public-key";
async fn load(args: &Cli) -> anyhow::Result<Self> {
if args.database_url.is_empty() {
Self::load_aws_sm().await
} else {
Self::load_cli(args)
}
}
async fn load_aws_sm() -> anyhow::Result<Self> {
let Ok(region) = std::env::var("AWS_REGION") else {
anyhow::bail!("AWS_REGION is not set, cannot load secrets automatically: either set this, or use CLI args to supply secrets");
};
let config = aws_config::defaults(BehaviorVersion::v2023_11_09())
.region(Region::new(region.clone()))
.load()
.await;
let asm = aws_sdk_secretsmanager::Client::new(&config);
let Some(database_url) = asm
.get_secret_value()
.secret_id(Self::DATABASE_URL_SECRET)
.send()
.await?
.secret_string()
.map(str::to_string)
else {
anyhow::bail!(
"Database URL secret not found at {region}/{}",
Self::DATABASE_URL_SECRET
)
};
let jwt_token = asm
.get_secret_value()
.secret_id(Self::PAGESERVER_JWT_TOKEN_SECRET)
.send()
.await?
.secret_string()
.map(str::to_string);
if jwt_token.is_none() {
tracing::warn!("No pageserver JWT token set: this will only work if authentication is disabled on the pageserver");
}
let control_plane_jwt_token = asm
.get_secret_value()
.secret_id(Self::CONTROL_PLANE_JWT_TOKEN_SECRET)
.send()
.await?
.secret_string()
.map(str::to_string);
if jwt_token.is_none() {
tracing::warn!("No control plane JWT token set: this will only work if authentication is disabled on the pageserver");
}
let public_key = asm
.get_secret_value()
.secret_id(Self::PUBLIC_KEY_SECRET)
.send()
.await?
.secret_string()
.map(str::to_string);
let public_key = match public_key {
Some(key) => Some(JwtAuth::from_key(key)?),
None => {
tracing::warn!(
"No public key set: inccoming HTTP requests will not be authenticated"
);
None
}
};
Ok(Self {
database_url,
public_key,
jwt_token,
control_plane_jwt_token,
})
}
fn load_cli(args: &Cli) -> anyhow::Result<Self> {
let public_key = match &args.public_key {
None => None,
Some(key_path) => Some(JwtAuth::from_key_path(key_path)?),
};
Ok(Self {
database_url: args.database_url.clone(),
public_key,
jwt_token: args.jwt_token.clone(),
control_plane_jwt_token: args.control_plane_jwt_token.clone(),
})
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let launch_ts = Box::leak(Box::new(LaunchTimestamp::generate()));
@@ -66,23 +187,24 @@ async fn main() -> anyhow::Result<()> {
args.listen
);
let secrets = Secrets::load(&args).await?;
let config = Config {
jwt_token: args.jwt_token,
jwt_token: secrets.jwt_token,
control_plane_jwt_token: secrets.control_plane_jwt_token,
compute_hook_url: args.compute_hook_url,
};
let json_path = args.path;
let persistence = Arc::new(Persistence::new(args.database_url, json_path.clone()));
let persistence = Arc::new(Persistence::new(secrets.database_url, json_path.clone()));
let service = Service::spawn(config, persistence.clone()).await?;
let http_listener = tcp_listener::bind(args.listen)?;
let auth = if let Some(public_key_path) = &args.public_key {
let jwt_auth = JwtAuth::from_key_path(public_key_path)?;
Some(Arc::new(SwappableJwtAuth::new(jwt_auth)))
} else {
None
};
let auth = secrets
.public_key
.map(|jwt_auth| Arc::new(SwappableJwtAuth::new(jwt_auth)));
let router = make_router(service, auth)
.build()
.map_err(|err| anyhow!(err))?;

View File

@@ -14,7 +14,7 @@ use utils::generation::Generation;
use utils::id::{NodeId, TimelineId};
use utils::lsn::Lsn;
use crate::compute_hook::ComputeHook;
use crate::compute_hook::{ComputeHook, NotifyError};
use crate::node::Node;
use crate::tenant_state::{IntentState, ObservedState, ObservedStateLocation};
@@ -37,9 +37,15 @@ pub(super) struct Reconciler {
pub(crate) pageservers: Arc<HashMap<NodeId, Node>>,
/// A hook to notify the running postgres instances when we change the location
/// of a tenant
/// of a tenant. Use this via [`Self::compute_notify`] to update our failure flag
/// and guarantee eventual retries.
pub(crate) compute_hook: Arc<ComputeHook>,
/// To avoid stalling if the cloud control plane is unavailable, we may proceed
/// past failures in [`ComputeHook::notify`], but we _must_ remember that we failed
/// so that we can set [`crate::tenant_state::TenantState::pending_compute_notification`] to ensure a later retry.
pub(crate) compute_notify_failure: bool,
/// A means to abort background reconciliation: it is essential to
/// call this when something changes in the original TenantState that
/// will make this reconciliation impossible or unnecessary, for
@@ -52,7 +58,9 @@ pub(super) struct Reconciler {
}
#[derive(thiserror::Error, Debug)]
pub enum ReconcileError {
pub(crate) enum ReconcileError {
#[error(transparent)]
Notify(#[from] NotifyError),
#[error(transparent)]
Other(#[from] anyhow::Error),
}
@@ -317,9 +325,19 @@ impl Reconciler {
}
tracing::info!("🔁 Notifying compute to use pageserver {}", dest_ps_id);
self.compute_hook
.notify(self.tenant_shard_id, dest_ps_id)
.await?;
// During a live migration it is unhelpful to proceed if we couldn't notify compute: if we detach
// the origin without notifying compute, we will render the tenant unavailable.
while let Err(e) = self.compute_notify().await {
match e {
NotifyError::Fatal(_) => return Err(anyhow::anyhow!(e)),
_ => {
tracing::warn!(
"Live migration blocked by compute notification error, retrying: {e}"
);
}
}
}
// Downgrade the origin to secondary. If the tenant's policy is PlacementPolicy::Single, then
// this location will be deleted in the general case reconciliation that runs after this.
@@ -400,15 +418,7 @@ impl Reconciler {
wanted_conf.generation = self.generation.into();
tracing::info!("Observed configuration requires update.");
self.location_config(node_id, wanted_conf, None).await?;
if let Err(e) = self
.compute_hook
.notify(self.tenant_shard_id, node_id)
.await
{
tracing::warn!(
"Failed to notify compute of newly attached pageserver {node_id}: {e}"
);
}
self.compute_notify().await?;
}
}
}
@@ -461,6 +471,29 @@ impl Reconciler {
Ok(())
}
pub(crate) async fn compute_notify(&mut self) -> Result<(), NotifyError> {
// Whenever a particular Reconciler emits a notification, it is always notifying for the intended
// destination.
if let Some(node_id) = self.intent.attached {
let result = self
.compute_hook
.notify(self.tenant_shard_id, node_id, &self.cancel)
.await;
if let Err(e) = &result {
// It is up to the caller whether they want to drop out on this error, but they don't have to:
// in general we should avoid letting unavailability of the cloud control plane stop us from
// making progress.
tracing::warn!("Failed to notify compute of attached pageserver {node_id}: {e}");
// Set this flag so that in our ReconcileResult we will set the flag on the shard that it
// needs to retry at some point.
self.compute_notify_failure = true;
}
result
} else {
Ok(())
}
}
}
pub(crate) fn attached_location_conf(

View File

@@ -12,6 +12,7 @@ use control_plane::attachment_service::{
TenantShardMigrateRequest, TenantShardMigrateResponse,
};
use diesel::result::DatabaseErrorKind;
use futures::StreamExt;
use hyper::StatusCode;
use pageserver_api::{
control_api::{
@@ -27,6 +28,7 @@ use pageserver_api::{
shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId},
};
use pageserver_client::mgmt_api;
use tokio_util::sync::CancellationToken;
use utils::{
completion::Barrier,
generation::Generation,
@@ -36,7 +38,7 @@ use utils::{
};
use crate::{
compute_hook::ComputeHook,
compute_hook::{self, ComputeHook},
node::Node,
persistence::{DatabaseError, NodePersistence, Persistence, TenantShardPersistence},
scheduler::Scheduler,
@@ -66,6 +68,7 @@ struct ServiceState {
impl ServiceState {
fn new(
config: Config,
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
nodes: HashMap<NodeId, Node>,
tenants: BTreeMap<TenantShardId, TenantState>,
@@ -73,7 +76,7 @@ impl ServiceState {
Self {
tenants,
nodes: Arc::new(nodes),
compute_hook: Arc::new(ComputeHook::new()),
compute_hook: Arc::new(ComputeHook::new(config)),
result_tx,
}
}
@@ -82,8 +85,17 @@ impl ServiceState {
#[derive(Clone)]
pub struct Config {
// All pageservers managed by one instance of this service must have
// the same public key.
// the same public key. This JWT token will be used to authenticate
// this service to the pageservers it manages.
pub jwt_token: Option<String>,
// This JWT token will be used to authenticate this service to the control plane.
pub control_plane_jwt_token: Option<String>,
/// Where the compute hook should send notifications of pageserver attachment locations
/// (this URL points to the control plane in prod). If this is None, the compute hook will
/// assume it is running in a test environment and try to update neon_local.
pub compute_hook_url: Option<String>,
}
impl From<DatabaseError> for ApiError {
@@ -163,6 +175,8 @@ impl Service {
let mut cleanup = Vec::new();
let mut compute_notifications = Vec::new();
// Populate intent and observed states for all tenants, based on reported state on pageservers
let shard_count = {
let mut locked = self.inner.write().unwrap();
@@ -187,6 +201,13 @@ impl Service {
// not enough pageservers are available. The tenant may well still be available
// to clients.
tracing::error!("Failed to schedule tenant {tenant_shard_id} at startup: {e}");
} else {
// If we're both intending and observed to be attached at a particular node, we will
// emit a compute notification for this. In the case where our observed state does not
// yet match our intent, we will eventually reconcile, and that will emit a compute notification.
if let Some(attached_at) = tenant_state.stably_attached() {
compute_notifications.push((*tenant_shard_id, attached_at));
}
}
}
@@ -235,10 +256,57 @@ impl Service {
}
}
// Emit compute hook notifications for all tenants which are already stably attached. Other tenants
// will emit compute hook notifications when they reconcile.
//
// Ordering: we must complete these notification attempts before doing any other reconciliation for the
// tenants named here, because otherwise our calls to notify() might race with more recent values
// generated by reconciliation.
// Compute notify is fallible. If it fails here, do not delay overall startup: set the
// flag on these shards that they have a pending notification.
let compute_hook = self.inner.read().unwrap().compute_hook.clone();
// Construct an async stream of futures to invoke the compute notify function: we do this
// in order to subsequently use .buffered() on the stream to execute with bounded parallelism.
let stream = futures::stream::iter(compute_notifications.into_iter())
.map(|(tenant_shard_id, node_id)| {
let compute_hook = compute_hook.clone();
async move {
// TODO: give Service a cancellation token for clean shutdown
let cancel = CancellationToken::new();
if let Err(e) = compute_hook.notify(tenant_shard_id, node_id, &cancel).await {
tracing::error!(
tenant_shard_id=%tenant_shard_id,
node_id=%node_id,
"Failed to notify compute on startup for shard: {e}"
);
Some(tenant_shard_id)
} else {
None
}
}
})
.buffered(compute_hook::API_CONCURRENCY);
let notify_results = stream.collect::<Vec<_>>().await;
// Update tenant state for any that failed to do their initial compute notify, so that they'll retry later.
{
let mut locked = self.inner.write().unwrap();
for tenant_shard_id in notify_results.into_iter().flatten() {
if let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) {
shard.pending_compute_notification = true;
}
}
}
// Finally, now that the service is up and running, launch reconcile operations for any tenants
// which require it: under normal circumstances this should only include tenants that were in some
// transient state before we restarted.
// transient state before we restarted, or any tenants whose compute hooks failed above.
let reconcile_tasks = self.reconcile_all();
// We will not wait for these reconciliation tasks to run here: we're now done with startup and
// normal operations may proceed.
tracing::info!("Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)");
}
@@ -295,6 +363,7 @@ impl Service {
waiter: Arc::new(SeqWait::new(Sequence::initial())),
error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
last_error: Arc::default(),
pending_compute_notification: false,
};
tenants.insert(tenant_shard_id, new_tenant);
@@ -304,7 +373,10 @@ impl Service {
let this = Arc::new(Self {
inner: Arc::new(std::sync::RwLock::new(ServiceState::new(
result_tx, nodes, tenants,
config.clone(),
result_tx,
nodes,
tenants,
))),
config,
persistence,
@@ -330,6 +402,10 @@ impl Service {
// needed, but it is used to handle out-of-band updates via. e.g. test hook.
tenant.generation = std::cmp::max(tenant.generation, result.generation);
// If the reconciler signals that it failed to notify compute, set this state on
// the shard so that a future [`TenantState::maybe_reconcile`] will try again.
tenant.pending_compute_notification = result.pending_compute_notification;
match result.result {
Ok(()) => {
for (node_id, loc) in &result.observed.locations {

View File

@@ -71,6 +71,12 @@ pub(crate) struct TenantState {
/// TODO: generalize to an array of recent events
/// TOOD: use a ArcSwap instead of mutex for faster reads?
pub(crate) last_error: std::sync::Arc<std::sync::Mutex<String>>,
/// If we have a pending compute notification that for some reason we weren't able to send,
/// set this to true. If this is set, calls to [`Self::maybe_reconcile`] will run a task to retry
/// sending it. This is the mechanism by which compute notifications are included in the scope
/// of state that we publish externally in an eventually consistent way.
pub(crate) pending_compute_notification: bool,
}
#[derive(Default, Clone, Debug)]
@@ -164,6 +170,9 @@ pub(crate) struct ReconcileResult {
pub(crate) tenant_shard_id: TenantShardId,
pub(crate) generation: Generation,
pub(crate) observed: ObservedState,
/// Set [`TenantState::pending_compute_notification`] from this flag
pub(crate) pending_compute_notification: bool,
}
impl IntentState {
@@ -226,6 +235,7 @@ impl TenantState {
waiter: Arc::new(SeqWait::new(Sequence(0))),
error_waiter: Arc::new(SeqWait::new(Sequence(0))),
last_error: Arc::default(),
pending_compute_notification: false,
}
}
@@ -333,6 +343,38 @@ impl TenantState {
Ok(())
}
/// Query whether the tenant's observed state for attached node matches its intent state, and if so,
/// yield the node ID. This is appropriate for emitting compute hook notifications: we are checking that
/// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
///
/// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
/// funciton should not be used to decide whether to reconcile.
pub(crate) fn stably_attached(&self) -> Option<NodeId> {
if let Some(attach_intent) = self.intent.attached {
match self.observed.locations.get(&attach_intent) {
Some(loc) => match &loc.conf {
Some(conf) => match conf.mode {
LocationConfigMode::AttachedMulti
| LocationConfigMode::AttachedSingle
| LocationConfigMode::AttachedStale => {
// Our intent and observed state agree that this node is in an attached state.
Some(attach_intent)
}
// Our observed config is not an attached state
_ => None,
},
// Our observed state is None, i.e. in flux
None => None,
},
// We have no observed state for this node
None => None,
}
} else {
// Our intent is not to attach
None
}
}
fn dirty(&self) -> bool {
if let Some(node_id) = self.intent.attached {
let wanted_conf = attached_location_conf(self.generation, &self.shard, &self.config);
@@ -354,6 +396,12 @@ impl TenantState {
}
}
// Even if there is no pageserver work to be done, if we have a pending notification to computes,
// wake up a reconciler to send it.
if self.pending_compute_notification {
return true;
}
false
}
@@ -415,11 +463,13 @@ impl TenantState {
service_config: service_config.clone(),
cancel: cancel.clone(),
persistence: persistence.clone(),
compute_notify_failure: false,
};
let reconcile_seq = self.sequence;
tracing::info!("Spawning Reconciler for sequence {}", self.sequence);
let must_notify = self.pending_compute_notification;
let join_handle = tokio::task::spawn(async move {
// Wait for any previous reconcile task to complete before we start
if let Some(old_handle) = old_handle {
@@ -438,7 +488,16 @@ impl TenantState {
return;
}
// Attempt to make observed state match intent state
let result = reconciler.reconcile().await;
// If we know we had a pending compute notification from some previous action, send a notification irrespective
// of whether the above reconcile() did any work
if result.is_ok() && must_notify {
// If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
reconciler.compute_notify().await.ok();
}
result_tx
.send(ReconcileResult {
sequence: reconcile_seq,
@@ -446,6 +505,7 @@ impl TenantState {
tenant_shard_id: reconciler.tenant_shard_id,
generation: reconciler.generation,
observed: reconciler.observed,
pending_compute_notification: reconciler.compute_notify_failure,
})
.ok();
});

View File

@@ -457,6 +457,12 @@ impl AttachmentService {
args.push(format!("--public-key={public_key_path}"));
}
if let Some(control_plane_compute_hook_api) = &self.env.control_plane_compute_hook_api {
args.push(format!(
"--compute-hook-url={control_plane_compute_hook_api}"
));
}
background_process::start_process(
COMMAND,
&self.env.base_data_dir,

View File

@@ -795,7 +795,7 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
&endpoint.timeline_id.to_string(),
branch_name,
lsn_str.as_str(),
endpoint.status(),
&format!("{}", endpoint.status()),
]);
}

View File

@@ -184,7 +184,7 @@ impl ComputeControlPlane {
v.tenant_id == tenant_id
&& v.timeline_id == timeline_id
&& v.mode == mode
&& v.status() != "stopped"
&& v.status() != EndpointStatus::Stopped
});
if let Some((key, _)) = duplicates.next() {
@@ -223,6 +223,26 @@ pub struct Endpoint {
features: Vec<ComputeFeature>,
}
#[derive(PartialEq, Eq)]
pub enum EndpointStatus {
Running,
Stopped,
Crashed,
RunningNoPidfile,
}
impl std::fmt::Display for EndpointStatus {
fn fmt(&self, writer: &mut std::fmt::Formatter) -> std::fmt::Result {
let s = match self {
Self::Running => "running",
Self::Stopped => "stopped",
Self::Crashed => "crashed",
Self::RunningNoPidfile => "running, no pidfile",
};
write!(writer, "{}", s)
}
}
impl Endpoint {
fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv) -> Result<Endpoint> {
if !entry.file_type()?.is_dir() {
@@ -380,16 +400,16 @@ impl Endpoint {
self.endpoint_path().join("pgdata")
}
pub fn status(&self) -> &str {
pub fn status(&self) -> EndpointStatus {
let timeout = Duration::from_millis(300);
let has_pidfile = self.pgdata().join("postmaster.pid").exists();
let can_connect = TcpStream::connect_timeout(&self.pg_address, timeout).is_ok();
match (has_pidfile, can_connect) {
(true, true) => "running",
(false, false) => "stopped",
(true, false) => "crashed",
(false, true) => "running, no pidfile",
(true, true) => EndpointStatus::Running,
(false, false) => EndpointStatus::Stopped,
(true, false) => EndpointStatus::Crashed,
(false, true) => EndpointStatus::RunningNoPidfile,
}
}
@@ -481,7 +501,7 @@ impl Endpoint {
remote_ext_config: Option<&String>,
shard_stripe_size: usize,
) -> Result<()> {
if self.status() == "running" {
if self.status() == EndpointStatus::Running {
anyhow::bail!("The endpoint is already running");
}

View File

@@ -72,11 +72,16 @@ pub struct LocalEnv {
#[serde(default)]
pub safekeepers: Vec<SafekeeperConf>,
// Control plane location: if None, we will not run attachment_service. If set, this will
// Control plane upcall API for pageserver: if None, we will not run attachment_service. If set, this will
// be propagated into each pageserver's configuration.
#[serde(default)]
pub control_plane_api: Option<Url>,
// Control plane upcall API for attachment service. If set, this will be propagated into the
// attachment service's configuration.
#[serde(default)]
pub control_plane_compute_hook_api: Option<Url>,
/// Keep human-readable aliases in memory (and persist them to config), to hide ZId hex strings from the user.
#[serde(default)]
// A `HashMap<String, HashMap<TenantId, TimelineId>>` would be more appropriate here,

View File

@@ -21,7 +21,7 @@ We build all images after a successful `release` tests run and push automaticall
## Docker Compose example
You can see a [docker compose](https://docs.docker.com/compose/) example to create a neon cluster in [/docker-compose/docker-compose.yml](/docker-compose/docker-compose.yml). It creates the following conatainers.
You can see a [docker compose](https://docs.docker.com/compose/) example to create a neon cluster in [/docker-compose/docker-compose.yml](/docker-compose/docker-compose.yml). It creates the following containers.
- pageserver x 1
- safekeeper x 3
@@ -38,7 +38,7 @@ You can specify version of neon cluster using following environment values.
- TAG: the tag version of [docker image](https://registry.hub.docker.com/r/neondatabase/neon/tags) (default is latest), which is tagged in [CI test](/.github/workflows/build_and_test.yml)
```
$ cd docker-compose/
$ docker-compose down # remove the conainers if exists
$ docker-compose down # remove the containers if exists
$ PG_VERSION=15 TAG=2937 docker-compose up --build -d # You can specify the postgres and image version
Creating network "dockercompose_default" with the default driver
Creating docker-compose_storage_broker_1 ... done

View File

@@ -64,7 +64,7 @@ Storage.
The LayerMap tracks what layers exist in a timeline.
Currently, the layer map is just a resizeable array (Vec). On a GetPage@LSN or
Currently, the layer map is just a resizable array (Vec). On a GetPage@LSN or
other read request, the layer map scans through the array to find the right layer
that contains the data for the requested page. The read-code in LayeredTimeline
is aware of the ancestor, and returns data from the ancestor timeline if it's

View File

@@ -22,7 +22,7 @@ timeline to shutdown. It will also wait for them to finish.
A task registered in the task registry can check if it has been
requested to shut down, by calling `is_shutdown_requested()`. There's
also a `shudown_watcher()` Future that can be used with `tokio::select!`
also a `shutdown_watcher()` Future that can be used with `tokio::select!`
or similar, to wake up on shutdown.

View File

@@ -74,4 +74,4 @@ somewhat wasteful, but because most WAL records only affect one page,
the overhead is acceptable.
The WAL redo always happens for one particular page. If the WAL record
coantains changes to other pages, they are ignored.
contains changes to other pages, they are ignored.

View File

@@ -0,0 +1,420 @@
# Splitting cloud console
Created on 17.06.2022
## Summary
Currently we have `cloud` repository that contains code implementing public API for our clients as well as code for managing storage and internal infrastructure services. We can split everything user-related from everything storage-related to make it easier to test and maintain.
This RFC proposes to introduce a new control-plane service with HTTP API. The overall architecture will look like this:
```markup
. x
external area x internal area
(our clients) x (our services)
x
x ┌───────────────────────┐
x ┌───────────────┐ > ┌─────────────────────┐ │ Storage (EC2) │
x │ console db │ > │ control-plane db │ │ │
x └───────────────┘ > └─────────────────────┘ │ - safekeepers │
x ▲ > ▲ │ - pageservers │
x │ > │ │ │
┌──────────────────┐ x ┌───────┴───────┐ > │ │ Dependencies │
│ browser UI ├──►│ │ > ┌──────────┴──────────┐ │ │
└──────────────────┘ x │ │ > │ │ │ - etcd │
x │ console ├───────►│ control-plane ├────►│ - S3 │
┌──────────────────┐ x │ │ > │ (deployed in k8s) │ │ - more? │
│public API clients├──►│ │ > │ │ │ │
└──────────────────┘ x └───────┬───────┘ > └──────────┬──────────┘ └───────────────────────┘
x │ > ▲ │ ▲
x │ > │ │ │
x ┌───────┴───────┐ > │ │ ┌───────────┴───────────┐
x │ dependencies │ > │ │ │ │
x │- analytics │ > │ └───────────────►│ computes │
x │- auth │ > │ │ (deployed in k8s) │
x │- billing │ > │ │ │
x └───────────────┘ > │ └───────────────────────┘
x > │ ▲
x > ┌─────┴───────────────┐ │
┌──────────────────┐ x > │ │ │
│ │ x > │ proxy ├─────────────────┘
│ postgres ├───────────────────────────►│ (deployed in k8s) │
│ users │ x > │ │
│ │ x > └─────────────────────┘
└──────────────────┘ x >
>
>
closed-source > open-source
>
>
```
Notes:
- diagram is simplified in the less-important places
- directed arrows are strict and mean that connections in the reverse direction are forbidden
This split is quite complex and this RFC proposes several smaller steps to achieve the larger goal:
1. Start by refactoring the console code, the goal is to have console and control-plane code in the different directories without dependencies on each other.
2. Do similar refactoring for tables in the console database, remove queries selecting data from both console and control-plane; move control-plane tables to a separate database.
3. Implement control-plane HTTP API serving on a separate TCP port; make all console→control-plane calls to go through that HTTP API.
4. Move control-plane source code to the neon repo; start control-plane as a separate service.
## Motivation
These are the two most important problems we want to solve:
- Publish open-source implementation of all our cloud/storage features
- Make a unified control-plane that is used in all cloud (serverless) and local (tests) setups
Right now we have some closed-source code in the cloud repo. That code contains implementation for running Neon computes in k8s and without that code its impossible to automatically scale PostgreSQL computes. That means that we dont have an open-source serverless PostgreSQL at the moment.
After splitting and open-sourcing control-plane service we will have source code and Docker images for all storage services. That control-plane service should have HTTP API for creating and managing tenants (including all our storage features), while proxy will listen for incoming connections and create computes on-demand.
Improving our test suite is an important task, but requires a lot of prerequisites and may require a separate RFC. Possible implementation of that is described in the section [Next steps](#next-steps).
Another piece of motivation can be a better involvement of storage development team into a control-plane. By splitting control-plane from the console, it can be more convenient to test and develop control-plane with paying less attention to “business” features, such as user management, billing and analytics.
For example, console currently requires authentication providers such as GitHub OAuth to work at all, as well as nodejs to be able to build it locally. It will be more convenient to build and run it locally without these requirements.
## Proposed implementation
### Current state of things
Lets start with defining the current state of things at the moment of this proposal. We have three repositories containing source code:
- open-source `postgres` — our fork of postgres
- open-source `neon` — our main repository for storage source code
- closed-source `cloud` — mostly console backend and UI frontend
This proposal aims not to change anything at the existing code in `neon` and `postgres` repositories, but to create control-plane service and move its source code from `cloud` to the `neon` repository. That means that we need to split code in `cloud` repo only, and will consider only this repository for exploring its source code.
Lets look at the miscellaneous things in the `cloud` repo which are NOT part of the console application, i.e. NOT the Go source code that is compiled to the `./console` binary. There we have:
- command-line tools, such as cloudbench, neonadmin
- markdown documentation
- cloud operations scripts (helm, terraform, ansible)
- configs and other things
- e2e python tests
- incidents playbooks
- UI frontend
- Make build scripts, code generation scripts
- database migrations
- swagger definitions
And also lets take a look at what we have in the console source code, which is the service wed like to split:
- API Servers
- Public API v2
- Management API v2
- Public API v1
- Admin API v1 (same port as Public API v1)
- Management API v1
- Workers
- Monitor Compute Activity
- Watch Failed Operations
- Availability Checker
- Business Metrics Collector
- Internal Services
- Auth Middleware, UserIsAdmin, Cookies
- Cable Websocket Server
- Admin Services
- Global Settings, Operations, Pageservers, Platforms, Projects, Safekeepers, Users
- Authenticate Proxy
- API Keys
- App Controller, serving UI HTML
- Auth Controller
- Branches
- Projects
- Psql Connect + Passwordless login
- Users
- Cloud Metrics
- User Metrics
- Invites
- Pageserver/Safekeeper management
- Operations, k8s/docker/common logic
- Platforms, Regions
- Project State
- Projects Roles, SCRAM
- Global Settings
- Other things
- segment analytics integration
- sentry integration
- other common utilities packages
### Drawing the splitting line
The most challenging and the most important thing is to define the line that will split new control-plane service from the existing cloud service. If we dont get it right, then we can end up with having a lot more issues without many benefits.
We propose to define that line as follows:
- everything user-related stays in the console service
- everything storage-related should be in the control-plane service
- something that falls in between should be decided where to go, but most likely should stay in the console service
- some similar parts should be in both services, such as admin/management/db_migrations
We call user-related all requests that can be connected to some user. The general idea is dont have any user_id in the control-plane service and operate exclusively on tenant_id+timeline_id, the same way as existing storage services work now (compute, safekeeper, pageserver).
Storage-related things can be defined as doing any of the following:
- using k8s API
- doing requests to any of the storage services (proxy, compute, safekeeper, pageserver, etc..)
- tracking current status of tenants/timelines, managing lifetime of computes
Based on that idea, we can say that new control-plane service should have the following components:
- single HTTP API for everything
- Create and manage tenants and timelines
- Manage global settings and storage configuration (regions, platforms, safekeepers, pageservers)
- Admin API for storage health inspection and debugging
- Workers
- Monitor Compute Activity
- Watch Failed Operations
- Availability Checker
- Internal Services
- Admin Services
- Global Settings, Operations, Pageservers, Platforms, Tenants, Safekeepers
- Authenticate Proxy
- Branches
- Psql Connect
- Cloud Metrics
- Pageserver/Safekeeper management
- Operations, k8s/docker/common logic
- Platforms, Regions
- Tenant State
- Compute Roles, SCRAM
- Global Settings
---
And other components should probably stay in the console service:
- API Servers (no changes here)
- Public API v2
- Management API v2
- Public API v1
- Admin API v1 (same port as Public API v1)
- Management API v1
- Workers
- Business Metrics Collector
- Internal Services
- Auth Middleware, UserIsAdmin, Cookies
- Cable Websocket Server
- Admin Services
- Users admin stays the same
- Other admin services can redirect requests to the control-plane
- API Keys
- App Controller, serving UI HTML
- Auth Controller
- Projects
- User Metrics
- Invites
- Users
- Passwordless login
- Other things
- segment analytics integration
- sentry integration
- other common utilities packages
There are also miscellaneous things that are useful for all kinds of services. So we can say that these things can be in both services:
- markdown documentation
- e2e python tests
- make build scripts, code generation scripts
- database migrations
- swagger definitions
The single entrypoint to the storage should be control-plane API. After we define that API, we can have code-generated implementation for the client and for the server. The general idea is to move code implementing storage components from the console to the API implementation inside the new control-plane service.
After the code is moved to the new service, we can fill the created void by making API calls to the new service:
- authorization of the client
- mapping user_id + project_id to the tenant_id
- calling the control-plane API
### control-plane API
Currently we have the following projects API in the console:
```
GET /projects/{project_id}
PATCH /projects/{project_id}
POST /projects/{project_id}/branches
GET /projects/{project_id}/databases
POST /projects/{project_id}/databases
GET /projects/{project_id}/databases/{database_id}
PUT /projects/{project_id}/databases/{database_id}
DELETE /projects/{project_id}/databases/{database_id}
POST /projects/{project_id}/delete
GET /projects/{project_id}/issue_token
GET /projects/{project_id}/operations
GET /projects/{project_id}/operations/{operation_id}
POST /projects/{project_id}/query
GET /projects/{project_id}/roles
POST /projects/{project_id}/roles
GET /projects/{project_id}/roles/{role_name}
DELETE /projects/{project_id}/roles/{role_name}
POST /projects/{project_id}/roles/{role_name}/reset_password
POST /projects/{project_id}/start
POST /projects/{project_id}/stop
POST /psql_session/{psql_session_id}
```
It looks fine and we probably already have clients relying on it. So we should not change it, at least for now. But most of these endpoints (if not all) are related to storage, and it can suggest us what control-plane API should look like:
```
GET /tenants/{tenant_id}
PATCH /tenants/{tenant_id}
POST /tenants/{tenant_id}/branches
GET /tenants/{tenant_id}/databases
POST /tenants/{tenant_id}/databases
GET /tenants/{tenant_id}/databases/{database_id}
PUT /tenants/{tenant_id}/databases/{database_id}
DELETE /tenants/{tenant_id}/databases/{database_id}
POST /tenants/{tenant_id}/delete
GET /tenants/{tenant_id}/issue_token
GET /tenants/{tenant_id}/operations
GET /tenants/{tenant_id}/operations/{operation_id}
POST /tenants/{tenant_id}/query
GET /tenants/{tenant_id}/roles
POST /tenants/{tenant_id}/roles
GET /tenants/{tenant_id}/roles/{role_name}
DELETE /tenants/{tenant_id}/roles/{role_name}
POST /tenants/{tenant_id}/roles/{role_name}/reset_password
POST /tenants/{tenant_id}/start
POST /tenants/{tenant_id}/stop
POST /psql_session/{psql_session_id}
```
One of the options here is to use gRPC instead of the HTTP, which has some useful features, but there are some strong points towards using plain HTTP:
- HTTP API is easier to use for the clients
- we already have HTTP API in pageserver/safekeeper/console
- we probably want control-plane API to be similar to the console API, available in the cloud
### Getting updates from the storage
There can be some valid cases, when we would like to know what is changed in the storage. For example, console might want to know when user has queried and started compute and when compute was scaled to zero after that, to know how much user should pay for the service. Another example is to get info about reaching the disk space limits. Yet another example is to do analytics, such as how many users had at least one active project in a month.
All of the above cases can happen without using the console, just by accessing compute through the proxy.
To solve this, we can have a log of events occurring in the storage (event logs). That is very similar to operations table we have right now, the only difference is that events are immutable and we cannot change them after saving to the database. For example, we might want to have events for the following activities:
- We finished processing some HTTP API query, such as resetting the password
- We changed some state, such as started or stopped a compute
- Operation is created
- Operation is started for the first time
- Operation is failed for the first time
- Operation is finished
Once we save these events to the database, we can create HTTP API to subscribe to these events. That API can look like this:
```
GET /events/<cursor>
{
"events": [...],
"next_cursor": 123
}
```
It should be possible to replay event logs from some point of time, to get a state of almost anything from the storage services. That means that if we maintain some state in the control-plane database and we have a reason to have the same state in the console database, it is possible by polling events from the control-plane API and changing the state in the console database according to the events.
### Next steps
After implementing control-plane HTTP API and starting control-plane as a separate service, we might want to think of exploiting benefits of the new architecture, such as reorganizing test infrastructure. Possible options are listed in the [Next steps](#next-steps-1).
## Non Goals
RFC doesnt cover the actual cloud deployment scripts and schemas, such as terraform, ansible, k8s yamls and so on.
## Impacted components
Mostly console, but can also affect some storage service.
## Scalability
We should support starting several instances of the new control-plane service at the same time.
At the same time, it should be possible to use only single instance of control-plane, which can be useful for local tests.
## Security implications
New control-plane service is an internal service, so no external requests can reach it. But at the same time, it contains API to do absolutely anything with any of the tenants. That means that bad internal actor can potentially read and write all of the tenants. To make this safer, we can have one of these:
- Simple option is to protect all requests with a single private key, so that no one can make requests without having that one key.
- Another option is to have a separate token for every tenant and store these tokens in another secure place. This way its harder to access all tenants at once, because they have the different tokens.
## Alternative implementation
There was an idea to create a k8s operator for managing storage services and computes, but author of this RFC is not really familiar with it.
Regarding less alternative ideas, there are another options for the name of the new control-plane service:
- storage-ctl
- cloud
- cloud-ctl
## Pros/cons of proposed approaches (TODO)
Pros:
- All storage features are completely open-source
- Better tests coverage, less difference between cloud and local setups
- Easier to develop storage and cloud features, because there is no need to setup console for that
- Easier to deploy storage-only services to the any cloud
Cons:
- All storage features are completely open-source
- Distributed services mean more code to connect different services and potential network issues
- Console needs to have a dependency on storage API, there can be complications with developing new feature in a branch
- More code to JOIN data from different services (console and control-plane)
## Definition of Done
We have a new control-plane service running in the k8s. Source code for that control-plane service is located in the open-source neon repo.
## Next steps
After weve reached DoD, we can make further improvements.
First thing that can benefit from the split is local testing. The same control-plane service can implement starting computes as a local processes instead of k8s deployments. If it will also support starting pageservers/safekeepers/proxy for the local setup, then it can completely replace `./neon_local` binary, which is currently used for testing. The local testing environment can look like this:
```
┌─────────────────────┐ ┌───────────────────────┐
│ │ │ Storage (local) │
│ control-plane db │ │ │
│ (local process) │ │ - safekeepers │
│ │ │ - pageservers │
└──────────▲──────────┘ │ │
│ │ Dependencies │
┌──────────┴──────────┐ │ │
│ │ │ - etcd │
│ control-plane ├────►│ - S3 │
│ (local process) │ │ - more? │
│ │ │ │
└──────────┬──────────┘ └───────────────────────┘
▲ │ ▲
│ │ │
│ │ ┌───────────┴───────────┐
│ │ │ │
│ └───────────────►│ computes │
│ │ (local processes) │
│ │ │
┌──────┴──────────────┐ └───────────────────────┘
│ │ ▲
│ proxy │ │
│ (local process) ├─────────────────┘
│ │
└─────────────────────┘
```
The key thing here is that control-plane local service have the same API and almost the same implementation as the one deployed in the k8s. This allows to run the same e2e tests against both cloud and local setups.
For the python test_runner tests everything can stay mostly the same. To do that, we just need to replace `./neon_local` cli commands with API calls to the control-plane.
The benefit here will be in having fast local tests that are really close to our cloud setup. Bugs in k8s queries are still cannot be found when running computes as a local processes, but it should be really easy to start k8s locally (for example in k3s) and run the same tests with control-plane connected to the local k8s.
Talking about console and UI tests, after the split there should be a way to test these without spinning up all the storage locally. New control-plane service has a well-defined API, allowing us to mock it. This way we can create UI tests to verify the right calls are issued after specific UI interactions and verify that we render correct messages when API returns errors.

View File

@@ -78,7 +78,7 @@ with grpc streams and tokio mpsc channels. The implementation description is at
It is just 500 lines of code and core functionality is complete. 1-1 pub sub
gives about 120k received messages per second; having multiple subscribers in
different connecitons quickly scales to 1 million received messages per second.
different connections quickly scales to 1 million received messages per second.
I had concerns about many concurrent streams in singe connection, but 2^20
subscribers still work (though eat memory, with 10 publishers 20GB are consumed;
in this implementation each publisher holds full copy of all subscribers). There
@@ -95,12 +95,12 @@ other members, with best-effort this is simple.
### Security implications
Communication happens in a private network that is not exposed to users;
additionaly we can add auth to the broker.
additionally we can add auth to the broker.
## Alternative: get existing pub-sub
We could take some existing pub sub solution, e.g. RabbitMQ, Redis. But in this
case IMV simplicity of our own outweights external dependency costs (RabbitMQ is
case IMV simplicity of our own outweighs external dependency costs (RabbitMQ is
much more complicated and needs VM; Redis Rust client maintenance is not
ideal...). Also note that projects like CockroachDB and TiDB are based on gRPC
as well.

View File

@@ -74,7 +74,7 @@ TenantMaintenanceGuard: Like ActiveTenantGuard, but can be held even when the
tenant is not in Active state. Used for operations like attach/detach. Perhaps
allow only one such guard on a Tenant at a time.
Similarly for Timelines. We don't currentl have a "state" on Timeline, but I think
Similarly for Timelines. We don't currently have a "state" on Timeline, but I think
we need at least two states: Active and Stopping. The Stopping state is used at
deletion, to prevent new TimelineActiveGuards from appearing, while you wait for
existing TimelineActiveGuards to die out.
@@ -85,7 +85,7 @@ have a TenantActiveGuard, and the tenant's state changes from Active to
Stopping, the is_shutdown_requested() function should return true, and
shutdown_watcher() future should return.
This signaling doesn't neessarily need to cover all cases. For example, if you
This signaling doesn't necessarily need to cover all cases. For example, if you
have a block of code in spawn_blocking(), it might be acceptable if
is_shutdown_requested() doesn't return true even though the tenant is in
Stopping state, as long as the code finishes reasonably fast.

View File

@@ -37,7 +37,7 @@ sequenceDiagram
```
At this point it is not possible to restore from index, it contains L2 which
is no longer available in s3 and doesnt contain L3 added by compaction by the
is no longer available in s3 and doesn't contain L3 added by compaction by the
first pageserver. So if any of the pageservers restart initial sync will fail
(or in on-demand world it will fail a bit later during page request from
missing layer)
@@ -74,7 +74,7 @@ One possible solution for relocation case is to orchestrate background jobs
from outside. The oracle who runs migration can turn off background jobs on
PS1 before migration and then run migration -> enable them on PS2. The problem
comes if migration fails. In this case in order to resume background jobs
oracle needs to guarantee that PS2 doesnt run background jobs and if it doesnt
oracle needs to guarantee that PS2 doesn't run background jobs and if it doesn't
respond then PS1 is stuck unable to run compaction/gc. This cannot be solved
without human ensuring that no upload from PS2 can happen. In order to be able
to resolve this automatically CAS is required on S3 side so pageserver can
@@ -128,7 +128,7 @@ During discussion it seems that we converged on the approach consisting of:
whether we need to apply change to the index state or not.
- Responsibility for running background jobs is assigned externally. Pageserver
keeps locally persistent flag for each tenant that indicates whether this
pageserver is considered as primary one or not. TODO what happends if we
pageserver is considered as primary one or not. TODO what happens if we
crash and cannot start for some extended period of time? Control plane can
assign ownership to some other pageserver. Pageserver needs some way to check
if its still the blessed one. Maybe by explicit request to control plane on
@@ -138,7 +138,7 @@ Requirement for deterministic layer generation was considered overly strict
because of two reasons:
- It can limit possible optimizations e g when pageserver wants to reshuffle
some data locally and doesnt want to coordinate this
some data locally and doesn't want to coordinate this
- The deterministic algorithm itself can change so during deployments for some
time there will be two different version running at the same time which can
cause non determinism
@@ -164,7 +164,7 @@ sequenceDiagram
CP->>PS1: Yes
deactivate CP
PS1->>S3: Fetch PS1 index.
note over PS1: Continue operations, start backround jobs
note over PS1: Continue operations, start background jobs
note over PS1,PS2: PS1 starts up and still and is not a leader anymore
PS1->>CP: Am I still the leader for Tenant X?
CP->>PS1: No
@@ -203,7 +203,7 @@ sequenceDiagram
### Eviction
When two pageservers operate on a tenant for extended period of time follower
doesnt perform write operations in s3. When layer is evicted follower relies
doesn't perform write operations in s3. When layer is evicted follower relies
on updates from primary to get info about layers it needs to cover range for
evicted layer.

View File

@@ -4,7 +4,7 @@ Created on 08.03.23
## Motivation
Currently we dont delete pageserver part of the data from s3 when project is deleted. (The same is true for safekeepers, but this outside of the scope of this RFC).
Currently we don't delete pageserver part of the data from s3 when project is deleted. (The same is true for safekeepers, but this outside of the scope of this RFC).
This RFC aims to spin a discussion to come to a robust deletion solution that wont put us in into a corner for features like postponed deletion (when we keep data for user to be able to restore a project if it was deleted by accident)
@@ -75,9 +75,9 @@ Remote one is needed for cases when pageserver is lost during deletion so other
Why local mark file is needed?
If we dont have one, we have two choices, delete local data before deleting the remote part or do that after.
If we don't have one, we have two choices, delete local data before deleting the remote part or do that after.
If we delete local data before remote then during restart pageserver wont pick up remote tenant at all because nothing is available locally (pageserver looks for remote conuterparts of locally available tenants).
If we delete local data before remote then during restart pageserver wont pick up remote tenant at all because nothing is available locally (pageserver looks for remote counterparts of locally available tenants).
If we delete local data after remote then at the end of the sequence when remote mark file is deleted if pageserver restart happens then the state is the same to situation when pageserver just missing data on remote without knowing the fact that this data is intended to be deleted. In this case the current behavior is upload everything local-only to remote.
@@ -145,7 +145,7 @@ sequenceDiagram
CP->>PS: Retry delete tenant
PS->>CP: Not modified
else Mark is missing
note over PS: Continue to operate the tenant as if deletion didnt happen
note over PS: Continue to operate the tenant as if deletion didn't happen
note over CP: Eventually console should <br> retry delete request
@@ -168,7 +168,7 @@ sequenceDiagram
PS->>CP: True
```
Similar sequence applies when both local and remote marks were persisted but Control Plane still didnt receive a response.
Similar sequence applies when both local and remote marks were persisted but Control Plane still didn't receive a response.
If pageserver crashes after both mark files were deleted then it will reply to control plane status poll request with 404 which should be treated by control plane as success.
@@ -187,7 +187,7 @@ If pageseserver is lost then the deleted tenant should be attached to different
##### Restrictions for tenant that is in progress of being deleted
I propose to add another state to tenant/timeline - PendingDelete. This state shouldnt allow executing any operations aside from polling the deletion status.
I propose to add another state to tenant/timeline - PendingDelete. This state shouldn't allow executing any operations aside from polling the deletion status.
#### Summary
@@ -237,7 +237,7 @@ New branch gets created
PS1 starts up (is it possible or we just recycle it?)
PS1 is unaware of the new branch. It can either fall back to s3 ls, or ask control plane.
So here comes the dependency of storage on control plane. During restart storage needs to know which timelines are valid for operation. If there is nothing on s3 that can answer that question storage neeeds to ask control plane.
So here comes the dependency of storage on control plane. During restart storage needs to know which timelines are valid for operation. If there is nothing on s3 that can answer that question storage needs to ask control plane.
### Summary
@@ -250,7 +250,7 @@ Cons:
Pros:
- Easier to reason about if you dont have to account for pageserver restarts
- Easier to reason about if you don't have to account for pageserver restarts
### Extra notes
@@ -262,7 +262,7 @@ Delayed deletion can be done with both approaches. As discussed with Anna (@step
After discussion in comments I see that we settled on two options (though a bit different from ones described in rfc). First one is the same - pageserver owns as much as possible. The second option is that pageserver owns markers thing, but actual deletion happens in control plane by repeatedly calling ls + delete.
To my mind the only benefit of the latter approach is possible code reuse between safekeepers and pageservers. Otherwise poking around integrating s3 library into control plane, configuring shared knowledge abouth paths in s3 - are the downsides. Another downside of relying on control plane is the testing process. Control plane resides in different repository so it is quite hard to test pageserver related changes there. e2e test suite there doesnt support shutting down pageservers, which are separate docker containers there instead of just processes.
To my mind the only benefit of the latter approach is possible code reuse between safekeepers and pageservers. Otherwise poking around integrating s3 library into control plane, configuring shared knowledge about paths in s3 - are the downsides. Another downside of relying on control plane is the testing process. Control plane resides in different repository so it is quite hard to test pageserver related changes there. e2e test suite there doesn't support shutting down pageservers, which are separate docker containers there instead of just processes.
With pageserver owning everything we still give the retry logic to control plane but its easier to duplicate if needed compared to sharing inner s3 workings. We will have needed tests for retry logic in neon repo.

View File

@@ -75,7 +75,7 @@ sequenceDiagram
```
At this point it is not possible to restore the state from index, it contains L2 which
is no longer available in s3 and doesnt contain L3 added by compaction by the
is no longer available in s3 and doesn't contain L3 added by compaction by the
first pageserver. So if any of the pageservers restart, initial sync will fail
(or in on-demand world it will fail a bit later during page request from
missing layer)
@@ -171,7 +171,7 @@ sequenceDiagram
Another problem is a possibility of concurrent branch creation calls.
I e during migration create_branch can be called on old pageserver and newly created branch wont be seen on new pageserver. Prior art includes prototyping an approach of trying to mirror such branches, but currently it lost its importance, because now attach is fast because we dont need to download all data, and additionally to the best of my knowledge of control plane internals (cc @ololobus to confirm) operations on one project are executed sequentially, so it is not possible to have such case. So branch create operation will be executed only when relocation is completed. As a safety measure we can forbid branch creation for tenants that are in readonly remote state.
I e during migration create_branch can be called on old pageserver and newly created branch wont be seen on new pageserver. Prior art includes prototyping an approach of trying to mirror such branches, but currently it lost its importance, because now attach is fast because we don't need to download all data, and additionally to the best of my knowledge of control plane internals (cc @ololobus to confirm) operations on one project are executed sequentially, so it is not possible to have such case. So branch create operation will be executed only when relocation is completed. As a safety measure we can forbid branch creation for tenants that are in readonly remote state.
## Simplistic approach

View File

@@ -55,7 +55,7 @@ When PostgreSQL requests a file, `compute_ctl` downloads it.
PostgreSQL requests files in the following cases:
- When loading a preload library set in `local_preload_libraries`
- When explicitly loading a library with `LOAD`
- Wnen creating extension with `CREATE EXTENSION` (download sql scripts, (optional) extension data files and (optional) library files)))
- When creating extension with `CREATE EXTENSION` (download sql scripts, (optional) extension data files and (optional) library files)))
#### Summary

View File

@@ -26,7 +26,7 @@ plane guarantee prevents robust response to failures, as if a pageserver is unre
we may not detach from it. The mechanism in this RFC fixes this, by making it safe to
attach to a new, different pageserver even if an unresponsive pageserver may be running.
Futher, lack of safety during split-brain conditions blocks two important features where occasional
Further lack of safety during split-brain conditions blocks two important features where occasional
split-brain conditions are part of the design assumptions:
- seamless tenant migration ([RFC PR](https://github.com/neondatabase/neon/pull/5029))
@@ -490,11 +490,11 @@ The above makes it safe for control plane to change the assignment of
tenant to pageserver in control plane while a timeline creation is ongoing.
The reason is that the creation request against the new assigned pageserver
uses a new generation number. However, care must be taken by control plane
to ensure that a "timeline creation successul" response from some pageserver
to ensure that a "timeline creation successful" response from some pageserver
is checked for the pageserver's generation for that timeline's tenant still being the latest.
If it is not the latest, the response does not constitute a successful timeline creation.
It is acceptable to discard such responses, the scrubber will clean up the S3 state.
It is better to issue a timelien deletion request to the stale attachment.
It is better to issue a timeline deletion request to the stale attachment.
#### Timeline Deletion
@@ -633,7 +633,7 @@ As outlined in the Part 1 on correctness, it is critical that deletions are only
executed once the key is not referenced anywhere in S3.
This property is obviously upheld by the scheme above.
#### We Accept Object Leakage In Acceptable Circumcstances
#### We Accept Object Leakage In Acceptable Circumstances
If we crash in the flow above between (2) and (3), we lose track of unreferenced object.
Further, enqueuing a single to the persistent queue may not be durable immediately to amortize cost of flush to disk.

View File

@@ -162,7 +162,7 @@ struct Tenant {
...
txns: HashMap<TxnId, Transaction>,
// the most recently started txn's id; only most recently sarted can win
// the most recently started txn's id; only most recently started can win
next_winner_txn: Option<TxnId>,
}
struct Transaction {
@@ -186,7 +186,7 @@ A transaction T in state Committed has subsequent transactions that may or may n
So, for garbage collection, we need to assess transactions in state Committed and RejectAcknowledged:
- Commited: delete objects on the deadlist.
- Committed: delete objects on the deadlist.
- We dont need a LIST request here, the deadlist is sufficient. So, its really cheap.
- This is **not true MVCC garbage collection**; by deleting the objects on Committed transaction T s deadlist, we might delete data referenced by other transactions that were concurrent with T, i.e., they started while T was still open. However, the fact that T is committed means that the other transactions are RejectPending or RejectAcknowledged, so, they dont matter. Pageservers executing these doomed RejectPending transactions must handle 404 for GETs gracefully, e.g., by trying to commit txn so they observe the rejection theyre destined to get anyways. 404s for RejectAcknowledged is handled below.
- RejectAcknowledged: delete all objects created in that txn, and discard deadlists.
@@ -242,15 +242,15 @@ If a pageserver is unresponsive from Control Planes / Computes perspective
At this point, availability is restored and user pain relieved.
Whats left is to somehow close the doomed transaction of the unresponsive pageserver, so that it beomes RejectAcknowledged, and GC can make progress. Since S3 is cheap, we can afford to wait a really long time here, especially if we put a soft bound on the amount of data a transaction may produce before it must commit. Procedure:
Whats left is to somehow close the doomed transaction of the unresponsive pageserver, so that it becomes RejectAcknowledged, and GC can make progress. Since S3 is cheap, we can afford to wait a really long time here, especially if we put a soft bound on the amount of data a transaction may produce before it must commit. Procedure:
1. Ensure the unresponsive pageserver is taken out of rotation for new attachments. That probably should happen as part of the routine above.
2. Make a human operator investigate decide what to do (next morning, NO ONCALL ALERT):
1. Inspect the instance, investigate logs, understand root cause.
2. Try to re-establish connectivity between pageserver and Control Plane so that pageserver can retry commits, get rejected, ack rejection ⇒ enable GC.
3. Use below procedure to decomission pageserver.
3. Use below procedure to decommission pageserver.
### Decomissioning A Pageserver (Dead or Alive-but-Unrespsonive)
### Decommissioning A Pageserver (Dead or Alive-but-Unresponsive)
The solution, enabled by this proposal:
@@ -310,7 +310,7 @@ Issues that we discussed:
1. In abstract terms, this proposal provides a linearized history for a given S3 prefix.
2. In concrete terms, this proposal provides a linearized history per tenant.
3. There can be multiple writers at a given time, but only one of them will win to become part of the linearized history.
4. ************************************************************************************Alternative ideas mentioned during meetings that should be turned into a written prospoal like this one:************************************************************************************
4. ************************************************************************************Alternative ideas mentioned during meetings that should be turned into a written proposal like this one:************************************************************************************
1. @Dmitry Rodionov : having linearized storage of index_part.json in some database that allows serializable transactions / atomic compare-and-swap PUT
2. @Dmitry Rodionov :
3. @Stas : something like this scheme, but somehow find a way to equate attachment duration with transaction duration, without losing work if pageserver dies months after attachment.

View File

@@ -54,7 +54,7 @@ If the compaction algorithm doesn't change between the two compaction runs, is d
*However*:
1. the file size of the overwritten L1s may not be identical, and
2. the bit pattern of the overwritten L1s may not be identical, and,
3. in the future, we may want to make the compaction code non-determinstic, influenced by past access patterns, or otherwise change it, resulting in L1 overwrites with a different set of delta records than before the overwrite
3. in the future, we may want to make the compaction code non-deterministic, influenced by past access patterns, or otherwise change it, resulting in L1 overwrites with a different set of delta records than before the overwrite
The items above are a problem for the [split-brain protection RFC](https://github.com/neondatabase/neon/pull/4919) because it assumes that layer files in S3 are only ever deleted, but never replaced (overPUTted).
@@ -63,7 +63,7 @@ But node B based its world view on the version of node A's `index_part.json` fro
That earlier `index_part.json`` contained the file size of the pre-overwrite L1.
If the overwritten L1 has a different file size, node B will refuse to read data from the overwritten L1.
Effectively, the data in the L1 has become inaccessible to node B.
If node B already uploaded an index part itself, all subsequent attachments will use node B's index part, and run into the same probem.
If node B already uploaded an index part itself, all subsequent attachments will use node B's index part, and run into the same problem.
If we ever introduce checksums instead of checking just the file size, then a mismatching bit pattern (2) will cause similar problems.
@@ -121,7 +121,7 @@ Multi-object changes that previously created and removed files in timeline dir a
* atomic `index_part.json` update in S3, as per guarantee that S3 PUT is atomic
* local timeline dir state:
* irrelevant for layer map content => irrelevant for atomic updates / crash consistency
* if we crash after index part PUT, local layer files will be used, so, no on-demand downloads neede for them
* if we crash after index part PUT, local layer files will be used, so, no on-demand downloads needed for them
* if we crash before index part PUT, local layer files will be deleted
## Trade-Offs
@@ -140,7 +140,7 @@ Assuming upload queue allows for unlimited queue depth (that's what it does toda
* wal ingest: currently unbounded
* L0 => L1 compaction: CPU time proportional to `O(sum(L0 size))` and upload work proportional to `O()`
* Compaction threshold is 10 L0s and each L0 can be up to 256M in size. Target size for L1 is 128M.
* In practive, most L0s are tiny due to 10minute `DEFAULT_CHECKPOINT_TIMEOUT`.
* In practice, most L0s are tiny due to 10minute `DEFAULT_CHECKPOINT_TIMEOUT`.
* image layer generation: CPU time `O(sum(input data))` + upload work `O(sum(new image layer size))`
* I have no intuition how expensive / long-running it is in reality.
* gc: `update_gc_info`` work (not substantial, AFAIK)
@@ -158,7 +158,7 @@ Pageserver crashes are very rare ; it would likely be acceptable to re-do the lo
However, regular pageserver restart happen frequently, e.g., during weekly deploys.
In general, pageserver restart faces the problem of tenants that "take too long" to shut down.
They are a problem because other tenants that shut down quickly are unavailble while we wait for the slow tenants to shut down.
They are a problem because other tenants that shut down quickly are unavailable while we wait for the slow tenants to shut down.
We currently allot 10 seconds for graceful shutdown until we SIGKILL the pageserver process (as per `pageserver.service` unit file).
A longer budget would expose tenants that are done early to a longer downtime.
A short budget would risk throwing away more work that'd have to be re-done after restart.
@@ -236,7 +236,7 @@ tenants/$tenant/timelines/$timeline/$key_and_lsn_range
tenants/$tenant/timelines/$timeline/$layer_file_id-$key_and_lsn_range
```
To guarantee uniqueness, the unqiue number is a sequence number, stored in `index_part.json`.
To guarantee uniqueness, the unique number is a sequence number, stored in `index_part.json`.
This alternative does not solve atomic layer map updates.
In our crash-during-compaction scenario above, the compaction run after the crash will not overwrite the L1s, but write/PUT new files with new sequence numbers.
@@ -246,11 +246,11 @@ We'd need to write a deduplication pass that checks if perfectly overlapping lay
However, this alternative is appealing because it systematically prevents overwrites at a lower level than this RFC.
So, this alternative is sufficient for the needs of the split-brain safety RFC (immutable layer files locally and in S3).
But it doesn't solve the problems with crash-during-compaction outlined earlier in this RFC, and in fact, makes it much more accute.
But it doesn't solve the problems with crash-during-compaction outlined earlier in this RFC, and in fact, makes it much more acute.
The proposed design in this RFC addresses both.
So, if this alternative sounds appealing, we should implement the proposal in this RFC first, then implement this alternative on top.
That way, we avoid a phase where the crash-during-compaction problem is accute.
That way, we avoid a phase where the crash-during-compaction problem is acute.
## Related issues

View File

@@ -596,4 +596,4 @@ pageservers are updated to be aware of it.
As well as simplifying implementation, putting heatmaps in S3 will be useful
for future analytics purposes -- gathering aggregated statistics on activity
pattersn across many tenants may be done directly from data in S3.
patterns across many tenants may be done directly from data in S3.

View File

@@ -147,7 +147,7 @@ Separating corrupt writes from non-corrupt ones is a hard problem in general,
and if the application was involved in making the corrupt write, a recovery
would also involve the application. Therefore, corruption that has made it into
the WAL is outside of the scope of this feature. However, the WAL replay can be
issued to right before the point in time where the corruption occured. Then the
issued to right before the point in time where the corruption occurred. Then the
data loss is isolated to post-corruption writes only.
## Impacted components (e.g. pageserver, safekeeper, console, etc)
@@ -161,7 +161,7 @@ limits and billing we apply to existing timelines.
## Proposed implementation
The first problem to keep in mind is the reproducability of `initdb`.
The first problem to keep in mind is the reproducibility of `initdb`.
So an initial step would be to upload `initdb` snapshots to S3.
After that, we'd have the endpoint spawn a background process which

View File

@@ -69,7 +69,7 @@ However, unlike above, an ideal solution will
* This means, read each `DiskBtree` page at most once.
* Facilitate merging of the reads we issue to the OS and eventually NVMe.
Each of these items above represents a signficant amount of work.
Each of these items above represents a significant amount of work.
## Performance

View File

@@ -0,0 +1,135 @@
# Postgres aux files storage
In the current Neon architecture, compute nodes can only persist data through WAL replication to storage nodes. This method covers most of the data that Postgres persists, but not all. Exceptions include:
* Replication slots
* Postgres statistics files (`pg_stat/pgstat.stat`)
* `pg_stat_statements` disk state
* `pg_prewarm` disk state
It may also be beneficial to store observability data, such as compute metrics and page access histograms, in the per-endpoint storage.
## Aux file types
The mentioned files have different requirements regarding durability, exhibit different I/O patterns, vary in expected size, and have different desired behaviors with respect to point-in-time recovery and read-only replicas.
### Replication Slots
Replication slots persist data about the replication receiver on the replication source. In the case of chain replication, these files could be modified by a replica (because it may have another attached replica). The ability to modify this state on a replica is the main reason for this data to be kept in a separate file instead of in the usual relation (ignoring hint bits, replicas can't modify relation data).
* Stale file: Not allowed, as Postgres might have already pruned WAL or vacuumed old catalog entries (relevant in the case of logical replication).
* Lost file: Postgres will start, but replication will not be able to proceed.
* Point-in-time recovery (PITR): Resetting a slot to a previous point in time is meaningful in that it allows replication to continue from that point. However, it's hard to envision a use case where this would be useful, as the replica has to be reset in a similar fashion. Considering that a stale slot will cause WAL and catalog to grow without garbage collection, we decided to delete slots upon branch creation. This decision aligns with Postgres documentation's suggestion to omit `pg_replslot` from backups.
There are several types of files:
* Slot state, `pg_replslot/<slot_name>/state`, fixed size, about 200 bytes; updated on checkpoint.
* LogicalRewriteHeap, `pg_logical/mappings/map-*`, size varies with the size of the mapping; updated on checkpoint.
* SnapBuild state, `pg_logical/snapshots/*`, fixed structure plus an array of transaction IDs, size can reach up to kilobytes; updated on checkpoint or with running_xact WAL record.
* ReplicationOrigin, `pg_logical/replorigin_checkpoint`, fixed size, capped at 10 bytes; updated on checkpoint.
An important point is that these files could be modified on the replica in cases of chain replication or logical decoding at standby. Our approach with WAL logging comes at the expense of not being able to support chain replication and logical decoding at standby.
### PGStat: Postgres' cumulative statistics system
* Stale file: Not acceptable, as Postgres discards it on a dirty stop.
* Lost file: Postgres will work, but there may be a potential impact on query plans' quality and autovacuum performance.
* PITR: Resetting pgstat is sensible if we can retrieve a relevant version of the file. Since dump/restore operations are only performed on stop/start, we might need to modify Postgres code to dump the file periodically and handle an outdated file on start, which seems more practical than trying to WAL-log changes to stats.
Files are written only on shutdown and read only on startup. Their size depends on the number of relations and columns, typically ranging from 100KB to 1MB, but can exceed that. In my tests, the database had stats file around 1KB and it compressed 10x with gzip.
Stored in `pg_stat/pgstat.stat`.
### Files written by extensions
Potentially any extension can write files upon shutdown that it may want to retrieve upon startup.
PostgreSQL extensions, like pg_stat_statements, can manage persistent data across server restarts by writing to files during shutdown and reading from them during startup. This is achieved through PostgreSQL's support for extension hooks and background workers that can execute custom code at specific points in the server's lifecycle, including startup and shutdown.
*Writing Data on Shutdown:* Extensions can use the shmem_exit hook to register a function that PostgreSQL will call during its shutdown sequence. In the case of pg_stat_statements, it registers a function that serializes the collected SQL statement statistics to a file (e.g., pg_stat/pg_stat_statements.stat). This serialization process involves writing the data to a disk in a format that the extension can later read.
See
https://github.com/neondatabase/postgres/blob/f7ea954989a2e7901f858779cff55259f203479a/contrib/pg_stat_statements/pg_stat_statements.c#L556
*Reading Data on Startup:* Upon server startup, after the extension is loaded, it can read the previously saved file to restore the statistics. This is typically done by registering a shmem_startup_hook in the _PG_init function of the extension, which is called when the extension is loaded into the PostgreSQL server process. The extension checks for the presence of its data file and loads it to initialize its in-memory data structures with the saved statistics.
See
https://github.com/neondatabase/postgres/blob/f7ea954989a2e7901f858779cff55259f203479a/contrib/pg_stat_statements/pg_stat_statements.c#L461
*Ensuring Data Integrity:* To ensure data integrity across unexpected shutdowns, extensions like pg_stat_statements can also leverage PostgreSQL's WAL (Write-Ahead Logging) for critical data that must survive crashes. However, for performance statistics, a simple file-based persistence mechanism is often sufficient and involves less overhead.
The following important extensions are supported by Neon and we could either provide an extension specific mechanism to survive their AUX files or write a generic mechanism that can support more extensions in their hooks.
#### pg_stat_statements
* Stale file: Not currently supported, but could be added if desired.
* Lost file: Results in lost stats, but everything else will work.
The file is only saved on a clean shutdown and read on startup. The maximum size depends on the `pgss_max` setting (the maximum amount of tracked queries), with the default value being 5000. Using `sqlsmith` to generate random SQL queries and fill the statement cache to 4967 entries, the pg_stat/pg_stat_statements.stat file was 12MB and compressed to 2.1MB with gzip.
#### pg_prewarm
No stale file support currently, but it could be added if needed.
This data is only saved on a clean shutdown and read on startup, stored in the `autoprewarm.blocks` file. The size is `NBuffers * 20 bytes`. `NBuffers` is the number of 8KB pages in shared buffers. Thus, for 16GB shared buffers, this file would be approximately 40MB and will grow linearly with the size of the shared buffers. We should note that in Neon the size of shared buffers might change between supend and start due to autoscaling.
## Summary
Files associated with replication slots differ from others in two main aspects:
1. Durability Requirement: The durability requirement for replication slot files is higher. Losing a replication slot file means it cannot be regenerated, and replication will consequently break
2. Access Pattern: Access to these files occurs while Postgres is running, not just at startup and shutdown.
For statistics files, the stakes for durability are lower since these files can be regenerated, albeit with a performance cost.
For all of that files Point-In-Time Recovery (PITR) is not strictly necessary. It would be a nice-to-have feature but is not essential.
## Possible storages
### s3fs
Mounting an `/<branch_id>/<endpoint_id>` directory into each running compute node is a viable option.
Pros:
* Eliminates the need to modify Postgres code with special file access routines for WAL logging on write or some API access on read/write.
* The file system API provides built-in access laziness, meaning prewarm, stats, and PGSS files don't need to be downloaded before database startup (background workers can read them as needed).
Cons:
* Mount time adds to start latency. Some slot files must be downloaded for Postgres to start, requiring careful batch access to prevent slow starts with thousands of small files. TODO: A detailed list of files that can block start needs to be compiled. TODO: check if snapshot files need to be read at startup.
It not clear how to exactly mount S3 in our setup. We should restrict access to only a certain prefix, e.g. `/<branch_id>/<endpoint_id>`. Then we have to mount it somehow. AWS has `s3-csi`, but it is mounted on pod start and with our approach of pre-created computes we don't know endpoint/tenant during pod start. So we have to manage mount inside pod. That means setting `cap_admin` on pod. So not sure how to do that with pod. On the othe hand things are easier with NeonVM: in each pod we have VM and we have root access to it, so we can mount S3.
*Alternative:*
* create one filesystem for all tenants but encrypt its content with tenant-specific keys (using stackable encryption system like eCryptFS that can be mounted on top of CSI filesystem)
* mount the filesystem on pod start
* configure each tenant with the eCryptFS encryption key for its prefix
* on compute start the tenant mounts the eCryptFS for its' prefix files
TODO: Determine how to share credentials with the VM and restrict access to a specific prefix.
Note: S3FS is not suited for general-purpose file systems due to its limitations, such as no in-place updates and high latency. However, it is suitable for the files discussed here.
### EBS
EBS is per-Availability Zone (AZ), introducing a dependency on AZ for branch/endpoint, which is undesirable.
### EFS
Built on top of NFS, EFS could be a potential solution but shares similar mounting issues with S3FS. Restricting access to specific paths is more challenging, and past issues with Vector on EFS leading to corrupted files (possibly due to Vector) make it a less favorable option.
### Logical messages + basebackup
This is our current approach.
Pros:
* Eliminates the need for an additional service, whether internal or external.
Cons:
* Lacks start-up laziness as all files are loaded via base backup
* Does not support file updates on replicas, which are necessary for chain replication and logical decoding on standby
### Centralized service
Pros:
* Flexible
Cons:
* Requires instrumentation for all access, both reads and writes, complicating the implementation.

View File

@@ -21,7 +21,7 @@ implementation where we keep more data than we would need to, do not
change the synthetic size or incur any costs to the user.
The synthetic size is calculated for the whole project. It is not
straighforward to attribute size to individual branches. See "What is
straightforward to attribute size to individual branches. See "What is
the size of an individual branch?" for discussion on those
difficulties.
@@ -248,7 +248,7 @@ and truncate the WAL.
Synthetic size is calculated for the whole project, and includes all
branches. There is no such thing as the size of a branch, because it
is not straighforward to attribute the parts of size to individual
is not straightforward to attribute the parts of size to individual
branches.
## Example: attributing size to branches

View File

@@ -127,6 +127,10 @@ impl JwtAuth {
Ok(Self::new(decoding_keys))
}
pub fn from_key(key: String) -> Result<Self> {
Ok(Self::new(vec![DecodingKey::from_ed_pem(key.as_bytes())?]))
}
/// Attempt to decode the token with the internal decoding keys.
///
/// The function tries the stored decoding keys in succession,

View File

@@ -682,7 +682,7 @@ async fn get_lsn_by_timestamp_handler(
let result = timeline
.find_lsn_for_timestamp(timestamp_pg, &cancel, &ctx)
.await?;
#[derive(serde::Serialize)]
#[derive(serde::Serialize, Debug)]
struct Result {
lsn: Lsn,
kind: &'static str,
@@ -693,7 +693,14 @@ async fn get_lsn_by_timestamp_handler(
LsnForTimestamp::Past(lsn) => (lsn, "past"),
LsnForTimestamp::NoData(lsn) => (lsn, "nodata"),
};
json_response(StatusCode::OK, Result { lsn, kind })
let result = Result { lsn, kind };
tracing::info!(
lsn=?result.lsn,
kind=%result.kind,
timestamp=%timestamp_raw,
"lsn_by_timestamp finished"
);
json_response(StatusCode::OK, result)
}
async fn get_timestamp_of_lsn_handler(

View File

@@ -124,7 +124,7 @@ pub(super) enum FlushLoopState {
/// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Hole {
pub(crate) struct Hole {
key_range: Range<Key>,
coverage_size: usize,
}
@@ -565,19 +565,19 @@ impl From<GetReadyAncestorError> for PageReconstructError {
/// Public interface functions
impl Timeline {
/// Get the LSN where this branch was created
pub fn get_ancestor_lsn(&self) -> Lsn {
pub(crate) fn get_ancestor_lsn(&self) -> Lsn {
self.ancestor_lsn
}
/// Get the ancestor's timeline id
pub fn get_ancestor_timeline_id(&self) -> Option<TimelineId> {
pub(crate) fn get_ancestor_timeline_id(&self) -> Option<TimelineId> {
self.ancestor_timeline
.as_ref()
.map(|ancestor| ancestor.timeline_id)
}
/// Lock and get timeline's GC cutoff
pub fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
pub(crate) fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
self.latest_gc_cutoff_lsn.read()
}
@@ -733,27 +733,27 @@ impl Timeline {
}
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
pub fn get_last_record_lsn(&self) -> Lsn {
pub(crate) fn get_last_record_lsn(&self) -> Lsn {
self.last_record_lsn.load().last
}
pub fn get_prev_record_lsn(&self) -> Lsn {
pub(crate) fn get_prev_record_lsn(&self) -> Lsn {
self.last_record_lsn.load().prev
}
/// Atomically get both last and prev.
pub fn get_last_record_rlsn(&self) -> RecordLsn {
pub(crate) fn get_last_record_rlsn(&self) -> RecordLsn {
self.last_record_lsn.load()
}
pub fn get_disk_consistent_lsn(&self) -> Lsn {
pub(crate) fn get_disk_consistent_lsn(&self) -> Lsn {
self.disk_consistent_lsn.load()
}
/// remote_consistent_lsn from the perspective of the tenant's current generation,
/// not validated with control plane yet.
/// See [`Self::get_remote_consistent_lsn_visible`].
pub fn get_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
pub(crate) fn get_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
if let Some(remote_client) = &self.remote_client {
remote_client.remote_consistent_lsn_projected()
} else {
@@ -764,7 +764,7 @@ impl Timeline {
/// remote_consistent_lsn which the tenant is guaranteed not to go backward from,
/// i.e. a value of remote_consistent_lsn_projected which has undergone
/// generation validation in the deletion queue.
pub fn get_remote_consistent_lsn_visible(&self) -> Option<Lsn> {
pub(crate) fn get_remote_consistent_lsn_visible(&self) -> Option<Lsn> {
if let Some(remote_client) = &self.remote_client {
remote_client.remote_consistent_lsn_visible()
} else {
@@ -775,7 +775,7 @@ impl Timeline {
/// The sum of the file size of all historic layers in the layer map.
/// This method makes no distinction between local and remote layers.
/// Hence, the result **does not represent local filesystem usage**.
pub async fn layer_size_sum(&self) -> u64 {
pub(crate) async fn layer_size_sum(&self) -> u64 {
let guard = self.layers.read().await;
let layer_map = guard.layer_map();
let mut size = 0;
@@ -785,7 +785,7 @@ impl Timeline {
size
}
pub fn resident_physical_size(&self) -> u64 {
pub(crate) fn resident_physical_size(&self) -> u64 {
self.metrics.resident_physical_size_get()
}
@@ -861,7 +861,7 @@ impl Timeline {
}
/// Check that it is valid to request operations with that lsn.
pub fn check_lsn_is_in_scope(
pub(crate) fn check_lsn_is_in_scope(
&self,
lsn: Lsn,
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
@@ -877,7 +877,7 @@ impl Timeline {
/// Flush to disk all data that was written with the put_* functions
#[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
pub async fn freeze_and_flush(&self) -> anyhow::Result<()> {
pub(crate) async fn freeze_and_flush(&self) -> anyhow::Result<()> {
self.freeze_inmem_layer(false).await;
self.flush_frozen_layers_and_wait().await
}
@@ -1021,7 +1021,7 @@ impl Timeline {
}
/// Mutate the timeline with a [`TimelineWriter`].
pub async fn writer(&self) -> TimelineWriter<'_> {
pub(crate) async fn writer(&self) -> TimelineWriter<'_> {
TimelineWriter {
tl: self,
_write_guard: self.write_lock.lock().await,
@@ -1033,7 +1033,7 @@ impl Timeline {
///
/// Also flush after a period of time without new data -- it helps
/// safekeepers to regard pageserver as caught up and suspend activity.
pub async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
pub(crate) async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
let last_lsn = self.get_last_record_lsn();
let open_layer_size = {
let guard = self.layers.read().await;
@@ -1071,13 +1071,16 @@ impl Timeline {
Ok(())
}
pub fn activate(
pub(crate) fn activate(
self: &Arc<Self>,
broker_client: BrokerClientChannel,
background_jobs_can_start: Option<&completion::Barrier>,
ctx: &RequestContext,
) {
self.spawn_initial_logical_size_computation_task(ctx);
if self.tenant_shard_id.is_zero() {
// Logical size is only maintained accurately on shard zero.
self.spawn_initial_logical_size_computation_task(ctx);
}
self.launch_wal_receiver(ctx, broker_client);
self.set_state(TimelineState::Active);
self.launch_eviction_task(background_jobs_can_start);
@@ -1172,7 +1175,7 @@ impl Timeline {
self.gate.close().await;
}
pub fn set_state(&self, new_state: TimelineState) {
pub(crate) fn set_state(&self, new_state: TimelineState) {
match (self.current_state(), new_state) {
(equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
info!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
@@ -1192,7 +1195,7 @@ impl Timeline {
}
}
pub fn set_broken(&self, reason: String) {
pub(crate) fn set_broken(&self, reason: String) {
let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
let broken_state = TimelineState::Broken {
reason,
@@ -1206,27 +1209,27 @@ impl Timeline {
self.cancel.cancel();
}
pub fn current_state(&self) -> TimelineState {
pub(crate) fn current_state(&self) -> TimelineState {
self.state.borrow().clone()
}
pub fn is_broken(&self) -> bool {
pub(crate) fn is_broken(&self) -> bool {
matches!(&*self.state.borrow(), TimelineState::Broken { .. })
}
pub fn is_active(&self) -> bool {
pub(crate) fn is_active(&self) -> bool {
self.current_state() == TimelineState::Active
}
pub fn is_stopping(&self) -> bool {
pub(crate) fn is_stopping(&self) -> bool {
self.current_state() == TimelineState::Stopping
}
pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
pub(crate) fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
self.state.subscribe()
}
pub async fn wait_to_become_active(
pub(crate) async fn wait_to_become_active(
&self,
_ctx: &RequestContext, // Prepare for use by cancellation
) -> Result<(), TimelineState> {
@@ -1251,7 +1254,7 @@ impl Timeline {
}
}
pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
pub(crate) async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
let guard = self.layers.read().await;
let layer_map = guard.layer_map();
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
@@ -1275,7 +1278,10 @@ impl Timeline {
}
#[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
pub(crate) async fn download_layer(
&self,
layer_file_name: &str,
) -> anyhow::Result<Option<bool>> {
let Some(layer) = self.find_layer(layer_file_name).await else {
return Ok(None);
};
@@ -1292,7 +1298,7 @@ impl Timeline {
/// Evict just one layer.
///
/// Returns `Ok(None)` in the case where the layer could not be found by its `layer_file_name`.
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
pub(crate) async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let _gate = self
.gate
.enter()
@@ -1315,7 +1321,7 @@ const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
// Private functions
impl Timeline {
pub fn get_lazy_slru_download(&self) -> bool {
pub(crate) fn get_lazy_slru_download(&self) -> bool {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.lazy_slru_download
@@ -1852,6 +1858,12 @@ impl Timeline {
priority: GetLogicalSizePriority,
ctx: &RequestContext,
) -> logical_size::CurrentLogicalSize {
if !self.tenant_shard_id.is_zero() {
// Logical size is only accurately maintained on shard zero: when called elsewhere, for example
// when HTTP API is serving a GET for timeline zero, return zero
return logical_size::CurrentLogicalSize::Approximate(logical_size::Approximate::zero());
}
let current_size = self.current_logical_size.current_size();
debug!("Current size: {current_size:?}");
@@ -2094,7 +2106,7 @@ impl Timeline {
.expect("only this task sets it");
}
pub fn spawn_ondemand_logical_size_calculation(
pub(crate) fn spawn_ondemand_logical_size_calculation(
self: &Arc<Self>,
lsn: Lsn,
cause: LogicalSizeCalculationCause,
@@ -2140,6 +2152,9 @@ impl Timeline {
ctx: &RequestContext,
) -> Result<u64, CalculateLogicalSizeError> {
span::debug_assert_current_span_has_tenant_and_timeline_id();
// We should never be calculating logical sizes on shard !=0, because these shards do not have
// accurate relation sizes, and they do not emit consumption metrics.
debug_assert!(self.tenant_shard_id.is_zero());
let _guard = self.gate.enter();
@@ -2173,7 +2188,7 @@ impl Timeline {
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub async fn calculate_logical_size(
async fn calculate_logical_size(
&self,
up_to_lsn: Lsn,
cause: LogicalSizeCalculationCause,
@@ -2828,6 +2843,7 @@ impl Timeline {
}
/// Flush one frozen in-memory layer to disk, as a new delta layer.
#[instrument(skip_all, fields(layer=%frozen_layer))]
async fn flush_frozen_layer(
self: &Arc<Self>,
frozen_layer: Arc<InMemoryLayer>,
@@ -3422,7 +3438,7 @@ enum DurationRecorder {
}
impl DurationRecorder {
pub fn till_now(&self) -> DurationRecorder {
fn till_now(&self) -> DurationRecorder {
match self {
DurationRecorder::NotStarted => {
panic!("must only call on recorded measurements")
@@ -3433,7 +3449,7 @@ impl DurationRecorder {
}
}
}
pub fn into_recorded(self) -> Option<RecordedDuration> {
fn into_recorded(self) -> Option<RecordedDuration> {
match self {
DurationRecorder::NotStarted => None,
DurationRecorder::Recorded(recorded, _) => Some(recorded),
@@ -4373,10 +4389,6 @@ impl Timeline {
guard.finish_gc_timeline(&gc_layers);
if result.layers_removed != 0 {
fail_point!("after-timeline-gc-removed-layers");
}
#[cfg(feature = "testing")]
{
result.doomed_layers = gc_layers;
@@ -4633,7 +4645,9 @@ impl Timeline {
}
}
pub fn get_download_all_remote_layers_task_info(&self) -> Option<DownloadRemoteLayersTaskInfo> {
pub(crate) fn get_download_all_remote_layers_task_info(
&self,
) -> Option<DownloadRemoteLayersTaskInfo> {
self.download_all_remote_layers_task_info
.read()
.unwrap()
@@ -4729,7 +4743,7 @@ fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageRecon
// TODO Currently, Deref is used to allow easy access to read methods from this trait.
// This is probably considered a bad practice in Rust and should be fixed eventually,
// but will cause large code changes.
pub struct TimelineWriter<'a> {
pub(crate) struct TimelineWriter<'a> {
tl: &'a Timeline,
_write_guard: tokio::sync::MutexGuard<'a, ()>,
}
@@ -4747,7 +4761,7 @@ impl<'a> TimelineWriter<'a> {
///
/// This will implicitly extend the relation, if the page is beyond the
/// current end-of-file.
pub async fn put(
pub(crate) async fn put(
&self,
key: Key,
lsn: Lsn,

View File

@@ -319,6 +319,13 @@ impl Timeline {
cancel: &CancellationToken,
ctx: &RequestContext,
) -> ControlFlow<()> {
if !self.tenant_shard_id.is_zero() {
// Shards !=0 do not maintain accurate relation sizes, and do not need to calculate logical size
// for consumption metrics (consumption metrics are only sent from shard 0). We may therefore
// skip imitating logical size accesses for eviction purposes.
return ControlFlow::Continue(());
}
let mut state = self.eviction_task_timeline_state.lock().await;
// Only do the imitate_layer accesses approximately as often as the threshold. A little

View File

@@ -101,6 +101,14 @@ impl From<&Exact> for u64 {
}
}
impl Approximate {
/// For use in situations where we don't have a sane logical size value but need
/// to return something, e.g. in HTTP API on shard >0 of a sharded tenant.
pub(crate) fn zero() -> Self {
Self(0)
}
}
impl CurrentLogicalSize {
pub(crate) fn size_dont_care_about_accuracy(&self) -> u64 {
match self {

View File

@@ -426,13 +426,21 @@ pub(super) async fn handle_walreceiver_connection(
// Send the replication feedback message.
// Regular standby_status_update fields are put into this message.
let current_timeline_size = timeline
.get_current_logical_size(
crate::tenant::timeline::GetLogicalSizePriority::User,
&ctx,
)
// FIXME: https://github.com/neondatabase/neon/issues/5963
.size_dont_care_about_accuracy();
let current_timeline_size = if timeline.tenant_shard_id.is_zero() {
timeline
.get_current_logical_size(
crate::tenant::timeline::GetLogicalSizePriority::User,
&ctx,
)
// FIXME: https://github.com/neondatabase/neon/issues/5963
.size_dont_care_about_accuracy()
} else {
// Non-zero shards send zero for logical size. The safekeeper will ignore
// this number. This is because in a sharded tenant, only shard zero maintains
// accurate logical size.
0
};
let status_update = PageserverFeedback {
current_timeline_size,
last_received_lsn,

View File

@@ -1,60 +0,0 @@
From de3dd0cd034d2bcc12b456171ce163bdc1f4cb65 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Thu, 1 Feb 2024 17:42:31 +0200
Subject: [PATCH 1/1] Make v0.6.0 work with Neon
Now that the WAL-logging happens as a separate step at the end of the
build, we need a few neon-specific hints to make it work.
---
src/hnswbuild.c | 28 ++++++++++++++++++++++++++++
1 file changed, 28 insertions(+)
diff --git a/src/hnswbuild.c b/src/hnswbuild.c
index 680789b..bfa657a 100644
--- a/src/hnswbuild.c
+++ b/src/hnswbuild.c
@@ -1089,13 +1089,41 @@ BuildIndex(Relation heap, Relation index, IndexInfo *indexInfo,
SeedRandom(42);
#endif
+#ifdef NEON_SMGR
+ smgr_start_unlogged_build(RelationGetSmgr(index));
+#endif
+
InitBuildState(buildstate, heap, index, indexInfo, forkNum);
BuildGraph(buildstate, forkNum);
+#ifdef NEON_SMGR
+ smgr_finish_unlogged_build_phase_1(RelationGetSmgr(index));
+#endif
+
if (RelationNeedsWAL(index))
+ {
log_newpage_range(index, forkNum, 0, RelationGetNumberOfBlocks(index), true);
+#ifdef NEON_SMGR
+ {
+#if PG_VERSION_NUM >= 160000
+ RelFileLocator rlocator = RelationGetSmgr(index)->smgr_rlocator.locator;
+#else
+ RelFileNode rlocator = RelationGetSmgr(index)->smgr_rnode.node;
+#endif
+
+ SetLastWrittenLSNForBlockRange(XactLastRecEnd, rlocator,
+ MAIN_FORKNUM, 0, RelationGetNumberOfBlocks(index));
+ SetLastWrittenLSNForRelation(XactLastRecEnd, rlocator, MAIN_FORKNUM);
+ }
+#endif
+ }
+
+#ifdef NEON_SMGR
+ smgr_end_unlogged_build(RelationGetSmgr(index));
+#endif
+
FreeBuildState(buildstate);
}
--
2.39.2

View File

@@ -804,6 +804,9 @@ ApplyRecord(StringInfo input_message)
ErrorContextCallback errcallback;
#if PG_VERSION_NUM >= 150000
DecodedXLogRecord *decoded;
#define STATIC_DECODEBUF_SIZE (64 * 1024)
static char *static_decodebuf = NULL;
size_t required_space;
#endif
/*
@@ -833,7 +836,19 @@ ApplyRecord(StringInfo input_message)
XLogBeginRead(reader_state, lsn);
#if PG_VERSION_NUM >= 150000
decoded = (DecodedXLogRecord *) XLogReadRecordAlloc(reader_state, record->xl_tot_len, true);
/*
* For reasonably small records, reuse a fixed size buffer to reduce
* palloc overhead.
*/
required_space = DecodeXLogRecordRequiredSpace(record->xl_tot_len);
if (required_space <= STATIC_DECODEBUF_SIZE)
{
if (static_decodebuf == NULL)
static_decodebuf = MemoryContextAlloc(TopMemoryContext, STATIC_DECODEBUF_SIZE);
decoded = (DecodedXLogRecord *) static_decodebuf;
}
else
decoded = palloc(required_space);
if (!DecodeXLogRecord(reader_state, decoded, record, lsn, &errormsg))
elog(ERROR, "failed to decode WAL record: %s", errormsg);
@@ -842,37 +857,15 @@ ApplyRecord(StringInfo input_message)
/* Record the location of the next record. */
decoded->next_lsn = reader_state->NextRecPtr;
/*
* If it's in the decode buffer, mark the decode buffer space as
* occupied.
*/
if (!decoded->oversized)
{
/* The new decode buffer head must be MAXALIGNed. */
Assert(decoded->size == MAXALIGN(decoded->size));
if ((char *) decoded == reader_state->decode_buffer)
reader_state->decode_buffer_tail = reader_state->decode_buffer + decoded->size;
else
reader_state->decode_buffer_tail += decoded->size;
}
/* Insert it into the queue of decoded records. */
Assert(reader_state->decode_queue_tail != decoded);
if (reader_state->decode_queue_tail)
reader_state->decode_queue_tail->next = decoded;
reader_state->decode_queue_tail = decoded;
if (!reader_state->decode_queue_head)
reader_state->decode_queue_head = decoded;
/*
* Update the pointers to the beginning and one-past-the-end of this
* record, again for the benefit of historical code that expected the
* decoder to track this rather than accessing these fields of the record
* itself.
*/
reader_state->record = reader_state->decode_queue_head;
reader_state->ReadRecPtr = reader_state->record->lsn;
reader_state->EndRecPtr = reader_state->record->next_lsn;
reader_state->record = decoded;
reader_state->ReadRecPtr = decoded->lsn;
reader_state->EndRecPtr = decoded->next_lsn;
}
#else
/*
@@ -912,8 +905,9 @@ ApplyRecord(StringInfo input_message)
elog(TRACE, "applied WAL record with LSN %X/%X",
(uint32) (lsn >> 32), (uint32) lsn);
#if PG_VERSION_NUM >= 150000
if (decoded && decoded->oversized)
if ((char *) decoded != static_decodebuf)
pfree(decoded);
#endif
}

View File

@@ -12,8 +12,7 @@ use crate::console::errors::GetAuthInfoError;
use crate::console::provider::{CachedRoleSecret, ConsoleBackend};
use crate::console::AuthSecret;
use crate::context::RequestMonitoring;
use crate::proxy::connect_compute::handle_try_wake;
use crate::proxy::retry::retry_after;
use crate::proxy::wake_compute::wake_compute;
use crate::proxy::NeonOptions;
use crate::stream::Stream;
use crate::{
@@ -28,11 +27,26 @@ use crate::{
};
use crate::{scram, EndpointCacheKey, EndpointId, RoleName};
use futures::TryFutureExt;
use std::borrow::Cow;
use std::ops::ControlFlow;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{error, info, warn};
use tracing::info;
/// Alternative to [`std::borrow::Cow`] but doesn't need `T: ToOwned` as we don't need that functionality
pub enum MaybeOwned<'a, T> {
Owned(T),
Borrowed(&'a T),
}
impl<T> std::ops::Deref for MaybeOwned<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
match self {
MaybeOwned::Owned(t) => t,
MaybeOwned::Borrowed(t) => t,
}
}
}
/// This type serves two purposes:
///
@@ -44,12 +58,9 @@ use tracing::{error, info, warn};
/// backends which require them for the authentication process.
pub enum BackendType<'a, T> {
/// Cloud API (V2).
Console(Cow<'a, ConsoleBackend>, T),
Console(MaybeOwned<'a, ConsoleBackend>, T),
/// Authentication via a web browser.
Link(Cow<'a, url::ApiUrl>),
#[cfg(test)]
/// Test backend.
Test(&'a dyn TestBackend),
Link(MaybeOwned<'a, url::ApiUrl>),
}
pub trait TestBackend: Send + Sync + 'static {
@@ -67,14 +78,14 @@ impl std::fmt::Display for BackendType<'_, ()> {
ConsoleBackend::Console(endpoint) => {
fmt.debug_tuple("Console").field(&endpoint.url()).finish()
}
#[cfg(feature = "testing")]
#[cfg(any(test, feature = "testing"))]
ConsoleBackend::Postgres(endpoint) => {
fmt.debug_tuple("Postgres").field(&endpoint.url()).finish()
}
#[cfg(test)]
ConsoleBackend::Test(_) => fmt.debug_tuple("Test").finish(),
},
Link(url) => fmt.debug_tuple("Link").field(&url.as_str()).finish(),
#[cfg(test)]
Test(_) => fmt.debug_tuple("Test").finish(),
}
}
}
@@ -85,10 +96,8 @@ impl<T> BackendType<'_, T> {
pub fn as_ref(&self) -> BackendType<'_, &T> {
use BackendType::*;
match self {
Console(c, x) => Console(Cow::Borrowed(c), x),
Link(c) => Link(Cow::Borrowed(c)),
#[cfg(test)]
Test(x) => Test(*x),
Console(c, x) => Console(MaybeOwned::Borrowed(c), x),
Link(c) => Link(MaybeOwned::Borrowed(c)),
}
}
}
@@ -102,8 +111,6 @@ impl<'a, T> BackendType<'a, T> {
match self {
Console(c, x) => Console(c, f(x)),
Link(c) => Link(c),
#[cfg(test)]
Test(x) => Test(x),
}
}
}
@@ -116,8 +123,6 @@ impl<'a, T, E> BackendType<'a, Result<T, E>> {
match self {
Console(c, x) => x.map(|x| Console(c, x)),
Link(c) => Ok(Link(c)),
#[cfg(test)]
Test(x) => Ok(Test(x)),
}
}
}
@@ -147,7 +152,7 @@ impl ComputeUserInfo {
}
pub enum ComputeCredentialKeys {
#[cfg(feature = "testing")]
#[cfg(any(test, feature = "testing"))]
Password(Vec<u8>),
AuthKeys(AuthKeys),
}
@@ -277,42 +282,6 @@ async fn authenticate_with_secret(
classic::authenticate(info, client, config, &mut ctx.latency_timer, secret).await
}
/// wake a compute (or retrieve an existing compute session from cache)
async fn wake_compute(
ctx: &mut RequestMonitoring,
api: &impl console::Api,
compute_credentials: ComputeCredentials<ComputeCredentialKeys>,
) -> auth::Result<(CachedNodeInfo, ComputeUserInfo)> {
let mut num_retries = 0;
let mut node = loop {
let wake_res = api.wake_compute(ctx, &compute_credentials.info).await;
match handle_try_wake(wake_res, num_retries) {
Err(e) => {
error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node");
return Err(e.into());
}
Ok(ControlFlow::Continue(e)) => {
warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node");
}
Ok(ControlFlow::Break(n)) => break n,
}
let wait_duration = retry_after(num_retries);
num_retries += 1;
tokio::time::sleep(wait_duration).await;
};
ctx.set_project(node.aux.clone());
match compute_credentials.keys {
#[cfg(feature = "testing")]
ComputeCredentialKeys::Password(password) => node.config.password(password),
ComputeCredentialKeys::AuthKeys(auth_keys) => node.config.auth_keys(auth_keys),
};
Ok((node, compute_credentials.info))
}
impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint> {
/// Get compute endpoint name from the credentials.
pub fn get_endpoint(&self) -> Option<EndpointId> {
@@ -321,8 +290,6 @@ impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint> {
match self {
Console(_, user_info) => user_info.endpoint_id.clone(),
Link(_) => Some("link".into()),
#[cfg(test)]
Test(_) => Some("test".into()),
}
}
@@ -333,8 +300,6 @@ impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint> {
match self {
Console(_, user_info) => &user_info.user,
Link(_) => "link",
#[cfg(test)]
Test(_) => "test",
}
}
@@ -359,8 +324,20 @@ impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint> {
let compute_credentials =
auth_quirks(ctx, &*api, user_info, client, allow_cleartext, config).await?;
let (cache_info, user_info) = wake_compute(ctx, &*api, compute_credentials).await?;
(cache_info, BackendType::Console(api, user_info))
let mut num_retries = 0;
let mut node =
wake_compute(&mut num_retries, ctx, &api, &compute_credentials.info).await?;
ctx.set_project(node.aux.clone());
match compute_credentials.keys {
#[cfg(any(test, feature = "testing"))]
ComputeCredentialKeys::Password(password) => node.config.password(password),
ComputeCredentialKeys::AuthKeys(auth_keys) => node.config.auth_keys(auth_keys),
};
(node, BackendType::Console(api, compute_credentials.info))
}
// NOTE: this auth backend doesn't use client credentials.
Link(url) => {
@@ -373,10 +350,6 @@ impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint> {
BackendType::Link(url),
)
}
#[cfg(test)]
Test(_) => {
unreachable!("this function should never be called in the test backend")
}
};
info!("user successfully authenticated");
@@ -393,8 +366,6 @@ impl BackendType<'_, ComputeUserInfo> {
match self {
Console(api, user_info) => api.get_allowed_ips_and_secret(ctx, user_info).await,
Link(_) => Ok((Cached::new_uncached(Arc::new(vec![])), None)),
#[cfg(test)]
Test(x) => x.get_allowed_ips_and_secret(),
}
}
@@ -409,8 +380,6 @@ impl BackendType<'_, ComputeUserInfo> {
match self {
Console(api, user_info) => api.wake_compute(ctx, user_info).map_ok(Some).await,
Link(_) => Ok(None),
#[cfg(test)]
Test(x) => x.wake_compute().map(Some),
}
}
}

View File

@@ -20,7 +20,7 @@ pub(super) async fn authenticate(
) -> auth::Result<ComputeCredentials<ComputeCredentialKeys>> {
let flow = AuthFlow::new(client);
let scram_keys = match secret {
#[cfg(feature = "testing")]
#[cfg(any(test, feature = "testing"))]
AuthSecret::Md5(_) => {
info!("auth endpoint chooses MD5");
return Err(auth::AuthError::bad_auth_method("MD5"));

View File

@@ -172,7 +172,7 @@ pub(super) fn validate_password_and_exchange(
secret: AuthSecret,
) -> super::Result<sasl::Outcome<ComputeCredentialKeys>> {
match secret {
#[cfg(feature = "testing")]
#[cfg(any(test, feature = "testing"))]
AuthSecret::Md5(_) => {
// test only
Ok(sasl::Outcome::Success(ComputeCredentialKeys::Password(

View File

@@ -1,5 +1,6 @@
use futures::future::Either;
use proxy::auth;
use proxy::auth::backend::MaybeOwned;
use proxy::config::AuthenticationConfig;
use proxy::config::CacheOptions;
use proxy::config::HttpConfig;
@@ -17,9 +18,9 @@ use proxy::usage_metrics;
use anyhow::bail;
use proxy::config::{self, ProxyConfig};
use proxy::serverless;
use std::net::SocketAddr;
use std::pin::pin;
use std::sync::Arc;
use std::{borrow::Cow, net::SocketAddr};
use tokio::net::TcpListener;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
@@ -259,18 +260,13 @@ async fn main() -> anyhow::Result<()> {
}
if let auth::BackendType::Console(api, _) = &config.auth_backend {
match &**api {
proxy::console::provider::ConsoleBackend::Console(api) => {
let cache = api.caches.project_info.clone();
if let Some(url) = args.redis_notifications {
info!("Starting redis notifications listener ({url})");
maintenance_tasks
.spawn(notifications::task_main(url.to_owned(), cache.clone()));
}
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
if let proxy::console::provider::ConsoleBackend::Console(api) = &**api {
let cache = api.caches.project_info.clone();
if let Some(url) = args.redis_notifications {
info!("Starting redis notifications listener ({url})");
maintenance_tasks.spawn(notifications::task_main(url.to_owned(), cache.clone()));
}
#[cfg(feature = "testing")]
proxy::console::provider::ConsoleBackend::Postgres(_) => {}
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
}
}
@@ -369,18 +365,18 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
let api = console::provider::neon::Api::new(endpoint, caches, locks);
let api = console::provider::ConsoleBackend::Console(api);
auth::BackendType::Console(Cow::Owned(api), ())
auth::BackendType::Console(MaybeOwned::Owned(api), ())
}
#[cfg(feature = "testing")]
AuthBackend::Postgres => {
let url = args.auth_endpoint.parse()?;
let api = console::provider::mock::Api::new(url);
let api = console::provider::ConsoleBackend::Postgres(api);
auth::BackendType::Console(Cow::Owned(api), ())
auth::BackendType::Console(MaybeOwned::Owned(api), ())
}
AuthBackend::Link => {
let url = args.uri.parse()?;
auth::BackendType::Link(Cow::Owned(url))
auth::BackendType::Link(MaybeOwned::Owned(url))
}
};
let http_config = HttpConfig {

View File

@@ -1,4 +1,4 @@
#[cfg(feature = "testing")]
#[cfg(any(test, feature = "testing"))]
pub mod mock;
pub mod neon;
@@ -199,7 +199,7 @@ pub mod errors {
/// Auth secret which is managed by the cloud.
#[derive(Clone, Eq, PartialEq, Debug)]
pub enum AuthSecret {
#[cfg(feature = "testing")]
#[cfg(any(test, feature = "testing"))]
/// Md5 hash of user's password.
Md5([u8; 16]),
@@ -264,13 +264,16 @@ pub trait Api {
) -> Result<CachedNodeInfo, errors::WakeComputeError>;
}
#[derive(Clone)]
#[non_exhaustive]
pub enum ConsoleBackend {
/// Current Cloud API (V2).
Console(neon::Api),
/// Local mock of Cloud API (V2).
#[cfg(feature = "testing")]
#[cfg(any(test, feature = "testing"))]
Postgres(mock::Api),
/// Internal testing
#[cfg(test)]
Test(Box<dyn crate::auth::backend::TestBackend>),
}
#[async_trait]
@@ -283,8 +286,10 @@ impl Api for ConsoleBackend {
use ConsoleBackend::*;
match self {
Console(api) => api.get_role_secret(ctx, user_info).await,
#[cfg(feature = "testing")]
#[cfg(any(test, feature = "testing"))]
Postgres(api) => api.get_role_secret(ctx, user_info).await,
#[cfg(test)]
Test(_) => unreachable!("this function should never be called in the test backend"),
}
}
@@ -296,8 +301,10 @@ impl Api for ConsoleBackend {
use ConsoleBackend::*;
match self {
Console(api) => api.get_allowed_ips_and_secret(ctx, user_info).await,
#[cfg(feature = "testing")]
#[cfg(any(test, feature = "testing"))]
Postgres(api) => api.get_allowed_ips_and_secret(ctx, user_info).await,
#[cfg(test)]
Test(api) => api.get_allowed_ips_and_secret(),
}
}
@@ -310,8 +317,10 @@ impl Api for ConsoleBackend {
match self {
Console(api) => api.wake_compute(ctx, user_info).await,
#[cfg(feature = "testing")]
#[cfg(any(test, feature = "testing"))]
Postgres(api) => api.wake_compute(ctx, user_info).await,
#[cfg(test)]
Test(api) => api.wake_compute(),
}
}
}

View File

@@ -19,7 +19,6 @@ use tokio::time::Instant;
use tokio_postgres::config::SslMode;
use tracing::{error, info, info_span, warn, Instrument};
#[derive(Clone)]
pub struct Api {
endpoint: http::Endpoint,
pub caches: &'static ApiCaches,

View File

@@ -1,7 +1,7 @@
use std::{sync::Arc, time::SystemTime};
use anyhow::Context;
use bytes::BytesMut;
use bytes::{buf::Writer, BufMut, BytesMut};
use chrono::{Datelike, Timelike};
use futures::{Stream, StreamExt};
use parquet::{
@@ -192,8 +192,9 @@ async fn worker_inner(
let mut rows = Vec::with_capacity(config.rows_per_group);
let schema = rows.as_slice().schema()?;
let file = BytesWriter::default();
let mut w = SerializedFileWriter::new(file, schema.clone(), config.propeties.clone())?;
let buffer = BytesMut::new();
let w = buffer.writer();
let mut w = SerializedFileWriter::new(w, schema.clone(), config.propeties.clone())?;
let mut last_upload = time::Instant::now();
@@ -221,20 +222,23 @@ async fn worker_inner(
}
if !w.flushed_row_groups().is_empty() {
let _: BytesWriter = upload_parquet(w, len, &storage).await?;
let _: Writer<BytesMut> = upload_parquet(w, len, &storage).await?;
}
Ok(())
}
async fn flush_rows(
async fn flush_rows<W>(
rows: Vec<RequestData>,
mut w: SerializedFileWriter<BytesWriter>,
mut w: SerializedFileWriter<W>,
) -> anyhow::Result<(
Vec<RequestData>,
SerializedFileWriter<BytesWriter>,
SerializedFileWriter<W>,
RowGroupMetaDataPtr,
)> {
)>
where
W: std::io::Write + Send + 'static,
{
let span = Span::current();
let (mut rows, w, rg_meta) = tokio::task::spawn_blocking(move || {
let _enter = span.enter();
@@ -258,10 +262,10 @@ async fn flush_rows(
}
async fn upload_parquet(
w: SerializedFileWriter<BytesWriter>,
w: SerializedFileWriter<Writer<BytesMut>>,
len: i64,
storage: &GenericRemoteStorage,
) -> anyhow::Result<BytesWriter> {
) -> anyhow::Result<Writer<BytesMut>> {
let len_uncompressed = w
.flushed_row_groups()
.iter()
@@ -270,11 +274,12 @@ async fn upload_parquet(
// I don't know how compute intensive this is, although it probably isn't much... better be safe than sorry.
// finish method only available on the fork: https://github.com/apache/arrow-rs/issues/5253
let (mut file, metadata) = tokio::task::spawn_blocking(move || w.finish())
let (writer, metadata) = tokio::task::spawn_blocking(move || w.finish())
.await
.unwrap()?;
let data = file.buf.split().freeze();
let mut buffer = writer.into_inner();
let data = buffer.split().freeze();
let compression = len as f64 / len_uncompressed as f64;
let size = data.len();
@@ -315,24 +320,7 @@ async fn upload_parquet(
.await
.context("request_data_upload")?;
Ok(file)
}
// why doesn't BytesMut impl io::Write?
#[derive(Default)]
struct BytesWriter {
buf: BytesMut,
}
impl std::io::Write for BytesWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.buf.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
Ok(buffer.writer())
}
#[cfg(test)]

View File

@@ -5,6 +5,7 @@ pub mod connect_compute;
pub mod handshake;
pub mod passthrough;
pub mod retry;
pub mod wake_compute;
use crate::{
auth,

View File

@@ -1,15 +1,16 @@
use crate::{
auth,
compute::{self, PostgresConnection},
console::{self, errors::WakeComputeError, Api},
console::{self, errors::WakeComputeError},
context::RequestMonitoring,
metrics::{bool_to_str, NUM_CONNECTION_FAILURES, NUM_WAKEUP_FAILURES},
proxy::retry::{retry_after, ShouldRetry},
metrics::NUM_CONNECTION_FAILURES,
proxy::{
retry::{retry_after, ShouldRetry},
wake_compute::wake_compute,
},
};
use async_trait::async_trait;
use hyper::StatusCode;
use pq_proto::StartupMessageParams;
use std::ops::ControlFlow;
use tokio::time;
use tracing::{error, info, warn};
@@ -88,39 +89,6 @@ impl ConnectMechanism for TcpMechanism<'_> {
}
}
fn report_error(e: &WakeComputeError, retry: bool) {
use crate::console::errors::ApiError;
let retry = bool_to_str(retry);
let kind = match e {
WakeComputeError::BadComputeAddress(_) => "bad_compute_address",
WakeComputeError::ApiError(ApiError::Transport(_)) => "api_transport_error",
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::LOCKED,
ref text,
}) if text.contains("written data quota exceeded")
|| text.contains("the limit for current plan reached") =>
{
"quota_exceeded"
}
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::LOCKED,
..
}) => "api_console_locked",
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::BAD_REQUEST,
..
}) => "api_console_bad_request",
WakeComputeError::ApiError(ApiError::Console { status, .. })
if status.is_server_error() =>
{
"api_console_other_server_error"
}
WakeComputeError::ApiError(ApiError::Console { .. }) => "api_console_other_error",
WakeComputeError::TimeoutError => "timeout_error",
};
NUM_WAKEUP_FAILURES.with_label_values(&[retry, kind]).inc();
}
/// Try to connect to the compute node, retrying if necessary.
/// This function might update `node_info`, so we take it by `&mut`.
#[tracing::instrument(skip_all)]
@@ -137,7 +105,7 @@ where
mechanism.update_connect_config(&mut node_info.config);
// try once
let (config, err) = match mechanism
let err = match mechanism
.connect_once(ctx, &node_info, CONNECT_TIMEOUT)
.await
{
@@ -145,51 +113,27 @@ where
ctx.latency_timer.success();
return Ok(res);
}
Err(e) => {
error!(error = ?e, "could not connect to compute node");
(invalidate_cache(node_info), e)
}
Err(e) => e,
};
ctx.latency_timer.cache_miss();
error!(error = ?err, "could not connect to compute node");
let mut num_retries = 1;
// if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
info!("compute node's state has likely changed; requesting a wake-up");
let node_info = loop {
let wake_res = match user_info {
auth::BackendType::Console(api, user_info) => api.wake_compute(ctx, user_info).await,
// nothing to do?
auth::BackendType::Link(_) => return Err(err.into()),
// test backend
#[cfg(test)]
auth::BackendType::Test(x) => x.wake_compute(),
};
match user_info {
auth::BackendType::Console(api, info) => {
// if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
info!("compute node's state has likely changed; requesting a wake-up");
match handle_try_wake(wake_res, num_retries) {
Err(e) => {
error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node");
report_error(&e, false);
return Err(e.into());
}
// failed to wake up but we can continue to retry
Ok(ControlFlow::Continue(e)) => {
report_error(&e, true);
warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node");
}
// successfully woke up a compute node and can break the wakeup loop
Ok(ControlFlow::Break(mut node_info)) => {
node_info.config.reuse_password(&config);
mechanism.update_connect_config(&mut node_info.config);
break node_info;
}
ctx.latency_timer.cache_miss();
let config = invalidate_cache(node_info);
node_info = wake_compute(&mut num_retries, ctx, api, info).await?;
node_info.config.reuse_password(&config);
mechanism.update_connect_config(&mut node_info.config);
}
let wait_duration = retry_after(num_retries);
num_retries += 1;
time::sleep(wait_duration).await;
// nothing to do?
auth::BackendType::Link(_) => {}
};
// now that we have a new node, try connect to it repeatedly.
@@ -221,23 +165,3 @@ where
time::sleep(wait_duration).await;
}
}
/// Attempts to wake up the compute node.
/// * Returns Ok(Continue(e)) if there was an error waking but retries are acceptable
/// * Returns Ok(Break(node)) if the wakeup succeeded
/// * Returns Err(e) if there was an error
pub fn handle_try_wake(
result: Result<console::CachedNodeInfo, WakeComputeError>,
num_retries: u32,
) -> Result<ControlFlow<console::CachedNodeInfo, WakeComputeError>, WakeComputeError> {
match result {
Err(err) => match &err {
WakeComputeError::ApiError(api) if api.should_retry(num_retries) => {
Ok(ControlFlow::Continue(err))
}
_ => Err(err),
},
// Ready to try again.
Ok(new) => Ok(ControlFlow::Break(new)),
}
}

View File

@@ -5,9 +5,9 @@ mod mitm;
use super::connect_compute::ConnectMechanism;
use super::retry::ShouldRetry;
use super::*;
use crate::auth::backend::{ComputeUserInfo, TestBackend};
use crate::auth::backend::{ComputeUserInfo, MaybeOwned, TestBackend};
use crate::config::CertResolver;
use crate::console::provider::{CachedAllowedIps, CachedRoleSecret};
use crate::console::provider::{CachedAllowedIps, CachedRoleSecret, ConsoleBackend};
use crate::console::{self, CachedNodeInfo, NodeInfo};
use crate::proxy::retry::{retry_after, NUM_RETRIES_CONNECT};
use crate::{auth, http, sasl, scram};
@@ -371,6 +371,7 @@ enum ConnectAction {
Fail,
}
#[derive(Clone)]
struct TestConnectMechanism {
counter: Arc<std::sync::Mutex<usize>>,
sequence: Vec<ConnectAction>,
@@ -490,9 +491,16 @@ fn helper_create_cached_node_info() -> CachedNodeInfo {
fn helper_create_connect_info(
mechanism: &TestConnectMechanism,
) -> (CachedNodeInfo, auth::BackendType<'_, ComputeUserInfo>) {
) -> (CachedNodeInfo, auth::BackendType<'static, ComputeUserInfo>) {
let cache = helper_create_cached_node_info();
let user_info = auth::BackendType::Test(mechanism);
let user_info = auth::BackendType::Console(
MaybeOwned::Owned(ConsoleBackend::Test(Box::new(mechanism.clone()))),
ComputeUserInfo {
endpoint: "endpoint".into(),
user: "user".into(),
options: NeonOptions::parse_options_raw(""),
},
);
(cache, user_info)
}

View File

@@ -0,0 +1,95 @@
use crate::auth::backend::ComputeUserInfo;
use crate::console::{
errors::WakeComputeError,
provider::{CachedNodeInfo, ConsoleBackend},
Api,
};
use crate::context::RequestMonitoring;
use crate::metrics::{bool_to_str, NUM_WAKEUP_FAILURES};
use crate::proxy::retry::retry_after;
use hyper::StatusCode;
use std::ops::ControlFlow;
use tracing::{error, warn};
use super::retry::ShouldRetry;
/// wake a compute (or retrieve an existing compute session from cache)
pub async fn wake_compute(
num_retries: &mut u32,
ctx: &mut RequestMonitoring,
api: &ConsoleBackend,
info: &ComputeUserInfo,
) -> Result<CachedNodeInfo, WakeComputeError> {
loop {
let wake_res = api.wake_compute(ctx, info).await;
match handle_try_wake(wake_res, *num_retries) {
Err(e) => {
error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node");
report_error(&e, false);
return Err(e);
}
Ok(ControlFlow::Continue(e)) => {
warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node");
report_error(&e, true);
}
Ok(ControlFlow::Break(n)) => return Ok(n),
}
let wait_duration = retry_after(*num_retries);
*num_retries += 1;
tokio::time::sleep(wait_duration).await;
}
}
/// Attempts to wake up the compute node.
/// * Returns Ok(Continue(e)) if there was an error waking but retries are acceptable
/// * Returns Ok(Break(node)) if the wakeup succeeded
/// * Returns Err(e) if there was an error
pub fn handle_try_wake(
result: Result<CachedNodeInfo, WakeComputeError>,
num_retries: u32,
) -> Result<ControlFlow<CachedNodeInfo, WakeComputeError>, WakeComputeError> {
match result {
Err(err) => match &err {
WakeComputeError::ApiError(api) if api.should_retry(num_retries) => {
Ok(ControlFlow::Continue(err))
}
_ => Err(err),
},
// Ready to try again.
Ok(new) => Ok(ControlFlow::Break(new)),
}
}
fn report_error(e: &WakeComputeError, retry: bool) {
use crate::console::errors::ApiError;
let retry = bool_to_str(retry);
let kind = match e {
WakeComputeError::BadComputeAddress(_) => "bad_compute_address",
WakeComputeError::ApiError(ApiError::Transport(_)) => "api_transport_error",
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::LOCKED,
ref text,
}) if text.contains("written data quota exceeded")
|| text.contains("the limit for current plan reached") =>
{
"quota_exceeded"
}
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::LOCKED,
..
}) => "api_console_locked",
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::BAD_REQUEST,
..
}) => "api_console_bad_request",
WakeComputeError::ApiError(ApiError::Console { status, .. })
if status.is_server_error() =>
{
"api_console_other_server_error"
}
WakeComputeError::ApiError(ApiError::Console { .. }) => "api_console_other_error",
WakeComputeError::TimeoutError => "timeout_error",
};
NUM_WAKEUP_FAILURES.with_label_values(&[retry, kind]).inc();
}

View File

@@ -3,6 +3,7 @@
//! Handles both SQL over HTTP and SQL over Websockets.
mod conn_pool;
mod json;
mod sql_over_http;
mod websocket;

View File

@@ -0,0 +1,448 @@
use serde_json::Map;
use serde_json::Value;
use tokio_postgres::types::Kind;
use tokio_postgres::types::Type;
use tokio_postgres::Row;
//
// Convert json non-string types to strings, so that they can be passed to Postgres
// as parameters.
//
pub fn json_to_pg_text(json: Vec<Value>) -> Vec<Option<String>> {
json.iter()
.map(|value| {
match value {
// special care for nulls
Value::Null => None,
// convert to text with escaping
v @ (Value::Bool(_) | Value::Number(_) | Value::Object(_)) => Some(v.to_string()),
// avoid escaping here, as we pass this as a parameter
Value::String(s) => Some(s.to_string()),
// special care for arrays
Value::Array(_) => json_array_to_pg_array(value),
}
})
.collect()
}
//
// Serialize a JSON array to a Postgres array. Contrary to the strings in the params
// in the array we need to escape the strings. Postgres is okay with arrays of form
// '{1,"2",3}'::int[], so we don't check that array holds values of the same type, leaving
// it for Postgres to check.
//
// Example of the same escaping in node-postgres: packages/pg/lib/utils.js
//
fn json_array_to_pg_array(value: &Value) -> Option<String> {
match value {
// special care for nulls
Value::Null => None,
// convert to text with escaping
// here string needs to be escaped, as it is part of the array
v @ (Value::Bool(_) | Value::Number(_) | Value::String(_)) => Some(v.to_string()),
v @ Value::Object(_) => json_array_to_pg_array(&Value::String(v.to_string())),
// recurse into array
Value::Array(arr) => {
let vals = arr
.iter()
.map(json_array_to_pg_array)
.map(|v| v.unwrap_or_else(|| "NULL".to_string()))
.collect::<Vec<_>>()
.join(",");
Some(format!("{{{}}}", vals))
}
}
}
//
// Convert postgres row with text-encoded values to JSON object
//
pub fn pg_text_row_to_json(
row: &Row,
columns: &[Type],
raw_output: bool,
array_mode: bool,
) -> Result<Value, anyhow::Error> {
let iter = row
.columns()
.iter()
.zip(columns)
.enumerate()
.map(|(i, (column, typ))| {
let name = column.name();
let pg_value = row.as_text(i)?;
let json_value = if raw_output {
match pg_value {
Some(v) => Value::String(v.to_string()),
None => Value::Null,
}
} else {
pg_text_to_json(pg_value, typ)?
};
Ok((name.to_string(), json_value))
});
if array_mode {
// drop keys and aggregate into array
let arr = iter
.map(|r| r.map(|(_key, val)| val))
.collect::<Result<Vec<Value>, anyhow::Error>>()?;
Ok(Value::Array(arr))
} else {
let obj = iter.collect::<Result<Map<String, Value>, anyhow::Error>>()?;
Ok(Value::Object(obj))
}
}
//
// Convert postgres text-encoded value to JSON value
//
fn pg_text_to_json(pg_value: Option<&str>, pg_type: &Type) -> Result<Value, anyhow::Error> {
if let Some(val) = pg_value {
if let Kind::Array(elem_type) = pg_type.kind() {
return pg_array_parse(val, elem_type);
}
match *pg_type {
Type::BOOL => Ok(Value::Bool(val == "t")),
Type::INT2 | Type::INT4 => {
let val = val.parse::<i32>()?;
Ok(Value::Number(serde_json::Number::from(val)))
}
Type::FLOAT4 | Type::FLOAT8 => {
let fval = val.parse::<f64>()?;
let num = serde_json::Number::from_f64(fval);
if let Some(num) = num {
Ok(Value::Number(num))
} else {
// Pass Nan, Inf, -Inf as strings
// JS JSON.stringify() does converts them to null, but we
// want to preserve them, so we pass them as strings
Ok(Value::String(val.to_string()))
}
}
Type::JSON | Type::JSONB => Ok(serde_json::from_str(val)?),
_ => Ok(Value::String(val.to_string())),
}
} else {
Ok(Value::Null)
}
}
//
// Parse postgres array into JSON array.
//
// This is a bit involved because we need to handle nested arrays and quoted
// values. Unlike postgres we don't check that all nested arrays have the same
// dimensions, we just return them as is.
//
fn pg_array_parse(pg_array: &str, elem_type: &Type) -> Result<Value, anyhow::Error> {
_pg_array_parse(pg_array, elem_type, false).map(|(v, _)| v)
}
fn _pg_array_parse(
pg_array: &str,
elem_type: &Type,
nested: bool,
) -> Result<(Value, usize), anyhow::Error> {
let mut pg_array_chr = pg_array.char_indices();
let mut level = 0;
let mut quote = false;
let mut entries: Vec<Value> = Vec::new();
let mut entry = String::new();
// skip bounds decoration
if let Some('[') = pg_array.chars().next() {
for (_, c) in pg_array_chr.by_ref() {
if c == '=' {
break;
}
}
}
fn push_checked(
entry: &mut String,
entries: &mut Vec<Value>,
elem_type: &Type,
) -> Result<(), anyhow::Error> {
if !entry.is_empty() {
// While in usual postgres response we get nulls as None and everything else
// as Some(&str), in arrays we get NULL as unquoted 'NULL' string (while
// string with value 'NULL' will be represented by '"NULL"'). So catch NULLs
// here while we have quotation info and convert them to None.
if entry == "NULL" {
entries.push(pg_text_to_json(None, elem_type)?);
} else {
entries.push(pg_text_to_json(Some(entry), elem_type)?);
}
entry.clear();
}
Ok(())
}
while let Some((mut i, mut c)) = pg_array_chr.next() {
let mut escaped = false;
if c == '\\' {
escaped = true;
(i, c) = pg_array_chr.next().unwrap();
}
match c {
'{' if !quote => {
level += 1;
if level > 1 {
let (res, off) = _pg_array_parse(&pg_array[i..], elem_type, true)?;
entries.push(res);
for _ in 0..off - 1 {
pg_array_chr.next();
}
}
}
'}' if !quote => {
level -= 1;
if level == 0 {
push_checked(&mut entry, &mut entries, elem_type)?;
if nested {
return Ok((Value::Array(entries), i));
}
}
}
'"' if !escaped => {
if quote {
// end of quoted string, so push it manually without any checks
// for emptiness or nulls
entries.push(pg_text_to_json(Some(&entry), elem_type)?);
entry.clear();
}
quote = !quote;
}
',' if !quote => {
push_checked(&mut entry, &mut entries, elem_type)?;
}
_ => {
entry.push(c);
}
}
}
if level != 0 {
return Err(anyhow::anyhow!("unbalanced array"));
}
Ok((Value::Array(entries), 0))
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_atomic_types_to_pg_params() {
let json = vec![Value::Bool(true), Value::Bool(false)];
let pg_params = json_to_pg_text(json);
assert_eq!(
pg_params,
vec![Some("true".to_owned()), Some("false".to_owned())]
);
let json = vec![Value::Number(serde_json::Number::from(42))];
let pg_params = json_to_pg_text(json);
assert_eq!(pg_params, vec![Some("42".to_owned())]);
let json = vec![Value::String("foo\"".to_string())];
let pg_params = json_to_pg_text(json);
assert_eq!(pg_params, vec![Some("foo\"".to_owned())]);
let json = vec![Value::Null];
let pg_params = json_to_pg_text(json);
assert_eq!(pg_params, vec![None]);
}
#[test]
fn test_json_array_to_pg_array() {
// atoms and escaping
let json = "[true, false, null, \"NULL\", 42, \"foo\", \"bar\\\"-\\\\\"]";
let json: Value = serde_json::from_str(json).unwrap();
let pg_params = json_to_pg_text(vec![json]);
assert_eq!(
pg_params,
vec![Some(
"{true,false,NULL,\"NULL\",42,\"foo\",\"bar\\\"-\\\\\"}".to_owned()
)]
);
// nested arrays
let json = "[[true, false], [null, 42], [\"foo\", \"bar\\\"-\\\\\"]]";
let json: Value = serde_json::from_str(json).unwrap();
let pg_params = json_to_pg_text(vec![json]);
assert_eq!(
pg_params,
vec![Some(
"{{true,false},{NULL,42},{\"foo\",\"bar\\\"-\\\\\"}}".to_owned()
)]
);
// array of objects
let json = r#"[{"foo": 1},{"bar": 2}]"#;
let json: Value = serde_json::from_str(json).unwrap();
let pg_params = json_to_pg_text(vec![json]);
assert_eq!(
pg_params,
vec![Some(r#"{"{\"foo\":1}","{\"bar\":2}"}"#.to_owned())]
);
}
#[test]
fn test_atomic_types_parse() {
assert_eq!(
pg_text_to_json(Some("foo"), &Type::TEXT).unwrap(),
json!("foo")
);
assert_eq!(pg_text_to_json(None, &Type::TEXT).unwrap(), json!(null));
assert_eq!(pg_text_to_json(Some("42"), &Type::INT4).unwrap(), json!(42));
assert_eq!(pg_text_to_json(Some("42"), &Type::INT2).unwrap(), json!(42));
assert_eq!(
pg_text_to_json(Some("42"), &Type::INT8).unwrap(),
json!("42")
);
assert_eq!(
pg_text_to_json(Some("42.42"), &Type::FLOAT8).unwrap(),
json!(42.42)
);
assert_eq!(
pg_text_to_json(Some("42.42"), &Type::FLOAT4).unwrap(),
json!(42.42)
);
assert_eq!(
pg_text_to_json(Some("NaN"), &Type::FLOAT4).unwrap(),
json!("NaN")
);
assert_eq!(
pg_text_to_json(Some("Infinity"), &Type::FLOAT4).unwrap(),
json!("Infinity")
);
assert_eq!(
pg_text_to_json(Some("-Infinity"), &Type::FLOAT4).unwrap(),
json!("-Infinity")
);
let json: Value =
serde_json::from_str("{\"s\":\"str\",\"n\":42,\"f\":4.2,\"a\":[null,3,\"a\"]}")
.unwrap();
assert_eq!(
pg_text_to_json(
Some(r#"{"s":"str","n":42,"f":4.2,"a":[null,3,"a"]}"#),
&Type::JSONB
)
.unwrap(),
json
);
}
#[test]
fn test_pg_array_parse_text() {
fn pt(pg_arr: &str) -> Value {
pg_array_parse(pg_arr, &Type::TEXT).unwrap()
}
assert_eq!(
pt(r#"{"aa\"\\\,a",cha,"bbbb"}"#),
json!(["aa\"\\,a", "cha", "bbbb"])
);
assert_eq!(
pt(r#"{{"foo","bar"},{"bee","bop"}}"#),
json!([["foo", "bar"], ["bee", "bop"]])
);
assert_eq!(
pt(r#"{{{{"foo",NULL,"bop",bup}}}}"#),
json!([[[["foo", null, "bop", "bup"]]]])
);
assert_eq!(
pt(r#"{{"1",2,3},{4,NULL,6},{NULL,NULL,NULL}}"#),
json!([["1", "2", "3"], ["4", null, "6"], [null, null, null]])
);
}
#[test]
fn test_pg_array_parse_bool() {
fn pb(pg_arr: &str) -> Value {
pg_array_parse(pg_arr, &Type::BOOL).unwrap()
}
assert_eq!(pb(r#"{t,f,t}"#), json!([true, false, true]));
assert_eq!(pb(r#"{{t,f,t}}"#), json!([[true, false, true]]));
assert_eq!(
pb(r#"{{t,f},{f,t}}"#),
json!([[true, false], [false, true]])
);
assert_eq!(
pb(r#"{{t,NULL},{NULL,f}}"#),
json!([[true, null], [null, false]])
);
}
#[test]
fn test_pg_array_parse_numbers() {
fn pn(pg_arr: &str, ty: &Type) -> Value {
pg_array_parse(pg_arr, ty).unwrap()
}
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT4), json!([1, 2, 3]));
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT2), json!([1, 2, 3]));
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT8), json!(["1", "2", "3"]));
assert_eq!(pn(r#"{1,2,3}"#, &Type::FLOAT4), json!([1.0, 2.0, 3.0]));
assert_eq!(pn(r#"{1,2,3}"#, &Type::FLOAT8), json!([1.0, 2.0, 3.0]));
assert_eq!(
pn(r#"{1.1,2.2,3.3}"#, &Type::FLOAT4),
json!([1.1, 2.2, 3.3])
);
assert_eq!(
pn(r#"{1.1,2.2,3.3}"#, &Type::FLOAT8),
json!([1.1, 2.2, 3.3])
);
assert_eq!(
pn(r#"{NaN,Infinity,-Infinity}"#, &Type::FLOAT4),
json!(["NaN", "Infinity", "-Infinity"])
);
assert_eq!(
pn(r#"{NaN,Infinity,-Infinity}"#, &Type::FLOAT8),
json!(["NaN", "Infinity", "-Infinity"])
);
}
#[test]
fn test_pg_array_with_decoration() {
fn p(pg_arr: &str) -> Value {
pg_array_parse(pg_arr, &Type::INT2).unwrap()
}
assert_eq!(
p(r#"[1:1][-2:-1][3:5]={{{1,2,3},{4,5,6}}}"#),
json!([[[1, 2, 3], [4, 5, 6]]])
);
}
#[test]
fn test_pg_array_parse_json() {
fn pt(pg_arr: &str) -> Value {
pg_array_parse(pg_arr, &Type::JSONB).unwrap()
}
assert_eq!(pt(r#"{"{}"}"#), json!([{}]));
assert_eq!(
pt(r#"{"{\"foo\": 1, \"bar\": 2}"}"#),
json!([{"foo": 1, "bar": 2}])
);
assert_eq!(
pt(r#"{"{\"foo\": 1}", "{\"bar\": 2}"}"#),
json!([{"foo": 1}, {"bar": 2}])
);
assert_eq!(
pt(r#"{{"{\"foo\": 1}", "{\"bar\": 2}"}}"#),
json!([[{"foo": 1}, {"bar": 2}]])
);
}
}

View File

@@ -12,16 +12,12 @@ use hyper::Response;
use hyper::StatusCode;
use hyper::{Body, HeaderMap, Request};
use serde_json::json;
use serde_json::Map;
use serde_json::Value;
use tokio_postgres::error::DbError;
use tokio_postgres::error::ErrorPosition;
use tokio_postgres::types::Kind;
use tokio_postgres::types::Type;
use tokio_postgres::GenericClient;
use tokio_postgres::IsolationLevel;
use tokio_postgres::ReadyForQueryStatus;
use tokio_postgres::Row;
use tokio_postgres::Transaction;
use tracing::error;
use tracing::instrument;
@@ -40,6 +36,7 @@ use crate::RoleName;
use super::conn_pool::ConnInfo;
use super::conn_pool::GlobalConnPool;
use super::json::{json_to_pg_text, pg_text_row_to_json};
use super::SERVERLESS_DRIVER_SNI;
#[derive(serde::Deserialize)]
@@ -72,62 +69,6 @@ static TXN_DEFERRABLE: HeaderName = HeaderName::from_static("neon-batch-deferrab
static HEADER_VALUE_TRUE: HeaderValue = HeaderValue::from_static("true");
//
// Convert json non-string types to strings, so that they can be passed to Postgres
// as parameters.
//
fn json_to_pg_text(json: Vec<Value>) -> Vec<Option<String>> {
json.iter()
.map(|value| {
match value {
// special care for nulls
Value::Null => None,
// convert to text with escaping
v @ (Value::Bool(_) | Value::Number(_) | Value::Object(_)) => Some(v.to_string()),
// avoid escaping here, as we pass this as a parameter
Value::String(s) => Some(s.to_string()),
// special care for arrays
Value::Array(_) => json_array_to_pg_array(value),
}
})
.collect()
}
//
// Serialize a JSON array to a Postgres array. Contrary to the strings in the params
// in the array we need to escape the strings. Postgres is okay with arrays of form
// '{1,"2",3}'::int[], so we don't check that array holds values of the same type, leaving
// it for Postgres to check.
//
// Example of the same escaping in node-postgres: packages/pg/lib/utils.js
//
fn json_array_to_pg_array(value: &Value) -> Option<String> {
match value {
// special care for nulls
Value::Null => None,
// convert to text with escaping
// here string needs to be escaped, as it is part of the array
v @ (Value::Bool(_) | Value::Number(_) | Value::String(_)) => Some(v.to_string()),
v @ Value::Object(_) => json_array_to_pg_array(&Value::String(v.to_string())),
// recurse into array
Value::Array(arr) => {
let vals = arr
.iter()
.map(json_array_to_pg_array)
.map(|v| v.unwrap_or_else(|| "NULL".to_string()))
.collect::<Vec<_>>()
.join(",");
Some(format!("{{{}}}", vals))
}
}
}
fn get_conn_info(
ctx: &mut RequestMonitoring,
headers: &HeaderMap,
@@ -611,389 +552,3 @@ async fn query_to_json<T: GenericClient>(
}),
))
}
//
// Convert postgres row with text-encoded values to JSON object
//
pub fn pg_text_row_to_json(
row: &Row,
columns: &[Type],
raw_output: bool,
array_mode: bool,
) -> Result<Value, anyhow::Error> {
let iter = row
.columns()
.iter()
.zip(columns)
.enumerate()
.map(|(i, (column, typ))| {
let name = column.name();
let pg_value = row.as_text(i)?;
let json_value = if raw_output {
match pg_value {
Some(v) => Value::String(v.to_string()),
None => Value::Null,
}
} else {
pg_text_to_json(pg_value, typ)?
};
Ok((name.to_string(), json_value))
});
if array_mode {
// drop keys and aggregate into array
let arr = iter
.map(|r| r.map(|(_key, val)| val))
.collect::<Result<Vec<Value>, anyhow::Error>>()?;
Ok(Value::Array(arr))
} else {
let obj = iter.collect::<Result<Map<String, Value>, anyhow::Error>>()?;
Ok(Value::Object(obj))
}
}
//
// Convert postgres text-encoded value to JSON value
//
pub fn pg_text_to_json(pg_value: Option<&str>, pg_type: &Type) -> Result<Value, anyhow::Error> {
if let Some(val) = pg_value {
if let Kind::Array(elem_type) = pg_type.kind() {
return pg_array_parse(val, elem_type);
}
match *pg_type {
Type::BOOL => Ok(Value::Bool(val == "t")),
Type::INT2 | Type::INT4 => {
let val = val.parse::<i32>()?;
Ok(Value::Number(serde_json::Number::from(val)))
}
Type::FLOAT4 | Type::FLOAT8 => {
let fval = val.parse::<f64>()?;
let num = serde_json::Number::from_f64(fval);
if let Some(num) = num {
Ok(Value::Number(num))
} else {
// Pass Nan, Inf, -Inf as strings
// JS JSON.stringify() does converts them to null, but we
// want to preserve them, so we pass them as strings
Ok(Value::String(val.to_string()))
}
}
Type::JSON | Type::JSONB => Ok(serde_json::from_str(val)?),
_ => Ok(Value::String(val.to_string())),
}
} else {
Ok(Value::Null)
}
}
//
// Parse postgres array into JSON array.
//
// This is a bit involved because we need to handle nested arrays and quoted
// values. Unlike postgres we don't check that all nested arrays have the same
// dimensions, we just return them as is.
//
fn pg_array_parse(pg_array: &str, elem_type: &Type) -> Result<Value, anyhow::Error> {
_pg_array_parse(pg_array, elem_type, false).map(|(v, _)| v)
}
fn _pg_array_parse(
pg_array: &str,
elem_type: &Type,
nested: bool,
) -> Result<(Value, usize), anyhow::Error> {
let mut pg_array_chr = pg_array.char_indices();
let mut level = 0;
let mut quote = false;
let mut entries: Vec<Value> = Vec::new();
let mut entry = String::new();
// skip bounds decoration
if let Some('[') = pg_array.chars().next() {
for (_, c) in pg_array_chr.by_ref() {
if c == '=' {
break;
}
}
}
fn push_checked(
entry: &mut String,
entries: &mut Vec<Value>,
elem_type: &Type,
) -> Result<(), anyhow::Error> {
if !entry.is_empty() {
// While in usual postgres response we get nulls as None and everything else
// as Some(&str), in arrays we get NULL as unquoted 'NULL' string (while
// string with value 'NULL' will be represented by '"NULL"'). So catch NULLs
// here while we have quotation info and convert them to None.
if entry == "NULL" {
entries.push(pg_text_to_json(None, elem_type)?);
} else {
entries.push(pg_text_to_json(Some(entry), elem_type)?);
}
entry.clear();
}
Ok(())
}
while let Some((mut i, mut c)) = pg_array_chr.next() {
let mut escaped = false;
if c == '\\' {
escaped = true;
(i, c) = pg_array_chr.next().unwrap();
}
match c {
'{' if !quote => {
level += 1;
if level > 1 {
let (res, off) = _pg_array_parse(&pg_array[i..], elem_type, true)?;
entries.push(res);
for _ in 0..off - 1 {
pg_array_chr.next();
}
}
}
'}' if !quote => {
level -= 1;
if level == 0 {
push_checked(&mut entry, &mut entries, elem_type)?;
if nested {
return Ok((Value::Array(entries), i));
}
}
}
'"' if !escaped => {
if quote {
// end of quoted string, so push it manually without any checks
// for emptiness or nulls
entries.push(pg_text_to_json(Some(&entry), elem_type)?);
entry.clear();
}
quote = !quote;
}
',' if !quote => {
push_checked(&mut entry, &mut entries, elem_type)?;
}
_ => {
entry.push(c);
}
}
}
if level != 0 {
return Err(anyhow::anyhow!("unbalanced array"));
}
Ok((Value::Array(entries), 0))
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_atomic_types_to_pg_params() {
let json = vec![Value::Bool(true), Value::Bool(false)];
let pg_params = json_to_pg_text(json);
assert_eq!(
pg_params,
vec![Some("true".to_owned()), Some("false".to_owned())]
);
let json = vec![Value::Number(serde_json::Number::from(42))];
let pg_params = json_to_pg_text(json);
assert_eq!(pg_params, vec![Some("42".to_owned())]);
let json = vec![Value::String("foo\"".to_string())];
let pg_params = json_to_pg_text(json);
assert_eq!(pg_params, vec![Some("foo\"".to_owned())]);
let json = vec![Value::Null];
let pg_params = json_to_pg_text(json);
assert_eq!(pg_params, vec![None]);
}
#[test]
fn test_json_array_to_pg_array() {
// atoms and escaping
let json = "[true, false, null, \"NULL\", 42, \"foo\", \"bar\\\"-\\\\\"]";
let json: Value = serde_json::from_str(json).unwrap();
let pg_params = json_to_pg_text(vec![json]);
assert_eq!(
pg_params,
vec![Some(
"{true,false,NULL,\"NULL\",42,\"foo\",\"bar\\\"-\\\\\"}".to_owned()
)]
);
// nested arrays
let json = "[[true, false], [null, 42], [\"foo\", \"bar\\\"-\\\\\"]]";
let json: Value = serde_json::from_str(json).unwrap();
let pg_params = json_to_pg_text(vec![json]);
assert_eq!(
pg_params,
vec![Some(
"{{true,false},{NULL,42},{\"foo\",\"bar\\\"-\\\\\"}}".to_owned()
)]
);
// array of objects
let json = r#"[{"foo": 1},{"bar": 2}]"#;
let json: Value = serde_json::from_str(json).unwrap();
let pg_params = json_to_pg_text(vec![json]);
assert_eq!(
pg_params,
vec![Some(r#"{"{\"foo\":1}","{\"bar\":2}"}"#.to_owned())]
);
}
#[test]
fn test_atomic_types_parse() {
assert_eq!(
pg_text_to_json(Some("foo"), &Type::TEXT).unwrap(),
json!("foo")
);
assert_eq!(pg_text_to_json(None, &Type::TEXT).unwrap(), json!(null));
assert_eq!(pg_text_to_json(Some("42"), &Type::INT4).unwrap(), json!(42));
assert_eq!(pg_text_to_json(Some("42"), &Type::INT2).unwrap(), json!(42));
assert_eq!(
pg_text_to_json(Some("42"), &Type::INT8).unwrap(),
json!("42")
);
assert_eq!(
pg_text_to_json(Some("42.42"), &Type::FLOAT8).unwrap(),
json!(42.42)
);
assert_eq!(
pg_text_to_json(Some("42.42"), &Type::FLOAT4).unwrap(),
json!(42.42)
);
assert_eq!(
pg_text_to_json(Some("NaN"), &Type::FLOAT4).unwrap(),
json!("NaN")
);
assert_eq!(
pg_text_to_json(Some("Infinity"), &Type::FLOAT4).unwrap(),
json!("Infinity")
);
assert_eq!(
pg_text_to_json(Some("-Infinity"), &Type::FLOAT4).unwrap(),
json!("-Infinity")
);
let json: Value =
serde_json::from_str("{\"s\":\"str\",\"n\":42,\"f\":4.2,\"a\":[null,3,\"a\"]}")
.unwrap();
assert_eq!(
pg_text_to_json(
Some(r#"{"s":"str","n":42,"f":4.2,"a":[null,3,"a"]}"#),
&Type::JSONB
)
.unwrap(),
json
);
}
#[test]
fn test_pg_array_parse_text() {
fn pt(pg_arr: &str) -> Value {
pg_array_parse(pg_arr, &Type::TEXT).unwrap()
}
assert_eq!(
pt(r#"{"aa\"\\\,a",cha,"bbbb"}"#),
json!(["aa\"\\,a", "cha", "bbbb"])
);
assert_eq!(
pt(r#"{{"foo","bar"},{"bee","bop"}}"#),
json!([["foo", "bar"], ["bee", "bop"]])
);
assert_eq!(
pt(r#"{{{{"foo",NULL,"bop",bup}}}}"#),
json!([[[["foo", null, "bop", "bup"]]]])
);
assert_eq!(
pt(r#"{{"1",2,3},{4,NULL,6},{NULL,NULL,NULL}}"#),
json!([["1", "2", "3"], ["4", null, "6"], [null, null, null]])
);
}
#[test]
fn test_pg_array_parse_bool() {
fn pb(pg_arr: &str) -> Value {
pg_array_parse(pg_arr, &Type::BOOL).unwrap()
}
assert_eq!(pb(r#"{t,f,t}"#), json!([true, false, true]));
assert_eq!(pb(r#"{{t,f,t}}"#), json!([[true, false, true]]));
assert_eq!(
pb(r#"{{t,f},{f,t}}"#),
json!([[true, false], [false, true]])
);
assert_eq!(
pb(r#"{{t,NULL},{NULL,f}}"#),
json!([[true, null], [null, false]])
);
}
#[test]
fn test_pg_array_parse_numbers() {
fn pn(pg_arr: &str, ty: &Type) -> Value {
pg_array_parse(pg_arr, ty).unwrap()
}
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT4), json!([1, 2, 3]));
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT2), json!([1, 2, 3]));
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT8), json!(["1", "2", "3"]));
assert_eq!(pn(r#"{1,2,3}"#, &Type::FLOAT4), json!([1.0, 2.0, 3.0]));
assert_eq!(pn(r#"{1,2,3}"#, &Type::FLOAT8), json!([1.0, 2.0, 3.0]));
assert_eq!(
pn(r#"{1.1,2.2,3.3}"#, &Type::FLOAT4),
json!([1.1, 2.2, 3.3])
);
assert_eq!(
pn(r#"{1.1,2.2,3.3}"#, &Type::FLOAT8),
json!([1.1, 2.2, 3.3])
);
assert_eq!(
pn(r#"{NaN,Infinity,-Infinity}"#, &Type::FLOAT4),
json!(["NaN", "Infinity", "-Infinity"])
);
assert_eq!(
pn(r#"{NaN,Infinity,-Infinity}"#, &Type::FLOAT8),
json!(["NaN", "Infinity", "-Infinity"])
);
}
#[test]
fn test_pg_array_with_decoration() {
fn p(pg_arr: &str) -> Value {
pg_array_parse(pg_arr, &Type::INT2).unwrap()
}
assert_eq!(
p(r#"[1:1][-2:-1][3:5]={{{1,2,3},{4,5,6}}}"#),
json!([[[1, 2, 3], [4, 5, 6]]])
);
}
#[test]
fn test_pg_array_parse_json() {
fn pt(pg_arr: &str) -> Value {
pg_array_parse(pg_arr, &Type::JSONB).unwrap()
}
assert_eq!(pt(r#"{"{}"}"#), json!([{}]));
assert_eq!(
pt(r#"{"{\"foo\": 1, \"bar\": 2}"}"#),
json!([{"foo": 1, "bar": 2}])
);
assert_eq!(
pt(r#"{"{\"foo\": 1}", "{\"bar\": 2}"}"#),
json!([{"foo": 1}, {"bar": 2}])
);
assert_eq!(
pt(r#"{{"{\"foo\": 1}", "{\"bar\": 2}"}}"#),
json!([[{"foo": 1}, {"bar": 2}]])
);
}
}

View File

@@ -482,6 +482,7 @@ class NeonEnvBuilder:
self.overlay_mounts_created_by_us: List[Tuple[str, Path]] = []
self.config_init_force: Optional[str] = None
self.top_output_dir = top_output_dir
self.control_plane_compute_hook_api: Optional[str] = None
self.pageserver_virtual_file_io_engine: Optional[str] = pageserver_virtual_file_io_engine
@@ -1007,6 +1008,9 @@ class NeonEnv:
# The base URL of the attachment service
self.attachment_service_api: str = f"http://127.0.0.1:{self.attachment_service_port}"
# For testing this with a fake HTTP server, enable passing through a URL from config
self.control_plane_compute_hook_api = config.control_plane_compute_hook_api
self.attachment_service: NeonAttachmentService = NeonAttachmentService(
self, config.auth_enabled
)
@@ -1026,6 +1030,9 @@ class NeonEnv:
if self.control_plane_api is not None:
cfg["control_plane_api"] = self.control_plane_api
if self.control_plane_compute_hook_api is not None:
cfg["control_plane_compute_hook_api"] = self.control_plane_compute_hook_api
# Create config for pageserver
http_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
@@ -1904,7 +1911,7 @@ class Pagectl(AbstractNeonCli):
class NeonAttachmentService:
def __init__(self, env: NeonEnv, auth_enabled):
def __init__(self, env: NeonEnv, auth_enabled: bool):
self.env = env
self.running = False
self.auth_enabled = auth_enabled
@@ -3123,6 +3130,20 @@ class Endpoint(PgProtocol):
log.info(json.dumps(dict(data_dict, **kwargs)))
json.dump(dict(data_dict, **kwargs), file, indent=4)
# Please note: if you didn't respec this endpoint to have the `migrations`
# feature, this function will probably fail because neon_migration.migration_id
# won't exist. This is temporary - soon we'll get rid of the feature flag and
# migrations will be enabled for everyone.
def wait_for_migrations(self):
with self.cursor() as cur:
def check_migrations_done():
cur.execute("SELECT id FROM neon_migration.migration_id")
migration_id = cur.fetchall()[0][0]
assert migration_id != 0
wait_until(20, 0.5, check_migrations_done)
# Mock the extension part of spec passed from control plane for local testing
# endpooint.rs adds content of this file as a part of the spec.json
def create_remote_extension_spec(self, spec: dict[str, Any]):

View File

@@ -831,3 +831,16 @@ class PageserverHttpClient(requests.Session):
self.put(
f"http://localhost:{self.port}/v1/deletion_queue/flush?execute={'true' if execute else 'false'}"
).raise_for_status()
def timeline_wait_logical_size(self, tenant_id: TenantId, timeline_id: TimelineId) -> int:
detail = self.timeline_detail(
tenant_id,
timeline_id,
include_non_incremental_logical_size=True,
force_await_initial_logical_size=True,
)
current_logical_size = detail["current_logical_size"]
non_incremental = detail["current_logical_size_non_incremental"]
assert current_logical_size == non_incremental
assert isinstance(current_logical_size, int)
return current_logical_size

View File

@@ -356,10 +356,26 @@ def enable_remote_storage_versioning(
"""
Enable S3 versioning for the remote storage
"""
# local_fs has no
# local_fs has no support for versioning
assert isinstance(remote, S3Storage), "localfs is currently not supported"
assert remote.client is not None
# The SDK supports enabling versioning on normal S3 as well but we don't want to change
# these settings from a test in a live bucket (also, our access isn't enough nor should it be)
assert not remote.real, "Enabling storage versioning only supported on Mock S3"
# Workaround to enable self-copy until upstream bug is fixed: https://github.com/getmoto/moto/issues/7300
remote.client.put_bucket_encryption(
Bucket=remote.bucket_name,
ServerSideEncryptionConfiguration={
"Rules": [
{
"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"},
"BucketKeyEnabled": False,
},
]
},
)
# Note that this doesnt use pagination, so list is not guaranteed to be exhaustive.
response = remote.client.put_bucket_versioning(
Bucket=remote.bucket_name,

View File

@@ -155,6 +155,15 @@ class EvictionEnv:
mock_behavior,
eviction_order: EvictionOrder,
):
"""
Starts pageserver up with mocked statvfs setup. The startup is
problematic because of dueling initial logical size calculations
requiring layers and disk usage based task evicting.
Returns after initial logical sizes are complete, but the phase of disk
usage eviction task is unknown; it might need to run one more iteration
before assertions can be made.
"""
disk_usage_config = {
"period": period,
"max_usage_pct": max_usage_pct,
@@ -183,9 +192,15 @@ class EvictionEnv:
),
)
# we now do initial logical size calculation on startup, which on debug builds can fight with disk usage based eviction
for tenant_id, timeline_id in self.timelines:
pageserver_http = self.neon_env.get_tenant_pageserver(tenant_id).http_client()
pageserver_http.timeline_wait_logical_size(tenant_id, timeline_id)
def statvfs_called():
assert pageserver.log_contains(".*running mocked statvfs.*")
# we most likely have already completed multiple runs
wait_until(10, 1, statvfs_called)
@@ -789,9 +804,11 @@ def test_statvfs_pressure_usage(eviction_env: EvictionEnv):
wait_until(10, 1, relieved_log_message)
post_eviction_total_size, _, _ = env.timelines_du(env.pageserver)
def less_than_max_usage_pct():
post_eviction_total_size, _, _ = env.timelines_du(env.pageserver)
assert post_eviction_total_size < 0.33 * total_size, "we requested max 33% usage"
assert post_eviction_total_size <= 0.33 * total_size, "we requested max 33% usage"
wait_until(2, 2, less_than_max_usage_pct)
def test_statvfs_pressure_min_avail_bytes(eviction_env: EvictionEnv):
@@ -831,11 +848,13 @@ def test_statvfs_pressure_min_avail_bytes(eviction_env: EvictionEnv):
wait_until(10, 1, relieved_log_message)
post_eviction_total_size, _, _ = env.timelines_du(env.pageserver)
def more_than_min_avail_bytes_freed():
post_eviction_total_size, _, _ = env.timelines_du(env.pageserver)
assert (
total_size - post_eviction_total_size >= min_avail_bytes
), f"we requested at least {min_avail_bytes} worth of free space"
assert (
total_size - post_eviction_total_size >= min_avail_bytes
), "we requested at least min_avail_bytes worth of free space"
wait_until(2, 2, more_than_min_avail_bytes_freed)
def test_secondary_mode_eviction(eviction_env_ha: EvictionEnv):

View File

@@ -1,47 +0,0 @@
import subprocess
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
# Test gc_cutoff
#
# This test sets fail point at the end of GC, and checks that pageserver
# normally restarts after it. Also, there should be GC ERRORs in the log,
# but the fixture checks the log for any unexpected ERRORs after every
# test anyway, so it doesn't need any special attention here.
@pytest.mark.timeout(600)
def test_gc_cutoff(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
env = neon_env_builder.init_start(
initial_tenant_conf={
"gc_period": "10 s",
"gc_horizon": f"{1024 ** 2}",
"checkpoint_distance": f"{1024 ** 2}",
"compaction_period": "5 s",
# set PITR interval to be small, so we can do GC
"pitr_interval": "1 s",
"compaction_threshold": "3",
"image_creation_threshold": "2",
}
)
pageserver_http = env.pageserver.http_client()
# Use aggressive GC and checkpoint settings, so that we also exercise GC during the test
tenant_id = env.initial_tenant
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
connstr = endpoint.connstr(options="-csynchronous_commit=off")
pg_bin.run_capture(["pgbench", "-i", "-s10", connstr])
pageserver_http.configure_failpoints(("after-timeline-gc-removed-layers", "exit"))
# Because this test does a rapid series of restarts of the same node, it's possible that
# we are restarted again before we can clean up deletion lists form the previous generation,
# resulting in a subsequent startup logging a warning.
env.pageserver.allowed_errors.append(".*Dropping stale deletions for tenant.*")
for _ in range(5):
with pytest.raises(subprocess.SubprocessError):
pg_bin.run_capture(["pgbench", "-P1", "-N", "-c5", "-T500", "-Mprepared", connstr])
env.pageserver.stop()
env.pageserver.start(extra_env_vars={"FAILPOINTS": "after-timeline-gc-removed-layers=exit"})

View File

@@ -1,4 +1,6 @@
import time
from random import choice
from string import ascii_lowercase
import pytest
from fixtures.log_helper import log
@@ -11,6 +13,10 @@ from fixtures.types import Lsn
from fixtures.utils import query_scalar
def random_string(n: int):
return "".join([choice(ascii_lowercase) for _ in range(n)])
def test_logical_replication(neon_simple_env: NeonEnv, vanilla_pg):
env = neon_simple_env
@@ -238,6 +244,57 @@ def test_wal_page_boundary_start(neon_simple_env: NeonEnv, vanilla_pg):
) == endpoint.safe_psql("select sum(somedata) from replication_example")
# Test that WAL redo works for fairly large records.
#
# See https://github.com/neondatabase/neon/pull/6534. That wasn't a
# logical replication bug as such, but without logical replication,
# records passed ot the WAL redo process are never large enough to hit
# the bug.
def test_large_records(neon_simple_env: NeonEnv, vanilla_pg):
env = neon_simple_env
env.neon_cli.create_branch("init")
endpoint = env.endpoints.create_start("init")
cur = endpoint.connect().cursor()
cur.execute("CREATE TABLE reptbl(id int, largeval text);")
cur.execute("alter table reptbl replica identity full")
cur.execute("create publication pub1 for table reptbl")
# now start subscriber
vanilla_pg.start()
vanilla_pg.safe_psql("CREATE TABLE reptbl(id int, largeval text);")
log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}")
connstr = endpoint.connstr().replace("'", "''")
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
# Test simple insert, update, delete. But with very large values
value = random_string(10_000_000)
cur.execute(f"INSERT INTO reptbl VALUES (1, '{value}')")
logical_replication_sync(vanilla_pg, endpoint)
assert vanilla_pg.safe_psql("select id, largeval from reptbl") == [(1, value)]
# Test delete, and reinsert another value
cur.execute("DELETE FROM reptbl WHERE id = 1")
cur.execute(f"INSERT INTO reptbl VALUES (2, '{value}')")
logical_replication_sync(vanilla_pg, endpoint)
assert vanilla_pg.safe_psql("select id, largeval from reptbl") == [(2, value)]
value = random_string(10_000_000)
cur.execute(f"UPDATE reptbl SET largeval='{value}'")
logical_replication_sync(vanilla_pg, endpoint)
assert vanilla_pg.safe_psql("select id, largeval from reptbl") == [(2, value)]
endpoint.stop()
endpoint.start()
cur = endpoint.connect().cursor()
value = random_string(10_000_000)
cur.execute(f"UPDATE reptbl SET largeval='{value}'")
logical_replication_sync(vanilla_pg, endpoint)
assert vanilla_pg.safe_psql("select id, largeval from reptbl") == [(2, value)]
#
# Check that slots are not inherited in brnach
#

View File

@@ -13,12 +13,14 @@ def test_migrations(neon_simple_env: NeonEnv):
endpoint.respec(skip_pg_catalog_updates=False, features=["migrations"])
endpoint.start()
time.sleep(1) # Sleep to let migrations run
endpoint.wait_for_migrations()
num_migrations = 3
with endpoint.cursor() as cur:
cur.execute("SELECT id FROM neon_migration.migration_id")
migration_id = cur.fetchall()
assert migration_id[0][0] == 3
assert migration_id[0][0] == num_migrations
with open(log_path, "r") as log_file:
logs = log_file.read()
@@ -26,11 +28,13 @@ def test_migrations(neon_simple_env: NeonEnv):
endpoint.stop()
endpoint.start()
time.sleep(1) # Sleep to let migrations run
# We don't have a good way of knowing that the migrations code path finished executing
# in compute_ctl in the case that no migrations are being run
time.sleep(1)
with endpoint.cursor() as cur:
cur.execute("SELECT id FROM neon_migration.migration_id")
migration_id = cur.fetchall()
assert migration_id[0][0] == 3
assert migration_id[0][0] == num_migrations
with open(log_path, "r") as log_file:
logs = log_file.read()

View File

@@ -1,8 +1,7 @@
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.pg_version import PgVersion
from fixtures.utils import wait_until
def test_neon_superuser(neon_simple_env: NeonEnv, pg_version: PgVersion):
@@ -19,7 +18,8 @@ def test_neon_superuser(neon_simple_env: NeonEnv, pg_version: PgVersion):
sub.respec(skip_pg_catalog_updates=False, features=["migrations"])
sub.start()
time.sleep(1) # Sleep to let migrations run
pub.wait_for_migrations()
sub.wait_for_migrations()
with pub.cursor() as cur:
cur.execute(
@@ -68,10 +68,11 @@ def test_neon_superuser(neon_simple_env: NeonEnv, pg_version: PgVersion):
with pub.cursor(dbname="neondb", user="mr_whiskers", password="cat") as pcur:
pcur.execute("INSERT INTO t VALUES (30), (40)")
time.sleep(1) # Give the change time to propagate
def check_that_changes_propagated():
cur.execute("SELECT * FROM t")
res = cur.fetchall()
log.info(res)
assert len(res) == 4
assert [r[0] for r in res] == [10, 20, 30, 40]
cur.execute("SELECT * FROM t")
res = cur.fetchall()
log.info(res)
assert len(res) == 4
assert [r[0] for r in res] == [10, 20, 30, 40]
wait_until(10, 0.5, check_that_changes_propagated)

View File

@@ -1,7 +1,6 @@
import time
from datetime import datetime, timezone
import pytest
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
@@ -32,7 +31,6 @@ def test_tenant_s3_restore(
remote_storage = neon_env_builder.pageserver_remote_storage
assert remote_storage, "remote storage not configured"
enable_remote_storage_versioning(remote_storage)
pytest.skip("moto doesn't support self-copy: https://github.com/getmoto/moto/issues/7300")
env = neon_env_builder.init_start(initial_tenant_conf=MANY_SMALL_LAYERS_TENANT_CONFIG)
env.pageserver.allowed_errors.extend(

View File

@@ -1,14 +1,24 @@
import time
from collections import defaultdict
from fixtures.neon_fixtures import (
NeonEnvBuilder,
)
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import tenant_delete_wait_completed, timeline_delete_wait_completed
from fixtures.pg_version import PgVersion
from fixtures.types import TenantId, TimelineId
from fixtures.utils import wait_until
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
def get_node_shard_counts(env: NeonEnv, tenant_ids):
counts: defaultdict[str, int] = defaultdict(int)
for tid in tenant_ids:
for shard in env.attachment_service.locate(tid):
counts[shard["node_id"]] += 1
return counts
def test_sharding_service_smoke(
@@ -54,14 +64,7 @@ def test_sharding_service_smoke(
for tid in tenant_ids:
env.neon_cli.create_tenant(tid, shard_count=shards_per_tenant)
def get_node_shard_counts():
counts: defaultdict[str, int] = defaultdict(int)
for tid in tenant_ids:
for shard in env.attachment_service.locate(tid):
counts[shard["node_id"]] += 1
return counts
for node_id, count in get_node_shard_counts().items():
for node_id, count in get_node_shard_counts(env, tenant_ids).items():
# we used a multiple of pagservers for the total shard count,
# so expect equal number on all pageservers
assert count == tenant_shard_count / len(
@@ -89,7 +92,7 @@ def test_sharding_service_smoke(
env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Offline"})
def node_evacuated(node_id: int):
counts = get_node_shard_counts()
counts = get_node_shard_counts(env, tenant_ids)
assert counts[node_id] == 0
wait_until(10, 1, lambda: node_evacuated(env.pageservers[0].id))
@@ -98,7 +101,7 @@ def test_sharding_service_smoke(
# immediately
env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Active"})
time.sleep(1)
assert get_node_shard_counts()[env.pageservers[0].id] == 0
assert get_node_shard_counts(env, tenant_ids)[env.pageservers[0].id] == 0
# Delete all the tenants
for tid in tenant_ids:
@@ -113,7 +116,7 @@ def test_sharding_service_smoke(
for tid in tenant_ids:
env.neon_cli.create_tenant(tid, shard_count=shards_per_tenant)
counts = get_node_shard_counts()
counts = get_node_shard_counts(env, tenant_ids)
# Nothing should have been scheduled on the node in Draining
assert counts[env.pageservers[1].id] == 0
assert counts[env.pageservers[0].id] == tenant_shard_count // 2
@@ -270,3 +273,73 @@ def test_sharding_service_onboarding(
# The onboarded tenant should surviev a restart of pageserver
dest_ps.stop()
dest_ps.start()
def test_sharding_service_compute_hook(
httpserver: HTTPServer,
neon_env_builder: NeonEnvBuilder,
httpserver_listen_address,
):
"""
Test that the sharding service calls out to the configured HTTP endpoint on attachment changes
"""
# We will run two pageserver to migrate and check that the attachment service sends notifications
# when migrating.
neon_env_builder.num_pageservers = 2
(host, port) = httpserver_listen_address
neon_env_builder.control_plane_compute_hook_api = f"http://{host}:{port}/notify"
# Set up fake HTTP notify endpoint
notifications = []
def handler(request: Request):
log.info(f"Notify request: {request}")
notifications.append(request.json)
return Response(status=200)
httpserver.expect_request("/notify", method="POST").respond_with_handler(handler)
# Start running
env = neon_env_builder.init_start()
# We will to an unclean migration, which will result in deletion queue warnings
env.pageservers[0].allowed_errors.append(".*Dropped remote consistent LSN updates for tenant.*")
# Initial notification from tenant creation
assert len(notifications) == 1
expect = {
"tenant_id": str(env.initial_tenant),
"shards": [{"node_id": int(env.pageservers[0].id), "shard_number": 0}],
}
env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Offline"})
def node_evacuated(node_id: int):
counts = get_node_shard_counts(env, [env.initial_tenant])
assert counts[node_id] == 0
wait_until(10, 1, lambda: node_evacuated(env.pageservers[0].id))
# Additional notification from migration
log.info(f"notifications: {notifications}")
expect = {
"tenant_id": str(env.initial_tenant),
"shards": [{"node_id": int(env.pageservers[1].id), "shard_number": 0}],
}
def received_migration_notification():
assert len(notifications) == 2
assert notifications[1] == expect
wait_until(20, 0.25, received_migration_notification)
# When we restart, we should re-emit notifications for all tenants
env.attachment_service.stop()
env.attachment_service.start()
def received_restart_notification():
assert len(notifications) == 3
assert notifications[1] == expect
wait_until(10, 1, received_restart_notification)

View File

@@ -20,7 +20,7 @@ from fixtures.neon_fixtures import (
VanillaPostgres,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import (
assert_tenant_state,
timeline_delete_wait_completed,
@@ -40,7 +40,7 @@ def test_timeline_size(neon_simple_env: NeonEnv):
new_timeline_id = env.neon_cli.create_branch("test_timeline_size", "empty")
client = env.pageserver.http_client()
wait_for_timeline_size_init(client, tenant=env.initial_tenant, timeline=new_timeline_id)
client.timeline_wait_logical_size(env.initial_tenant, new_timeline_id)
endpoint_main = env.endpoints.create_start("test_timeline_size")
log.info("postgres is running on 'test_timeline_size' branch")
@@ -73,7 +73,7 @@ def test_timeline_size_createdropdb(neon_simple_env: NeonEnv):
new_timeline_id = env.neon_cli.create_branch("test_timeline_size_createdropdb", "empty")
client = env.pageserver.http_client()
wait_for_timeline_size_init(client, tenant=env.initial_tenant, timeline=new_timeline_id)
client.timeline_wait_logical_size(env.initial_tenant, new_timeline_id)
timeline_details = client.timeline_detail(
env.initial_tenant, new_timeline_id, include_non_incremental_logical_size=True
)
@@ -153,7 +153,7 @@ def test_timeline_size_quota_on_startup(neon_env_builder: NeonEnvBuilder):
client = env.pageserver.http_client()
new_timeline_id = env.neon_cli.create_branch("test_timeline_size_quota_on_startup")
wait_for_timeline_size_init(client, tenant=env.initial_tenant, timeline=new_timeline_id)
client.timeline_wait_logical_size(env.initial_tenant, new_timeline_id)
endpoint_main = env.endpoints.create(
"test_timeline_size_quota_on_startup",
@@ -219,7 +219,7 @@ def test_timeline_size_quota(neon_env_builder: NeonEnvBuilder):
client = env.pageserver.http_client()
new_timeline_id = env.neon_cli.create_branch("test_timeline_size_quota")
wait_for_timeline_size_init(client, tenant=env.initial_tenant, timeline=new_timeline_id)
client.timeline_wait_logical_size(env.initial_tenant, new_timeline_id)
endpoint_main = env.endpoints.create(
"test_timeline_size_quota",
@@ -715,28 +715,6 @@ def assert_physical_size_invariants(sizes: TimelinePhysicalSizeValues):
# XXX would be nice to assert layer file physical storage utilization here as well, but we can only do that for LocalFS
# Timeline logical size initialization is an asynchronous background task that runs once,
# try a few times to ensure it's activated properly
def wait_for_timeline_size_init(
client: PageserverHttpClient, tenant: TenantId, timeline: TimelineId
):
for i in range(10):
timeline_details = client.timeline_detail(
tenant, timeline, include_non_incremental_logical_size=True
)
current_logical_size = timeline_details["current_logical_size"]
non_incremental = timeline_details["current_logical_size_non_incremental"]
if current_logical_size == non_incremental:
return
log.info(
f"waiting for current_logical_size of a timeline to be calculated, iteration {i}: {current_logical_size} vs {non_incremental}"
)
time.sleep(1)
raise Exception(
f"timed out while waiting for current_logical_size of a timeline to reach its non-incremental value, details: {timeline_details}"
)
def test_ondemand_activation(neon_env_builder: NeonEnvBuilder):
"""
Tenants warmuping up opportunistically will wait for one another's logical size calculations to complete

View File

@@ -15,7 +15,7 @@ publish = false
[dependencies]
anyhow = { version = "1", features = ["backtrace"] }
aws-config = { version = "1", default-features = false, features = ["rustls", "sso"] }
aws-runtime = { version = "1", default-features = false, features = ["event-stream", "sigv4a"] }
aws-runtime = { version = "1", default-features = false, features = ["event-stream", "http-02x", "sigv4a"] }
aws-sigv4 = { version = "1", features = ["http0-compat", "sign-eventstream", "sigv4a"] }
aws-smithy-async = { version = "1", default-features = false, features = ["rt-tokio"] }
aws-smithy-http = { version = "0.60", default-features = false, features = ["event-stream"] }