Compare commits

..

20 Commits

Author SHA1 Message Date
Kosntantin Knizhnik
7c8e87056b Move check of page LSNB to heck_page_lsn() function 2025-07-14 18:03:58 +03:00
Konstantin Knizhnik
1db119c657 Use GetCurrentReplayRecPtr instead of GetXLogReplayRecPtr in the check for returned page LSN 2025-07-14 08:44:05 +03:00
Kosntantin Knizhnik
00826b4082 Handle case of 0 replay_lsn 2025-07-13 20:41:49 +03:00
Kosntantin Knizhnik
b726293ec3 Fix indentation 2025-07-13 16:55:29 +03:00
Kosntantin Knizhnik
c42c38138e Set request_lsn=max(gc_cutoff,request_lsn) 2025-07-12 15:55:41 +03:00
Kosntantin Knizhnik
af61b7238d Set request_lsn=max(gc_cutoff,request_lsn) 2025-07-12 15:55:35 +03:00
Dmitrii Kovalkov
ee7bb1a667 storcon: validate new_sk_set before starting safekeeper migration (#12546)
## Problem
We don't validate the validity of the `new_sk_set` before starting the
migration. It is validated later, so the migration to an invalid
safekeeper set will fail anyway. But at this point we might already
commited an invalid `new_sk_set` to the database and there is no `abort`
command yet (I ran into this issue in neon_local and ruined the timeline
:)

- Part of https://github.com/neondatabase/neon/issues/11669

## Summary of changes
- Add safekeeper count and safekeeper duplication checks before starting
the migration
- Test that we validate the `new_sk_set` before starting the migration
- Add `force` option to the `TimelineSafekeeperMigrateRequest` to
disable not-mandatory checks
2025-07-12 04:57:04 +00:00
Conrad Ludgate
9bba31bf68 proxy: encode json as we parse rows (#11992)
Serialize query row responses directly into JSON. Some of this code
should be using the `json::value_as_object/list` macros, but I've
avoided it for now to minimize the size of the diff.
2025-07-11 19:39:08 +00:00
Folke Behrens
380d167b7c proxy: For cancellation data replace HSET+EXPIRE/HGET with SET..EX/GET (#12553)
## Problem

To store cancellation data we send two commands to redis because the
redis server version doesn't support HSET with EX. Also, HSET is not
really needed.

## Summary of changes

* Replace the HSET + EXPIRE command pair with one SET .. EX command.
* Replace HGET with GET.
* Leave a workaround for old keys set with HSET.
* Replace some anyhow errors with specific errors to surface the
WRONGTYPE error from redis.
2025-07-11 19:35:42 +00:00
HaoyuHuang
cb991fba42 A few more PS changes (#12552)
# TLDR
Problem-I is a bug fix. The rest are no-ops. 

## Problem I
Page server checks image layer creation based on the elapsed time but
this check depends on the current logical size, which is only computed
on shard 0. Thus, for non-0 shards, the check will be ineffective and
image creation will never be done for idle tenants.

## Summary of changes I
This PR fixes the problem by simply removing the dependency on current
logical size.

## Summary of changes II
This PR adds a timeout when calling page server to split shard to make
sure SC does not wait for the API call forever. Currently the PR doesn't
adds any retry logic because it's not clear whether page server shard
split can be safely retried if the existing operation is still ongoing
or left the storage in a bad state. Thus it's better to abort the whole
operation and restart.

## Problem III
`test_remote_failures` requires PS to be compiled in the testing mode.
For PS in dev/staging, they are compiled without this mode.

## Summary of changes III
Remove the restriction and also increase the number of total failures
allowed.

## Summary of changes IV
remove test on PS getpage http route.

---------

Co-authored-by: Chen Luo <chen.luo@databricks.com>
Co-authored-by: Yecheng Yang <carlton.yang@databricks.com>
Co-authored-by: Vlad Lazar <vlad@neon.tech>
2025-07-11 19:27:55 +00:00
Matthias van de Meent
4566b12a22 NEON: Finish Zenith->Neon rename (#12566)
Even though we're now part of Databricks, let's at least make this part
consistent.

## Summary of changes

- PG14: https://github.com/neondatabase/postgres/pull/669
- PG15: https://github.com/neondatabase/postgres/pull/670
- PG16: https://github.com/neondatabase/postgres/pull/671
- PG17: https://github.com/neondatabase/postgres/pull/672

---------

Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
2025-07-11 18:56:39 +00:00
Alex Chi Z.
63ca084696 fix(pageserver): downgrade wal apply error during gc-compaction (#12518)
## Problem

close LKB-162

close https://github.com/neondatabase/cloud/issues/30665, related to
https://github.com/neondatabase/cloud/issues/29434

We see a lot of errors like:

```
2025-05-22T23:06:14.928959Z ERROR compaction_loop{tenant_id=? shard_id=0304}:run:gc_compact_timeline{timeline_id=?}: error applying 4 WAL records 35/DC0DF0B8..3B/E43188C0 (8119 bytes) to key 000000067F0000400500006027000000B9D0, from base image with LSN 0/0 to reconstruct page image at LSN 61/150B9B20 n_attempts=0: apply_wal_records

Caused by:
    0: read walredo stdout
    1: early eof
```

which is an acceptable form of error and we should downgrade it to
warning.

## Summary of changes

walredo error during gc-compaction is expected when the data below the
gc horizon does not contain a full key history. This is possible in some
rare cases of gc that is only able to remove data in the middle of the
history but not all earlier history when a full keyspace gets deleted.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-07-11 18:37:55 +00:00
Arpad Müller
379259bdd7 storcon: don't error log on timeline delete if tenant migration is in progress (#12523)
Fixes [LKB-61](https://databricks.atlassian.net/browse/LKB-61):
`test_timeline_archival_chaos` being flaky with storcon error `Requested
tenant is missing`.

When a tenant migration is ongoing, and the attach request has been sent
to the new location, but the attach hasn't finished yet, it is possible
for the pageserver to return a 412 precondition failed HTTP error on
timeline deletion, because it is being sent to the new location already.
That one we would previously log via sth like:

```
ERROR request{method=DELETE path=/v1/tenant/1f544a11c90d1afd7af9b26e48985a4e/timeline/32818fb3ebf07cb7f06805429d7dee38 request_id=c493c04b-7f33-46d2-8a65-aac8a5516055}: Error processing HTTP request: InternalServerError(Error deleting timeline 32
818fb3ebf07cb7f06805429d7dee38 on 1f544a11c90d1afd7af9b26e48985a4e on node 2 (localhost): pageserver API: Precondition failed: Requested tenant is missing
```

This patch changes that and makes us return a more reasonable resource
unavailable error. Not sure how scalable this is with tenants with a
large number of shards, but that's a different discussion (we'd probably
need a limited amount of per-storcon retries).

example
[link](https://neon-github-public-dev.s3.amazonaws.com/reports/pr-12398/15981821532/index.html#/testresult/e7785dfb1238d92f).
2025-07-11 17:07:14 +00:00
Heikki Linnakangas
3300207523 Update working set size estimate without lock (#12570)
Update the WSS estimate before acquring the lock, so that we don't need
to hold the lock for so long. That seems safe to me, see added comment.

I was planning to do this with the new rust-based communicator
implementation anyway, but it might help a little with the current C
implementation too. And more importantly, having this as a separate PR
gives us a chance to review this aspect independently.
2025-07-11 16:05:22 +00:00
Tristan Partin
a0a7733b5a Use relative paths in submodule URL references (#12559)
This is a nifty trick from the hadron repo that seems to help with SSH
key dance.

Signed-off-by: Tristan Partin <tristan.partin@databricks.com>
2025-07-11 15:57:50 +00:00
Conrad Ludgate
f4245403b3 [proxy] allow testing query cancellation locally (#12568)
## Problem

Canceelation requires redis, redis required control-plane.

## Summary of changes

Make redis for cancellation not require control plane.
Add instructions for setting up redis locally.
2025-07-11 15:13:36 +00:00
Heikki Linnakangas
a8db7ebffb Minor refactor of the SQL functions to get working set size estimate (#12550)
Split the functions into two: one internal function to calculate the
estimate, and another (two functions) to expose it as SQL functions.

This is in preparation of adding new communicator implementation. With
that, the SQL functions will dispatch the call to the old or new
implementation depending on which is being used.
2025-07-11 14:17:44 +00:00
Vlad Lazar
154f6dc59c pageserver: log only on final shard resolution failure (#12565)
This log is too noisy. Instead of warning on every retry, let's log only
on the final failure.
2025-07-11 13:25:25 +00:00
Vlad Lazar
15f633922a pageserver: use image consistent LSN for force image layer creation (#12547)
This is a no-op for the neon deployment

* Introduce the concept image consistent lsn: of the largest LSN below
which all pages have been redone successfully
* Use the image consistent LSN for forced image layer creations
* Optionally expose the image consistent LSN via the timeline describe
HTTP endpoint
* Add a sharded timeline describe endpoint to storcon

---------

Co-authored-by: Chen Luo <chen.luo@databricks.com>
2025-07-11 11:39:51 +00:00
Dmitrii Kovalkov
c34d36d8a2 storcon_cli: timeline-safekeeper-migrate and timeline-locate subcommands (#12548)
## Problem
We have a `safekeeper_migrate` handler, but no subcommand in
`storcon_cli`. Same for `/:timeline_id/locate` for identifying current
set of safekeepers.

- Closes: https://github.com/neondatabase/neon/issues/12395

## Summary of changes
- Add `timeline-safekeeper-migrate` and `timeline-locate` subcommands to
`storcon_cli`
2025-07-11 10:49:37 +00:00
85 changed files with 1586 additions and 2843 deletions

8
.gitmodules vendored
View File

@@ -1,16 +1,16 @@
[submodule "vendor/postgres-v14"]
path = vendor/postgres-v14
url = https://github.com/neondatabase/postgres.git
url = ../postgres.git
branch = REL_14_STABLE_neon
[submodule "vendor/postgres-v15"]
path = vendor/postgres-v15
url = https://github.com/neondatabase/postgres.git
url = ../postgres.git
branch = REL_15_STABLE_neon
[submodule "vendor/postgres-v16"]
path = vendor/postgres-v16
url = https://github.com/neondatabase/postgres.git
url = ../postgres.git
branch = REL_16_STABLE_neon
[submodule "vendor/postgres-v17"]
path = vendor/postgres-v17
url = https://github.com/neondatabase/postgres.git
url = ../postgres.git
branch = REL_17_STABLE_neon

329
Cargo.lock generated
View File

@@ -687,40 +687,13 @@ dependencies = [
"tracing",
]
[[package]]
name = "axum"
version = "0.7.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f"
dependencies = [
"async-trait",
"axum-core 0.4.5",
"bytes",
"futures-util",
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"itoa",
"matchit 0.7.3",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"rustversion",
"serde",
"sync_wrapper 1.0.1",
"tower 0.5.2",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d6fd624c75e18b3b4c6b9caf42b1afe24437daaee904069137d8bab077be8b8"
dependencies = [
"axum-core 0.5.0",
"axum-core",
"base64 0.22.1",
"bytes",
"form_urlencoded",
@@ -731,7 +704,7 @@ dependencies = [
"hyper 1.4.1",
"hyper-util",
"itoa",
"matchit 0.8.4",
"matchit",
"memchr",
"mime",
"percent-encoding",
@@ -751,26 +724,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "axum-core"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199"
dependencies = [
"async-trait",
"bytes",
"futures-util",
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"mime",
"pin-project-lite",
"rustversion",
"sync_wrapper 1.0.1",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.5.0"
@@ -797,8 +750,8 @@ version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "460fc6f625a1f7705c6cf62d0d070794e94668988b1c38111baeec177c715f7b"
dependencies = [
"axum 0.8.1",
"axum-core 0.5.0",
"axum",
"axum-core",
"bytes",
"form_urlencoded",
"futures-util",
@@ -1009,24 +962,6 @@ dependencies = [
"serde",
]
[[package]]
name = "bindgen"
version = "0.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f49d8fed880d473ea71efb9bf597651e77201bdd4893efe54c9e5d65ae04ce6f"
dependencies = [
"bitflags 2.8.0",
"cexpr",
"clang-sys",
"itertools 0.12.1",
"proc-macro2",
"quote",
"regex",
"rustc-hash 1.1.0",
"shlex",
"syn 2.0.100",
]
[[package]]
name = "bindgen"
version = "0.71.1"
@@ -1386,7 +1321,7 @@ dependencies = [
"aws-sdk-kms",
"aws-sdk-s3",
"aws-smithy-types",
"axum 0.8.1",
"axum",
"axum-extra",
"base64 0.22.1",
"bytes",
@@ -1401,10 +1336,8 @@ dependencies = [
"hostname-validator",
"http 1.1.0",
"indexmap 2.9.0",
"inferno 0.12.0",
"itertools 0.10.5",
"jsonwebtoken",
"libproc",
"metrics",
"nix 0.30.1",
"notify",
@@ -1418,8 +1351,6 @@ dependencies = [
"postgres-types",
"postgres_initdb",
"postgres_versioninfo",
"pprof 0.15.0",
"prost 0.12.6",
"regex",
"remote_storage",
"reqwest",
@@ -1431,7 +1362,6 @@ dependencies = [
"serde_with",
"signal-hook",
"tar",
"tempfile",
"thiserror 1.0.69",
"tokio",
"tokio-postgres",
@@ -2152,7 +2082,7 @@ name = "endpoint_storage"
version = "0.0.1"
dependencies = [
"anyhow",
"axum 0.8.1",
"axum",
"axum-extra",
"camino",
"camino-tempfile",
@@ -2268,12 +2198,12 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
[[package]]
name = "errno"
version = "0.3.13"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad"
checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245"
dependencies = [
"libc",
"windows-sys 0.59.0",
"windows-sys 0.52.0",
]
[[package]]
@@ -2603,18 +2533,6 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "getrandom"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4"
dependencies = [
"cfg-if",
"libc",
"r-efi",
"wasi 0.14.2+wasi-0.2.4",
]
[[package]]
name = "gettid"
version = "0.1.3"
@@ -2872,15 +2790,6 @@ dependencies = [
"digest",
]
[[package]]
name = "home"
version = "0.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589533453244b0995c858700322199b2becb13b627df2851f64a2775d024abcf"
dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "hostname"
version = "0.4.0"
@@ -2990,7 +2899,7 @@ dependencies = [
"jsonwebtoken",
"metrics",
"once_cell",
"pprof 0.14.0",
"pprof",
"regex",
"routerify",
"rustls 0.23.27",
@@ -3652,7 +3561,7 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
dependencies = [
"spin 0.9.8",
"spin",
]
[[package]]
@@ -3677,17 +3586,6 @@ version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058"
[[package]]
name = "libproc"
version = "0.14.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e78a09b56be5adbcad5aa1197371688dc6bb249a26da3bca2011ee2fb987ebfb"
dependencies = [
"bindgen 0.70.1",
"errno",
"libc",
]
[[package]]
name = "linux-raw-sys"
version = "0.4.14"
@@ -3700,12 +3598,6 @@ version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0b5399f6804fbab912acbd8878ed3532d506b7c951b8f9f164ef90fef39e3f4"
[[package]]
name = "linux-raw-sys"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12"
[[package]]
name = "litemap"
version = "0.7.4"
@@ -3759,12 +3651,6 @@ dependencies = [
"regex-automata 0.1.10",
]
[[package]]
name = "matchit"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
[[package]]
name = "matchit"
version = "0.8.4"
@@ -4504,7 +4390,7 @@ dependencies = [
"postgres_ffi_types",
"postgres_initdb",
"posthog_client_lite",
"pprof 0.14.0",
"pprof",
"pq_proto",
"procfs",
"rand 0.8.5",
@@ -5079,7 +4965,7 @@ name = "postgres_ffi"
version = "0.1.0"
dependencies = [
"anyhow",
"bindgen 0.71.1",
"bindgen",
"bytes",
"crc32c",
"criterion",
@@ -5090,7 +4976,7 @@ dependencies = [
"postgres",
"postgres_ffi_types",
"postgres_versioninfo",
"pprof 0.14.0",
"pprof",
"regex",
"serde",
"thiserror 1.0.69",
@@ -5180,30 +5066,6 @@ dependencies = [
"thiserror 1.0.69",
]
[[package]]
name = "pprof"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38a01da47675efa7673b032bf8efd8214f1917d89685e07e395ab125ea42b187"
dependencies = [
"aligned-vec",
"backtrace",
"cfg-if",
"findshlibs",
"inferno 0.11.21",
"libc",
"log",
"nix 0.26.4",
"once_cell",
"protobuf",
"protobuf-codegen",
"smallvec",
"spin 0.10.0",
"symbolic-demangle",
"tempfile",
"thiserror 2.0.11",
]
[[package]]
name = "pprof_util"
version = "0.7.0"
@@ -5279,7 +5141,7 @@ dependencies = [
"hex",
"lazy_static",
"procfs-core",
"rustix 0.38.41",
"rustix",
]
[[package]]
@@ -5415,57 +5277,6 @@ dependencies = [
"prost 0.13.5",
]
[[package]]
name = "protobuf"
version = "3.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4"
dependencies = [
"once_cell",
"protobuf-support",
"thiserror 1.0.69",
]
[[package]]
name = "protobuf-codegen"
version = "3.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d3976825c0014bbd2f3b34f0001876604fe87e0c86cd8fa54251530f1544ace"
dependencies = [
"anyhow",
"once_cell",
"protobuf",
"protobuf-parse",
"regex",
"tempfile",
"thiserror 1.0.69",
]
[[package]]
name = "protobuf-parse"
version = "3.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4aeaa1f2460f1d348eeaeed86aea999ce98c1bded6f089ff8514c9d9dbdc973"
dependencies = [
"anyhow",
"indexmap 2.9.0",
"log",
"protobuf",
"protobuf-support",
"tempfile",
"thiserror 1.0.69",
"which",
]
[[package]]
name = "protobuf-support"
version = "3.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6"
dependencies = [
"thiserror 1.0.69",
]
[[package]]
name = "proxy"
version = "0.1.0"
@@ -5478,6 +5289,7 @@ dependencies = [
"async-trait",
"atomic-take",
"aws-config",
"aws-credential-types",
"aws-sdk-iam",
"aws-sigv4",
"base64 0.22.1",
@@ -5517,6 +5329,7 @@ dependencies = [
"itoa",
"jose-jwa",
"jose-jwk",
"json",
"lasso",
"measured",
"metrics",
@@ -5637,12 +5450,6 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "r-efi"
version = "5.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
[[package]]
name = "rand"
version = "0.7.3"
@@ -6031,7 +5838,7 @@ dependencies = [
"async-trait",
"getrandom 0.2.11",
"http 1.1.0",
"matchit 0.8.4",
"matchit",
"opentelemetry",
"reqwest",
"reqwest-middleware",
@@ -6222,19 +6029,6 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "rustix"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266"
dependencies = [
"bitflags 2.8.0",
"errno",
"libc",
"linux-raw-sys 0.9.4",
"windows-sys 0.59.0",
]
[[package]]
name = "rustls"
version = "0.21.12"
@@ -6417,7 +6211,7 @@ dependencies = [
"postgres_backend",
"postgres_ffi",
"postgres_versioninfo",
"pprof 0.14.0",
"pprof",
"pq_proto",
"rand 0.8.5",
"regex",
@@ -7005,18 +6799,6 @@ name = "spin"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
dependencies = [
"lock_api",
]
[[package]]
name = "spin"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5fe4ccb98d9c292d56fec89a5e07da7fc4cf0dc11e156b41793132775d3e591"
dependencies = [
"lock_api",
]
[[package]]
name = "spinning_top"
@@ -7211,6 +6993,7 @@ dependencies = [
"pageserver_api",
"pageserver_client",
"reqwest",
"safekeeper_api",
"serde_json",
"storage_controller_client",
"tokio",
@@ -7377,14 +7160,14 @@ dependencies = [
[[package]]
name = "tempfile"
version = "3.20.0"
version = "3.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8a64e3985349f2441a1a9ef0b853f869006c3855f2cda6862a94d26ebb9d6a1"
checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c"
dependencies = [
"cfg-if",
"fastrand 2.2.0",
"getrandom 0.3.3",
"once_cell",
"rustix 1.0.7",
"rustix",
"windows-sys 0.59.0",
]
@@ -7878,25 +7661,16 @@ version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52"
dependencies = [
"async-stream",
"async-trait",
"axum 0.7.9",
"base64 0.22.1",
"bytes",
"h2 0.4.4",
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"hyper 1.4.1",
"hyper-timeout",
"hyper-util",
"percent-encoding",
"pin-project",
"prost 0.13.5",
"socket2",
"tokio",
"tokio-stream",
"tower 0.4.13",
"tower-layer",
"tower-service",
"tracing",
@@ -7909,7 +7683,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9"
dependencies = [
"async-trait",
"axum 0.8.1",
"axum",
"base64 0.22.1",
"bytes",
"flate2",
@@ -7970,16 +7744,11 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
dependencies = [
"futures-core",
"futures-util",
"indexmap 1.9.3",
"pin-project",
"pin-project-lite",
"rand 0.8.5",
"slab",
"tokio",
"tokio-util",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
@@ -8411,7 +8180,7 @@ dependencies = [
"pem",
"pin-project-lite",
"postgres_connection",
"pprof 0.14.0",
"pprof",
"pq_proto",
"rand 0.8.5",
"regex",
@@ -8463,7 +8232,7 @@ name = "vm_monitor"
version = "0.1.0"
dependencies = [
"anyhow",
"axum 0.8.1",
"axum",
"cgroups-rs",
"clap",
"futures",
@@ -8518,7 +8287,7 @@ dependencies = [
"pageserver_api",
"postgres_ffi",
"postgres_ffi_types",
"pprof 0.14.0",
"pprof",
"prost 0.13.5",
"remote_storage",
"serde",
@@ -8548,7 +8317,7 @@ name = "walproposer"
version = "0.1.0"
dependencies = [
"anyhow",
"bindgen 0.71.1",
"bindgen",
"postgres_ffi",
"utils",
]
@@ -8575,15 +8344,6 @@ version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasi"
version = "0.14.2+wasi-0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3"
dependencies = [
"wit-bindgen-rt",
]
[[package]]
name = "wasite"
version = "0.1.0"
@@ -8713,18 +8473,6 @@ dependencies = [
"rustls-pki-types",
]
[[package]]
name = "which"
version = "4.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7"
dependencies = [
"either",
"home",
"once_cell",
"rustix 0.38.41",
]
[[package]]
name = "whoami"
version = "1.5.1"
@@ -8953,15 +8701,6 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "wit-bindgen-rt"
version = "0.39.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1"
dependencies = [
"bitflags 2.8.0",
]
[[package]]
name = "workspace_hack"
version = "0.1.0"
@@ -8969,19 +8708,17 @@ dependencies = [
"ahash",
"anstream",
"anyhow",
"axum 0.8.1",
"axum-core 0.5.0",
"axum",
"axum-core",
"base64 0.21.7",
"base64ct",
"bytes",
"camino",
"cc",
"chrono",
"clang-sys",
"clap",
"clap_builder",
"const-oid",
"criterion",
"crypto-bigint 0.5.5",
"der 0.7.8",
"deranged",
@@ -9008,7 +8745,6 @@ dependencies = [
"hyper 0.14.30",
"hyper 1.4.1",
"hyper-util",
"indexmap 1.9.3",
"indexmap 2.9.0",
"itertools 0.12.1",
"lazy_static",
@@ -9025,6 +8761,7 @@ dependencies = [
"num-iter",
"num-rational",
"num-traits",
"once_cell",
"p256 0.13.2",
"parquet",
"prettyplease",
@@ -9046,7 +8783,6 @@ dependencies = [
"sha2",
"signature 2.2.0",
"smallvec",
"spin 0.9.8",
"spki 0.7.3",
"stable_deref_trait",
"subtle",
@@ -9061,7 +8797,6 @@ dependencies = [
"tokio-stream",
"tokio-util",
"toml_edit",
"tonic 0.12.3",
"tower 0.5.2",
"tracing",
"tracing-core",

View File

@@ -130,7 +130,6 @@ jemalloc_pprof = { version = "0.7", features = ["symbolize", "flamegraph"] }
jsonwebtoken = "9"
lasso = "0.7"
libc = "0.2"
libproc = "0.14"
md5 = "0.7.0"
measured = { version = "0.0.22", features=["lasso"] }
measured-process = { version = "0.0.22" }
@@ -279,7 +278,6 @@ safekeeper_api = { version = "0.1", path = "./libs/safekeeper_api" }
safekeeper_client = { path = "./safekeeper/client" }
storage_broker = { version = "0.1", path = "./storage_broker/" } # Note: main broker code is inside the binary crate, so linking with the library shouldn't be heavy.
storage_controller_client = { path = "./storage_controller/client" }
tempfile = "3"
tenant_size_model = { version = "0.1", path = "./libs/tenant_size_model/" }
tracing-utils = { version = "0.1", path = "./libs/tracing-utils/" }
utils = { version = "0.1", path = "./libs/utils/" }

View File

@@ -109,8 +109,6 @@ RUN set -e \
libreadline-dev \
libseccomp-dev \
ca-certificates \
bpfcc-tools \
sudo \
openssl \
unzip \
curl \

View File

@@ -61,9 +61,6 @@ RUN if [ "${DEBIAN_VERSION}" = "bookworm" ]; then \
libpq5 \
libpq-dev \
libzstd-dev \
linux-perf \
bpfcc-tools \
linux-headers-$(case "$(uname -m)" in x86_64) echo amd64;; aarch64) echo arm64;; esac) \
postgresql-16 \
postgresql-server-dev-16 \
postgresql-common \
@@ -108,21 +105,15 @@ RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
#
# 'gdb' is included so that we get backtraces of core dumps produced in
# regression tests
RUN set -ex \
&& KERNEL_VERSION="$(uname -r | cut -d'-' -f1 | sed 's/\.0$//')" \
&& echo KERNEL_VERSION=${KERNEL_VERSION} >> /etc/environment \
&& KERNEL_ARCH=$(uname -m | awk '{ if ($1 ~ /^(x86_64|i[3-6]86)$/) print "x86"; else if ($1 ~ /^(aarch64|arm.*)$/) print "aarch"; else print $1 }') \
&& echo KERNEL_ARCH=${KERNEL_ARCH} >> /etc/environment \
RUN set -e \
&& apt update \
&& apt install -y \
autoconf \
automake \
bc \
bison \
build-essential \
ca-certificates \
cmake \
cpio \
curl \
flex \
gdb \
@@ -131,10 +122,8 @@ RUN set -ex \
gzip \
jq \
jsonnet \
kmod \
libcurl4-openssl-dev \
libbz2-dev \
libelf-dev \
libffi-dev \
liblzma-dev \
libncurses5-dev \
@@ -148,11 +137,6 @@ RUN set -ex \
libxml2-dev \
libxmlsec1-dev \
libxxhash-dev \
linux-perf \
bpfcc-tools \
libbpfcc \
libbpfcc-dev \
linux-headers-$(case "$(uname -m)" in x86_64) echo amd64;; aarch64) echo arm64;; esac) \
lsof \
make \
netcat-openbsd \
@@ -160,8 +144,6 @@ RUN set -ex \
openssh-client \
parallel \
pkg-config \
rsync \
sudo \
unzip \
wget \
xz-utils \
@@ -216,8 +198,6 @@ RUN curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /
# Configure sudo & docker
RUN usermod -aG sudo nonroot && \
echo '%sudo ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers && \
mkdir -p /etc/sudoers.d && \
echo 'nonroot ALL=(ALL) NOPASSWD:ALL' > /etc/sudoers.d/nonroot && \
usermod -aG docker nonroot
# AWS CLI

View File

@@ -149,9 +149,6 @@ RUN case $DEBIAN_VERSION in \
ninja-build git autoconf automake libtool build-essential bison flex libreadline-dev \
zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget ca-certificates pkg-config libssl-dev \
libicu-dev libxslt1-dev liblz4-dev libzstd-dev zstd curl unzip g++ \
bpfcc-tools \
libbpfcc \
libbpfcc-dev \
libclang-dev \
jsonnet \
$VERSION_INSTALLS \
@@ -1991,10 +1988,6 @@ RUN apt update && \
locales \
lsof \
procps \
bpfcc-tools \
libbpfcc \
libbpfcc-dev \
libclang-dev \
rsyslog-gnutls \
screen \
tcpdump \

View File

@@ -39,14 +39,6 @@ commands:
user: nobody
sysvInitAction: respawn
shell: '/bin/sql_exporter -config.file=/etc/sql_exporter_autoscaling.yml -web.listen-address=:9499'
- name: enable-kernel-modules
user: root
sysvInitAction: sysinit
shell: mkdir -p /lib/ && ln -s /neonvm/tools/lib/modules /lib/
- name: enable-bpfs
user: root
sysvInitAction: sysinit
shell: mkdir -p /sys/kernel/debug && mount -t debugfs debugfs /sys/kernel/debug && mount -t bpf bpf /sys/fs/bpf && chmod 755 /sys/fs/bpf
# Rsyslog by default creates a unix socket under /dev/log . That's where Postgres sends logs also.
# We run syslog with postgres user so it can't create /dev/log. Instead we configure rsyslog to
# use a different path for the socket. The symlink actually points to our custom path.
@@ -73,7 +65,7 @@ files:
# regardless of hostname (ALL)
#
# Also allow it to shut down the VM. The fast_import job does that when it's finished.
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff, /usr/sbin/rsyslogd, /neonvm/tools/bin/perf, /usr/sbin/profile-bpfcc
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff, /usr/sbin/rsyslogd
- filename: cgconfig.conf
content: |
# Configuration for cgroups in VM compute nodes
@@ -160,8 +152,6 @@ merge: |
RUN set -e \
&& chmod 0644 /etc/cgconfig.conf
ENV PERF_BINARY_PATH=/neonvm/tools/bin/perf
COPY compute_rsyslog.conf /etc/compute_rsyslog.conf
RUN chmod 0666 /etc/compute_rsyslog.conf

View File

@@ -39,14 +39,6 @@ commands:
user: nobody
sysvInitAction: respawn
shell: '/bin/sql_exporter -config.file=/etc/sql_exporter_autoscaling.yml -web.listen-address=:9499'
- name: enable-kernel-modules
user: root
sysvInitAction: sysinit
shell: mkdir -p /lib/ && ln -s /neonvm/tools/lib/modules /lib/
- name: enable-bpfs
user: root
sysvInitAction: sysinit
shell: mkdir -p /sys/kernel/debug && mount -t debugfs debugfs /sys/kernel/debug && mount -t bpf bpf /sys/fs/bpf && chmod 755 /sys/fs/bpf
# Rsyslog by default creates a unix socket under /dev/log . That's where Postgres sends logs also.
# We run syslog with postgres user so it can't create /dev/log. Instead we configure rsyslog to
# use a different path for the socket. The symlink actually points to our custom path.
@@ -73,7 +65,7 @@ files:
# regardless of hostname (ALL)
#
# Also allow it to shut down the VM. The fast_import job does that when it's finished.
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff, /usr/sbin/rsyslogd, /neonvm/tools/bin/perf, /usr/sbin/profile-bpfcc
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff, /usr/sbin/rsyslogd
- filename: cgconfig.conf
content: |
# Configuration for cgroups in VM compute nodes
@@ -156,8 +148,6 @@ merge: |
RUN set -e \
&& chmod 0644 /etc/cgconfig.conf
ENV PERF_BINARY_PATH=/neonvm/tools/bin/perf
COPY compute_rsyslog.conf /etc/compute_rsyslog.conf
RUN chmod 0666 /etc/compute_rsyslog.conf
RUN mkdir /var/log/rsyslog && chown -R postgres /var/log/rsyslog

View File

@@ -31,7 +31,6 @@ hostname-validator = "1.1"
indexmap.workspace = true
itertools.workspace = true
jsonwebtoken.workspace = true
libproc.workspace = true
metrics.workspace = true
nix.workspace = true
notify.workspace = true
@@ -50,7 +49,6 @@ serde_with.workspace = true
serde_json.workspace = true
signal-hook.workspace = true
tar.workspace = true
tempfile.workspace = true
tower.workspace = true
tower-http.workspace = true
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
@@ -80,10 +78,3 @@ zstd = "0.13"
bytes = "1.0"
rust-ini = "0.20.0"
rlimit = "0.10.1"
inferno = { version = "0.12", default-features = false, features = [
"multithreaded",
"nameattr",
] }
pprof = { version = "0.15", features = ["protobuf-codec", "flamegraph"] }
prost = "0.12"

View File

@@ -371,9 +371,7 @@ fn maybe_cgexec(cmd: &str) -> Command {
}
}
/// A handle to the Postgres process that is running in the compute
/// node.
pub struct PostgresHandle {
struct PostgresHandle {
postgres: std::process::Child,
log_collector: JoinHandle<Result<()>>,
}
@@ -1042,6 +1040,8 @@ impl ComputeNode {
PageserverProtocol::Grpc => self.try_get_basebackup_grpc(spec, lsn)?,
};
self.fix_zenith_signal_neon_signal()?;
let mut state = self.state.lock().unwrap();
state.metrics.pageserver_connect_micros =
connected.duration_since(started).as_micros() as u64;
@@ -1051,6 +1051,27 @@ impl ComputeNode {
Ok(())
}
/// Move the Zenith signal file to Neon signal file location.
/// This makes Compute compatible with older PageServers that don't yet
/// know about the Zenith->Neon rename.
fn fix_zenith_signal_neon_signal(&self) -> Result<()> {
let datadir = Path::new(&self.params.pgdata);
let neonsig = datadir.join("neon.signal");
if neonsig.is_file() {
return Ok(());
}
let zenithsig = datadir.join("zenith.signal");
if zenithsig.is_file() {
fs::copy(zenithsig, neonsig)?;
}
Ok(())
}
/// Fetches a basebackup via gRPC. The connstring must use grpc://. Returns the timestamp when
/// the connection was established, and the (compressed) size of the basebackup.
fn try_get_basebackup_grpc(&self, spec: &ParsedSpec, lsn: Lsn) -> Result<(Instant, usize)> {

View File

@@ -15,7 +15,6 @@ pub(in crate::http) mod lfc;
pub(in crate::http) mod metrics;
pub(in crate::http) mod metrics_json;
pub(in crate::http) mod promote;
pub(in crate::http) mod profile;
pub(in crate::http) mod status;
pub(in crate::http) mod terminate;

View File

@@ -1,217 +0,0 @@
//! Contains the route for profiling the compute.
//!
//! Profiling the compute means generating a pprof profile of the
//! postgres processes.
//!
//! The profiling is done using the `perf` tool, which is expected to be
//! available somewhere in `$PATH`.
use std::sync::atomic::Ordering;
use axum::Json;
use axum::response::IntoResponse;
use http::StatusCode;
use nix::unistd::Pid;
use once_cell::sync::Lazy;
use tokio::sync::Mutex;
use crate::http::JsonResponse;
static CANCEL_CHANNEL: Lazy<Mutex<Option<tokio::sync::broadcast::Sender<()>>>> =
Lazy::new(|| Mutex::new(None));
fn default_sampling_frequency() -> u16 {
100
}
fn default_timeout_seconds() -> u8 {
5
}
fn deserialize_sampling_frequency<'de, D>(deserializer: D) -> Result<u16, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::Deserialize;
const MIN_SAMPLING_FREQUENCY: u16 = 1;
const MAX_SAMPLING_FREQUENCY: u16 = 1000;
let value = u16::deserialize(deserializer)?;
if !(MIN_SAMPLING_FREQUENCY..=MAX_SAMPLING_FREQUENCY).contains(&value) {
return Err(serde::de::Error::custom(format!(
"sampling_frequency must be between {MIN_SAMPLING_FREQUENCY} and {MAX_SAMPLING_FREQUENCY}, got {value}"
)));
}
Ok(value)
}
fn deserialize_profiling_timeout<'de, D>(deserializer: D) -> Result<u8, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::Deserialize;
const MIN_TIMEOUT_SECONDS: u8 = 1;
const MAX_TIMEOUT_SECONDS: u8 = 60;
let value = u8::deserialize(deserializer)?;
if !(MIN_TIMEOUT_SECONDS..=MAX_TIMEOUT_SECONDS).contains(&value) {
return Err(serde::de::Error::custom(format!(
"timeout_seconds must be between {MIN_TIMEOUT_SECONDS} and {MAX_TIMEOUT_SECONDS}, got {value}"
)));
}
Ok(value)
}
/// Request parameters for profiling the compute.
#[derive(Debug, Clone, serde::Deserialize)]
pub(in crate::http) struct ProfileRequest {
/// The profiling tool to use, currently only `perf` is supported.
profiler: crate::profiling::ProfileGenerator,
#[serde(default = "default_sampling_frequency")]
#[serde(deserialize_with = "deserialize_sampling_frequency")]
sampling_frequency: u16,
#[serde(default = "default_timeout_seconds")]
#[serde(deserialize_with = "deserialize_profiling_timeout")]
timeout_seconds: u8,
#[serde(default)]
archive: bool,
}
/// The HTTP request handler for reporting the profiling status of
/// the compute.
pub(in crate::http) async fn profile_status() -> impl IntoResponse {
tracing::info!("Profile status request received.");
let cancel_channel = CANCEL_CHANNEL.lock().await;
if let Some(tx) = cancel_channel.as_ref() {
if tx.receiver_count() > 0 {
return JsonResponse::create_response(
StatusCode::OK,
"Profiling is currently in progress.",
);
}
}
JsonResponse::create_response(StatusCode::NO_CONTENT, "Profiling is not in progress.")
}
/// The HTTP request handler for stopping profiling the compute.
pub(in crate::http) async fn profile_stop() -> impl IntoResponse {
tracing::info!("Profile stop request received.");
match CANCEL_CHANNEL.lock().await.take() {
Some(tx) => {
if tx.send(()).is_err() {
tracing::error!("Failed to send cancellation signal.");
return JsonResponse::create_response(
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to send cancellation signal",
);
}
JsonResponse::create_response(StatusCode::OK, "Profiling stopped successfully.")
}
None => JsonResponse::create_response(
StatusCode::PRECONDITION_FAILED,
"Profiling is not in progress, there is nothing to stop.",
),
}
}
/// The HTTP request handler for starting profiling the compute.
pub(in crate::http) async fn profile_start(
Json(request): Json<ProfileRequest>,
) -> impl IntoResponse {
tracing::info!("Profile start request received: {request:?}");
let tx = tokio::sync::broadcast::Sender::<()>::new(1);
{
let mut cancel_channel = CANCEL_CHANNEL.lock().await;
if cancel_channel.is_some() {
return JsonResponse::create_response(
StatusCode::CONFLICT,
"Profiling is already in progress.",
);
}
*cancel_channel = Some(tx.clone());
}
tracing::info!("Profiling will start with parameters: {request:?}");
let pg_pid = Pid::from_raw(crate::compute::PG_PID.load(Ordering::SeqCst) as _);
let run_with_sudo = !cfg!(feature = "testing");
let options = crate::profiling::ProfileGenerationOptions {
profiler: request.profiler,
run_with_sudo,
pids: [pg_pid].into_iter().collect(),
follow_forks: true,
sampling_frequency: request.sampling_frequency as u32,
blocklist_symbols: vec![
"libc".to_owned(),
"libgcc".to_owned(),
"pthread".to_owned(),
"vdso".to_owned(),
],
archive: request.archive,
};
let options = crate::profiling::ProfileGenerationTaskOptions {
options,
timeout: std::time::Duration::from_secs(request.timeout_seconds as u64),
should_stop: Some(tx),
};
let pprof_data = crate::profiling::generate_pprof_profile(options).await;
if CANCEL_CHANNEL.lock().await.take().is_none() {
tracing::error!("Profiling was cancelled from another request.");
return JsonResponse::create_response(
StatusCode::NO_CONTENT,
"Profiling was cancelled from another request.",
);
}
let pprof_data = match pprof_data {
Ok(data) => data,
Err(e) => {
tracing::error!(error = ?e, "failed to generate pprof data");
return JsonResponse::create_response(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to generate pprof data: {e:?}"),
);
}
};
tracing::info!("Profiling has completed successfully.");
let mut headers = http::HeaderMap::new();
if request.archive {
headers.insert(
http::header::CONTENT_TYPE,
http::HeaderValue::from_static("application/gzip"),
);
headers.insert(
http::header::CONTENT_DISPOSITION,
http::HeaderValue::from_static("attachment; filename=\"profile.pb.gz\""),
);
} else {
headers.insert(
http::header::CONTENT_TYPE,
http::HeaderValue::from_static("application/octet-stream"),
);
headers.insert(
http::header::CONTENT_DISPOSITION,
http::HeaderValue::from_static("attachment; filename=\"profile.pb\""),
);
}
(headers, pprof_data.0).into_response()
}

View File

@@ -27,7 +27,6 @@ use super::{
},
};
use crate::compute::ComputeNode;
use crate::http::routes::profile;
/// `compute_ctl` has two servers: internal and external. The internal server
/// binds to the loopback interface and handles communication from clients on
@@ -82,14 +81,8 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
Server::External {
config, compute_id, ..
} => {
let unauthenticated_router = Router::<Arc<ComputeNode>>::new()
.route("/metrics", get(metrics::get_metrics))
.route(
"/profile/cpu",
get(profile::profile_status)
.post(profile::profile_start)
.delete(profile::profile_stop),
);
let unauthenticated_router =
Router::<Arc<ComputeNode>>::new().route("/metrics", get(metrics::get_metrics));
let authenticated_router = Router::<Arc<ComputeNode>>::new()
.route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm))

View File

@@ -24,7 +24,6 @@ pub mod monitor;
pub mod params;
pub mod pg_helpers;
pub mod pgbouncer;
pub mod profiling;
pub mod rsyslog;
pub mod spec;
mod spec_apply;

File diff suppressed because it is too large Load Diff

View File

@@ -36,7 +36,7 @@ impl StorageBroker {
pub async fn start(&self, retry_timeout: &Duration) -> anyhow::Result<()> {
let broker = &self.env.broker;
print!("Starting neon broker at {}", broker.client_url());
println!("Starting neon broker at {}", broker.client_url());
let mut args = Vec::new();

View File

@@ -32,7 +32,8 @@
//! config.json - passed to `compute_ctl`
//! pgdata/
//! postgresql.conf - copy of postgresql.conf created by `compute_ctl`
//! zenith.signal
//! neon.signal
//! zenith.signal - copy of neon.signal, for backward compatibility
//! <other PostgreSQL files>
//! ```
//!

View File

@@ -217,6 +217,9 @@ pub struct NeonStorageControllerConf {
pub posthog_config: Option<PostHogConfig>,
pub kick_secondary_downloads: Option<bool>,
#[serde(with = "humantime_serde")]
pub shard_split_request_timeout: Option<Duration>,
}
impl NeonStorageControllerConf {
@@ -250,6 +253,7 @@ impl Default for NeonStorageControllerConf {
timeline_safekeeper_count: None,
posthog_config: None,
kick_secondary_downloads: None,
shard_split_request_timeout: None,
}
}
}

View File

@@ -303,7 +303,7 @@ impl PageServerNode {
async fn start_node(&self, retry_timeout: &Duration) -> anyhow::Result<()> {
// TODO: using a thread here because start_process() is not async but we need to call check_status()
let datadir = self.repo_path();
print!(
println!(
"Starting pageserver node {} at '{}' in {:?}, retrying for {:?}",
self.conf.id,
self.pg_connection_config.raw_address(),

View File

@@ -127,7 +127,7 @@ impl SafekeeperNode {
extra_opts: &[String],
retry_timeout: &Duration,
) -> anyhow::Result<()> {
print!(
println!(
"Starting safekeeper at '{}' in '{}', retrying for {:?}",
self.pg_connection_config.raw_address(),
self.datadir_path().display(),

View File

@@ -648,6 +648,13 @@ impl StorageController {
args.push(format!("--timeline-safekeeper-count={sk_cnt}"));
}
if let Some(duration) = self.config.shard_split_request_timeout {
args.push(format!(
"--shard-split-request-timeout={}",
humantime::Duration::from(duration)
));
}
let mut envs = vec![
("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
@@ -660,7 +667,7 @@ impl StorageController {
));
}
println!("Starting storage controller");
println!("Starting storage controller at {scheme}://{host}:{listen_port}");
background_process::start_process(
COMMAND,

View File

@@ -14,6 +14,7 @@ humantime.workspace = true
pageserver_api.workspace = true
pageserver_client.workspace = true
reqwest.workspace = true
safekeeper_api.workspace=true
serde_json = { workspace = true, features = ["raw_value"] }
storage_controller_client.workspace = true
tokio.workspace = true

View File

@@ -11,7 +11,7 @@ use pageserver_api::controller_api::{
PlacementPolicy, SafekeeperDescribeResponse, SafekeeperSchedulingPolicyRequest,
ShardSchedulingPolicy, ShardsPreferredAzsRequest, ShardsPreferredAzsResponse,
SkSchedulingPolicy, TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
TenantShardMigrateRequest, TenantShardMigrateResponse,
TenantShardMigrateRequest, TenantShardMigrateResponse, TimelineSafekeeperMigrateRequest,
};
use pageserver_api::models::{
EvictionPolicy, EvictionPolicyLayerAccessThreshold, ShardParameters, TenantConfig,
@@ -21,6 +21,7 @@ use pageserver_api::models::{
use pageserver_api::shard::{ShardStripeSize, TenantShardId};
use pageserver_client::mgmt_api::{self};
use reqwest::{Certificate, Method, StatusCode, Url};
use safekeeper_api::models::TimelineLocateResponse;
use storage_controller_client::control_api::Client;
use utils::id::{NodeId, TenantId, TimelineId};
@@ -279,6 +280,23 @@ enum Command {
#[arg(long)]
concurrency: Option<usize>,
},
/// Locate safekeepers for a timeline from the storcon DB.
TimelineLocate {
#[arg(long)]
tenant_id: TenantId,
#[arg(long)]
timeline_id: TimelineId,
},
/// Migrate a timeline to a new set of safekeepers
TimelineSafekeeperMigrate {
#[arg(long)]
tenant_id: TenantId,
#[arg(long)]
timeline_id: TimelineId,
/// Example: --new-sk-set 1,2,3
#[arg(long, required = true, value_delimiter = ',')]
new_sk_set: Vec<NodeId>,
},
}
#[derive(Parser)]
@@ -1324,7 +1342,7 @@ async fn main() -> anyhow::Result<()> {
concurrency,
} => {
let mut path = format!(
"/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers",
"v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers",
);
if let Some(c) = concurrency {
@@ -1335,6 +1353,41 @@ async fn main() -> anyhow::Result<()> {
.dispatch::<(), ()>(Method::POST, path, None)
.await?;
}
Command::TimelineLocate {
tenant_id,
timeline_id,
} => {
let path = format!("debug/v1/tenant/{tenant_id}/timeline/{timeline_id}/locate");
let resp = storcon_client
.dispatch::<(), TimelineLocateResponse>(Method::GET, path, None)
.await?;
let sk_set = resp.sk_set.iter().map(|id| id.0 as i64).collect::<Vec<_>>();
let new_sk_set = resp
.new_sk_set
.as_ref()
.map(|ids| ids.iter().map(|id| id.0 as i64).collect::<Vec<_>>());
println!("generation = {}", resp.generation);
println!("sk_set = {sk_set:?}");
println!("new_sk_set = {new_sk_set:?}");
}
Command::TimelineSafekeeperMigrate {
tenant_id,
timeline_id,
new_sk_set,
} => {
let path = format!("v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate");
storcon_client
.dispatch::<_, ()>(
Method::POST,
path,
Some(TimelineSafekeeperMigrateRequest { new_sk_set }),
)
.await?;
}
}
Ok(())

View File

@@ -1,58 +0,0 @@
# Continuous Crofiling (Compute)
The continuous profiling of the compute node is performed by `perf` or `bcc-tools`, the latter is preferred.
The executables profiled are all the postgres-related ones only, excluding the actual compute code (Rust). This can be done as well but
was not the main goal.
## Tools
The aforementioned tools are available within the same Docker image as
the compute node itself, but the corresponding dependencies linux the
linux kernel headers and the linux kernel itself are not and can't be
for obvious reasons. To solve that, as we run the compute nodes as a
virtual machine (qemu), we need to deliver these dependencies to it.
This is done by the `autoscaling` part, which builds and deploys the
kernel headers, needed modules, and the `perf` binary into an ext4-fs
disk image, which is later attached to the VM and is symlinked to be
made available for the compute node.
## Output
The output of the profiling is always a binary file in the same format
of `pprof`. It can, however, be archived by `gzip` additionally, if the
corresponding argument is provided in the JSON request.
## REST API
### Test profiling
One can test the profiling after connecting to the VM and running:
```sh
curl -X POST -H "Content-Type: application/json" http://localhost:3080/profile/cpu -d '{"profiler": {"BccProfile": null}, "sampling_frequency": 99, "timeout_seconds": 5, "archive": false}' -v --output profile.pb
```
This uses the `Bcc` profiler and does not archive the output. The
profiling data will be saved into the `profile.pb` file locally.
**Only one profiling session can be run at a time.**
To check the profiling status (to see whether it is already running or
not), one can perform the `GET` request:
```sh
curl http://localhost:3080/profile/cpu -v
```
The profiling can be stopped by performing the `DELETE` request:
```sh
curl -X DELETE http://localhost:3080/profile/cpu -v
```
## Supported profiling data
For now, only the CPU profiling is done and ther is no heap profiling.
Also, only the postgres-related executables are tracked, the compute
(Rust) part itself **is not tracked**.

View File

@@ -129,9 +129,10 @@ segment to bootstrap the WAL writing, but it doesn't contain the checkpoint reco
changes in xlog.c, to allow starting the compute node without reading the last checkpoint record
from WAL.
This includes code to read the `zenith.signal` file, which tells the startup code the LSN to start
at. When the `zenith.signal` file is present, the startup uses that LSN instead of the last
checkpoint's LSN. The system is known to be consistent at that LSN, without any WAL redo.
This includes code to read the `neon.signal` (also `zenith.signal`) file, which tells the startup
code the LSN to start at. When the `neon.signal` file is present, the startup uses that LSN
instead of the last checkpoint's LSN. The system is known to be consistent at that LSN, without
any WAL redo.
### How to get rid of the patch

View File

@@ -6,8 +6,8 @@ license.workspace = true
[dependencies]
thiserror.workspace = true
nix.workspace = true
nix.workspace=true
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
[target.'cfg(target_os = "macos")'.dependencies]
tempfile = "3.20.0"
tempfile = "3.14.0"

View File

@@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize};
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
use crate::models::{PageserverUtilization, ShardParameters, TenantConfig};
use crate::models::{PageserverUtilization, ShardParameters, TenantConfig, TimelineInfo};
use crate::shard::{ShardStripeSize, TenantShardId};
#[derive(Serialize, Deserialize, Debug)]
@@ -126,6 +126,13 @@ pub struct TenantDescribeResponse {
pub config: TenantConfig,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct TenantTimelineDescribeResponse {
pub shards: Vec<TimelineInfo>,
#[serde(skip_serializing_if = "Option::is_none")]
pub image_consistent_lsn: Option<Lsn>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct NodeShardResponse {
pub node_id: NodeId,

View File

@@ -1622,6 +1622,9 @@ pub struct TimelineInfo {
/// Whether the timeline is invisible in synthetic size calculations.
pub is_invisible: Option<bool>,
// HADRON: the largest LSN below which all page updates have been included in the image layers.
#[serde(skip_serializing_if = "Option::is_none")]
pub image_consistent_lsn: Option<Lsn>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]

View File

@@ -31,6 +31,7 @@ pub struct UnreliableWrapper {
/* BEGIN_HADRON */
// This the probability of failure for each operation, ranged from [0, 100].
// The probability is default to 100, which means that all operations will fail.
// Storage will fail by probability up to attempts_to_fail times.
attempt_failure_probability: u64,
/* END_HADRON */
}

View File

@@ -11,7 +11,7 @@ use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
use utils::pageserver_feedback::PageserverFeedback;
use crate::membership::Configuration;
use crate::membership::{Configuration, SafekeeperGeneration};
use crate::{ServerInfo, Term};
#[derive(Debug, Serialize, Deserialize)]
@@ -311,3 +311,12 @@ pub struct PullTimelineResponse {
pub safekeeper_host: Option<String>,
// TODO: add more fields?
}
/// Response to a timeline locate request.
/// Storcon-only API.
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct TimelineLocateResponse {
pub generation: SafekeeperGeneration,
pub sk_set: Vec<NodeId>,
pub new_sk_set: Option<Vec<NodeId>>,
}

View File

@@ -47,6 +47,7 @@ where
/* BEGIN_HADRON */
pub enum DeploymentMode {
Local,
Dev,
Staging,
Prod,
@@ -64,7 +65,7 @@ pub fn get_deployment_mode() -> Option<DeploymentMode> {
}
},
Err(_) => {
tracing::error!("DEPLOYMENT_MODE not set");
// tracing::error!("DEPLOYMENT_MODE not set");
None
}
}

View File

@@ -114,7 +114,7 @@ where
// Compute postgres doesn't have any previous WAL files, but the first
// record that it's going to write needs to include the LSN of the
// previous record (xl_prev). We include prev_record_lsn in the
// "zenith.signal" file, so that postgres can read it during startup.
// "neon.signal" file, so that postgres can read it during startup.
//
// We don't keep full history of record boundaries in the page server,
// however, only the predecessor of the latest record on each
@@ -751,34 +751,39 @@ where
//
// Add generated pg_control file and bootstrap WAL segment.
// Also send zenith.signal file with extra bootstrap data.
// Also send neon.signal and zenith.signal file with extra bootstrap data.
//
async fn add_pgcontrol_file(
&mut self,
pg_control_bytes: Bytes,
system_identifier: u64,
) -> Result<(), BasebackupError> {
// add zenith.signal file
let mut zenith_signal = String::new();
// add neon.signal file
let mut neon_signal = String::new();
if self.prev_record_lsn == Lsn(0) {
if self.timeline.is_ancestor_lsn(self.lsn) {
write!(zenith_signal, "PREV LSN: none")
write!(neon_signal, "PREV LSN: none")
.map_err(|e| BasebackupError::Server(e.into()))?;
} else {
write!(zenith_signal, "PREV LSN: invalid")
write!(neon_signal, "PREV LSN: invalid")
.map_err(|e| BasebackupError::Server(e.into()))?;
}
} else {
write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)
write!(neon_signal, "PREV LSN: {}", self.prev_record_lsn)
.map_err(|e| BasebackupError::Server(e.into()))?;
}
self.ar
.append(
&new_tar_header("zenith.signal", zenith_signal.len() as u64)?,
zenith_signal.as_bytes(),
)
.await
.map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,zenith.signal"))?;
// TODO: Remove zenith.signal once all historical computes have been replaced
// ... and thus support the neon.signal file.
for signalfilename in ["neon.signal", "zenith.signal"] {
self.ar
.append(
&new_tar_header(signalfilename, neon_signal.len() as u64)?,
neon_signal.as_bytes(),
)
.await
.map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,neon.signal"))?;
}
//send pg_control
let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?;

View File

@@ -917,11 +917,6 @@ async fn create_remote_storage_client(
// If `test_remote_failures` is non-zero, wrap the client with a
// wrapper that simulates failures.
if conf.test_remote_failures > 0 {
if !cfg!(feature = "testing") {
anyhow::bail!(
"test_remote_failures option is not available because pageserver was compiled without the 'testing' feature"
);
}
info!(
"Simulating remote failures for first {} attempts of each op",
conf.test_remote_failures

View File

@@ -397,6 +397,7 @@ async fn build_timeline_info(
timeline: &Arc<Timeline>,
include_non_incremental_logical_size: bool,
force_await_initial_logical_size: bool,
include_image_consistent_lsn: bool,
ctx: &RequestContext,
) -> anyhow::Result<TimelineInfo> {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
@@ -421,6 +422,10 @@ async fn build_timeline_info(
.await?,
);
}
// HADRON
if include_image_consistent_lsn {
info.image_consistent_lsn = Some(timeline.compute_image_consistent_lsn().await?);
}
Ok(info)
}
@@ -510,6 +515,8 @@ async fn build_timeline_info_common(
is_invisible: Some(is_invisible),
walreceiver_status,
// HADRON
image_consistent_lsn: None,
};
Ok(info)
}
@@ -712,6 +719,8 @@ async fn timeline_list_handler(
parse_query_param(&request, "include-non-incremental-logical-size")?;
let force_await_initial_logical_size: Option<bool> =
parse_query_param(&request, "force-await-initial-logical-size")?;
let include_image_consistent_lsn: Option<bool> =
parse_query_param(&request, "include-image-consistent-lsn")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
@@ -732,6 +741,7 @@ async fn timeline_list_handler(
&timeline,
include_non_incremental_logical_size.unwrap_or(false),
force_await_initial_logical_size.unwrap_or(false),
include_image_consistent_lsn.unwrap_or(false),
&ctx,
)
.instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
@@ -760,6 +770,9 @@ async fn timeline_and_offloaded_list_handler(
parse_query_param(&request, "include-non-incremental-logical-size")?;
let force_await_initial_logical_size: Option<bool> =
parse_query_param(&request, "force-await-initial-logical-size")?;
let include_image_consistent_lsn: Option<bool> =
parse_query_param(&request, "include-image-consistent-lsn")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
@@ -780,6 +793,7 @@ async fn timeline_and_offloaded_list_handler(
&timeline,
include_non_incremental_logical_size.unwrap_or(false),
force_await_initial_logical_size.unwrap_or(false),
include_image_consistent_lsn.unwrap_or(false),
&ctx,
)
.instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
@@ -964,6 +978,9 @@ async fn timeline_detail_handler(
parse_query_param(&request, "include-non-incremental-logical-size")?;
let force_await_initial_logical_size: Option<bool> =
parse_query_param(&request, "force-await-initial-logical-size")?;
// HADRON
let include_image_consistent_lsn: Option<bool> =
parse_query_param(&request, "include-image-consistent-lsn")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
// Logical size calculation needs downloading.
@@ -984,6 +1001,7 @@ async fn timeline_detail_handler(
&timeline,
include_non_incremental_logical_size.unwrap_or(false),
force_await_initial_logical_size.unwrap_or(false),
include_image_consistent_lsn.unwrap_or(false),
ctx,
)
.await
@@ -3643,6 +3661,7 @@ async fn activate_post_import_handler(
let timeline_info = build_timeline_info(
&timeline, false, // include_non_incremental_logical_size,
false, // force_await_initial_logical_size
false, // include_image_consistent_lsn
&ctx,
)
.await
@@ -4164,7 +4183,7 @@ pub fn make_router(
})
.get(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/getpage",
|r| testing_api_handler("getpage@lsn", r, getpage_at_lsn_handler),
|r| testing_api_handler("getpage@lsn", r, getpage_at_lsn_handler),
)
.get(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/touchpage",

View File

@@ -610,13 +610,13 @@ async fn import_file(
debug!("imported twophase file");
} else if file_path.starts_with("pg_wal") {
debug!("found wal file in base section. ignore it");
} else if file_path.starts_with("zenith.signal") {
} else if file_path.starts_with("zenith.signal") || file_path.starts_with("neon.signal") {
// Parse zenith signal file to set correct previous LSN
let bytes = read_all_bytes(reader).await?;
// zenith.signal format is "PREV LSN: prev_lsn"
// neon.signal format is "PREV LSN: prev_lsn"
// TODO write serialization and deserialization in the same place.
let zenith_signal = std::str::from_utf8(&bytes)?.trim();
let prev_lsn = match zenith_signal {
let neon_signal = std::str::from_utf8(&bytes)?.trim();
let prev_lsn = match neon_signal {
"PREV LSN: none" => Lsn(0),
"PREV LSN: invalid" => Lsn(0),
other => {
@@ -624,17 +624,17 @@ async fn import_file(
split[1]
.trim()
.parse::<Lsn>()
.context("can't parse zenith.signal")?
.context("can't parse neon.signal")?
}
};
// zenith.signal is not necessarily the last file, that we handle
// neon.signal is not necessarily the last file, that we handle
// but it is ok to call `finish_write()`, because final `modification.commit()`
// will update lsn once more to the final one.
let writer = modification.tline.writer().await;
writer.finish_write(prev_lsn);
debug!("imported zenith signal {}", prev_lsn);
debug!("imported neon signal {}", prev_lsn);
} else if file_path.starts_with("pg_tblspc") {
// TODO Backups exported from neon won't have pg_tblspc, but we will need
// this to import arbitrary postgres databases.

View File

@@ -2167,7 +2167,7 @@ impl PageServerHandler {
fn effective_request_lsn(
timeline: &Timeline,
last_record_lsn: Lsn,
request_lsn: Lsn,
mut request_lsn: Lsn,
not_modified_since: Lsn,
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
) -> Result<Lsn, PageStreamError> {
@@ -2195,12 +2195,16 @@ impl PageServerHandler {
if request_lsn < **latest_gc_cutoff_lsn && !timeline.is_gc_blocked_by_lsn_lease_deadline() {
let gc_info = &timeline.gc_info.read().unwrap();
if !gc_info.lsn_covered_by_lease(request_lsn) {
return Err(
PageStreamError::BadRequest(format!(
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
request_lsn, **latest_gc_cutoff_lsn
).into())
// While request was in flight, replica apply_lsn may be advanced.
// latest_gc_cutoff_lsn is conservative estimation for min(redo_lsn) for all replicas,
// so it is safe to move request_lsn forward to latest_gc_cutoff_lsn.
// If replica lease is expired and latest_gc_cutoff_lsn>redo_lsn for this replica,
// then check of page LSN at replia protects it from getting too new version of the page.
warn!(
"Tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
request_lsn, **latest_gc_cutoff_lsn
);
request_lsn = **latest_gc_cutoff_lsn;
}
}

View File

@@ -12816,6 +12816,40 @@ mod tests {
},
]
);
Ok(())
}
#[tokio::test]
async fn test_get_force_image_creation_lsn() -> anyhow::Result<()> {
let tenant_conf = pageserver_api::models::TenantConfig {
pitr_interval: Some(Duration::from_secs(7 * 3600)),
image_layer_force_creation_period: Some(Duration::from_secs(3600)),
..Default::default()
};
let tenant_id = TenantId::generate();
let harness = TenantHarness::create_custom(
"test_get_force_image_creation_lsn",
tenant_conf,
tenant_id,
ShardIdentity::unsharded(),
Generation::new(1),
)
.await?;
let (tenant, ctx) = harness.load().await;
let timeline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
timeline.gc_info.write().unwrap().cutoffs.time = Some(Lsn(100));
{
let writer = timeline.writer().await;
writer.finish_write(Lsn(5000));
}
let image_creation_lsn = timeline.get_force_image_creation_lsn().unwrap();
assert_eq!(image_creation_lsn, Lsn(4300));
Ok(())
}
}

View File

@@ -46,10 +46,11 @@
mod historic_layer_coverage;
mod layer_coverage;
use std::collections::{HashMap, VecDeque};
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::iter::Peekable;
use std::ops::Range;
use std::sync::Arc;
use std::time::Instant;
use anyhow::Result;
use historic_layer_coverage::BufferedHistoricLayerCoverage;
@@ -904,6 +905,103 @@ impl LayerMap {
max_stacked_deltas
}
/* BEGIN_HADRON */
/**
* Compute the image consistent LSN, the largest LSN below which all pages have been redone successfully.
* It works by first finding the latest image layers and store them into a map. Then for each delta layer,
* find all overlapping image layers in order to potentially increase the image LSN in case there are gaps
* (e.g., if an image is created at LSN 100 but the delta layer spans LSN [150, 200], then we can increase
* image LSN to 150 because there is no WAL record in between).
* Finally, the image consistent LSN is computed by taking the minimum of all image layers.
*/
pub fn compute_image_consistent_lsn(&self, disk_consistent_lsn: Lsn) -> Lsn {
struct ImageLayerInfo {
// creation LSN of the image layer
image_lsn: Lsn,
// the current minimum LSN of newer delta layers with overlapping key ranges
min_delta_lsn: Lsn,
}
let started_at = Instant::now();
let min_l0_deltas_lsn = {
let l0_deltas = self.level0_deltas();
l0_deltas
.iter()
.map(|layer| layer.get_lsn_range().start)
.min()
.unwrap_or(disk_consistent_lsn)
};
let global_key_range = Key::MIN..Key::MAX;
// step 1: collect all most recent image layers into a map
// map: end key to image_layer_info
let mut image_map: BTreeMap<Key, ImageLayerInfo> = BTreeMap::new();
for (img_range, img) in self.image_coverage(&global_key_range, disk_consistent_lsn) {
let img_lsn = img.map(|layer| layer.get_lsn_range().end).unwrap_or(Lsn(0));
image_map.insert(
img_range.end,
ImageLayerInfo {
image_lsn: img_lsn,
min_delta_lsn: min_l0_deltas_lsn,
},
);
}
// step 2: go through all delta layers, and update the image layer info with overlapping
// key ranges
for layer in self.historic.iter() {
if !layer.is_delta {
continue;
}
let delta_key_range = layer.get_key_range();
let delta_lsn_range = layer.get_lsn_range();
for (img_end_key, img_info) in image_map.range_mut(delta_key_range.start..Key::MAX) {
debug_assert!(img_end_key >= &delta_key_range.start);
if delta_lsn_range.end > img_info.image_lsn {
// the delta layer includes WAL records after the image
// it's possibel that the delta layer's start LSN < image LSN, which will be simply ignored by step 3
img_info.min_delta_lsn =
std::cmp::min(img_info.min_delta_lsn, delta_lsn_range.start);
}
if img_end_key >= &delta_key_range.end {
// we have fully processed all overlapping image layers
break;
}
}
}
// step 3, go through all image layers and find the image consistent LSN
let mut img_consistent_lsn = min_l0_deltas_lsn.checked_sub(Lsn(1)).unwrap();
let mut prev_key = Key::MIN;
for (img_key, img_info) in image_map {
tracing::debug!(
"Image layer {:?}:{} has min delta lsn {}",
Range {
start: prev_key,
end: img_key,
},
img_info.image_lsn,
img_info.min_delta_lsn,
);
let image_lsn = std::cmp::max(
img_info.image_lsn,
img_info.min_delta_lsn.checked_sub(Lsn(1)).unwrap_or(Lsn(0)),
);
img_consistent_lsn = std::cmp::min(img_consistent_lsn, image_lsn);
prev_key = img_key;
}
tracing::info!(
"computed image_consistent_lsn {} for disk_consistent_lsn {} in {}ms. Processed {} layrs in total.",
img_consistent_lsn,
disk_consistent_lsn,
started_at.elapsed().as_millis(),
self.historic.len()
);
img_consistent_lsn
}
/* END_HADRON */
/// Return all L0 delta layers
pub fn level0_deltas(&self) -> &Vec<Arc<PersistentLayerDesc>> {
&self.l0_delta_layers
@@ -1579,6 +1677,138 @@ mod tests {
LayerVisibilityHint::Visible
));
}
/* BEGIN_HADRON */
#[test]
fn test_compute_image_consistent_lsn() {
let mut layer_map = LayerMap::default();
let disk_consistent_lsn = Lsn(1000);
// case 1: empty layer map
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
assert_eq!(
disk_consistent_lsn.checked_sub(Lsn(1)).unwrap(),
image_consistent_lsn
);
// case 2: only L0 delta layer
{
let mut updates = layer_map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(0)..Key::from_i128(100),
Lsn(900)..Lsn(990),
true,
));
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(0)..Key::from_i128(100),
Lsn(850)..Lsn(899),
true,
));
}
// should use min L0 delta LSN - 1 as image consistent LSN
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
assert_eq!(Lsn(849), image_consistent_lsn);
// case 3: 3 images, no L1 delta
{
let mut updates = layer_map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(0)..Key::from_i128(40),
Lsn(100)..Lsn(100),
false,
));
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(40)..Key::from_i128(70),
Lsn(200)..Lsn(200),
false,
));
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(70)..Key::from_i128(100),
Lsn(150)..Lsn(150),
false,
));
}
// should use min L0 delta LSN - 1 as image consistent LSN
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
assert_eq!(Lsn(849), image_consistent_lsn);
// case 4: 3 images with 1 L1 delta
{
let mut updates = layer_map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(0)..Key::from_i128(50),
Lsn(300)..Lsn(350),
true,
));
}
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
assert_eq!(Lsn(299), image_consistent_lsn);
// case 5: 3 images with 1 more L1 delta with smaller LSN
{
let mut updates = layer_map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(50)..Key::from_i128(72),
Lsn(200)..Lsn(300),
true,
));
}
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
assert_eq!(Lsn(199), image_consistent_lsn);
// case 6: 3 images with more newer L1 deltas (no impact on final results)
{
let mut updates = layer_map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(0)..Key::from_i128(30),
Lsn(400)..Lsn(500),
true,
));
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(35)..Key::from_i128(100),
Lsn(450)..Lsn(600),
true,
));
}
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
assert_eq!(Lsn(199), image_consistent_lsn);
// case 7: 3 images with more older L1 deltas (no impact on final results)
{
let mut updates = layer_map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(0)..Key::from_i128(40),
Lsn(0)..Lsn(50),
true,
));
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(50)..Key::from_i128(100),
Lsn(10)..Lsn(60),
true,
));
}
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
assert_eq!(Lsn(199), image_consistent_lsn);
// case 8: 3 images with one more L1 delta with overlapping LSN range
{
let mut updates = layer_map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(0)..Key::from_i128(50),
Lsn(50)..Lsn(250),
true,
));
}
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
assert_eq!(Lsn(100), image_consistent_lsn);
}
/* END_HADRON */
}
#[cfg(test)]

View File

@@ -1678,6 +1678,8 @@ impl TenantManager {
// Phase 6: Release the InProgress on the parent shard
drop(parent_slot_guard);
utils::pausable_failpoint!("shard-split-post-finish-pause");
Ok(child_shards)
}

View File

@@ -351,13 +351,6 @@ pub struct Timeline {
last_image_layer_creation_check_at: AtomicLsn,
last_image_layer_creation_check_instant: std::sync::Mutex<Option<Instant>>,
// HADRON
/// If a key range has writes with LSN > force_image_creation_lsn, then we should force image layer creation
/// on this key range.
force_image_creation_lsn: AtomicLsn,
/// The last time instant when force_image_creation_lsn is computed.
force_image_creation_lsn_computed_at: std::sync::Mutex<Option<Instant>>,
/// Current logical size of the "datadir", at the last LSN.
current_logical_size: LogicalSize,
@@ -2854,7 +2847,7 @@ impl Timeline {
}
// HADRON
fn get_image_creation_timeout(&self) -> Option<Duration> {
fn get_image_layer_force_creation_period(&self) -> Option<Duration> {
let tenant_conf = self.tenant_conf.load();
tenant_conf
.tenant_conf
@@ -3134,9 +3127,6 @@ impl Timeline {
repartition_threshold: 0,
last_image_layer_creation_check_at: AtomicLsn::new(0),
last_image_layer_creation_check_instant: Mutex::new(None),
// HADRON
force_image_creation_lsn: AtomicLsn::new(0),
force_image_creation_lsn_computed_at: std::sync::Mutex::new(None),
last_received_wal: Mutex::new(None),
rel_size_latest_cache: RwLock::new(HashMap::new()),
rel_size_snapshot_cache: Mutex::new(LruCache::new(relsize_snapshot_cache_capacity)),
@@ -5381,13 +5371,16 @@ impl Timeline {
}
// HADRON
// for child timelines, we consider all pages up to ancestor_LSN are redone successfully by the parent timeline
min_image_lsn = min_image_lsn.max(self.get_ancestor_lsn());
if min_image_lsn < force_image_creation_lsn.unwrap_or(Lsn(0)) && max_deltas > 0 {
info!(
"forcing image creation for partitioned range {}-{}. Min image LSN: {}, force image creation LSN: {}",
"forcing image creation for partitioned range {}-{}. Min image LSN: {}, force image creation LSN: {}, num deltas: {}",
partition.ranges[0].start,
partition.ranges[0].end,
min_image_lsn,
force_image_creation_lsn.unwrap()
force_image_creation_lsn.unwrap(),
max_deltas
);
return true;
}
@@ -5611,10 +5604,11 @@ impl Timeline {
/// Predicate function which indicates whether we should check if new image layers
/// are required. Since checking if new image layers are required is expensive in
/// terms of CPU, we only do it in the following cases:
/// 1. If the timeline has ingested sufficient WAL to justify the cost
/// 1. If the timeline has ingested sufficient WAL to justify the cost or ...
/// 2. If enough time has passed since the last check:
/// 1. For large tenants, we wish to perform the check more often since they
/// suffer from the lack of image layers
/// suffer from the lack of image layers. Note that we assume sharded tenants
/// to be large since non-zero shards do not track the logical size.
/// 2. For small tenants (that can mostly fit in RAM), we use a much longer interval
fn should_check_if_image_layers_required(self: &Arc<Timeline>, lsn: Lsn) -> bool {
let large_timeline_threshold = self.conf.image_layer_generation_large_timeline_threshold;
@@ -5628,30 +5622,39 @@ impl Timeline {
let distance_based_decision = distance.0 >= min_distance;
let mut time_based_decision = false;
let mut last_check_instant = self.last_image_layer_creation_check_instant.lock().unwrap();
if let CurrentLogicalSize::Exact(logical_size) = self.current_logical_size.current_size() {
let check_required_after =
if Some(Into::<u64>::into(&logical_size)) >= large_timeline_threshold {
self.get_checkpoint_timeout()
} else {
Duration::from_secs(3600 * 48)
};
time_based_decision = match *last_check_instant {
Some(last_check) => {
let elapsed = last_check.elapsed();
elapsed >= check_required_after
let check_required_after = (|| {
if self.shard_identity.is_unsharded() {
if let CurrentLogicalSize::Exact(logical_size) =
self.current_logical_size.current_size()
{
if Some(Into::<u64>::into(&logical_size)) < large_timeline_threshold {
return Duration::from_secs(3600 * 48);
}
}
None => true,
};
}
}
self.get_checkpoint_timeout()
})();
let time_based_decision = match *last_check_instant {
Some(last_check) => {
let elapsed = last_check.elapsed();
elapsed >= check_required_after
}
None => true,
};
// Do the expensive delta layer counting only if this timeline has ingested sufficient
// WAL since the last check or a checkpoint timeout interval has elapsed since the last
// check.
let decision = distance_based_decision || time_based_decision;
tracing::info!(
"Decided to check image layers: {}. Distance-based decision: {}, time-based decision: {}",
decision,
distance_based_decision,
time_based_decision
);
if decision {
self.last_image_layer_creation_check_at.store(lsn);
*last_check_instant = Some(Instant::now());
@@ -7153,6 +7156,19 @@ impl Timeline {
.unwrap()
.clone()
}
/* BEGIN_HADRON */
pub(crate) async fn compute_image_consistent_lsn(&self) -> anyhow::Result<Lsn> {
let guard = self
.layers
.read(LayerManagerLockHolder::ComputeImageConsistentLsn)
.await;
let layer_map = guard.layer_map()?;
let disk_consistent_lsn = self.get_disk_consistent_lsn();
Ok(layer_map.compute_image_consistent_lsn(disk_consistent_lsn))
}
/* END_HADRON */
}
impl Timeline {

View File

@@ -8,7 +8,7 @@ use std::cmp::min;
use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
use std::ops::{Deref, Range};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use std::time::{Duration, Instant};
use super::layer_manager::LayerManagerLockHolder;
use super::{
@@ -34,7 +34,6 @@ use pageserver_api::models::{CompactInfoResponse, CompactKeyRange};
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
use pageserver_compaction::helpers::{fully_contains, overlaps_with};
use pageserver_compaction::interface::*;
use postgres_ffi::to_pg_timestamp;
use serde::Serialize;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio_util::sync::CancellationToken;
@@ -47,7 +46,6 @@ use wal_decoder::models::value::Value;
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
use crate::page_cache;
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::statvfs::Statvfs;
use crate::tenant::checks::check_valid_layermap;
use crate::tenant::gc_block::GcBlock;
@@ -1271,10 +1269,7 @@ impl Timeline {
// Define partitioning schema if needed
// HADRON
let force_image_creation_lsn = self
.get_or_compute_force_image_creation_lsn(cancel, ctx)
.await
.map_err(CompactionError::Other)?;
let force_image_creation_lsn = self.get_force_image_creation_lsn();
// 1. L0 Compact
let l0_outcome = {
@@ -1484,59 +1479,37 @@ impl Timeline {
}
/* BEGIN_HADRON */
// Get the force image creation LSN. Compute it if the last computed LSN is too old.
async fn get_or_compute_force_image_creation_lsn(
self: &Arc<Self>,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<Option<Lsn>> {
const FORCE_IMAGE_CREATION_LSN_COMPUTE_INTERVAL: Duration = Duration::from_secs(10 * 60); // 10 minutes
let image_layer_force_creation_period = self.get_image_creation_timeout();
if image_layer_force_creation_period.is_none() {
return Ok(None);
// Get the force image creation LSN based on gc_cutoff_lsn.
// Note that this is an estimation and the workload rate may suddenly change. When that happens,
// the force image creation may be too early or too late, but eventually it should be able to catch up.
pub(crate) fn get_force_image_creation_lsn(self: &Arc<Self>) -> Option<Lsn> {
let image_creation_period = self.get_image_layer_force_creation_period()?;
let current_lsn = self.get_last_record_lsn();
let pitr_lsn = self.gc_info.read().unwrap().cutoffs.time?;
let pitr_interval = self.get_pitr_interval();
if pitr_lsn == Lsn::INVALID || pitr_interval.is_zero() {
tracing::warn!(
"pitr LSN/interval not found, skipping force image creation LSN calculation"
);
return None;
}
let image_layer_force_creation_period = image_layer_force_creation_period.unwrap();
let force_image_creation_lsn_computed_at =
*self.force_image_creation_lsn_computed_at.lock().unwrap();
if force_image_creation_lsn_computed_at.is_none()
|| force_image_creation_lsn_computed_at.unwrap().elapsed()
> FORCE_IMAGE_CREATION_LSN_COMPUTE_INTERVAL
{
let now: SystemTime = SystemTime::now();
let timestamp = now
.checked_sub(image_layer_force_creation_period)
.ok_or_else(|| {
anyhow::anyhow!(
"image creation timeout is too large: {image_layer_force_creation_period:?}"
)
})?;
let timestamp = to_pg_timestamp(timestamp);
let force_image_creation_lsn = match self
.find_lsn_for_timestamp(timestamp, cancel, ctx)
.await?
{
LsnForTimestamp::Present(lsn) | LsnForTimestamp::Future(lsn) => lsn,
_ => {
let gc_lsn = *self.get_applied_gc_cutoff_lsn();
tracing::info!(
"no LSN found for timestamp {timestamp:?}, using latest GC cutoff LSN {}",
gc_lsn
);
gc_lsn
}
};
self.force_image_creation_lsn
.store(force_image_creation_lsn);
*self.force_image_creation_lsn_computed_at.lock().unwrap() = Some(Instant::now());
tracing::info!(
"computed force image creation LSN: {}",
force_image_creation_lsn
);
Ok(Some(force_image_creation_lsn))
} else {
Ok(Some(self.force_image_creation_lsn.load()))
}
let delta_lsn = current_lsn.checked_sub(pitr_lsn).unwrap().0
* image_creation_period.as_secs()
/ pitr_interval.as_secs();
let force_image_creation_lsn = current_lsn.checked_sub(delta_lsn).unwrap_or(Lsn(0));
tracing::info!(
"Tenant shard {} computed force_image_creation_lsn: {}. Current lsn: {}, image_layer_force_creation_period: {:?}, GC cutoff: {}, PITR interval: {:?}",
self.tenant_shard_id,
force_image_creation_lsn,
current_lsn,
image_creation_period,
pitr_lsn,
pitr_interval
);
Some(force_image_creation_lsn)
}
/* END_HADRON */

View File

@@ -359,14 +359,14 @@ impl<T: Types> Cache<T> {
Err(e) => {
// Retry on tenant manager error to handle tenant split more gracefully
if attempt < GET_MAX_RETRIES {
tracing::warn!(
"Fail to resolve tenant shard in attempt {}: {:?}. Retrying...",
attempt,
e
);
tokio::time::sleep(RETRY_BACKOFF).await;
continue;
} else {
tracing::warn!(
"Failed to resolve tenant shard after {} attempts: {:?}",
GET_MAX_RETRIES,
e
);
return Err(e);
}
}

View File

@@ -47,6 +47,7 @@ pub(crate) enum LayerManagerLockHolder {
ImportPgData,
DetachAncestor,
Eviction,
ComputeImageConsistentLsn,
#[cfg(test)]
Testing,
}

View File

@@ -147,6 +147,16 @@ pub enum RedoAttemptType {
GcCompaction,
}
impl std::fmt::Display for RedoAttemptType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RedoAttemptType::ReadPage => write!(f, "read page"),
RedoAttemptType::LegacyCompaction => write!(f, "legacy compaction"),
RedoAttemptType::GcCompaction => write!(f, "gc compaction"),
}
}
}
///
/// Public interface of WAL redo manager
///
@@ -199,6 +209,7 @@ impl PostgresRedoManager {
self.conf.wal_redo_timeout,
pg_version,
max_retry_attempts,
redo_attempt_type,
)
.await
};
@@ -221,6 +232,7 @@ impl PostgresRedoManager {
self.conf.wal_redo_timeout,
pg_version,
max_retry_attempts,
redo_attempt_type,
)
.await
}
@@ -445,6 +457,7 @@ impl PostgresRedoManager {
wal_redo_timeout: Duration,
pg_version: PgMajorVersion,
max_retry_attempts: u32,
redo_attempt_type: RedoAttemptType,
) -> Result<Bytes, Error> {
*(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
@@ -485,17 +498,28 @@ impl PostgresRedoManager {
);
if let Err(e) = result.as_ref() {
error!(
"error applying {} WAL records {}..{} ({} bytes) to key {key}, from base image with LSN {} to reconstruct page image at LSN {} n_attempts={}: {:?}",
records.len(),
records.first().map(|p| p.0).unwrap_or(Lsn(0)),
records.last().map(|p| p.0).unwrap_or(Lsn(0)),
nbytes,
base_img_lsn,
lsn,
n_attempts,
e,
);
macro_rules! message {
($level:tt) => {
$level!(
"error applying {} WAL records {}..{} ({} bytes) to key {} during {}, from base image with LSN {} to reconstruct page image at LSN {} n_attempts={}: {:?}",
records.len(),
records.first().map(|p| p.0).unwrap_or(Lsn(0)),
records.last().map(|p| p.0).unwrap_or(Lsn(0)),
nbytes,
key,
redo_attempt_type,
base_img_lsn,
lsn,
n_attempts,
e,
)
}
}
match redo_attempt_type {
RedoAttemptType::ReadPage => message!(error),
RedoAttemptType::LegacyCompaction => message!(error),
RedoAttemptType::GcCompaction => message!(warn),
}
}
result.map_err(Error::Other)

View File

@@ -1045,6 +1045,34 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
Assert(!found);
}
/*
* Check that pahge LSN returned by PS to replica is not beyand replay LSN.
* It can happen only in case of deteriorated lease.
*/
static bool
check_page_lsn(NeonGetPageResponse* resp, XLogRecPtr* replay_lsn_ptr)
{
if (RecoveryInProgress())
{
XLogRecPtr page_lsn = PageGetLSN((Page)resp->page);
#if PG_VERSION_NUM >= 150000
XLogRecPtr replay_lsn = GetCurrentReplayRecPtr(NULL);
#else
/*
* PG14 doesn't have GetCurrentReplayRecPtr() function which returns end of currently applied record.
* And GetXLogReplayRecPtr returns end of WAL records which was already applied.
* So we have to use this hack with resp->req.lsn which is expected to contain end record ptr in this case.
* But it works onlyfor v3 protocol version.
*/
XLogRecPtr replay_lsn = Max(GetXLogReplayRecPtr(NULL), resp->req.hdr.lsn);
#endif
if (replay_lsn_ptr)
*replay_lsn_ptr = replay_lsn;
return replay_lsn == 0 || page_lsn <= replay_lsn;
}
return true;
}
/*
* Lookup of already received prefetch requests. Only already received responses matching required LSNs are accepted.
* Present pages are marked in "mask" bitmap and total number of such pages is returned.
@@ -1068,7 +1096,7 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
for (int i = 0; i < nblocks; i++)
{
PrfHashEntry *entry;
NeonGetPageResponse* resp;
hashkey.buftag.blockNum = blocknum + i;
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
@@ -1101,8 +1129,16 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
continue;
}
Assert(slot->response->tag == T_NeonGetPageResponse); /* checked by check_getpage_response when response was assigned to the slot */
memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ);
resp = (NeonGetPageResponse*)slot->response;
/*
* Ignore "in-future" responses caused by deteriorated lease
*/
if (!check_page_lsn(resp, NULL))
{
continue;
}
memcpy(buffers[i], resp->page, BLCKSZ);
/*
* With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received
@@ -2227,6 +2263,15 @@ Retry:
case T_NeonGetPageResponse:
{
NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp;
XLogRecPtr replay_lsn;
if (!check_page_lsn(getpage_resp, &replay_lsn))
{
/* Alternative to throw error is to repeat the query with request_lsn=replay_lsn */
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg("There is no more version of page %u of relation %u/%u/%u.%u at LSN %X/%X at page server, request LSN %X/%X, latest version is at LSN %X/%X",
blockno, RelFileInfoFmt(rinfo), forkNum, LSN_FORMAT_ARGS(replay_lsn), LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(PageGetLSN((Page)getpage_resp->page)))));
}
memcpy(buffer, getpage_resp->page, BLCKSZ);
/*

View File

@@ -162,8 +162,34 @@ typedef struct FileCacheControl
dlist_head lru; /* double linked list for LRU replacement
* algorithm */
dlist_head holes; /* double linked list of punched holes */
HyperLogLogState wss_estimation; /* estimation of working set size */
ConditionVariable cv[N_COND_VARS]; /* turnstile of condition variables */
/*
* Estimation of working set size.
*
* This is not guarded by the lock. No locking is needed because all the
* writes to the "registers" are simple 64-bit stores, to update a
* timestamp. We assume that:
*
* - 64-bit stores are atomic. We could enforce that by using
* pg_atomic_uint64 instead of TimestampTz as the datatype in hll.h, but
* for now we just rely on it implicitly.
*
* - Even if they're not, and there is a race between two stores, it
* doesn't matter much which one wins because they're both updating the
* register with the current timestamp. Or you have a race between
* resetting the register and updating it, in which case it also doesn't
* matter much which one wins.
*
* - If they're not atomic, you might get an occasional "torn write" if
* you're really unlucky, but we tolerate that too. It just means that
* the estimate will be a little off, until the register is updated
* again.
*/
HyperLogLogState wss_estimation;
/* Prewarmer state */
PrewarmWorkerState prewarm_workers[MAX_PREWARM_WORKERS];
size_t n_prewarm_workers;
size_t n_prewarm_entries;
@@ -205,6 +231,8 @@ bool AmPrewarmWorker;
#define LFC_ENABLED() (lfc_ctl->limit != 0)
PGDLLEXPORT void lfc_prewarm_main(Datum main_arg);
/*
* Close LFC file if opened.
* All backends should close their LFC files once LFC is disabled.
@@ -1142,6 +1170,13 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
/* Update working set size estimate for the blocks */
for (int i = 0; i < nblocks; i++)
{
tag.blockNum = blkno + i;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
}
/*
* For every chunk that has blocks we're interested in, we
* 1. get the chunk header
@@ -1220,14 +1255,6 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
}
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
/* Approximate working set for the blocks assumed in this entry */
for (int i = 0; i < blocks_in_chunk; i++)
{
tag.blockNum = blkno + i;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
}
if (entry == NULL)
{
/* Pages are not cached */
@@ -1504,9 +1531,15 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
return false;
CopyNRelFileInfoToBufTag(tag, rinfo);
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
tag.forkNum = forknum;
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
/* Update working set size estimate for the blocks */
if (lfc_prewarm_update_ws_estimation)
{
tag.blockNum = blkno;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
}
tag.blockNum = blkno - chunk_offs;
hash = get_hash_value(lfc_hash, &tag);
@@ -1524,19 +1557,13 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
if (lwlsn > lsn)
{
elog(DEBUG1, "Skip LFC write for %d because LwLSN=%X/%X is greater than not_nodified_since LSN %X/%X",
elog(DEBUG1, "Skip LFC write for %u because LwLSN=%X/%X is greater than not_nodified_since LSN %X/%X",
blkno, LSN_FORMAT_ARGS(lwlsn), LSN_FORMAT_ARGS(lsn));
LWLockRelease(lfc_lock);
return false;
}
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
if (lfc_prewarm_update_ws_estimation)
{
tag.blockNum = blkno;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
}
if (found)
{
state = GET_STATE(entry, chunk_offs);
@@ -1649,9 +1676,15 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
return;
CopyNRelFileInfoToBufTag(tag, rinfo);
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
tag.forkNum = forkNum;
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
/* Update working set size estimate for the blocks */
for (int i = 0; i < nblocks; i++)
{
tag.blockNum = blkno + i;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
}
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
@@ -1692,14 +1725,6 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
cv = &lfc_ctl->cv[hash % N_COND_VARS];
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
/* Approximate working set for the blocks assumed in this entry */
for (int i = 0; i < blocks_in_chunk; i++)
{
tag.blockNum = blkno + i;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
}
if (found)
{
/*
@@ -2135,40 +2160,23 @@ local_cache_pages(PG_FUNCTION_ARGS)
SRF_RETURN_DONE(funcctx);
}
PG_FUNCTION_INFO_V1(approximate_working_set_size_seconds);
Datum
approximate_working_set_size_seconds(PG_FUNCTION_ARGS)
/*
* Internal implementation of the approximate_working_set_size_seconds()
* function.
*/
int32
lfc_approximate_working_set_size_seconds(time_t duration, bool reset)
{
if (lfc_size_limit != 0)
{
int32 dc;
time_t duration = PG_ARGISNULL(0) ? (time_t)-1 : PG_GETARG_INT32(0);
LWLockAcquire(lfc_lock, LW_SHARED);
dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, duration);
LWLockRelease(lfc_lock);
PG_RETURN_INT32(dc);
}
PG_RETURN_NULL();
}
int32 dc;
PG_FUNCTION_INFO_V1(approximate_working_set_size);
if (lfc_size_limit == 0)
return -1;
Datum
approximate_working_set_size(PG_FUNCTION_ARGS)
{
if (lfc_size_limit != 0)
{
int32 dc;
bool reset = PG_GETARG_BOOL(0);
LWLockAcquire(lfc_lock, reset ? LW_EXCLUSIVE : LW_SHARED);
dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, (time_t)-1);
if (reset)
memset(lfc_ctl->wss_estimation.regs, 0, sizeof lfc_ctl->wss_estimation.regs);
LWLockRelease(lfc_lock);
PG_RETURN_INT32(dc);
}
PG_RETURN_NULL();
dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, duration);
if (reset)
memset(lfc_ctl->wss_estimation.regs, 0, sizeof lfc_ctl->wss_estimation.regs);
return dc;
}
PG_FUNCTION_INFO_V1(get_local_cache_state);

View File

@@ -47,7 +47,8 @@ extern bool lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blk
extern FileCacheState* lfc_get_state(size_t max_entries);
extern void lfc_prewarm(FileCacheState* fcs, uint32 n_workers);
PGDLLEXPORT void lfc_prewarm_main(Datum main_arg);
extern int32 lfc_approximate_working_set_size_seconds(time_t duration, bool reset);
static inline bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,

View File

@@ -561,6 +561,8 @@ _PG_init(void)
PG_FUNCTION_INFO_V1(pg_cluster_size);
PG_FUNCTION_INFO_V1(backpressure_lsns);
PG_FUNCTION_INFO_V1(backpressure_throttling_time);
PG_FUNCTION_INFO_V1(approximate_working_set_size_seconds);
PG_FUNCTION_INFO_V1(approximate_working_set_size);
Datum
pg_cluster_size(PG_FUNCTION_ARGS)
@@ -607,6 +609,34 @@ backpressure_throttling_time(PG_FUNCTION_ARGS)
PG_RETURN_UINT64(BackpressureThrottlingTime());
}
Datum
approximate_working_set_size_seconds(PG_FUNCTION_ARGS)
{
time_t duration;
int32 dc;
duration = PG_ARGISNULL(0) ? (time_t) -1 : PG_GETARG_INT32(0);
dc = lfc_approximate_working_set_size_seconds(duration, false);
if (dc < 0)
PG_RETURN_NULL();
else
PG_RETURN_INT32(dc);
}
Datum
approximate_working_set_size(PG_FUNCTION_ARGS)
{
bool reset = PG_GETARG_BOOL(0);
int32 dc;
dc = lfc_approximate_working_set_size_seconds(-1, reset);
if (dc < 0)
PG_RETURN_NULL();
else
PG_RETURN_INT32(dc);
}
#if PG_MAJORVERSION_NUM >= 16
static void
neon_shmem_startup_hook(void)

View File

@@ -236,13 +236,13 @@ clear_buffer_cache(PG_FUNCTION_ARGS)
bool save_neon_test_evict;
/*
* Temporarily set the zenith_test_evict GUC, so that when we pin and
* Temporarily set the neon_test_evict GUC, so that when we pin and
* unpin a buffer, the buffer is evicted. We use that hack to evict all
* buffers, as there is no explicit "evict this buffer" function in the
* buffer manager.
*/
save_neon_test_evict = zenith_test_evict;
zenith_test_evict = true;
save_neon_test_evict = neon_test_evict;
neon_test_evict = true;
PG_TRY();
{
/* Scan through all the buffers */
@@ -273,7 +273,7 @@ clear_buffer_cache(PG_FUNCTION_ARGS)
/*
* Pin the buffer, and release it again. Because we have
* zenith_test_evict==true, this will evict the page from the
* neon_test_evict==true, this will evict the page from the
* buffer cache if no one else is holding a pin on it.
*/
if (isvalid)
@@ -286,7 +286,7 @@ clear_buffer_cache(PG_FUNCTION_ARGS)
PG_FINALLY();
{
/* restore the GUC */
zenith_test_evict = save_neon_test_evict;
neon_test_evict = save_neon_test_evict;
}
PG_END_TRY();

View File

@@ -2953,17 +2953,17 @@ XmlTableBuilderData
YYLTYPE
YYSTYPE
YY_BUFFER_STATE
ZenithErrorResponse
ZenithExistsRequest
ZenithExistsResponse
ZenithGetPageRequest
ZenithGetPageResponse
ZenithMessage
ZenithMessageTag
ZenithNblocksRequest
ZenithNblocksResponse
ZenithRequest
ZenithResponse
NeonErrorResponse
NeonExistsRequest
NeonExistsResponse
NeonGetPageRequest
NeonGetPageResponse
NeonMessage
NeonMessageTag
NeonNblocksRequest
NeonNblocksResponse
NeonRequest
NeonResponse
_SPI_connection
_SPI_plan
__AssignProcessToJobObject

35
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
[[package]]
name = "aiohappyeyeballs"
@@ -2326,25 +2326,6 @@ files = [
{file = "propcache-0.2.0.tar.gz", hash = "sha256:df81779732feb9d01e5d513fad0122efb3d53bbc75f61b2a4f29a020bc985e70"},
]
[[package]]
name = "protobuf"
version = "6.31.1"
description = ""
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "protobuf-6.31.1-cp310-abi3-win32.whl", hash = "sha256:7fa17d5a29c2e04b7d90e5e32388b8bfd0e7107cd8e616feef7ed3fa6bdab5c9"},
{file = "protobuf-6.31.1-cp310-abi3-win_amd64.whl", hash = "sha256:426f59d2964864a1a366254fa703b8632dcec0790d8862d30034d8245e1cd447"},
{file = "protobuf-6.31.1-cp39-abi3-macosx_10_9_universal2.whl", hash = "sha256:6f1227473dc43d44ed644425268eb7c2e488ae245d51c6866d19fe158e207402"},
{file = "protobuf-6.31.1-cp39-abi3-manylinux2014_aarch64.whl", hash = "sha256:a40fc12b84c154884d7d4c4ebd675d5b3b5283e155f324049ae396b95ddebc39"},
{file = "protobuf-6.31.1-cp39-abi3-manylinux2014_x86_64.whl", hash = "sha256:4ee898bf66f7a8b0bd21bce523814e6fbd8c6add948045ce958b73af7e8878c6"},
{file = "protobuf-6.31.1-cp39-cp39-win32.whl", hash = "sha256:0414e3aa5a5f3ff423828e1e6a6e907d6c65c1d5b7e6e975793d5590bdeecc16"},
{file = "protobuf-6.31.1-cp39-cp39-win_amd64.whl", hash = "sha256:8764cf4587791e7564051b35524b72844f845ad0bb011704c3736cce762d8fe9"},
{file = "protobuf-6.31.1-py3-none-any.whl", hash = "sha256:720a6c7e6b77288b85063569baae8536671b39f15cc22037ec7045658d80489e"},
{file = "protobuf-6.31.1.tar.gz", hash = "sha256:d8cac4c982f0b957a4dc73a80e2ea24fab08e679c0de9deb835f4a12d69aca9a"},
]
[[package]]
name = "psutil"
version = "5.9.4"
@@ -3328,18 +3309,6 @@ files = [
[package.dependencies]
cryptography = "*"
[[package]]
name = "types-protobuf"
version = "6.30.2.20250516"
description = "Typing stubs for protobuf"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "types_protobuf-6.30.2.20250516-py3-none-any.whl", hash = "sha256:8c226d05b5e8b2623111765fa32d6e648bbc24832b4c2fddf0fa340ba5d5b722"},
{file = "types_protobuf-6.30.2.20250516.tar.gz", hash = "sha256:aecd1881770a9bb225ede66872ef7f0da4505edd0b193108edd9892e48d49a41"},
]
[[package]]
name = "types-psutil"
version = "5.9.5.12"
@@ -3878,4 +3847,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.1"
python-versions = "^3.11"
content-hash = "7cc735f57c2760db6c994575a98d4f0e2670497ad9e909135a3bc67d479f5edf"
content-hash = "bd93313f110110aa53b24a3ed47ba2d7f60e2c658a79cdff7320fed1bb1b57b5"

View File

@@ -16,6 +16,7 @@ async-compression.workspace = true
async-trait.workspace = true
atomic-take.workspace = true
aws-config.workspace = true
aws-credential-types.workspace = true
aws-sdk-iam.workspace = true
aws-sigv4.workspace = true
base64.workspace = true
@@ -48,6 +49,7 @@ indexmap = { workspace = true, features = ["serde"] }
ipnet.workspace = true
itertools.workspace = true
itoa.workspace = true
json = { path = "../libs/proxy/json" }
lasso = { workspace = true, features = ["multi-threaded"] }
measured = { workspace = true, features = ["lasso"] }
metrics.workspace = true
@@ -127,4 +129,4 @@ rstest.workspace = true
walkdir.workspace = true
rand_distr = "0.4"
tokio-postgres.workspace = true
tracing-test = "0.2"
tracing-test = "0.2"

View File

@@ -123,6 +123,11 @@ docker exec -it proxy-postgres psql -U postgres -c "CREATE TABLE neon_control_pl
docker exec -it proxy-postgres psql -U postgres -c "CREATE ROLE proxy WITH SUPERUSER LOGIN PASSWORD 'password';"
```
If you want to test query cancellation, redis is also required:
```sh
docker run --detach --name proxy-redis --publish 6379:6379 redis:7.0
```
Let's create self-signed certificate by running:
```sh
openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key -subj "/CN=*.local.neon.build"
@@ -130,7 +135,10 @@ openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key
Then we need to build proxy with 'testing' feature and run, e.g.:
```sh
RUST_LOG=proxy LOGFMT=text cargo run -p proxy --bin proxy --features testing -- --auth-backend postgres --auth-endpoint 'postgresql://postgres:proxy-postgres@127.0.0.1:5432/postgres' -c server.crt -k server.key
RUST_LOG=proxy LOGFMT=text cargo run -p proxy --bin proxy --features testing -- \
--auth-backend postgres --auth-endpoint 'postgresql://postgres:proxy-postgres@127.0.0.1:5432/postgres' \
--redis-auth-type="plain" --redis-plain="redis://127.0.0.1:6379" \
-c server.crt -k server.key
```
Now from client you can start a new session:

View File

@@ -7,13 +7,17 @@ use std::pin::pin;
use std::sync::Mutex;
use scopeguard::ScopeGuard;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
use crate::ext::LockExt;
type ProcResult<P> = Result<<P as QueueProcessing>::Res, <P as QueueProcessing>::Err>;
pub trait QueueProcessing: Send + 'static {
type Req: Send + 'static;
type Res: Send;
type Err: Send + Clone;
/// Get the desired batch size.
fn batch_size(&self, queue_size: usize) -> usize;
@@ -24,7 +28,18 @@ pub trait QueueProcessing: Send + 'static {
/// If this apply can error, it's expected that errors be forwarded to each Self::Res.
///
/// Batching does not need to happen atomically.
fn apply(&mut self, req: Vec<Self::Req>) -> impl Future<Output = Vec<Self::Res>> + Send;
fn apply(
&mut self,
req: Vec<Self::Req>,
) -> impl Future<Output = Result<Vec<Self::Res>, Self::Err>> + Send;
}
#[derive(thiserror::Error)]
pub enum BatchQueueError<E: Clone, C> {
#[error(transparent)]
Result(E),
#[error(transparent)]
Cancelled(C),
}
pub struct BatchQueue<P: QueueProcessing> {
@@ -34,7 +49,7 @@ pub struct BatchQueue<P: QueueProcessing> {
struct BatchJob<P: QueueProcessing> {
req: P::Req,
res: tokio::sync::oneshot::Sender<P::Res>,
res: tokio::sync::oneshot::Sender<Result<P::Res, P::Err>>,
}
impl<P: QueueProcessing> BatchQueue<P> {
@@ -55,11 +70,11 @@ impl<P: QueueProcessing> BatchQueue<P> {
&self,
req: P::Req,
cancelled: impl Future<Output = R>,
) -> Result<P::Res, R> {
) -> Result<P::Res, BatchQueueError<P::Err, R>> {
let (id, mut rx) = self.inner.lock_propagate_poison().register_job(req);
let mut cancelled = pin!(cancelled);
let resp = loop {
let resp: Option<Result<P::Res, P::Err>> = loop {
// try become the leader, or try wait for success.
let mut processor = tokio::select! {
// try become leader.
@@ -72,7 +87,7 @@ impl<P: QueueProcessing> BatchQueue<P> {
if inner.queue.remove(&id).is_some() {
tracing::warn!("batched task cancelled before completion");
}
return Err(cancel);
return Err(BatchQueueError::Cancelled(cancel));
},
};
@@ -96,18 +111,30 @@ impl<P: QueueProcessing> BatchQueue<P> {
// good: we didn't get cancelled.
ScopeGuard::into_inner(cancel_safety);
if values.len() != resps.len() {
tracing::error!(
"batch: invalid response size, expected={}, got={}",
resps.len(),
values.len()
);
}
match values {
Ok(values) => {
if values.len() != resps.len() {
tracing::error!(
"batch: invalid response size, expected={}, got={}",
resps.len(),
values.len()
);
}
// send response values.
for (tx, value) in std::iter::zip(resps, values) {
if tx.send(value).is_err() {
// receiver hung up but that's fine.
// send response values.
for (tx, value) in std::iter::zip(resps, values) {
if tx.send(Ok(value)).is_err() {
// receiver hung up but that's fine.
}
}
}
Err(err) => {
for tx in resps {
if tx.send(Err(err.clone())).is_err() {
// receiver hung up but that's fine.
}
}
}
}
@@ -129,7 +156,8 @@ impl<P: QueueProcessing> BatchQueue<P> {
tracing::debug!(id, "batch: job completed");
Ok(resp.expect("no response found. batch processer should not panic"))
resp.expect("no response found. batch processer should not panic")
.map_err(BatchQueueError::Result)
}
}
@@ -139,8 +167,8 @@ struct BatchQueueInner<P: QueueProcessing> {
}
impl<P: QueueProcessing> BatchQueueInner<P> {
fn register_job(&mut self, req: P::Req) -> (u64, tokio::sync::oneshot::Receiver<P::Res>) {
let (tx, rx) = tokio::sync::oneshot::channel();
fn register_job(&mut self, req: P::Req) -> (u64, oneshot::Receiver<ProcResult<P>>) {
let (tx, rx) = oneshot::channel();
let id = self.version;
@@ -158,7 +186,7 @@ impl<P: QueueProcessing> BatchQueueInner<P> {
(id, rx)
}
fn get_batch(&mut self, p: &P) -> (Vec<P::Req>, Vec<tokio::sync::oneshot::Sender<P::Res>>) {
fn get_batch(&mut self, p: &P) -> (Vec<P::Req>, Vec<oneshot::Sender<ProcResult<P>>>) {
let batch_size = p.batch_size(self.queue.len());
let mut reqs = Vec::with_capacity(batch_size);
let mut resps = Vec::with_capacity(batch_size);

View File

@@ -522,15 +522,7 @@ pub async fn run() -> anyhow::Result<()> {
maintenance_tasks.spawn(usage_metrics::task_main(metrics_config));
}
if let Either::Left(auth::Backend::ControlPlane(api, ())) = &auth_backend
&& let crate::control_plane::client::ControlPlaneClient::ProxyV1(api) = &**api
&& let Some(client) = redis_client
{
// project info cache and invalidation of that cache.
let cache = api.caches.project_info.clone();
maintenance_tasks.spawn(notifications::task_main(client.clone(), cache.clone()));
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
if let Some(client) = redis_client {
// Try to connect to Redis 3 times with 1 + (0..0.1) second interval.
// This prevents immediate exit and pod restart,
// which can cause hammering of the redis in case of connection issues.
@@ -560,6 +552,16 @@ pub async fn run() -> anyhow::Result<()> {
}
}
}
#[allow(irrefutable_let_patterns)]
if let Either::Left(auth::Backend::ControlPlane(api, ())) = &auth_backend
&& let crate::control_plane::client::ControlPlaneClient::ProxyV1(api) = &**api
{
// project info cache and invalidation of that cache.
let cache = api.caches.project_info.clone();
maintenance_tasks.spawn(notifications::task_main(client, cache.clone()));
maintenance_tasks.spawn(async move { cache.gc_worker().await });
}
}
let maintenance = loop {

View File

@@ -4,12 +4,11 @@ use std::pin::pin;
use std::sync::{Arc, OnceLock};
use std::time::Duration;
use anyhow::anyhow;
use futures::FutureExt;
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
use postgres_client::RawCancelToken;
use postgres_client::tls::MakeTlsConnect;
use redis::{Cmd, FromRedisValue, Value};
use redis::{Cmd, FromRedisValue, SetExpiry, SetOptions, Value};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::net::TcpStream;
@@ -18,7 +17,7 @@ use tracing::{debug, error, info};
use crate::auth::AuthError;
use crate::auth::backend::ComputeUserInfo;
use crate::batch::{BatchQueue, QueueProcessing};
use crate::batch::{BatchQueue, BatchQueueError, QueueProcessing};
use crate::config::ComputeConfig;
use crate::context::RequestContext;
use crate::control_plane::ControlPlaneApi;
@@ -28,7 +27,7 @@ use crate::metrics::{CancelChannelSizeGuard, CancellationRequest, Metrics, Redis
use crate::pqproto::CancelKeyData;
use crate::rate_limiter::LeakyBucketRateLimiter;
use crate::redis::keys::KeyPrefix;
use crate::redis::kv_ops::RedisKVClient;
use crate::redis::kv_ops::{RedisKVClient, RedisKVClientError};
type IpSubnetKey = IpNet;
@@ -45,6 +44,17 @@ pub enum CancelKeyOp {
GetCancelData {
key: CancelKeyData,
},
GetCancelDataOld {
key: CancelKeyData,
},
}
#[derive(thiserror::Error, Debug, Clone)]
pub enum PipelineError {
#[error("could not send cmd to redis: {0}")]
RedisKVClient(Arc<RedisKVClientError>),
#[error("incorrect number of responses from redis")]
IncorrectNumberOfResponses,
}
pub struct Pipeline {
@@ -60,7 +70,7 @@ impl Pipeline {
}
}
async fn execute(self, client: &mut RedisKVClient) -> Vec<anyhow::Result<Value>> {
async fn execute(self, client: &mut RedisKVClient) -> Result<Vec<Value>, PipelineError> {
let responses = self.replies;
let batch_size = self.inner.len();
@@ -78,30 +88,20 @@ impl Pipeline {
batch_size,
responses, "successfully completed cancellation jobs",
);
values.into_iter().map(Ok).collect()
Ok(values.into_iter().collect())
}
Ok(value) => {
error!(batch_size, ?value, "unexpected redis return value");
std::iter::repeat_with(|| Err(anyhow!("incorrect response type from redis")))
.take(responses)
.collect()
}
Err(err) => {
std::iter::repeat_with(|| Err(anyhow!("could not send cmd to redis: {err}")))
.take(responses)
.collect()
Err(PipelineError::IncorrectNumberOfResponses)
}
Err(err) => Err(PipelineError::RedisKVClient(Arc::new(err))),
}
}
fn add_command_with_reply(&mut self, cmd: Cmd) {
fn add_command(&mut self, cmd: Cmd) {
self.inner.add_command(cmd);
self.replies += 1;
}
fn add_command_no_reply(&mut self, cmd: Cmd) {
self.inner.add_command(cmd).ignore();
}
}
impl CancelKeyOp {
@@ -109,12 +109,19 @@ impl CancelKeyOp {
match self {
CancelKeyOp::StoreCancelKey { key, value, expire } => {
let key = KeyPrefix::Cancel(*key).build_redis_key();
pipe.add_command_with_reply(Cmd::hset(&key, "data", &**value));
pipe.add_command_no_reply(Cmd::expire(&key, expire.as_secs() as i64));
pipe.add_command(Cmd::set_options(
&key,
&**value,
SetOptions::default().with_expiration(SetExpiry::EX(expire.as_secs())),
));
}
CancelKeyOp::GetCancelDataOld { key } => {
let key = KeyPrefix::Cancel(*key).build_redis_key();
pipe.add_command(Cmd::hget(key, "data"));
}
CancelKeyOp::GetCancelData { key } => {
let key = KeyPrefix::Cancel(*key).build_redis_key();
pipe.add_command_with_reply(Cmd::hget(key, "data"));
pipe.add_command(Cmd::get(key));
}
}
}
@@ -127,13 +134,14 @@ pub struct CancellationProcessor {
impl QueueProcessing for CancellationProcessor {
type Req = (CancelChannelSizeGuard<'static>, CancelKeyOp);
type Res = anyhow::Result<redis::Value>;
type Res = redis::Value;
type Err = PipelineError;
fn batch_size(&self, _queue_size: usize) -> usize {
self.batch_size
}
async fn apply(&mut self, batch: Vec<Self::Req>) -> Vec<Self::Res> {
async fn apply(&mut self, batch: Vec<Self::Req>) -> Result<Vec<Self::Res>, Self::Err> {
if !self.client.credentials_refreshed() {
// this will cause a timeout for cancellation operations
tracing::debug!(
@@ -244,18 +252,18 @@ impl CancellationHandler {
&self,
key: CancelKeyData,
) -> Result<Option<CancelClosure>, CancelError> {
let guard = Metrics::get()
.proxy
.cancel_channel_size
.guard(RedisMsgKind::HGet);
let op = CancelKeyOp::GetCancelData { key };
const TIMEOUT: Duration = Duration::from_secs(5);
let Some(tx) = self.tx.get() else {
tracing::warn!("cancellation handler is not available");
return Err(CancelError::InternalError);
};
const TIMEOUT: Duration = Duration::from_secs(5);
let guard = Metrics::get()
.proxy
.cancel_channel_size
.guard(RedisMsgKind::Get);
let op = CancelKeyOp::GetCancelData { key };
let result = timeout(
TIMEOUT,
tx.call((guard, op), std::future::pending::<Infallible>()),
@@ -264,10 +272,37 @@ impl CancellationHandler {
.map_err(|_| {
tracing::warn!("timed out waiting to receive GetCancelData response");
CancelError::RateLimit
})?
// cannot be cancelled
.unwrap_or_else(|x| match x {})
.map_err(|e| {
})?;
// We may still have cancel keys set with HSET <key> "data".
// Check error type and retry with HGET.
// TODO: remove code after HSET is not used anymore.
let result = if let Err(err) = result.as_ref()
&& let BatchQueueError::Result(err) = err
&& let PipelineError::RedisKVClient(err) = err
&& let RedisKVClientError::Redis(err) = &**err
&& let Some(errcode) = err.code()
&& errcode == "WRONGTYPE"
{
let guard = Metrics::get()
.proxy
.cancel_channel_size
.guard(RedisMsgKind::HGet);
let op = CancelKeyOp::GetCancelDataOld { key };
timeout(
TIMEOUT,
tx.call((guard, op), std::future::pending::<Infallible>()),
)
.await
.map_err(|_| {
tracing::warn!("timed out waiting to receive GetCancelData response");
CancelError::RateLimit
})?
} else {
result
};
let result = result.map_err(|e| {
tracing::warn!("failed to receive GetCancelData response: {e}");
CancelError::InternalError
})?;
@@ -442,7 +477,7 @@ impl Session {
let guard = Metrics::get()
.proxy
.cancel_channel_size
.guard(RedisMsgKind::HSet);
.guard(RedisMsgKind::Set);
let op = CancelKeyOp::StoreCancelKey {
key: self.key,
value: closure_json.clone(),
@@ -456,7 +491,7 @@ impl Session {
);
match tx.call((guard, op), cancel.as_mut()).await {
Ok(Ok(_)) => {
Ok(_) => {
tracing::debug!(
src=%self.key,
dest=?cancel_closure.cancel_token,
@@ -467,10 +502,10 @@ impl Session {
tokio::time::sleep(CANCEL_KEY_REFRESH).await;
}
// retry immediately.
Ok(Err(error)) => {
Err(BatchQueueError::Result(error)) => {
tracing::warn!(?error, "error registering cancellation key");
}
Err(Err(_cancelled)) => break,
Err(BatchQueueError::Cancelled(Err(_cancelled))) => break,
}
}

View File

@@ -374,11 +374,9 @@ pub enum Waiting {
#[label(singleton = "kind")]
#[allow(clippy::enum_variant_names)]
pub enum RedisMsgKind {
HSet,
HSetMultiple,
Set,
Get,
HGet,
HGetAll,
HDel,
}
#[derive(Default, Clone)]

View File

@@ -4,11 +4,12 @@ use std::time::Duration;
use futures::FutureExt;
use redis::aio::{ConnectionLike, MultiplexedConnection};
use redis::{ConnectionInfo, IntoConnectionInfo, RedisConnectionInfo, RedisResult};
use redis::{ConnectionInfo, IntoConnectionInfo, RedisConnectionInfo, RedisError, RedisResult};
use tokio::task::AbortHandle;
use tracing::{error, info, warn};
use super::elasticache::CredentialsProvider;
use crate::redis::elasticache::CredentialsProviderError;
enum Credentials {
Static(ConnectionInfo),
@@ -26,6 +27,14 @@ impl Clone for Credentials {
}
}
#[derive(thiserror::Error, Debug)]
pub enum ConnectionProviderError {
#[error(transparent)]
Redis(#[from] RedisError),
#[error(transparent)]
CredentialsProvider(#[from] CredentialsProviderError),
}
/// A wrapper around `redis::MultiplexedConnection` that automatically refreshes the token.
/// Provides PubSub connection without credentials refresh.
pub struct ConnectionWithCredentialsProvider {
@@ -86,15 +95,18 @@ impl ConnectionWithCredentialsProvider {
}
}
async fn ping(con: &mut MultiplexedConnection) -> RedisResult<()> {
redis::cmd("PING").query_async(con).await
async fn ping(con: &mut MultiplexedConnection) -> Result<(), ConnectionProviderError> {
redis::cmd("PING")
.query_async(con)
.await
.map_err(Into::into)
}
pub(crate) fn credentials_refreshed(&self) -> bool {
self.credentials_refreshed.load(Ordering::Relaxed)
}
pub(crate) async fn connect(&mut self) -> anyhow::Result<()> {
pub(crate) async fn connect(&mut self) -> Result<(), ConnectionProviderError> {
let _guard = self.mutex.lock().await;
if let Some(con) = self.con.as_mut() {
match Self::ping(con).await {
@@ -141,7 +153,7 @@ impl ConnectionWithCredentialsProvider {
Ok(())
}
async fn get_connection_info(&self) -> anyhow::Result<ConnectionInfo> {
async fn get_connection_info(&self) -> Result<ConnectionInfo, ConnectionProviderError> {
match &self.credentials {
Credentials::Static(info) => Ok(info.clone()),
Credentials::Dynamic(provider, addr) => {
@@ -160,7 +172,7 @@ impl ConnectionWithCredentialsProvider {
}
}
async fn get_client(&self) -> anyhow::Result<redis::Client> {
async fn get_client(&self) -> Result<redis::Client, ConnectionProviderError> {
let client = redis::Client::open(self.get_connection_info().await?)?;
self.credentials_refreshed.store(true, Ordering::Relaxed);
Ok(client)

View File

@@ -9,10 +9,12 @@ use aws_config::meta::region::RegionProviderChain;
use aws_config::profile::ProfileFileCredentialsProvider;
use aws_config::provider_config::ProviderConfig;
use aws_config::web_identity_token::WebIdentityTokenCredentialsProvider;
use aws_credential_types::provider::error::CredentialsError;
use aws_sdk_iam::config::ProvideCredentials;
use aws_sigv4::http_request::{
self, SignableBody, SignableRequest, SignatureLocation, SigningSettings,
self, SignableBody, SignableRequest, SignatureLocation, SigningError, SigningSettings,
};
use aws_sigv4::sign::v4::signing_params::BuildError;
use tracing::info;
#[derive(Debug)]
@@ -40,6 +42,18 @@ impl AWSIRSAConfig {
}
}
#[derive(thiserror::Error, Debug)]
pub enum CredentialsProviderError {
#[error(transparent)]
AwsCredentials(#[from] CredentialsError),
#[error(transparent)]
AwsSigv4Build(#[from] BuildError),
#[error(transparent)]
AwsSigv4Singing(#[from] SigningError),
#[error(transparent)]
Http(#[from] http::Error),
}
/// Credentials provider for AWS elasticache authentication.
///
/// Official documentation:
@@ -92,7 +106,9 @@ impl CredentialsProvider {
})
}
pub(crate) async fn provide_credentials(&self) -> anyhow::Result<(String, String)> {
pub(crate) async fn provide_credentials(
&self,
) -> Result<(String, String), CredentialsProviderError> {
let aws_credentials = self
.credentials_provider
.provide_credentials()

View File

@@ -2,9 +2,18 @@ use std::time::Duration;
use futures::FutureExt;
use redis::aio::ConnectionLike;
use redis::{Cmd, FromRedisValue, Pipeline, RedisResult};
use redis::{Cmd, FromRedisValue, Pipeline, RedisError, RedisResult};
use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
use crate::redis::connection_with_credentials_provider::ConnectionProviderError;
#[derive(thiserror::Error, Debug)]
pub enum RedisKVClientError {
#[error(transparent)]
Redis(#[from] RedisError),
#[error(transparent)]
ConnectionProvider(#[from] ConnectionProviderError),
}
pub struct RedisKVClient {
client: ConnectionWithCredentialsProvider,
@@ -32,12 +41,13 @@ impl RedisKVClient {
Self { client }
}
pub async fn try_connect(&mut self) -> anyhow::Result<()> {
pub async fn try_connect(&mut self) -> Result<(), RedisKVClientError> {
self.client
.connect()
.boxed()
.await
.inspect_err(|e| tracing::error!("failed to connect to redis: {e}"))
.map_err(Into::into)
}
pub(crate) fn credentials_refreshed(&self) -> bool {
@@ -47,7 +57,7 @@ impl RedisKVClient {
pub(crate) async fn query<T: FromRedisValue>(
&mut self,
q: &impl Queryable,
) -> anyhow::Result<T> {
) -> Result<T, RedisKVClientError> {
let e = match q.query(&mut self.client).await {
Ok(t) => return Ok(t),
Err(e) => e,

View File

@@ -1,6 +1,7 @@
use json::{ListSer, ObjectSer, ValueSer};
use postgres_client::Row;
use postgres_client::types::{Kind, Type};
use serde_json::{Map, Value};
use serde_json::Value;
//
// Convert json non-string types to strings, so that they can be passed to Postgres
@@ -74,44 +75,40 @@ pub(crate) enum JsonConversionError {
UnbalancedString,
}
enum OutputMode {
Array(Vec<Value>),
Object(Map<String, Value>),
enum OutputMode<'a> {
Array(ListSer<'a>),
Object(ObjectSer<'a>),
}
impl OutputMode {
fn key(&mut self, key: &str) -> &mut Value {
impl OutputMode<'_> {
fn key(&mut self, key: &str) -> ValueSer<'_> {
match self {
OutputMode::Array(values) => push_entry(values, Value::Null),
OutputMode::Object(map) => map.entry(key.to_string()).or_insert(Value::Null),
OutputMode::Array(values) => values.entry(),
OutputMode::Object(map) => map.key(key),
}
}
fn finish(self) -> Value {
fn finish(self) {
match self {
OutputMode::Array(values) => Value::Array(values),
OutputMode::Object(map) => Value::Object(map),
OutputMode::Array(values) => values.finish(),
OutputMode::Object(map) => map.finish(),
}
}
}
fn push_entry<T>(arr: &mut Vec<T>, t: T) -> &mut T {
arr.push(t);
arr.last_mut().expect("a value was just inserted")
}
//
// Convert postgres row with text-encoded values to JSON object
//
pub(crate) fn pg_text_row_to_json(
output: ValueSer,
row: &Row,
raw_output: bool,
array_mode: bool,
) -> Result<Value, JsonConversionError> {
) -> Result<(), JsonConversionError> {
let mut entries = if array_mode {
OutputMode::Array(Vec::with_capacity(row.columns().len()))
OutputMode::Array(output.list())
} else {
OutputMode::Object(Map::with_capacity(row.columns().len()))
OutputMode::Object(output.object())
};
for (i, column) in row.columns().iter().enumerate() {
@@ -120,53 +117,48 @@ pub(crate) fn pg_text_row_to_json(
let value = entries.key(column.name());
match pg_value {
Some(v) if raw_output => *value = Value::String(v.to_string()),
Some(v) if raw_output => value.value(v),
Some(v) => pg_text_to_json(value, v, column.type_())?,
None => *value = Value::Null,
None => value.value(json::Null),
}
}
Ok(entries.finish())
entries.finish();
Ok(())
}
//
// Convert postgres text-encoded value to JSON value
//
fn pg_text_to_json(
output: &mut Value,
val: &str,
pg_type: &Type,
) -> Result<(), JsonConversionError> {
fn pg_text_to_json(output: ValueSer, val: &str, pg_type: &Type) -> Result<(), JsonConversionError> {
if let Kind::Array(elem_type) = pg_type.kind() {
// todo: we should fetch this from postgres.
let delimiter = ',';
let mut array = vec![];
pg_array_parse(&mut array, val, elem_type, delimiter)?;
*output = Value::Array(array);
json::value_as_list!(|output| pg_array_parse(output, val, elem_type, delimiter)?);
return Ok(());
}
match *pg_type {
Type::BOOL => *output = Value::Bool(val == "t"),
Type::BOOL => output.value(val == "t"),
Type::INT2 | Type::INT4 => {
let val = val.parse::<i32>()?;
*output = Value::Number(serde_json::Number::from(val));
output.value(val);
}
Type::FLOAT4 | Type::FLOAT8 => {
let fval = val.parse::<f64>()?;
let num = serde_json::Number::from_f64(fval);
if let Some(num) = num {
*output = Value::Number(num);
if fval.is_finite() {
output.value(fval);
} else {
// Pass Nan, Inf, -Inf as strings
// JS JSON.stringify() does converts them to null, but we
// want to preserve them, so we pass them as strings
*output = Value::String(val.to_string());
output.value(val);
}
}
Type::JSON | Type::JSONB => *output = serde_json::from_str(val)?,
_ => *output = Value::String(val.to_string()),
// we assume that the string value is valid json.
Type::JSON | Type::JSONB => output.write_raw_json(val.as_bytes()),
_ => output.value(val),
}
Ok(())
@@ -192,7 +184,7 @@ fn pg_text_to_json(
/// gets its own level of curly braces, and delimiters must be written between adjacent
/// curly-braced entities of the same level.
fn pg_array_parse(
elements: &mut Vec<Value>,
elements: &mut ListSer,
mut pg_array: &str,
elem: &Type,
delim: char,
@@ -221,7 +213,7 @@ fn pg_array_parse(
/// reads a single array from the `pg_array` string and pushes each values to `elements`.
/// returns the rest of the `pg_array` string that was not read.
fn pg_array_parse_inner<'a>(
elements: &mut Vec<Value>,
elements: &mut ListSer,
mut pg_array: &'a str,
elem: &Type,
delim: char,
@@ -234,7 +226,7 @@ fn pg_array_parse_inner<'a>(
let mut q = String::new();
loop {
let value = push_entry(elements, Value::Null);
let value = elements.entry();
pg_array = pg_array_parse_item(value, &mut q, pg_array, elem, delim)?;
// check for separator.
@@ -260,7 +252,7 @@ fn pg_array_parse_inner<'a>(
///
/// `quoted` is a scratch allocation that has no defined output.
fn pg_array_parse_item<'a>(
output: &mut Value,
output: ValueSer,
quoted: &mut String,
mut pg_array: &'a str,
elem: &Type,
@@ -276,9 +268,8 @@ fn pg_array_parse_item<'a>(
if pg_array.starts_with('{') {
// nested array.
let mut nested = vec![];
pg_array = pg_array_parse_inner(&mut nested, pg_array, elem, delim)?;
*output = Value::Array(nested);
pg_array =
json::value_as_list!(|output| pg_array_parse_inner(output, pg_array, elem, delim))?;
return Ok(pg_array);
}
@@ -306,7 +297,7 @@ fn pg_array_parse_item<'a>(
// we might have an item string:
// check for null
if item == "NULL" {
*output = Value::Null;
output.value(json::Null);
} else {
pg_text_to_json(output, item, elem)?;
}
@@ -440,15 +431,15 @@ mod tests {
}
fn pg_text_to_json(val: &str, pg_type: &Type) -> Value {
let mut v = Value::Null;
super::pg_text_to_json(&mut v, val, pg_type).unwrap();
v
let output = json::value_to_string!(|v| super::pg_text_to_json(v, val, pg_type).unwrap());
serde_json::from_str(&output).unwrap()
}
fn pg_array_parse(pg_array: &str, pg_type: &Type) -> Value {
let mut array = vec![];
super::pg_array_parse(&mut array, pg_array, pg_type, ',').unwrap();
Value::Array(array)
let output = json::value_to_string!(|v| json::value_as_list!(|v| {
super::pg_array_parse(v, pg_array, pg_type, ',').unwrap();
}));
serde_json::from_str(&output).unwrap()
}
#[test]

View File

@@ -14,10 +14,7 @@ use hyper::http::{HeaderName, HeaderValue};
use hyper::{Request, Response, StatusCode, header};
use indexmap::IndexMap;
use postgres_client::error::{DbError, ErrorPosition, SqlState};
use postgres_client::{
GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, RowStream, Transaction,
};
use serde::Serialize;
use postgres_client::{GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, Transaction};
use serde_json::Value;
use serde_json::value::RawValue;
use tokio::time::{self, Instant};
@@ -687,32 +684,21 @@ impl QueryData {
let (inner, mut discard) = client.inner();
let cancel_token = inner.cancel_token();
match select(
let mut json_buf = vec![];
let batch_result = match select(
pin!(query_to_json(
config,
&mut *inner,
self,
&mut 0,
json::ValueSer::new(&mut json_buf),
parsed_headers
)),
pin!(cancel.cancelled()),
)
.await
{
// The query successfully completed.
Either::Left((Ok((status, results)), __not_yet_cancelled)) => {
discard.check_idle(status);
let json_output =
serde_json::to_string(&results).expect("json serialization should not fail");
Ok(json_output)
}
// The query failed with an error
Either::Left((Err(e), __not_yet_cancelled)) => {
discard.discard();
Err(e)
}
// The query was cancelled.
Either::Left((res, __not_yet_cancelled)) => res,
Either::Right((_cancelled, query)) => {
tracing::info!("cancelling query");
if let Err(err) = cancel_token.cancel_query(NoTls).await {
@@ -721,13 +707,7 @@ impl QueryData {
// wait for the query cancellation
match time::timeout(time::Duration::from_millis(100), query).await {
// query successed before it was cancelled.
Ok(Ok((status, results))) => {
discard.check_idle(status);
let json_output = serde_json::to_string(&results)
.expect("json serialization should not fail");
Ok(json_output)
}
Ok(Ok(status)) => Ok(status),
// query failed or was cancelled.
Ok(Err(error)) => {
let db_error = match &error {
@@ -743,14 +723,29 @@ impl QueryData {
discard.discard();
}
Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres))
return Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres));
}
Err(_timeout) => {
discard.discard();
Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres))
return Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres));
}
}
}
};
match batch_result {
// The query successfully completed.
Ok(status) => {
discard.check_idle(status);
let json_output = String::from_utf8(json_buf).expect("json should be valid utf8");
Ok(json_output)
}
// The query failed with an error
Err(e) => {
discard.discard();
Err(e)
}
}
}
}
@@ -787,7 +782,7 @@ impl BatchQueryData {
})
.map_err(SqlOverHttpError::Postgres)?;
let json_output = match query_batch(
let json_output = match query_batch_to_json(
config,
cancel.child_token(),
&mut transaction,
@@ -845,24 +840,21 @@ async fn query_batch(
transaction: &mut Transaction<'_>,
queries: BatchQueryData,
parsed_headers: HttpHeaders,
) -> Result<String, SqlOverHttpError> {
let mut results = Vec::with_capacity(queries.queries.len());
let mut current_size = 0;
results: &mut json::ListSer<'_>,
) -> Result<(), SqlOverHttpError> {
for stmt in queries.queries {
let query = pin!(query_to_json(
config,
transaction,
stmt,
&mut current_size,
results.entry(),
parsed_headers,
));
let cancelled = pin!(cancel.cancelled());
let res = select(query, cancelled).await;
match res {
// TODO: maybe we should check that the transaction bit is set here
Either::Left((Ok((_, values)), _cancelled)) => {
results.push(values);
}
Either::Left((Ok(_), _cancelled)) => {}
Either::Left((Err(e), _cancelled)) => {
return Err(e);
}
@@ -872,8 +864,22 @@ async fn query_batch(
}
}
let results = json!({ "results": results });
let json_output = serde_json::to_string(&results).expect("json serialization should not fail");
Ok(())
}
async fn query_batch_to_json(
config: &'static HttpConfig,
cancel: CancellationToken,
tx: &mut Transaction<'_>,
queries: BatchQueryData,
headers: HttpHeaders,
) -> Result<String, SqlOverHttpError> {
let json_output = json::value_to_string!(|obj| json::value_as_object!(|obj| {
let results = obj.key("results");
json::value_as_list!(|results| {
query_batch(config, cancel, tx, queries, headers, results).await?;
});
}));
Ok(json_output)
}
@@ -882,54 +888,54 @@ async fn query_to_json<T: GenericClient>(
config: &'static HttpConfig,
client: &mut T,
data: QueryData,
current_size: &mut usize,
output: json::ValueSer<'_>,
parsed_headers: HttpHeaders,
) -> Result<(ReadyForQueryStatus, impl Serialize + use<T>), SqlOverHttpError> {
) -> Result<ReadyForQueryStatus, SqlOverHttpError> {
let query_start = Instant::now();
let query_params = data.params;
let mut output = json::ObjectSer::new(output);
let mut row_stream = client
.query_raw_txt(&data.query, query_params)
.query_raw_txt(&data.query, data.params)
.await
.map_err(SqlOverHttpError::Postgres)?;
let query_acknowledged = Instant::now();
let columns_len = row_stream.statement.columns().len();
let mut fields = Vec::with_capacity(columns_len);
let mut json_fields = output.key("fields").list();
for c in row_stream.statement.columns() {
fields.push(json!({
"name": c.name().to_owned(),
"dataTypeID": c.type_().oid(),
"tableID": c.table_oid(),
"columnID": c.column_id(),
"dataTypeSize": c.type_size(),
"dataTypeModifier": c.type_modifier(),
"format": "text",
}));
let json_field = json_fields.entry();
json::value_as_object!(|json_field| {
json_field.entry("name", c.name());
json_field.entry("dataTypeID", c.type_().oid());
json_field.entry("tableID", c.table_oid());
json_field.entry("columnID", c.column_id());
json_field.entry("dataTypeSize", c.type_size());
json_field.entry("dataTypeModifier", c.type_modifier());
json_field.entry("format", "text");
});
}
json_fields.finish();
let raw_output = parsed_headers.raw_output;
let array_mode = data.array_mode.unwrap_or(parsed_headers.default_array_mode);
let raw_output = parsed_headers.raw_output;
// Manually drain the stream into a vector to leave row_stream hanging
// around to get a command tag. Also check that the response is not too
// big.
let mut rows = Vec::new();
let mut rows = 0;
let mut json_rows = output.key("rows").list();
while let Some(row) = row_stream.next().await {
let row = row.map_err(SqlOverHttpError::Postgres)?;
*current_size += row.body_len();
// we don't have a streaming response support yet so this is to prevent OOM
// from a malicious query (eg a cross join)
if *current_size > config.max_response_size_bytes {
if json_rows.as_buffer().len() > config.max_response_size_bytes {
return Err(SqlOverHttpError::ResponseTooLarge(
config.max_response_size_bytes,
));
}
let row = pg_text_row_to_json(&row, raw_output, array_mode)?;
rows.push(row);
pg_text_row_to_json(json_rows.entry(), &row, raw_output, array_mode)?;
rows += 1;
// assumption: parsing pg text and converting to json takes CPU time.
// let's assume it is slightly expensive, so we should consume some cooperative budget.
@@ -937,16 +943,14 @@ async fn query_to_json<T: GenericClient>(
// of rows and never hit the tokio mpsc for a long time (although unlikely).
tokio::task::consume_budget().await;
}
json_rows.finish();
let query_resp_end = Instant::now();
let RowStream {
command_tag,
status: ready,
..
} = row_stream;
let ready = row_stream.status;
// grab the command tag and number of rows affected
let command_tag = command_tag.unwrap_or_default();
let command_tag = row_stream.command_tag.unwrap_or_default();
let mut command_tag_split = command_tag.split(' ');
let command_tag_name = command_tag_split.next().unwrap_or_default();
let command_tag_count = if command_tag_name == "INSERT" {
@@ -959,7 +963,7 @@ async fn query_to_json<T: GenericClient>(
.and_then(|s| s.parse::<i64>().ok());
info!(
rows = rows.len(),
rows,
?ready,
command_tag,
acknowledgement = ?(query_acknowledged - query_start),
@@ -967,16 +971,12 @@ async fn query_to_json<T: GenericClient>(
"finished executing query"
);
// Resulting JSON format is based on the format of node-postgres result.
let results = json!({
"command": command_tag_name.to_string(),
"rowCount": command_tag_count,
"rows": rows,
"fields": fields,
"rowAsArray": array_mode,
});
output.entry("command", command_tag_name);
output.entry("rowCount", command_tag_count);
output.entry("rowAsArray", array_mode);
Ok((ready, results))
output.finish();
Ok(ready)
}
enum Client {

View File

@@ -10,8 +10,6 @@ psycopg2-binary = "^2.9.10"
typing-extensions = "^4.12.2"
PyJWT = {version = "^2.1.0", extras = ["crypto"]}
requests = "^2.32.4"
protobuf = "^6.31.1"
types-protobuf="^6.30.2"
pytest-xdist = "^3.3.1"
asyncpg = "^0.30.0"
aiopg = "^1.4.0"
@@ -65,7 +63,6 @@ build-backend = "poetry.core.masonry.api"
exclude = [
"^vendor/",
"^target/",
"test_runner/regress/data/profile_pb2.py",
"test_runner/performance/pgvector/loaddata.py",
]
check_untyped_defs = true
@@ -97,7 +94,6 @@ target-version = "py311"
extend-exclude = [
"vendor/",
"target/",
"test_runner/regress/data/profile_pb2.py",
"test_runner/stubs/", # Autogenerated by mypy's stubgen
]
line-length = 100 # this setting is rather guidance, it won't fail if it can't make the shorter

View File

@@ -850,6 +850,31 @@ async fn handle_tenant_describe(
json_response(StatusCode::OK, service.tenant_describe(tenant_id)?)
}
/* BEGIN_HADRON */
async fn handle_tenant_timeline_describe(
service: Arc<Service>,
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Scrubber)?;
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
return res;
}
ForwardOutcome::NotForwarded(_req) => {}
};
json_response(
StatusCode::OK,
service
.tenant_timeline_describe(tenant_id, timeline_id)
.await?,
)
}
/* END_HADRON */
async fn handle_tenant_list(
service: Arc<Service>,
req: Request<Body>,
@@ -2480,6 +2505,13 @@ pub fn make_router(
)
})
// Timeline operations
.get("/control/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
tenant_service_handler(
r,
handle_tenant_timeline_describe,
RequestName("v1_tenant_timeline_describe"),
)
})
.delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
tenant_service_handler(
r,

View File

@@ -222,6 +222,9 @@ struct Cli {
/// Primarily useful for testing to reduce test execution time.
#[arg(long, default_value = "false", action=ArgAction::Set)]
kick_secondary_downloads: bool,
#[arg(long)]
shard_split_request_timeout: Option<humantime::Duration>,
}
enum StrictMode {
@@ -470,6 +473,10 @@ async fn async_main() -> anyhow::Result<()> {
timeline_safekeeper_count: args.timeline_safekeeper_count,
posthog_config: posthog_config.clone(),
kick_secondary_downloads: args.kick_secondary_downloads,
shard_split_request_timeout: args
.shard_split_request_timeout
.map(humantime::Duration::into)
.unwrap_or(Duration::MAX),
};
// Validate that we can connect to the database

View File

@@ -86,6 +86,23 @@ impl PageserverClient {
)
}
/* BEGIN_HADRON */
pub(crate) async fn tenant_timeline_describe(
&self,
tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,
) -> Result<TimelineInfo> {
measured_request!(
"tenant_timeline_describe",
crate::metrics::Method::Get,
&self.node_id_label,
self.inner
.tenant_timeline_describe(tenant_shard_id, timeline_id,)
.await
)
}
/* END_HADRON */
pub(crate) async fn tenant_scan_remote_storage(
&self,
tenant_id: TenantId,

View File

@@ -32,7 +32,7 @@ use pageserver_api::controller_api::{
ShardSchedulingPolicy, ShardsPreferredAzsRequest, ShardsPreferredAzsResponse,
SkSchedulingPolicy, TenantCreateRequest, TenantCreateResponse, TenantCreateResponseShard,
TenantDescribeResponse, TenantDescribeResponseShard, TenantLocateResponse, TenantPolicyRequest,
TenantShardMigrateRequest, TenantShardMigrateResponse,
TenantShardMigrateRequest, TenantShardMigrateResponse, TenantTimelineDescribeResponse,
};
use pageserver_api::models::{
self, DetachBehavior, LocationConfig, LocationConfigListResponse, LocationConfigMode, LsnLease,
@@ -60,6 +60,7 @@ use tokio::sync::mpsc::error::TrySendError;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
use utils::completion::Barrier;
use utils::env;
use utils::generation::Generation;
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
@@ -483,6 +484,9 @@ pub struct Config {
/// When set, actively checks and initiates heatmap downloads/uploads.
pub kick_secondary_downloads: bool,
/// Timeout used for HTTP client of split requests. [`Duration::MAX`] if None.
pub shard_split_request_timeout: Duration,
}
impl From<DatabaseError> for ApiError {
@@ -5206,6 +5210,9 @@ impl Service {
match res {
Ok(ok) => Ok(ok),
Err(mgmt_api::Error::ApiError(StatusCode::CONFLICT, _)) => Ok(StatusCode::CONFLICT),
Err(mgmt_api::Error::ApiError(StatusCode::PRECONDITION_FAILED, msg)) if msg.contains("Requested tenant is missing") => {
Err(ApiError::ResourceUnavailable("Tenant migration in progress".into()))
},
Err(mgmt_api::Error::ApiError(StatusCode::SERVICE_UNAVAILABLE, msg)) => Err(ApiError::ResourceUnavailable(msg.into())),
Err(e) => {
Err(
@@ -5486,6 +5493,92 @@ impl Service {
.ok_or_else(|| ApiError::NotFound(anyhow::anyhow!("Tenant {tenant_id} not found").into()))
}
/* BEGIN_HADRON */
pub(crate) async fn tenant_timeline_describe(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<TenantTimelineDescribeResponse, ApiError> {
self.tenant_remote_mutation(tenant_id, |locations| async move {
if locations.0.is_empty() {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant not found").into(),
));
};
let locations: Vec<(TenantShardId, Node)> = locations
.0
.iter()
.map(|t| (*t.0, t.1.latest.node.clone()))
.collect();
let mut futs = FuturesUnordered::new();
for (shard_id, node) in locations {
futs.push({
async move {
let result = node
.with_client_retries(
|client| async move {
client
.tenant_timeline_describe(&shard_id, &timeline_id)
.await
},
&self.http_client,
&self.config.pageserver_jwt_token,
3,
3,
Duration::from_secs(30),
&self.cancel,
)
.await;
(result, shard_id, node.get_id())
}
});
}
let mut results: Vec<TimelineInfo> = Vec::new();
while let Some((result, tenant_shard_id, node_id)) = futs.next().await {
match result {
Some(Ok(timeline_info)) => results.push(timeline_info),
Some(Err(e)) => {
tracing::warn!(
"Failed to describe tenant {} timeline {} for pageserver {}: {e}",
tenant_shard_id,
timeline_id,
node_id,
);
return Err(ApiError::ResourceUnavailable(format!("{e}").into()));
}
None => return Err(ApiError::Cancelled),
}
}
let mut image_consistent_lsn: Option<Lsn> = Some(Lsn::MAX);
for timeline_info in &results {
if let Some(tline_image_consistent_lsn) = timeline_info.image_consistent_lsn {
image_consistent_lsn = Some(std::cmp::min(
image_consistent_lsn.unwrap(),
tline_image_consistent_lsn,
));
} else {
tracing::warn!(
"Timeline {} on shard {} does not have image consistent lsn",
timeline_info.timeline_id,
timeline_info.tenant_id
);
image_consistent_lsn = None;
break;
}
}
Ok(TenantTimelineDescribeResponse {
shards: results,
image_consistent_lsn,
})
})
.await?
}
/* END_HADRON */
/// limit & offset are pagination parameters. Since we are walking an in-memory HashMap, `offset` does not
/// avoid traversing data, it just avoid returning it. This is suitable for our purposes, since our in memory
/// maps are small enough to traverse fast, our pagination is just to avoid serializing huge JSON responses
@@ -6317,18 +6410,39 @@ impl Service {
// TODO: issue split calls concurrently (this only matters once we're splitting
// N>1 shards into M shards -- initially we're usually splitting 1 shard into N).
// HADRON: set a timeout for splitting individual shards on page servers.
// Currently we do not perform any retry because it's not clear if page server can handle
// partially split shards correctly.
let shard_split_timeout =
if let Some(env::DeploymentMode::Local) = env::get_deployment_mode() {
Duration::from_secs(30)
} else {
self.config.shard_split_request_timeout
};
let mut http_client_builder = reqwest::ClientBuilder::new()
.pool_max_idle_per_host(0)
.timeout(shard_split_timeout);
for ssl_ca_cert in &self.config.ssl_ca_certs {
http_client_builder = http_client_builder.add_root_certificate(ssl_ca_cert.clone());
}
let http_client = http_client_builder
.build()
.expect("Failed to construct HTTP client");
for target in &targets {
let ShardSplitTarget {
parent_id,
node,
child_ids,
} = target;
let client = PageserverClient::new(
node.get_id(),
self.http_client.clone(),
http_client.clone(),
node.base_url(),
self.config.pageserver_jwt_token.as_deref(),
);
let response = client
.tenant_shard_split(
*parent_id,

View File

@@ -25,7 +25,8 @@ use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo};
use safekeeper_api::PgVersionId;
use safekeeper_api::membership::{self, MemberSet, SafekeeperGeneration};
use safekeeper_api::models::{
PullTimelineRequest, TimelineMembershipSwitchRequest, TimelineMembershipSwitchResponse,
PullTimelineRequest, TimelineLocateResponse, TimelineMembershipSwitchRequest,
TimelineMembershipSwitchResponse,
};
use safekeeper_api::{INITIAL_TERM, Term};
use safekeeper_client::mgmt_api;
@@ -37,21 +38,14 @@ use utils::lsn::Lsn;
use super::Service;
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct TimelineLocateResponse {
pub generation: SafekeeperGeneration,
pub sk_set: Vec<NodeId>,
pub new_sk_set: Option<Vec<NodeId>>,
}
impl Service {
fn make_member_set(safekeepers: &[Safekeeper]) -> Result<MemberSet, ApiError> {
fn make_member_set(safekeepers: &[Safekeeper]) -> Result<MemberSet, anyhow::Error> {
let members = safekeepers
.iter()
.map(|sk| sk.get_safekeeper_id())
.collect::<Vec<_>>();
MemberSet::new(members).map_err(ApiError::InternalServerError)
MemberSet::new(members)
}
fn get_safekeepers(&self, ids: &[i64]) -> Result<Vec<Safekeeper>, ApiError> {
@@ -86,7 +80,7 @@ impl Service {
) -> Result<Vec<NodeId>, ApiError> {
let safekeepers = self.get_safekeepers(&timeline_persistence.sk_set)?;
let mset = Self::make_member_set(&safekeepers)?;
let mset = Self::make_member_set(&safekeepers).map_err(ApiError::InternalServerError)?;
let mconf = safekeeper_api::membership::Configuration::new(mset);
let req = safekeeper_api::models::TimelineCreateRequest {
@@ -1111,6 +1105,26 @@ impl Service {
}
}
if new_sk_set.is_empty() {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"new safekeeper set is empty"
)));
}
if new_sk_set.len() < self.config.timeline_safekeeper_count {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"new safekeeper set must have at least {} safekeepers",
self.config.timeline_safekeeper_count
)));
}
let new_sk_set_i64 = new_sk_set.iter().map(|id| id.0 as i64).collect::<Vec<_>>();
let new_safekeepers = self.get_safekeepers(&new_sk_set_i64)?;
// Construct new member set in advance to validate it.
// E.g. validates that there is no duplicate safekeepers.
let new_sk_member_set =
Self::make_member_set(&new_safekeepers).map_err(ApiError::BadRequest)?;
// TODO(diko): per-tenant lock is too wide. Consider introducing per-timeline locks.
let _tenant_lock = trace_shared_lock(
&self.tenant_op_locks,
@@ -1141,6 +1155,18 @@ impl Service {
.map(|&id| NodeId(id as u64))
.collect::<Vec<_>>();
// Validate that we are not migrating to a decomissioned safekeeper.
for sk in new_safekeepers.iter() {
if !cur_sk_set.contains(&sk.get_id())
&& sk.scheduling_policy() == SkSchedulingPolicy::Decomissioned
{
return Err(ApiError::BadRequest(anyhow::anyhow!(
"safekeeper {} is decomissioned",
sk.get_id()
)));
}
}
tracing::info!(
?cur_sk_set,
?new_sk_set,
@@ -1183,11 +1209,8 @@ impl Service {
}
let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?;
let cur_sk_member_set = Self::make_member_set(&cur_safekeepers)?;
let new_sk_set_i64 = new_sk_set.iter().map(|id| id.0 as i64).collect::<Vec<_>>();
let new_safekeepers = self.get_safekeepers(&new_sk_set_i64)?;
let new_sk_member_set = Self::make_member_set(&new_safekeepers)?;
let cur_sk_member_set =
Self::make_member_set(&cur_safekeepers).map_err(ApiError::InternalServerError)?;
let joint_config = membership::Configuration {
generation,

View File

@@ -115,39 +115,6 @@ class EndpointHttpClient(requests.Session):
json: dict[str, str] = res.json()
return json
def start_profiling_cpu(
self, sampling_frequency: int, timeout_seconds: int, archive: bool = False
) -> tuple[int, bytes]:
url = f"http://localhost:{self.external_port}/profile/cpu"
params = {
"profiler": {"BccProfile": None},
"sampling_frequency": sampling_frequency,
"timeout_seconds": timeout_seconds,
"archive": archive,
}
res = self.post(
url,
json=params,
auth=self.auth,
)
return res.status_code, res.content
def stop_profiling_cpu(self) -> int:
url = f"http://localhost:{self.external_port}/profile/cpu"
res = self.delete(url, auth=self.auth)
return res.status_code
def get_profiling_cpu_status(self) -> bool:
"""
Returns True if CPU profiling is currently running, False otherwise.
"""
url = f"http://localhost:{self.external_port}/profile/cpu"
res = self.get(url, auth=self.auth)
res.raise_for_status()
return res.status_code == 200
def database_schema(self, database: str):
res = self.get(
f"http://localhost:{self.external_port}/database_schema?database={urllib.parse.quote(database, safe='')}",

View File

@@ -2342,6 +2342,20 @@ class NeonStorageController(MetricsGetter, LogUtils):
response.raise_for_status()
return response.json()
# HADRON
def tenant_timeline_describe(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
):
response = self.request(
"GET",
f"{self.api}/control/v1/tenant/{tenant_id}/timeline/{timeline_id}",
headers=self.headers(TokenScope.ADMIN),
)
response.raise_for_status()
return response.json()
def nodes(self):
"""
:return: list of {"id": ""}
@@ -5395,6 +5409,7 @@ SKIP_FILES = frozenset(
(
"pg_internal.init",
"pg.log",
"neon.signal",
"zenith.signal",
"pg_hba.conf",
"postgresql.conf",

View File

@@ -115,8 +115,7 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
".*Local data loss suspected.*",
# Too many frozen layers error is normal during intensive benchmarks
".*too many frozen layers.*",
# Transient errors when resolving tenant shards by page service
".*Fail to resolve tenant shard in attempt.*",
".*Failed to resolve tenant shard after.*",
# Expected warnings when pageserver has not refreshed GC info yet
".*pitr LSN/interval not found, skipping force image creation LSN calculation.*",
".*No broker updates received for a while.*",

View File

@@ -6,7 +6,6 @@ import json
import os
import re
import subprocess
import sys
import tarfile
import threading
import time
@@ -718,33 +717,6 @@ def skip_in_debug_build(reason: str):
)
def run_only_on_linux_kernel_higher_than(version: float, reason: str):
"""
Skip tests if the Linux kernel version is lower than the specified version.
The version is specified as a float, e.g. 5.4 for kernel.
Also skips if the host is not Linux.
"""
should_skip = False
if sys.platform != "linux":
should_skip = True
else:
try:
kernel_version_list = os.uname()[2].split("-")[0].split(".")
kernel_version = float(f"{kernel_version_list[0]}.{kernel_version_list[1]}")
if kernel_version < version:
should_skip = True
except ValueError:
log.error(f"Failed to parse kernel version: {os.uname()[2]}")
should_skip = True
return pytest.mark.skipif(
should_skip,
reason=reason,
)
def skip_on_ci(reason: str):
# `CI` variable is always set to `true` on GitHub
# Ref: https://docs.github.com/en/actions/writing-workflows/choosing-what-your-workflow-does/store-information-in-variables#default-environment-variables

View File

@@ -1,41 +0,0 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# type: ignore
# source: profile.proto
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\rprofile.proto\x12\x12perftools.profiles\"\xd5\x03\n\x07Profile\x12\x32\n\x0bsample_type\x18\x01 \x03(\x0b\x32\x1d.perftools.profiles.ValueType\x12*\n\x06sample\x18\x02 \x03(\x0b\x32\x1a.perftools.profiles.Sample\x12,\n\x07mapping\x18\x03 \x03(\x0b\x32\x1b.perftools.profiles.Mapping\x12.\n\x08location\x18\x04 \x03(\x0b\x32\x1c.perftools.profiles.Location\x12.\n\x08\x66unction\x18\x05 \x03(\x0b\x32\x1c.perftools.profiles.Function\x12\x14\n\x0cstring_table\x18\x06 \x03(\t\x12\x13\n\x0b\x64rop_frames\x18\x07 \x01(\x03\x12\x13\n\x0bkeep_frames\x18\x08 \x01(\x03\x12\x12\n\ntime_nanos\x18\t \x01(\x03\x12\x16\n\x0e\x64uration_nanos\x18\n \x01(\x03\x12\x32\n\x0bperiod_type\x18\x0b \x01(\x0b\x32\x1d.perftools.profiles.ValueType\x12\x0e\n\x06period\x18\x0c \x01(\x03\x12\x0f\n\x07\x63omment\x18\r \x03(\x03\x12\x1b\n\x13\x64\x65\x66\x61ult_sample_type\x18\x0e \x01(\x03\"\'\n\tValueType\x12\x0c\n\x04type\x18\x01 \x01(\x03\x12\x0c\n\x04unit\x18\x02 \x01(\x03\"V\n\x06Sample\x12\x13\n\x0blocation_id\x18\x01 \x03(\x04\x12\r\n\x05value\x18\x02 \x03(\x03\x12(\n\x05label\x18\x03 \x03(\x0b\x32\x19.perftools.profiles.Label\"@\n\x05Label\x12\x0b\n\x03key\x18\x01 \x01(\x03\x12\x0b\n\x03str\x18\x02 \x01(\x03\x12\x0b\n\x03num\x18\x03 \x01(\x03\x12\x10\n\x08num_unit\x18\x04 \x01(\x03\"\xdd\x01\n\x07Mapping\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x14\n\x0cmemory_start\x18\x02 \x01(\x04\x12\x14\n\x0cmemory_limit\x18\x03 \x01(\x04\x12\x13\n\x0b\x66ile_offset\x18\x04 \x01(\x04\x12\x10\n\x08\x66ilename\x18\x05 \x01(\x03\x12\x10\n\x08\x62uild_id\x18\x06 \x01(\x03\x12\x15\n\rhas_functions\x18\x07 \x01(\x08\x12\x15\n\rhas_filenames\x18\x08 \x01(\x08\x12\x18\n\x10has_line_numbers\x18\t \x01(\x08\x12\x19\n\x11has_inline_frames\x18\n \x01(\x08\"v\n\x08Location\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x12\n\nmapping_id\x18\x02 \x01(\x04\x12\x0f\n\x07\x61\x64\x64ress\x18\x03 \x01(\x04\x12&\n\x04line\x18\x04 \x03(\x0b\x32\x18.perftools.profiles.Line\x12\x11\n\tis_folded\x18\x05 \x01(\x08\")\n\x04Line\x12\x13\n\x0b\x66unction_id\x18\x01 \x01(\x04\x12\x0c\n\x04line\x18\x02 \x01(\x03\"_\n\x08\x46unction\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\x03\x12\x13\n\x0bsystem_name\x18\x03 \x01(\x03\x12\x10\n\x08\x66ilename\x18\x04 \x01(\x03\x12\x12\n\nstart_line\x18\x05 \x01(\x03\x42-\n\x1d\x63om.google.perftools.profilesB\x0cProfileProtob\x06proto3')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'profile_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b'\n\035com.google.perftools.profilesB\014ProfileProto'
_PROFILE._serialized_start=38
_PROFILE._serialized_end=507
_VALUETYPE._serialized_start=509
_VALUETYPE._serialized_end=548
_SAMPLE._serialized_start=550
_SAMPLE._serialized_end=636
_LABEL._serialized_start=638
_LABEL._serialized_end=702
_MAPPING._serialized_start=705
_MAPPING._serialized_end=926
_LOCATION._serialized_start=928
_LOCATION._serialized_end=1046
_LINE._serialized_start=1048
_LINE._serialized_end=1089
_FUNCTION._serialized_start=1091
_FUNCTION._serialized_end=1186
# @@protoc_insertion_point(module_scope)

View File

@@ -7,6 +7,7 @@ import time
from enum import StrEnum
import pytest
from fixtures.common_types import TenantShardId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
@@ -960,9 +961,70 @@ def get_layer_map(env, tenant_shard_id, timeline_id, ps_id):
return image_layer_count, delta_layer_count
def test_image_creation_timeout(neon_env_builder: NeonEnvBuilder):
def test_image_layer_creation_time_threshold(neon_env_builder: NeonEnvBuilder):
"""
Tests that page server can force creating new images if image creation timeout is enabled
Tests that image layers can be created when the time threshold is reached on non-0 shards.
"""
tenant_conf = {
"compaction_threshold": "100",
"image_creation_threshold": "100",
"image_layer_creation_check_threshold": "1",
# disable distance based image layer creation check
"checkpoint_distance": 10 * 1024 * 1024 * 1024,
"checkpoint_timeout": "100ms",
"image_layer_force_creation_period": "1s",
"pitr_interval": "10s",
"gc_period": "1s",
"compaction_period": "1s",
"lsn_lease_length": "1s",
}
# consider every tenant large to run the image layer generation check more eagerly
neon_env_builder.pageserver_config_override = (
"image_layer_generation_large_timeline_threshold=0"
)
neon_env_builder.num_pageservers = 1
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start(
initial_tenant_conf=tenant_conf,
initial_tenant_shard_count=2,
initial_tenant_shard_stripe_size=1,
)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main")
endpoint.safe_psql("CREATE TABLE foo (id INTEGER, val text)")
for v in range(10):
endpoint.safe_psql(f"INSERT INTO foo (id, val) VALUES ({v}, repeat('abcde{v:0>3}', 500))")
tenant_shard_id = TenantShardId(tenant_id, 1, 2)
# Generate some rows.
for v in range(20):
endpoint.safe_psql(f"INSERT INTO foo (id, val) VALUES ({v}, repeat('abcde{v:0>3}', 500))")
# restart page server so that logical size on non-0 shards is missing
env.pageserver.restart()
(old_images, old_deltas) = get_layer_map(env, tenant_shard_id, timeline_id, 0)
log.info(f"old images: {old_images}, old deltas: {old_deltas}")
def check_image_creation():
(new_images, old_deltas) = get_layer_map(env, tenant_shard_id, timeline_id, 0)
log.info(f"images: {new_images}, deltas: {old_deltas}")
assert new_images > old_images
wait_until(check_image_creation)
endpoint.stop_and_destroy()
def test_image_layer_force_creation_period(neon_env_builder: NeonEnvBuilder):
"""
Tests that page server can force creating new images if image_layer_force_creation_period is enabled
"""
# use large knobs to disable L0 compaction/image creation except for the force image creation
tenant_conf = {
@@ -972,10 +1034,10 @@ def test_image_creation_timeout(neon_env_builder: NeonEnvBuilder):
"checkpoint_distance": 10 * 1024,
"checkpoint_timeout": "1s",
"image_layer_force_creation_period": "1s",
# The lsn for forced image layer creations is calculated once every 10 minutes.
# Hence, drive compaction manually such that the test doesn't compute it at the
# wrong time.
"compaction_period": "0s",
"pitr_interval": "10s",
"gc_period": "1s",
"compaction_period": "1s",
"lsn_lease_length": "1s",
}
# consider every tenant large to run the image layer generation check more eagerly
@@ -1018,4 +1080,69 @@ def test_image_creation_timeout(neon_env_builder: NeonEnvBuilder):
)
def test_image_consistent_lsn(neon_env_builder: NeonEnvBuilder):
"""
Test the /v1/tenant/<tenant_id>/timeline/<timeline_id> endpoint and the computation of image_consistent_lsn
"""
# use large knobs to disable L0 compaction/image creation except for the force image creation
tenant_conf = {
"compaction_threshold": "100",
"image_creation_threshold": "100",
"image_layer_creation_check_threshold": "1",
"checkpoint_distance": 10 * 1024,
"checkpoint_timeout": "1s",
"image_layer_force_creation_period": "1s",
"pitr_interval": "10s",
"gc_period": "1s",
"compaction_period": "1s",
"lsn_lease_length": "1s",
}
neon_env_builder.num_pageservers = 2
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start(
initial_tenant_conf=tenant_conf,
initial_tenant_shard_count=4,
initial_tenant_shard_stripe_size=1,
)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main")
endpoint.safe_psql("CREATE TABLE foo (id INTEGER, val text)")
for v in range(10):
endpoint.safe_psql(
f"INSERT INTO foo (id, val) VALUES ({v}, repeat('abcde{v:0>3}', 500))", log_query=False
)
response = env.storage_controller.tenant_timeline_describe(tenant_id, timeline_id)
shards = response["shards"]
for shard in shards:
assert shard["image_consistent_lsn"] is not None
image_consistent_lsn = response["image_consistent_lsn"]
assert image_consistent_lsn is not None
# do more writes and wait for image_consistent_lsn to advance
for v in range(100):
endpoint.safe_psql(
f"INSERT INTO foo (id, val) VALUES ({v}, repeat('abcde{v:0>3}', 500))", log_query=False
)
def check_image_consistent_lsn_advanced():
response = env.storage_controller.tenant_timeline_describe(tenant_id, timeline_id)
new_image_consistent_lsn = response["image_consistent_lsn"]
shards = response["shards"]
for shard in shards:
print(f"shard {shard['tenant_id']} image_consistent_lsn{shard['image_consistent_lsn']}")
assert new_image_consistent_lsn != image_consistent_lsn
wait_until(check_image_consistent_lsn_advanced)
endpoint.stop_and_destroy()
for ps in env.pageservers:
ps.allowed_errors.append(".*created delta file of size.*larger than double of target.*")
# END_HADRON

View File

@@ -1,322 +0,0 @@
from __future__ import annotations
import gzip
import io
import threading
import time
from typing import TYPE_CHECKING, Any
import pytest
from data.profile_pb2 import Profile # type: ignore
from fixtures.log_helper import log
from fixtures.utils import run_only_on_default_postgres, run_only_on_linux_kernel_higher_than
from google.protobuf.message import Message
from requests import HTTPError
if TYPE_CHECKING:
from fixtures.endpoint.http import EndpointHttpClient
from fixtures.neon_fixtures import NeonEnv
LINUX_VERSION_REQUIRED = 6.0
PG_REASON = "test doesn't use postgres in a manner that requires any specific version"
LINUX_REASON = f"test requires linux {LINUX_VERSION_REQUIRED} for ebpfs"
def _start_profiling_cpu(
client: EndpointHttpClient, event: threading.Event | None, archive: bool = False
) -> Profile | None:
"""
Start CPU profiling for the compute node.
"""
log.info("Starting CPU profiling...")
if event is not None:
event.set()
status, response = client.start_profiling_cpu(100, 30, archive=archive)
match status:
case 200:
log.debug("CPU profiling finished")
profile: Any = Profile()
if archive:
log.debug("Unpacking gzipped profile data")
with gzip.GzipFile(fileobj=io.BytesIO(response)) as f:
response = f.read()
else:
log.debug("Unpacking non-gzipped profile data")
Message.ParseFromString(profile, response)
return profile
case 204:
log.debug("CPU profiling was stopped")
raise HTTPError("Failed to finish CPU profiling: was stopped.")
case 409:
log.debug("CPU profiling is already in progress, cannot start again")
raise HTTPError("Failed to finish CPU profiling: profiling is already in progress.")
case 500:
response_str = response.decode("utf-8", errors="replace")
log.debug(
f"Failed to finish CPU profiling, was stopped or got an error: {status}: {response_str}"
)
raise HTTPError(f"Failed to finish CPU profiling, {status}: {response_str}")
case _:
log.error(f"Failed to start CPU profiling: {status}")
raise HTTPError(f"Failed to start CPU profiling: {status}")
def _stop_profiling_cpu(client: EndpointHttpClient, event: threading.Event | None):
"""
Stop CPU profiling for the compute node.
"""
log.info("Manually stopping CPU profiling...")
if event is not None:
event.set()
status = client.stop_profiling_cpu()
match status:
case 200:
log.debug("CPU profiling stopped successfully")
case 412:
log.debug("CPU profiling is not running, nothing to do")
case _:
log.error(f"Failed to stop CPU profiling: {status}")
raise HTTPError(f"Failed to stop CPU profiling: {status}")
return status
def _wait_till_profiling_starts(
http_client: EndpointHttpClient,
event: threading.Event | None,
repeat_delay_secs: float = 0.3,
timeout: int = 60,
) -> bool:
"""
Wait until CPU profiling starts.
"""
log.info("Waiting for CPU profiling to start...")
end_time = time.time() + timeout
while not http_client.get_profiling_cpu_status():
time.sleep(repeat_delay_secs)
if end_time <= time.time():
log.error("Timeout while waiting for CPU profiling to start")
return False
log.info("CPU profiling has started successfully and is in progress")
if event is not None:
event.set()
return True
def _wait_and_assert_cpu_profiling(http_client: EndpointHttpClient, event: threading.Event | None):
profile = _start_profiling_cpu(http_client, event)
if profile is None:
log.error("The received profiling data is malformed or empty.")
return
assert len(profile.sample) > 0, "No samples found in CPU profiling data"
assert len(profile.mapping) > 0, "No mappings found in CPU profiling data"
assert len(profile.location) > 0, "No locations found in CPU profiling data"
assert len(profile.function) > 0, "No functions found in CPU profiling data"
assert len(profile.string_table) > 0, "No string tables found in CPU profiling data"
strings = [
"PostgresMain",
"ServerLoop",
"BackgroundWorkerMain",
"pq_recvbuf",
"pq_getbyte",
]
assert any(s in profile.string_table for s in strings), (
f"Expected at least one of {strings} in string table, but none found"
)
@run_only_on_default_postgres(reason=PG_REASON)
@run_only_on_linux_kernel_higher_than(version=LINUX_VERSION_REQUIRED, reason=LINUX_REASON)
def test_compute_profiling_cpu_with_timeout(neon_simple_env: NeonEnv):
"""
Test that CPU profiling works correctly with timeout.
"""
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
pg_conn = endpoint.connect()
pg_cur = pg_conn.cursor()
pg_cur.execute("create database profiling_test2")
http_client = endpoint.http_client()
def _wait_and_assert_cpu_profiling_local():
_wait_and_assert_cpu_profiling(http_client, None)
thread = threading.Thread(target=_wait_and_assert_cpu_profiling_local)
thread.start()
inserting_should_stop_event = threading.Event()
def insert_rows():
lfc_conn = endpoint.connect(dbname="profiling_test2")
lfc_cur = lfc_conn.cursor()
n_records = 0
lfc_cur.execute(
"create table t(pk integer primary key, payload text default repeat('?', 128))"
)
batch_size = 1000
log.info("Inserting rows")
while not inserting_should_stop_event.is_set():
n_records += batch_size
lfc_cur.execute(
f"insert into t (pk) values (generate_series({n_records - batch_size + 1},{batch_size}))"
)
log.info(f"Inserted {n_records} rows")
thread2 = threading.Thread(target=insert_rows)
assert _wait_till_profiling_starts(http_client, None)
time.sleep(4) # Give some time for the profiling to start
thread2.start()
thread.join(timeout=60)
inserting_should_stop_event.set() # Stop the insertion thread
thread2.join(timeout=60)
@run_only_on_default_postgres(reason=PG_REASON)
@run_only_on_linux_kernel_higher_than(version=LINUX_VERSION_REQUIRED, reason=LINUX_REASON)
def test_compute_profiling_cpu_with_archiving_the_response(neon_simple_env: NeonEnv):
"""
Test that CPU profiling works correctly with archiving the data.
"""
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
http_client = endpoint.http_client()
assert _start_profiling_cpu(http_client, None, archive=True) is not None, (
"Failed to start and finish CPU profiling with archiving enabled"
)
@run_only_on_default_postgres(reason=PG_REASON)
@run_only_on_linux_kernel_higher_than(version=LINUX_VERSION_REQUIRED, reason=LINUX_REASON)
def test_compute_profiling_cpu_start_and_stop(neon_simple_env: NeonEnv):
"""
Test that CPU profiling can be started and stopped correctly.
"""
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
http_client = endpoint.http_client()
def _wait_and_assert_cpu_profiling():
# Should raise as the profiling will be stopped.
with pytest.raises(HTTPError) as _:
_start_profiling_cpu(http_client, None)
thread = threading.Thread(target=_wait_and_assert_cpu_profiling)
thread.start()
assert _wait_till_profiling_starts(http_client, None)
_stop_profiling_cpu(http_client, None)
# Should raise as the profiling is already stopped.
assert _stop_profiling_cpu(http_client, None) == 412
thread.join(timeout=60)
@run_only_on_default_postgres(reason=PG_REASON)
@run_only_on_linux_kernel_higher_than(version=LINUX_VERSION_REQUIRED, reason=LINUX_REASON)
def test_compute_profiling_cpu_conflict(neon_simple_env: NeonEnv):
"""
Test that CPU profiling can be started once and the second time
it will throw an error as it is already running.
"""
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
pg_conn = endpoint.connect()
pg_cur = pg_conn.cursor()
pg_cur.execute("create database profiling_test")
http_client = endpoint.http_client()
def _wait_and_assert_cpu_profiling_local():
_wait_and_assert_cpu_profiling(http_client, None)
thread = threading.Thread(target=_wait_and_assert_cpu_profiling_local)
thread.start()
assert _wait_till_profiling_starts(http_client, None)
inserting_should_stop_event = threading.Event()
def insert_rows():
lfc_conn = endpoint.connect(dbname="profiling_test")
lfc_cur = lfc_conn.cursor()
n_records = 0
lfc_cur.execute(
"create table t(pk integer primary key, payload text default repeat('?', 128))"
)
batch_size = 1000
log.info("Inserting rows")
while not inserting_should_stop_event.is_set():
n_records += batch_size
lfc_cur.execute(
f"insert into t (pk) values (generate_series({n_records - batch_size + 1},{batch_size}))"
)
log.info(f"Inserted {n_records} rows")
thread2 = threading.Thread(target=insert_rows)
thread2.start()
# Should raise as the profiling is already in progress.
with pytest.raises(HTTPError) as _:
_start_profiling_cpu(http_client, None)
inserting_should_stop_event.set() # Stop the insertion thread
# The profiling should still be running and finish normally.
thread.join(timeout=600)
thread2.join(timeout=600)
@run_only_on_default_postgres(reason=PG_REASON)
@run_only_on_linux_kernel_higher_than(version=LINUX_VERSION_REQUIRED, reason=LINUX_REASON)
def test_compute_profiling_cpu_stop_when_not_running(neon_simple_env: NeonEnv):
"""
Test that CPU profiling throws the expected error when is attempted
to be stopped when it hasn't be running.
"""
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
http_client = endpoint.http_client()
for _ in range(3):
status = _stop_profiling_cpu(http_client, None)
assert status == 412
@run_only_on_default_postgres(reason=PG_REASON)
@run_only_on_linux_kernel_higher_than(version=LINUX_VERSION_REQUIRED, reason=LINUX_REASON)
def test_compute_profiling_cpu_start_arguments_validation_works(neon_simple_env: NeonEnv):
"""
Test that CPU profiling start request properly validated the
arguments and throws the expected error (bad request).
"""
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
http_client = endpoint.http_client()
for sampling_frequency in [-1, 0, 1000000]:
status, _ = http_client.start_profiling_cpu(sampling_frequency, 5)
assert status == 422
for timeout_seconds in [-1, 0, 1000000]:
status, _ = http_client.start_profiling_cpu(5, timeout_seconds)
assert status == 422

View File

@@ -2,6 +2,9 @@ from __future__ import annotations
from typing import TYPE_CHECKING
import pytest
from fixtures.neon_fixtures import StorageControllerApiException
if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnvBuilder
@@ -75,3 +78,38 @@ def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder):
ep.start(safekeeper_generation=1, safekeepers=[3])
assert ep.safe_psql("SELECT * FROM t") == [(i,) for i in range(1, 4)]
def test_new_sk_set_validation(neon_env_builder: NeonEnvBuilder):
"""
Test that safekeeper_migrate validates the new_sk_set before starting the migration.
"""
neon_env_builder.num_safekeepers = 3
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
"timeline_safekeeper_count": 2,
}
env = neon_env_builder.init_start()
def expect_fail(sk_set: list[int], match: str):
with pytest.raises(StorageControllerApiException, match=match):
env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, sk_set
)
# Check that we failed before commiting to the database.
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert mconf["generation"] == 1
expect_fail([], "safekeeper set is empty")
expect_fail([1], "must have at least 2 safekeepers")
expect_fail([1, 1], "duplicate safekeeper")
expect_fail([1, 100500], "does not exist")
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
sk_set = mconf["sk_set"]
assert len(sk_set) == 2
decom_sk = [sk.id for sk in env.safekeepers if sk.id not in sk_set][0]
env.storage_controller.safekeeper_scheduling_policy(decom_sk, "Decomissioned")
expect_fail([sk_set[0], decom_sk], "decomissioned")

View File

@@ -1673,6 +1673,91 @@ def test_shard_resolve_during_split_abort(neon_env_builder: NeonEnvBuilder):
# END_HADRON
# HADRON
@pytest.mark.skip(reason="The backpressure change has not been merged yet.")
def test_back_pressure_per_shard(neon_env_builder: NeonEnvBuilder):
"""
Tests back pressure knobs are enforced on the per shard basis instead of at the tenant level.
"""
init_shard_count = 4
neon_env_builder.num_pageservers = init_shard_count
stripe_size = 1
env = neon_env_builder.init_start(
initial_tenant_shard_count=init_shard_count,
initial_tenant_shard_stripe_size=stripe_size,
initial_tenant_conf={
# disable auto-flush of shards and set max_replication_flush_lag as 15MB.
# The backpressure parameters must be enforced at the shard level to avoid stalling PG.
"checkpoint_distance": 1 * 1024 * 1024 * 1024,
"checkpoint_timeout": "1h",
},
)
endpoint = env.endpoints.create(
"main",
config_lines=[
"max_replication_write_lag = 0",
"max_replication_apply_lag = 0",
"max_replication_flush_lag = 15MB",
"neon.max_cluster_size = 10GB",
],
)
endpoint.respec(skip_pg_catalog_updates=False) # Needed for databricks_system to get created.
endpoint.start()
# generate 20MB of data
endpoint.safe_psql(
"CREATE TABLE usertable AS SELECT s AS KEY, repeat('a', 1000) as VALUE from generate_series(1, 20000) s;"
)
res = endpoint.safe_psql(
"SELECT neon.backpressure_throttling_time() as throttling_time", dbname="databricks_system"
)[0]
assert res[0] == 0, f"throttling_time should be 0, but got {res[0]}"
endpoint.stop()
# HADRON
def test_shard_split_page_server_timeout(neon_env_builder: NeonEnvBuilder):
"""
Tests that shard split can correctly handle page server timeouts and abort the split
"""
init_shard_count = 2
neon_env_builder.num_pageservers = 1
stripe_size = 1
if neon_env_builder.storage_controller_config is None:
neon_env_builder.storage_controller_config = {"shard_split_request_timeout": "5s"}
else:
neon_env_builder.storage_controller_config["shard_split_request_timeout"] = "5s"
env = neon_env_builder.init_start(
initial_tenant_shard_count=init_shard_count,
initial_tenant_shard_stripe_size=stripe_size,
)
env.storage_controller.allowed_errors.extend(
[
".*Enqueuing background abort.*",
".*failpoint.*",
".*Failed to abort.*",
".*Exclusive lock by ShardSplit was held.*",
]
)
env.pageserver.allowed_errors.extend([".*request was dropped before completing.*"])
endpoint1 = env.endpoints.create_start(branch_name="main")
env.pageserver.http_client().configure_failpoints(("shard-split-post-finish-pause", "pause"))
with pytest.raises(StorageControllerApiException):
env.storage_controller.tenant_shard_split(env.initial_tenant, shard_count=4)
env.pageserver.http_client().configure_failpoints(("shard-split-post-finish-pause", "off"))
endpoint1.stop_and_destroy()
def test_sharding_backpressure(neon_env_builder: NeonEnvBuilder):
"""
Check a scenario when one of the shards is much slower than others.

View File

@@ -209,9 +209,9 @@ def test_ancestor_detach_branched_from(
client.timeline_delete(env.initial_tenant, env.initial_timeline)
wait_timeline_detail_404(client, env.initial_tenant, env.initial_timeline)
# because we do the fullbackup from ancestor at the branch_lsn, the zenith.signal is always different
# as there is always "PREV_LSN: invalid" for "before"
skip_files = {"zenith.signal"}
# because we do the fullbackup from ancestor at the branch_lsn, the neon.signal and/or zenith.signal is always
# different as there is always "PREV_LSN: invalid" for "before"
skip_files = {"zenith.signal", "neon.signal"}
assert_pageserver_backups_equal(fullbackup_before, fullbackup_after, skip_files)
@@ -767,7 +767,7 @@ def test_compaction_induced_by_detaches_in_history(
env.pageserver, env.initial_tenant, branch_timeline_id, branch_lsn, fullbackup_after
)
# we don't need to skip any files, because zenith.signal will be identical
# we don't need to skip any files, because neon.signal will be identical
assert_pageserver_backups_equal(fullbackup_before, fullbackup_after, set())

View File

@@ -1,18 +1,18 @@
{
"v17": [
"17.5",
"db424d42d748f8ad91ac00e28db2c7f2efa42f7f"
"353c725b0c76cc82b15af21d8360d03391dc6814"
],
"v16": [
"16.9",
"7a4c0eacaeb9b97416542fa19103061c166460b1"
"e08c8d5f1576ca0487d14d154510499c5f12adfb"
],
"v15": [
"15.13",
"8c3249f36c7df6ac0efb8ee9f1baf4aa1b83e5c9"
"afd46987f3da50c9146a8aa59380052df0862c06"
],
"v14": [
"14.18",
"9085654ee8022d5cc4ca719380a1dc53e5e3246f"
"8ce1f52303aec29e098309347b57c01a1962e221"
]
}

View File

@@ -28,7 +28,6 @@ chrono = { version = "0.4", default-features = false, features = ["clock", "serd
clap = { version = "4", features = ["derive", "env", "string"] }
clap_builder = { version = "4", default-features = false, features = ["color", "env", "help", "std", "string", "suggestions", "usage"] }
const-oid = { version = "0.9", default-features = false, features = ["db", "std"] }
criterion = { version = "0.5", features = ["html_reports"] }
crypto-bigint = { version = "0.5", features = ["generic-array", "zeroize"] }
der = { version = "0.7", default-features = false, features = ["derive", "flagset", "oid", "pem", "std"] }
deranged = { version = "0.3", default-features = false, features = ["powerfmt", "serde", "std"] }
@@ -55,8 +54,7 @@ hmac = { version = "0.12", default-features = false, features = ["reset"] }
hyper-582f2526e08bb6a0 = { package = "hyper", version = "0.14", features = ["client", "http1", "http2", "runtime", "server", "stream"] }
hyper-dff4ba8e3ae991db = { package = "hyper", version = "1", features = ["full"] }
hyper-util = { version = "0.1", features = ["client-legacy", "server-auto", "service"] }
indexmap-dff4ba8e3ae991db = { package = "indexmap", version = "1", default-features = false, features = ["std"] }
indexmap-f595c2ba2a3f28df = { package = "indexmap", version = "2", features = ["serde"] }
indexmap = { version = "2", features = ["serde"] }
itertools = { version = "0.12" }
lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] }
libc = { version = "0.2", features = ["extra_traits", "use_std"] }
@@ -72,6 +70,7 @@ num-integer = { version = "0.1", features = ["i128"] }
num-iter = { version = "0.1", default-features = false, features = ["i128", "std"] }
num-rational = { version = "0.4", default-features = false, features = ["num-bigint-std", "std"] }
num-traits = { version = "0.2", features = ["i128", "libm"] }
once_cell = { version = "1" }
p256 = { version = "0.13", features = ["jwk"] }
parquet = { version = "53", default-features = false, features = ["zstd"] }
prost = { version = "0.13", features = ["no-recursion-limit", "prost-derive"] }
@@ -90,7 +89,6 @@ serde_json = { version = "1", features = ["alloc", "raw_value"] }
sha2 = { version = "0.10", features = ["asm", "oid"] }
signature = { version = "2", default-features = false, features = ["digest", "rand_core", "std"] }
smallvec = { version = "1", default-features = false, features = ["const_new", "write"] }
spin = { version = "0.9" }
spki = { version = "0.7", default-features = false, features = ["pem", "std"] }
stable_deref_trait = { version = "1" }
subtle = { version = "2" }
@@ -103,7 +101,6 @@ tokio-rustls = { version = "0.26", default-features = false, features = ["loggin
tokio-stream = { version = "0.1", features = ["net"] }
tokio-util = { version = "0.7", features = ["codec", "compat", "io-util", "rt"] }
toml_edit = { version = "0.22", features = ["serde"] }
tonic = { version = "0.12", default-features = false, features = ["codegen", "prost", "transport"] }
tower = { version = "0.5", default-features = false, features = ["balance", "buffer", "limit", "log"] }
tracing = { version = "0.1", features = ["log"] }
tracing-core = { version = "0.1" }
@@ -123,15 +120,13 @@ anyhow = { version = "1", features = ["backtrace"] }
bytes = { version = "1", features = ["serde"] }
cc = { version = "1", default-features = false, features = ["parallel"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }
clang-sys = { version = "1", default-features = false, features = ["clang_11_0", "runtime"] }
clap = { version = "4", features = ["derive", "env", "string"] }
clap_builder = { version = "4", default-features = false, features = ["color", "env", "help", "std", "string", "suggestions", "usage"] }
either = { version = "1" }
getrandom = { version = "0.2", default-features = false, features = ["std"] }
half = { version = "2", default-features = false, features = ["num-traits"] }
hashbrown = { version = "0.14", features = ["raw"] }
indexmap-dff4ba8e3ae991db = { package = "indexmap", version = "1", default-features = false, features = ["std"] }
indexmap-f595c2ba2a3f28df = { package = "indexmap", version = "2", features = ["serde"] }
indexmap = { version = "2", features = ["serde"] }
itertools = { version = "0.12" }
libc = { version = "0.2", features = ["extra_traits", "use_std"] }
log = { version = "0.4", default-features = false, features = ["std"] }
@@ -144,6 +139,7 @@ num-integer = { version = "0.1", features = ["i128"] }
num-iter = { version = "0.1", default-features = false, features = ["i128", "std"] }
num-rational = { version = "0.4", default-features = false, features = ["num-bigint-std", "std"] }
num-traits = { version = "0.2", features = ["i128", "libm"] }
once_cell = { version = "1" }
parquet = { version = "53", default-features = false, features = ["zstd"] }
prettyplease = { version = "0.2", default-features = false, features = ["verbatim"] }
proc-macro2 = { version = "1" }