Compare commits

..

39 Commits

Author SHA1 Message Date
Konstantin Knizhnik
1c4bed27be Resolve merge conflicts 2025-04-16 08:20:46 +03:00
Konstantin Knizhnik
fdf0f1bdc0 Fix rust formatting 2025-04-16 07:49:47 +03:00
Konstantin Knizhnik
0bdd388dd8 Make it possible to control lazy_sru_download through tenant config 2025-04-16 07:49:47 +03:00
Konstantin Knizhnik
712b4cf83c Update compute_tools/src/compute.rs
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2025-04-16 07:49:46 +03:00
Konstantin Knizhnik
15b6bb5026 Update libs/compute_api/src/spec.rs
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2025-04-16 07:49:44 +03:00
Konstantin Knizhnik
61d642e541 Update pageserver/src/page_service.rs
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2025-04-16 07:48:59 +03:00
Konstantin Knizhnik
1d24b887b8 Refector construction of basebackup command 2025-04-16 07:48:58 +03:00
Konstantin Knizhnik
955175c791 Make clippy happy 2025-04-16 07:48:58 +03:00
Konstantin Knizhnik
5fb0bcdd6a Make clippy happy 2025-04-16 07:48:58 +03:00
Konstantin Knizhnik
f146fa86f8 Use lazy SLRU download for all timelines is feature flag is set 2025-04-16 07:48:56 +03:00
Konstantin Knizhnik
961008116b Use lazy SLRU download for all timelines is feature flag is set 2025-04-16 07:47:55 +03:00
Konstantin Knizhnik
42d2d3addc Fix checking lazy SLRU download condition 2025-04-16 07:45:35 +03:00
Konstantin Knizhnik
06d0bed566 Always update lazy_slru_download flag during basebackup 2025-04-16 07:45:35 +03:00
Konstantin Knizhnik
aa367e5d82 Add lazy_slru_download_threshold parameter to page server config 2025-04-16 07:45:33 +03:00
Konstantin Knizhnik
6b76e1c526 Add lazy_slru_download compute feature flag 2025-04-16 07:42:32 +03:00
Tristan Partin
eadb05f78e Teach neon_local to pass the Authorization header to compute_ctl (#11490)
This allows us to remove hacks in the compute_ctl authorization
middleware which allowed for bypasses of auth checks.

Fixes: https://github.com/neondatabase/neon/issues/11316

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-04-15 17:27:49 +00:00
Fedor Dikarev
c5115518e9 remove temp file from repo (#11586)
## Problem
In https://github.com/neondatabase/neon/pull/11409 we added temp file to
the repo.

## Summary of changes
Remove temp file from the repo.
2025-04-15 15:29:15 +00:00
Alex Chi Z.
931f8c4300 fix(pageserver): check if cancelled before waiting logical size (2/2) (#11575)
## Problem

close https://github.com/neondatabase/neon/issues/11486, proceeding
https://github.com/neondatabase/neon/pull/11531

## Summary of changes

This patch fixes the rest 50% of instability of
`test_create_churn_during_restart`. During tenant warmup, we'll request
logical size; however, if the startup gets cancelled, we won't be able
to spawn the initial logical size calculation task that sets the
`cancel_wait_for_background_loop_concurrency_limit_semaphore`.

Therefore, we check `cancelled` before proceeding to get
`cancel_wait_for_background_loop_concurrency_limit_semaphore`. There
will still be a race if the timeline shutdown happens after L5710 and
before L5711, but it should be enough to reduce the flakiness of the
test.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-15 15:16:16 +00:00
Alexander Bayandin
0f7c2cc382 CI(release): add time to RC PR branch names (#11547)
## Problem

We can't have more than one open release PR created on the same day (due
to non-unique enough branch names).

## Summary of changes
- Add time (hours and minutes) to RC PR branch names
- Also make sure we use UTC for releases
2025-04-15 15:08:05 +00:00
Erik Grinaker
983d56502b pageserver: reduce shard ancestor rewrite threshold to 30% (#11582)
## Problem

When doing power-of-two shard splits (i.e. 4 → 8 → 16), we end up
rewriting all layers since half of the pages will be local due to
striping. This causes a lot of resource usage when splitting large
tenants.

## Summary of changes

Drop the threshold of local/total pages to 30%, to reduce the amount of
layer rewrites after splits.
2025-04-15 14:26:29 +00:00
Erik Grinaker
bcef542d5b pageserver: don't rewrite invisible layers during ancestor compaction (#11580)
## Problem

Shard ancestor compaction can be very expensive following shard splits
of large tenants. We currently rewrite garbage layers after shard splits
as well, which can be a significant amount of data.

Touches https://github.com/neondatabase/cloud/issues/22532.

## Summary of changes

Don't rewrite invisible layers after shard splits.
2025-04-15 14:25:58 +00:00
a-masterov
e31455d936 Add the tests for the extensions pg_jsonschema and pg_session_jwt (#11323)
## Problem
`pg_jsonschema` and `pg_session_jwt` are not yet covered by tests
## Summary of changes
Added the tests for these extensions.
2025-04-15 14:06:01 +00:00
Alex Chi Z.
a4ea7d6194 fix(pageserver): gc-compaction verification false failure (#11564)
## Problem

https://github.com/neondatabase/neon/pull/11515 introduced a bug that
some key history cannot be verified.

If a key only exists above the horizon, the verification will fail for
its first occurrence because the history does not exist at that point.

As gc-compaction skips a key range whenever an error occurs, it might be
doing some wasted work in staging/prod now. But I'm not planning a
hotfix this week as the bug doesn't affect correctness/performance.

## Summary of changes

Allow keys with only above horizon history in the verification.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-15 13:58:32 +00:00
Alexander Bayandin
19bea5fd0c CI: do not wait for tests to trigger deploy job (#11548)
## Problem

There is too much delay between merging a PR into `main` and deploying
the changes to staging

## Summary of changes
- Trigger `deploy` job without waiting for `build-and-test-locally` job
2025-04-15 11:23:41 +00:00
a-masterov
5be94e28c4 Update the documentation of the cloud regress test (#11539)
## Problem
The information in the README.md contained errors, and some information
was missing.
## Summary of changes
Found errors are fixed, and new information is added.

---------

Co-authored-by: Alexander Bayandin <alexander@neon.tech>
2025-04-15 11:00:25 +00:00
Alexander Bayandin
63a106021a CI(allure-report-generate): Install allure to /tmp (#11579)
## Problem

The `/__w/neon/neon` directory is mounted from host to container and
persists between runs.
Sometimes the next workflow run fails to delete it:

```
Deleting the contents of '/__w/neon/neon'
Error: File was unable to be removed Error: EACCES: permission denied, rmdir '/__w/neon/neon/allure-2.32.2/bin'
```

## Summary of changes
- Download and install allure to `/tmp` which exists in container only

Ref https://github.com/neondatabase/cloud/issues/27186
2025-04-15 09:29:36 +00:00
Fedor Dikarev
9a6ace9bde introduce new runners: unit-perf and use them for benchmark jobs (#11409)
## Problem
Benchmarks results are inconsistent on existing small-metal runners

## Summary of changes
Introduce new `unit-perf` runners, and lets run benchmark on them.

The new hardware has slower, but consistent, CPU frequency - if run with
default governor schedutil.
Thus we needed to adjust some testcases' timeouts and add some retry
steps where hard-coded timeouts couldn't be increased without changing
the system under test.
-
[wait_for_last_record_lsn](6592d69a67/test_runner/fixtures/pageserver/utils.py (L193))
1000s -> 2000s
-
[test_branch_creation_many](https://github.com/neondatabase/neon/pull/11409/files#diff-2ebfe76f89004d563c7e53e3ca82462e1d85e92e6d5588e8e8f598bbe119e927)
1000s
-
[test_ingest_insert_bulk](https://github.com/neondatabase/neon/pull/11409/files#diff-e90e685be4a87053bc264a68740969e6a8872c8897b8b748d0e8c5f683a68d9f)
- with back throttling disabled compute becomes unresponsive for more
than 60 seconds (PG hard-coded client authentication connection timeout)
-
[test_sharded_ingest](https://github.com/neondatabase/neon/pull/11409/files#diff-e8d870165bd44acb9a6d8350f8640b301c1385a4108430b8d6d659b697e4a3f1)
600s -> 1200s

Right now there are only 2 runners of that class, and if we decide to go
with them, we have to check how much that type of runners we need, so
jobs not stuck with waiting for that type of runners available.

However we now decided to run those runners with governor performance
instead of schedutil.
This achieves almost same performance as previous runners but still
achieves consistent results for same commit

Related issue to activate performance governor on these runners
https://github.com/neondatabase/runner/pull/138

## Verification that it helps

### analyze runtimes on new runner for same commit

Table of runtimes for the same commit on different runners in
[run](https://github.com/neondatabase/neon/actions/runs/14417589789)

| Run | Benchmarks (1) | Benchmarks (2) |Benchmarks (3) |Benchmarks (4)
| Benchmarks (5) |
|--------|--------|---------|---------|---------|---------|
| 1 | 1950.37s | 6374.55s |  3646.15s |  4149.48s |  2330.22s | 
| 2 | - | 6369.27s |  3666.65s |  4162.42s |  2329.23s | 
| Delta % |  - |  0,07 %  | 0,5 %   |   0,3 % | 0,04 %   |
| with governor performance | 1519.57s |  4131.62s |  - | -  |  - |
| second run gov. perf. | 1513.62s |  4134.67s |  - | -  |  - |
| Delta % |  0,3 % |  0,07 %  |  -  |  - | -   |
| speedup gov. performance | 22 % |  35 % |  - | -  |  - |
| current desktop class hetzner runners (main) | 1487.10s | 3699.67s | -
| - | - |
| slower than desktop class | 2 % |  12 % |  - | -  |  - |


In summary, the runtimes for the same commit on this hardware varies
less than 1 %.

---------

Co-authored-by: BodoBolero <peterbendel@neon.tech>
2025-04-15 08:21:44 +00:00
Erik Grinaker
8c77ccfc01 pageserver: log total progress during shard ancestor compaction (#11565)
## Problem

Shard ancestor compaction doesn't currently log any global progress
information, only for the current batch.

## Summary of changes

Log the number of layers checked for eligibility this iteration, and the
total number of layers to check. This will indicate how far along the
total shard ancestor compaction has gotten for this iteration.
2025-04-15 07:25:09 +00:00
Tristan Partin
cbd2fc2395 Clean up logs and error messages in compute_ctl authorize middleware (#11576)
Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-04-15 01:21:18 +00:00
Tristan Partin
028a191040 Continue with s/spec/config changes (#11574)
Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-04-14 21:18:21 +00:00
Vlad Lazar
8cce27bedb pageserver: add a randomized read path test (#11519)
## Problem

Every time we make changes to the read path to fix a bug or add a
feature,
we end up adding another incomprehensible test.

## Summary of changes

Add some generic infrastructure for generating a layer map from a type
spec
and use that for a read path test. The test is randomized but uses a
fixed seed
by default. A fuzzing mode is available for confidence building.

See [Notion
page](https://www.notion.so/neondatabase/Read-Path-Unit-Testing-Fuzzing-1d1f189e0047806c8e5cd37781b0a350?pvs=4)
for a diagram of the layer map
used.

Just for fun I tried removing [this
commit](9990199cb4)
from https://github.com/neondatabase/neon/pull/11494
and it caught the bug in the normal mode (no fuzzing required).
2025-04-14 15:31:32 +00:00
Vlad Lazar
90b706cd96 tests: save pageserver metrics at the end of the test (#11559)
## Problem

Sometimes it's useful to see the pageserver metrics after a test in
order to debug stuff.
For example, for https://github.com/neondatabase/neon/issues/11465 I'd
like to know
what the remote storage latencies are from the client.

## Summary of changes

When stopping the env, record the pageserver metrics into a file in the
pageserver's workdir.
2025-04-14 15:13:20 +00:00
Alex Chi Z.
057ce115de fix(test): allow stale generation errors (1/2) (#11531)
## Problem

Part of https://github.com/neondatabase/neon/issues/11486

## Summary of changes

50% of the test instability of `test_create_churn_during_restart` are
due to error message gets changed. Allow the new error message.

Still need to fix other errors due to failure to acquire semaphore in
this or the next patch.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-14 14:51:17 +00:00
Vlad Lazar
e85607eed8 tests: remove config tweak allowing old versions to start with a batching config (#11560)
## Problem

Pageservers now ignore unknown config fields, so this config tweaking is
no longer needed.

## Summary of changes

Get rid of the hack.

Closes https://github.com/neondatabase/neon/issues/11524
2025-04-14 14:42:35 +00:00
Tristan Partin
437071888e Fix logging in nightly physical replication benchmarks (#11541)
Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-04-14 13:57:33 +00:00
Vlad Lazar
148b3701cf pageserver: add metrics for get page batch breaking reasons (#11545)
## Problem

https://github.com/neondatabase/neon/pull/11494 changes the batching
logic, but we don't have a way to evaluate it.

## Summary of changes

This PR introduces a global and per timeline metric which tracks the
reason for
which a batch was broken.
2025-04-14 13:24:47 +00:00
Christian Schwarz
daebe50e19 refactor: plumb gate and cancellation down to to blob_io::BlobWriter (#11543)
In #10063 we will switch BlobWriter to use the owned buffers IO buffered
writer, which implements double-buffering by virtue of a background task
that performs the flushing.

That task's lifecylce must be contained within the Timeline lifecycle,
so, it must hold the timeline gate open and respect Timeline::cancel.

This PR does the noisy plumbing to reduce the #10063 diff.

Refs
- extracted from https://github.com/neondatabase/neon/pull/10063
- epic https://github.com/neondatabase/neon/issues/9868
2025-04-14 11:51:01 +00:00
Arpad Müller
e0ee6fbeff Remove deprecated --compute-hook-url storcon param (#11551)
We have already migrated the storage controller to
`--control-plane-url`, added in #11173. The new param was added to
support also safekeeper specific endpoints. See the docs changes in
#11195 for further details.

Part of #11163
2025-04-14 10:36:40 +00:00
Konstantin Knizhnik
307fa2ceb7 Remove unused n_synced variable from HandleSafekeeperResponse (#11553)
## Problem

clang produce warning about unused variable `n_synced` in
HandleSafekeeperResponse

## Summary of changes

Remove local variable.

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-04-14 09:45:13 +00:00
63 changed files with 1732 additions and 570 deletions

View File

@@ -6,6 +6,7 @@ self-hosted-runner:
- small
- small-metal
- small-arm64
- unit-perf
- us-east-2
config-variables:
- AWS_ECR_REGION

View File

@@ -70,6 +70,7 @@ runs:
- name: Install Allure
shell: bash -euxo pipefail {0}
working-directory: /tmp
run: |
if ! which allure; then
ALLURE_ZIP=allure-${ALLURE_VERSION}.zip

View File

@@ -53,10 +53,13 @@ jobs:
|| inputs.component-name == 'Compute' && 'release-compute'
}}
run: |
today=$(date +'%Y-%m-%d')
echo "title=${COMPONENT_NAME} release ${today}" | tee -a ${GITHUB_OUTPUT}
echo "rc-branch=rc/${RELEASE_BRANCH}/${today}" | tee -a ${GITHUB_OUTPUT}
echo "release-branch=${RELEASE_BRANCH}" | tee -a ${GITHUB_OUTPUT}
now_date=$(date -u +'%Y-%m-%d')
now_time=$(date -u +'%H-%M-%Z')
{
echo "title=${COMPONENT_NAME} release ${now_date}"
echo "rc-branch=rc/${RELEASE_BRANCH}/${now_date}_${now_time}"
echo "release-branch=${RELEASE_BRANCH}"
} | tee -a ${GITHUB_OUTPUT}
- name: Configure git
run: |

View File

@@ -284,7 +284,7 @@ jobs:
statuses: write
contents: write
pull-requests: write
runs-on: [ self-hosted, small-metal ]
runs-on: [ self-hosted, unit-perf ]
container:
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
credentials:
@@ -1271,7 +1271,7 @@ jobs:
exit 1
deploy:
needs: [ check-permissions, push-neon-image-dev, push-compute-image-dev, push-neon-image-prod, push-compute-image-prod, meta, build-and-test-locally, trigger-custom-extensions-build-and-wait ]
needs: [ check-permissions, push-neon-image-dev, push-compute-image-dev, push-neon-image-prod, push-compute-image-prod, meta, trigger-custom-extensions-build-and-wait ]
# `!failure() && !cancelled()` is required because the workflow depends on the job that can be skipped: `push-neon-image-prod` and `push-compute-image-prod`
if: ${{ contains(fromJSON('["push-main", "storage-release", "proxy-release", "compute-release"]'), needs.meta.outputs.run-kind) && !failure() && !cancelled() }}
permissions:

14
Cargo.lock generated
View File

@@ -1416,6 +1416,7 @@ name = "control_plane"
version = "0.1.0"
dependencies = [
"anyhow",
"base64 0.13.1",
"camino",
"clap",
"comfy-table",
@@ -1425,10 +1426,13 @@ dependencies = [
"humantime",
"humantime-serde",
"hyper 0.14.30",
"jsonwebtoken",
"nix 0.27.1",
"once_cell",
"pageserver_api",
"pageserver_client",
"pem",
"pkcs8 0.10.2",
"postgres_backend",
"postgres_connection",
"regex",
@@ -1437,6 +1441,7 @@ dependencies = [
"scopeguard",
"serde",
"serde_json",
"sha2",
"storage_broker",
"thiserror 1.0.69",
"tokio",
@@ -2817,6 +2822,7 @@ dependencies = [
"hyper 0.14.30",
"itertools 0.10.5",
"jemalloc_pprof",
"jsonwebtoken",
"metrics",
"once_cell",
"pprof",
@@ -4269,6 +4275,7 @@ dependencies = [
"hyper 0.14.30",
"indoc",
"itertools 0.10.5",
"jsonwebtoken",
"md5",
"metrics",
"nix 0.27.1",
@@ -5685,9 +5692,9 @@ dependencies = [
[[package]]
name = "ring"
version = "0.17.13"
version = "0.17.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ac5d832aa16abd7d1def883a8545280c20a60f523a370aa3a9617c2b8550ee"
checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7"
dependencies = [
"cc",
"cfg-if",
@@ -5988,6 +5995,7 @@ dependencies = [
"humantime",
"hyper 0.14.30",
"itertools 0.10.5",
"jsonwebtoken",
"metrics",
"once_cell",
"pageserver_api",
@@ -7872,6 +7880,7 @@ dependencies = [
"metrics",
"nix 0.27.1",
"once_cell",
"pem",
"pin-project-lite",
"postgres_connection",
"pprof",
@@ -8460,6 +8469,7 @@ dependencies = [
"once_cell",
"p256 0.13.2",
"parquet",
"pkcs8 0.10.2",
"prettyplease",
"proc-macro2",
"prost 0.13.3",

View File

@@ -141,7 +141,9 @@ parking_lot = "0.12"
parquet = { version = "53", default-features = false, features = ["zstd"] }
parquet_derive = "53"
pbkdf2 = { version = "0.12.1", features = ["simple", "std"] }
pem = "3.0.3"
pin-project-lite = "0.2"
pkcs8 = "0.10.2"
pprof = { version = "0.14", features = ["criterion", "flamegraph", "frame-pointer", "prost-codec"] }
procfs = "0.16"
prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency

View File

@@ -139,7 +139,7 @@ fn main() -> Result<()> {
let scenario = failpoint_support::init();
// For historical reasons, the main thread that processes the spec and launches postgres
// For historical reasons, the main thread that processes the config and launches postgres
// is synchronous, but we always have this tokio runtime available and we "enter" it so
// that you can use tokio::spawn() and tokio::runtime::Handle::current().block_on(...)
// from all parts of compute_ctl.
@@ -155,7 +155,7 @@ fn main() -> Result<()> {
let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?;
let cli_spec = get_config(&cli)?;
let config = get_config(&cli)?;
let compute_node = ComputeNode::new(
ComputeNodeParams {
@@ -176,8 +176,7 @@ fn main() -> Result<()> {
#[cfg(target_os = "linux")]
vm_monitor_addr: cli.vm_monitor_addr,
},
cli_spec.spec,
cli_spec.compute_ctl_config,
config,
)?;
let exit_code = compute_node.run()?;

View File

@@ -11,7 +11,7 @@ use std::{env, fs};
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use compute_api::privilege::Privilege;
use compute_api::responses::{ComputeCtlConfig, ComputeMetrics, ComputeStatus};
use compute_api::responses::{ComputeConfig, ComputeCtlConfig, ComputeMetrics, ComputeStatus};
use compute_api::spec::{
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PgIdent,
};
@@ -303,11 +303,7 @@ struct StartVmMonitorResult {
}
impl ComputeNode {
pub fn new(
params: ComputeNodeParams,
cli_spec: Option<ComputeSpec>,
compute_ctl_config: ComputeCtlConfig,
) -> Result<Self> {
pub fn new(params: ComputeNodeParams, config: ComputeConfig) -> Result<Self> {
let connstr = params.connstr.as_str();
let conn_conf = postgres::config::Config::from_str(connstr)
.context("cannot build postgres config from connstr")?;
@@ -315,8 +311,8 @@ impl ComputeNode {
.context("cannot build tokio postgres config from connstr")?;
let mut new_state = ComputeState::new();
if let Some(cli_spec) = cli_spec {
let pspec = ParsedSpec::try_from(cli_spec).map_err(|msg| anyhow::anyhow!(msg))?;
if let Some(spec) = config.spec {
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
new_state.pspec = Some(pspec);
}
@@ -327,7 +323,7 @@ impl ComputeNode {
state: Mutex::new(new_state),
state_changed: Condvar::new(),
ext_download_progress: RwLock::new(HashMap::new()),
compute_ctl_config,
compute_ctl_config: config.compute_ctl_config,
})
}
@@ -901,32 +897,28 @@ impl ComputeNode {
let mut client = config.connect(NoTls)?;
let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;
let basebackup_cmd = match lsn {
Lsn(0) => {
if spec.spec.mode != ComputeMode::Primary {
format!(
"basebackup {} {} --gzip --replica",
spec.tenant_id, spec.timeline_id
)
} else {
format!("basebackup {} {} --gzip", spec.tenant_id, spec.timeline_id)
}
}
_ => {
if spec.spec.mode != ComputeMode::Primary {
format!(
"basebackup {} {} {} --gzip --replica",
spec.tenant_id, spec.timeline_id, lsn
)
} else {
format!(
"basebackup {} {} {} --gzip",
spec.tenant_id, spec.timeline_id, lsn
)
}
}
};
let tenant_id = spec.tenant_id.to_string();
let timeline_id = spec.timeline_id.to_string();
let lsn_str = lsn.to_string();
let mut cmd = Vec::new();
cmd.push("basebackup");
cmd.push(&tenant_id);
cmd.push(&timeline_id);
if lsn != Lsn::INVALID {
cmd.push(&lsn_str);
}
cmd.push("--gzip");
if spec.spec.mode != ComputeMode::Primary {
cmd.push("--replica");
}
if spec
.spec
.features
.contains(&ComputeFeature::LazySlruDownload)
{
cmd.push("--lazy-slru-download")
}
let basebackup_cmd = cmd.join(" ");
let copyreader = client.copy_out(basebackup_cmd.as_str())?;
let mut measured_reader = MeasuredReader::new(copyreader);
let mut bufreader = std::io::BufReader::new(&mut measured_reader);

View File

@@ -11,7 +11,7 @@ use futures::future::BoxFuture;
use http::{Request, Response, StatusCode};
use jsonwebtoken::{Algorithm, DecodingKey, TokenData, Validation, jwk::JwkSet};
use tower_http::auth::AsyncAuthorizeRequest;
use tracing::warn;
use tracing::{debug, warn};
use crate::http::{JsonResponse, extract::RequestId};
@@ -54,8 +54,8 @@ impl AsyncAuthorizeRequest<Body> for Authorize {
Box::pin(async move {
let request_id = request.extract_parts::<RequestId>().await.unwrap();
// TODO: Remove this stanza after teaching neon_local and the
// regression tests to use a JWT + JWKS.
// TODO(tristan957): Remove this stanza after teaching neon_local
// and the regression tests to use a JWT + JWKS.
//
// https://github.com/neondatabase/neon/issues/11316
if cfg!(feature = "testing") {
@@ -92,7 +92,7 @@ impl AsyncAuthorizeRequest<Body> for Authorize {
if data.claims.compute_id != compute_id {
return Err(JsonResponse::error(
StatusCode::UNAUTHORIZED,
"invalid claims in authorization token",
"invalid compute ID in authorization token claims",
));
}
@@ -112,12 +112,16 @@ impl Authorize {
token: &str,
validation: &Validation,
) -> Result<TokenData<ComputeClaims>> {
debug_assert!(!jwks.keys.is_empty());
debug!("verifying token {}", token);
for jwk in jwks.keys.iter() {
let decoding_key = match DecodingKey::from_jwk(jwk) {
Ok(key) => key,
Err(e) => {
warn!(
"Failed to construct decoding key from {}: {}",
"failed to construct decoding key from {}: {}",
jwk.common.key_id.as_ref().unwrap(),
e
);
@@ -130,7 +134,7 @@ impl Authorize {
Ok(data) => return Ok(data),
Err(e) => {
warn!(
"Failed to decode authorization token using {}: {}",
"failed to decode authorization token using {}: {}",
jwk.common.key_id.as_ref().unwrap(),
e
);
@@ -140,6 +144,6 @@ impl Authorize {
}
}
Err(anyhow!("Failed to verify authorization token"))
Err(anyhow!("failed to verify authorization token"))
}
}

View File

@@ -6,13 +6,17 @@ license.workspace = true
[dependencies]
anyhow.workspace = true
base64.workspace = true
camino.workspace = true
clap.workspace = true
comfy-table.workspace = true
futures.workspace = true
humantime.workspace = true
jsonwebtoken.workspace = true
nix.workspace = true
once_cell.workspace = true
pem.workspace = true
pkcs8.workspace = true
humantime-serde.workspace = true
hyper0.workspace = true
regex.workspace = true
@@ -20,6 +24,7 @@ reqwest = { workspace = true, features = ["blocking", "json"] }
scopeguard.workspace = true
serde.workspace = true
serde_json.workspace = true
sha2.workspace = true
thiserror.workspace = true
toml.workspace = true
toml_edit.workspace = true

View File

@@ -552,6 +552,7 @@ enum EndpointCmd {
Start(EndpointStartCmdArgs),
Reconfigure(EndpointReconfigureCmdArgs),
Stop(EndpointStopCmdArgs),
GenerateJwt(EndpointGenerateJwtCmdArgs),
}
#[derive(clap::Args)]
@@ -699,6 +700,13 @@ struct EndpointStopCmdArgs {
mode: String,
}
#[derive(clap::Args)]
#[clap(about = "Generate a JWT for an endpoint")]
struct EndpointGenerateJwtCmdArgs {
#[clap(help = "Postgres endpoint id")]
endpoint_id: String,
}
#[derive(clap::Subcommand)]
#[clap(about = "Manage neon_local branch name mappings")]
enum MappingsCmd {
@@ -1528,6 +1536,16 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
endpoint.stop(&args.mode, args.destroy)?;
}
EndpointCmd::GenerateJwt(args) => {
let endpoint_id = &args.endpoint_id;
let endpoint = cplane
.endpoints
.get(endpoint_id)
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
let jwt = endpoint.generate_jwt()?;
println!("{jwt}");
}
}
Ok(())

View File

@@ -42,22 +42,29 @@ use std::path::PathBuf;
use std::process::Command;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use std::time::{Duration, Instant};
use anyhow::{Context, Result, anyhow, bail};
use compute_api::requests::ConfigurationRequest;
use compute_api::requests::{ComputeClaims, ConfigurationRequest};
use compute_api::responses::{
ComputeConfig, ComputeCtlConfig, ComputeStatus, ComputeStatusResponse,
ComputeConfig, ComputeCtlConfig, ComputeStatus, ComputeStatusResponse, TlsConfig,
};
use compute_api::spec::{
Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PgIdent,
RemoteExtSpec, Role,
};
use jsonwebtoken::jwk::{
AlgorithmParameters, CommonParameters, EllipticCurve, Jwk, JwkSet, KeyAlgorithm, KeyOperations,
OctetKeyPairParameters, OctetKeyPairType, PublicKeyUse,
};
use nix::sys::signal::{Signal, kill};
use pageserver_api::shard::ShardStripeSize;
use pem::Pem;
use pkcs8::der::Decode;
use reqwest::header::CONTENT_TYPE;
use safekeeper_api::membership::SafekeeperGeneration;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use tracing::debug;
use url::Host;
use utils::id::{NodeId, TenantId, TimelineId};
@@ -82,6 +89,7 @@ pub struct EndpointConf {
drop_subscriptions_before_start: bool,
features: Vec<ComputeFeature>,
cluster: Option<Cluster>,
compute_ctl_config: ComputeCtlConfig,
}
//
@@ -137,6 +145,36 @@ impl ComputeControlPlane {
.unwrap_or(self.base_port)
}
/// Create a JSON Web Key Set. This ideally matches the way we create a JWKS
/// from the production control plane.
fn create_jwks_from_pem(pem: Pem) -> Result<JwkSet> {
let document = pkcs8::Document::from_der(&pem.into_contents())?;
let mut hasher = Sha256::new();
hasher.update(&document);
let key_hash = hasher.finalize();
Ok(JwkSet {
keys: vec![Jwk {
common: CommonParameters {
public_key_use: Some(PublicKeyUse::Signature),
key_operations: Some(vec![KeyOperations::Verify]),
key_algorithm: Some(KeyAlgorithm::EdDSA),
key_id: Some(base64::encode_config(key_hash, base64::URL_SAFE_NO_PAD)),
x509_url: None::<String>,
x509_chain: None::<Vec<String>>,
x509_sha1_fingerprint: None::<String>,
x509_sha256_fingerprint: None::<String>,
},
algorithm: AlgorithmParameters::OctetKeyPair(OctetKeyPairParameters {
key_type: OctetKeyPairType::OctetKeyPair,
curve: EllipticCurve::Ed25519,
x: base64::encode_config(&document, base64::URL_SAFE_NO_PAD),
}),
}],
})
}
#[allow(clippy::too_many_arguments)]
pub fn new_endpoint(
&mut self,
@@ -154,6 +192,10 @@ impl ComputeControlPlane {
let pg_port = pg_port.unwrap_or_else(|| self.get_port());
let external_http_port = external_http_port.unwrap_or_else(|| self.get_port() + 1);
let internal_http_port = internal_http_port.unwrap_or_else(|| external_http_port + 1);
let compute_ctl_config = ComputeCtlConfig {
jwks: Self::create_jwks_from_pem(self.env.read_public_key()?)?,
tls: None::<TlsConfig>,
};
let ep = Arc::new(Endpoint {
endpoint_id: endpoint_id.to_owned(),
pg_address: SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), pg_port),
@@ -181,6 +223,7 @@ impl ComputeControlPlane {
reconfigure_concurrency: 1,
features: vec![],
cluster: None,
compute_ctl_config: compute_ctl_config.clone(),
});
ep.create_endpoint_dir()?;
@@ -200,6 +243,7 @@ impl ComputeControlPlane {
reconfigure_concurrency: 1,
features: vec![],
cluster: None,
compute_ctl_config,
})?,
)?;
std::fs::write(
@@ -242,7 +286,6 @@ impl ComputeControlPlane {
///////////////////////////////////////////////////////////////////////////////
#[derive(Debug)]
pub struct Endpoint {
/// used as the directory name
endpoint_id: String,
@@ -271,6 +314,9 @@ pub struct Endpoint {
features: Vec<ComputeFeature>,
// Cluster settings
cluster: Option<Cluster>,
/// The compute_ctl config for the endpoint's compute.
compute_ctl_config: ComputeCtlConfig,
}
#[derive(PartialEq, Eq)]
@@ -333,6 +379,7 @@ impl Endpoint {
drop_subscriptions_before_start: conf.drop_subscriptions_before_start,
features: conf.features,
cluster: conf.cluster,
compute_ctl_config: conf.compute_ctl_config,
})
}
@@ -580,6 +627,13 @@ impl Endpoint {
Ok(safekeeper_connstrings)
}
/// Generate a JWT with the correct claims.
pub fn generate_jwt(&self) -> Result<String> {
self.env.generate_auth_token(&ComputeClaims {
compute_id: self.endpoint_id.clone(),
})
}
#[allow(clippy::too_many_arguments)]
pub async fn start(
&self,
@@ -706,7 +760,7 @@ impl Endpoint {
ComputeConfig {
spec: Some(spec),
compute_ctl_config: ComputeCtlConfig::default(),
compute_ctl_config: self.compute_ctl_config.clone(),
}
};
@@ -774,16 +828,7 @@ impl Endpoint {
])
// TODO: It would be nice if we generated compute IDs with the same
// algorithm as the real control plane.
.args([
"--compute-id",
&format!(
"compute-{}",
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
),
])
.args(["--compute-id", &self.endpoint_id])
.stdin(std::process::Stdio::null())
.stderr(logfile.try_clone()?)
.stdout(logfile);
@@ -881,6 +926,7 @@ impl Endpoint {
self.external_http_address.port()
),
)
.bearer_auth(self.generate_jwt()?)
.send()
.await?;
@@ -957,6 +1003,7 @@ impl Endpoint {
self.external_http_address.port()
))
.header(CONTENT_TYPE.as_str(), "application/json")
.bearer_auth(self.generate_jwt()?)
.body(
serde_json::to_string(&ConfigurationRequest {
spec,

View File

@@ -12,6 +12,7 @@ use std::{env, fs};
use anyhow::{Context, bail};
use clap::ValueEnum;
use pem::Pem;
use postgres_backend::AuthType;
use reqwest::Url;
use serde::{Deserialize, Serialize};
@@ -56,6 +57,7 @@ pub struct LocalEnv {
// used to issue tokens during e.g pg start
pub private_key_path: PathBuf,
/// Path to environment's public key
pub public_key_path: PathBuf,
pub broker: NeonBroker,
@@ -758,11 +760,11 @@ impl LocalEnv {
// this function is used only for testing purposes in CLI e g generate tokens during init
pub fn generate_auth_token<S: Serialize>(&self, claims: &S) -> anyhow::Result<String> {
let private_key_path = self.get_private_key_path();
let key_data = fs::read(private_key_path)?;
encode_from_key_file(claims, &key_data)
let key = self.read_private_key()?;
encode_from_key_file(claims, &key)
}
/// Get the path to the private key.
pub fn get_private_key_path(&self) -> PathBuf {
if self.private_key_path.is_absolute() {
self.private_key_path.to_path_buf()
@@ -771,6 +773,29 @@ impl LocalEnv {
}
}
/// Get the path to the public key.
pub fn get_public_key_path(&self) -> PathBuf {
if self.public_key_path.is_absolute() {
self.public_key_path.to_path_buf()
} else {
self.base_data_dir.join(&self.public_key_path)
}
}
/// Read the contents of the private key file.
pub fn read_private_key(&self) -> anyhow::Result<Pem> {
let private_key_path = self.get_private_key_path();
let pem = pem::parse(fs::read(private_key_path)?)?;
Ok(pem)
}
/// Read the contents of the public key file.
pub fn read_public_key(&self) -> anyhow::Result<Pem> {
let public_key_path = self.get_public_key_path();
let pem = pem::parse(fs::read(public_key_path)?)?;
Ok(pem)
}
/// Materialize the [`NeonLocalInitConf`] to disk. Called during [`neon_local init`].
pub fn init(conf: NeonLocalInitConf, force: &InitForceMode) -> anyhow::Result<()> {
let base_path = base_path();
@@ -956,6 +981,7 @@ fn generate_auth_keys(private_key_path: &Path, public_key_path: &Path) -> anyhow
String::from_utf8_lossy(&keygen_output.stderr)
);
}
// Extract the public key from the private key file
//
// openssl pkey -in auth_private_key.pem -pubout -out auth_public_key.pem
@@ -972,6 +998,7 @@ fn generate_auth_keys(private_key_path: &Path, public_key_path: &Path) -> anyhow
String::from_utf8_lossy(&keygen_output.stderr)
);
}
Ok(())
}

View File

@@ -18,6 +18,7 @@ use pageserver_api::models::{
};
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
use pem::Pem;
use postgres_backend::AuthType;
use reqwest::{Certificate, Method};
use serde::de::DeserializeOwned;
@@ -34,8 +35,8 @@ use crate::local_env::{LocalEnv, NeonStorageControllerConf};
pub struct StorageController {
env: LocalEnv,
private_key: Option<Vec<u8>>,
public_key: Option<String>,
private_key: Option<Pem>,
public_key: Option<Pem>,
client: reqwest::Client,
config: NeonStorageControllerConf,
@@ -116,7 +117,9 @@ impl StorageController {
AuthType::Trust => (None, None),
AuthType::NeonJWT => {
let private_key_path = env.get_private_key_path();
let private_key = fs::read(private_key_path).expect("failed to read private key");
let private_key =
pem::parse(fs::read(private_key_path).expect("failed to read private key"))
.expect("failed to parse PEM file");
// If pageserver auth is enabled, this implicitly enables auth for this service,
// using the same credentials.
@@ -138,9 +141,13 @@ impl StorageController {
.expect("Empty key dir")
.expect("Error reading key dir");
std::fs::read_to_string(dent.path()).expect("Can't read public key")
pem::parse(std::fs::read_to_string(dent.path()).expect("Can't read public key"))
.expect("Failed to parse PEM file")
} else {
std::fs::read_to_string(&public_key_path).expect("Can't read public key")
pem::parse(
std::fs::read_to_string(&public_key_path).expect("Can't read public key"),
)
.expect("Failed to parse PEM file")
};
(Some(private_key), Some(public_key))
}

View File

@@ -0,0 +1,8 @@
EXTENSION = pg_jsonschema
DATA = pg_jsonschema--1.0.sql
REGRESS = jsonschema_valid_api jsonschema_edge_cases
REGRESS_OPTS = --load-extension=pg_jsonschema
PG_CONFIG ?= pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)

View File

@@ -0,0 +1,87 @@
-- Schema with enums, nulls, extra properties disallowed
SELECT jsonschema_is_valid('{
"type": "object",
"properties": {
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
"email": { "type": ["string", "null"], "format": "email" }
},
"required": ["status"],
"additionalProperties": false
}'::json);
jsonschema_is_valid
---------------------
t
(1 row)
-- Valid enum and null email
SELECT jsonschema_validation_errors(
'{
"type": "object",
"properties": {
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
"email": { "type": ["string", "null"], "format": "email" }
},
"required": ["status"],
"additionalProperties": false
}'::json,
'{"status": "active", "email": null}'::json
);
jsonschema_validation_errors
------------------------------
{}
(1 row)
-- Invalid enum value
SELECT jsonschema_validation_errors(
'{
"type": "object",
"properties": {
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
"email": { "type": ["string", "null"], "format": "email" }
},
"required": ["status"],
"additionalProperties": false
}'::json,
'{"status": "disabled", "email": null}'::json
);
jsonschema_validation_errors
----------------------------------------------------------------------
{"\"disabled\" is not one of [\"active\",\"inactive\",\"pending\"]"}
(1 row)
-- Invalid email format (assuming format is validated)
SELECT jsonschema_validation_errors(
'{
"type": "object",
"properties": {
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
"email": { "type": ["string", "null"], "format": "email" }
},
"required": ["status"],
"additionalProperties": false
}'::json,
'{"status": "active", "email": "not-an-email"}'::json
);
jsonschema_validation_errors
-----------------------------------------
{"\"not-an-email\" is not a \"email\""}
(1 row)
-- Extra property not allowed
SELECT jsonschema_validation_errors(
'{
"type": "object",
"properties": {
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
"email": { "type": ["string", "null"], "format": "email" }
},
"required": ["status"],
"additionalProperties": false
}'::json,
'{"status": "active", "extra": "should not be here"}'::json
);
jsonschema_validation_errors
--------------------------------------------------------------------
{"Additional properties are not allowed ('extra' was unexpected)"}
(1 row)

View File

@@ -0,0 +1,65 @@
-- Define schema
SELECT jsonschema_is_valid('{
"type": "object",
"properties": {
"username": { "type": "string" },
"age": { "type": "integer" }
},
"required": ["username"]
}'::json);
jsonschema_is_valid
---------------------
t
(1 row)
-- Valid instance
SELECT jsonschema_validation_errors(
'{
"type": "object",
"properties": {
"username": { "type": "string" },
"age": { "type": "integer" }
},
"required": ["username"]
}'::json,
'{"username": "alice", "age": 25}'::json
);
jsonschema_validation_errors
------------------------------
{}
(1 row)
-- Invalid instance: missing required "username"
SELECT jsonschema_validation_errors(
'{
"type": "object",
"properties": {
"username": { "type": "string" },
"age": { "type": "integer" }
},
"required": ["username"]
}'::json,
'{"age": 25}'::json
);
jsonschema_validation_errors
-----------------------------------------
{"\"username\" is a required property"}
(1 row)
-- Invalid instance: wrong type for "age"
SELECT jsonschema_validation_errors(
'{
"type": "object",
"properties": {
"username": { "type": "string" },
"age": { "type": "integer" }
},
"required": ["username"]
}'::json,
'{"username": "bob", "age": "twenty"}'::json
);
jsonschema_validation_errors
-------------------------------------------
{"\"twenty\" is not of type \"integer\""}
(1 row)

View File

@@ -0,0 +1,66 @@
-- Schema with enums, nulls, extra properties disallowed
SELECT jsonschema_is_valid('{
"type": "object",
"properties": {
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
"email": { "type": ["string", "null"], "format": "email" }
},
"required": ["status"],
"additionalProperties": false
}'::json);
-- Valid enum and null email
SELECT jsonschema_validation_errors(
'{
"type": "object",
"properties": {
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
"email": { "type": ["string", "null"], "format": "email" }
},
"required": ["status"],
"additionalProperties": false
}'::json,
'{"status": "active", "email": null}'::json
);
-- Invalid enum value
SELECT jsonschema_validation_errors(
'{
"type": "object",
"properties": {
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
"email": { "type": ["string", "null"], "format": "email" }
},
"required": ["status"],
"additionalProperties": false
}'::json,
'{"status": "disabled", "email": null}'::json
);
-- Invalid email format (assuming format is validated)
SELECT jsonschema_validation_errors(
'{
"type": "object",
"properties": {
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
"email": { "type": ["string", "null"], "format": "email" }
},
"required": ["status"],
"additionalProperties": false
}'::json,
'{"status": "active", "email": "not-an-email"}'::json
);
-- Extra property not allowed
SELECT jsonschema_validation_errors(
'{
"type": "object",
"properties": {
"status": { "type": "string", "enum": ["active", "inactive", "pending"] },
"email": { "type": ["string", "null"], "format": "email" }
},
"required": ["status"],
"additionalProperties": false
}'::json,
'{"status": "active", "extra": "should not be here"}'::json
);

View File

@@ -0,0 +1,48 @@
-- Define schema
SELECT jsonschema_is_valid('{
"type": "object",
"properties": {
"username": { "type": "string" },
"age": { "type": "integer" }
},
"required": ["username"]
}'::json);
-- Valid instance
SELECT jsonschema_validation_errors(
'{
"type": "object",
"properties": {
"username": { "type": "string" },
"age": { "type": "integer" }
},
"required": ["username"]
}'::json,
'{"username": "alice", "age": 25}'::json
);
-- Invalid instance: missing required "username"
SELECT jsonschema_validation_errors(
'{
"type": "object",
"properties": {
"username": { "type": "string" },
"age": { "type": "integer" }
},
"required": ["username"]
}'::json,
'{"age": 25}'::json
);
-- Invalid instance: wrong type for "age"
SELECT jsonschema_validation_errors(
'{
"type": "object",
"properties": {
"username": { "type": "string" },
"age": { "type": "integer" }
},
"required": ["username"]
}'::json,
'{"username": "bob", "age": "twenty"}'::json
);

View File

@@ -0,0 +1,9 @@
EXTENSION = pg_session_jwt
REGRESS = basic_functions
REGRESS_OPTS = --load-extension=$(EXTENSION)
export PGOPTIONS = -c pg_session_jwt.jwk={"crv":"Ed25519","kty":"OKP","x":"R_Abz-63zJ00l-IraL5fQhwkhGVZCSooQFV5ntC3C7M"}
PG_CONFIG ?= pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)

View File

@@ -0,0 +1,35 @@
-- Basic functionality tests for pg_session_jwt
-- Test auth.init() function
SELECT auth.init();
init
------
(1 row)
-- Test an invalid JWT
SELECT auth.jwt_session_init('INVALID-JWT');
ERROR: invalid JWT encoding
-- Test creating a session with an expired JWT
SELECT auth.jwt_session_init('eyJhbGciOiJFZERTQSJ9.eyJleHAiOjE3NDI1NjQ0MzIsImlhdCI6MTc0MjU2NDI1MiwianRpIjo0MjQyNDIsInN1YiI6InVzZXIxMjMifQ.A6FwKuaSduHB9O7Gz37g0uoD_U9qVS0JNtT7YABGVgB7HUD1AMFc9DeyhNntWBqncg8k5brv-hrNTuUh5JYMAw');
ERROR: Token used after it has expired
-- Test creating a session with a valid JWT
SELECT auth.jwt_session_init('eyJhbGciOiJFZERTQSJ9.eyJleHAiOjQ4OTYxNjQyNTIsImlhdCI6MTc0MjU2NDI1MiwianRpIjo0MzQzNDMsInN1YiI6InVzZXIxMjMifQ.2TXVgjb6JSUq6_adlvp-m_SdOxZSyGS30RS9TLB0xu2N83dMSs2NybwE1NMU8Fb0tcAZR_ET7M2rSxbTrphfCg');
jwt_session_init
------------------
(1 row)
-- Test auth.session() function
SELECT auth.session();
session
-------------------------------------------------------------------------
{"exp": 4896164252, "iat": 1742564252, "jti": 434343, "sub": "user123"}
(1 row)
-- Test auth.user_id() function
SELECT auth.user_id() AS user_id;
user_id
---------
user123
(1 row)

View File

@@ -0,0 +1,19 @@
-- Basic functionality tests for pg_session_jwt
-- Test auth.init() function
SELECT auth.init();
-- Test an invalid JWT
SELECT auth.jwt_session_init('INVALID-JWT');
-- Test creating a session with an expired JWT
SELECT auth.jwt_session_init('eyJhbGciOiJFZERTQSJ9.eyJleHAiOjE3NDI1NjQ0MzIsImlhdCI6MTc0MjU2NDI1MiwianRpIjo0MjQyNDIsInN1YiI6InVzZXIxMjMifQ.A6FwKuaSduHB9O7Gz37g0uoD_U9qVS0JNtT7YABGVgB7HUD1AMFc9DeyhNntWBqncg8k5brv-hrNTuUh5JYMAw');
-- Test creating a session with a valid JWT
SELECT auth.jwt_session_init('eyJhbGciOiJFZERTQSJ9.eyJleHAiOjQ4OTYxNjQyNTIsImlhdCI6MTc0MjU2NDI1MiwianRpIjo0MzQzNDMsInN1YiI6InVzZXIxMjMifQ.2TXVgjb6JSUq6_adlvp-m_SdOxZSyGS30RS9TLB0xu2N83dMSs2NybwE1NMU8Fb0tcAZR_ET7M2rSxbTrphfCg');
-- Test auth.session() function
SELECT auth.session();
-- Test auth.user_id() function
SELECT auth.user_id() AS user_id;

View File

@@ -1,242 +0,0 @@
# Storage Encryption Key Management
## Summary
As a precursor to adding new encryption capabilities to Neon's storage services, this RFC proposes
mechanisms for creating and storing fine-grained encryption keys for user data in Neon. We aim
to provide at least tenant granularity, but will use timeline granularity when it is simpler to do
so.
Out of scope:
- We describe an abstract KMS interface, but not particular platform implementations (such as how
to authenticate with KMS).
## Terminology
_wrapped/unwrapped_: a wrapped encryption key is a key encrypted by another key. For example, the key for
encrypting a timeline's pageserver data might be wrapped by some "root" key for the tenant's user account, stored in a KMS system.
_key hierarchy_: the relationships between keys which wrap each other. For example, a layer file key might
be wrapped by a pageserver tenant key, which is wrapped by a tenant's root key.
## Design Choices
Storage: S3 will be the store of record for wrapped keys.
Separate keys: Safekeeper and Pageserver will use independent keys.
AES256: rather than building a generic system for keys, we will assume that all the keys
we manage are AES256 keys -- this is the de-facto standard for enterprise data storage.
Per-object keys: rather than encrypting data objects (layer files and segment files) with
the tenant keys directly, they will be encrypted with separate keys. This avoids cryptographic
safety issues from re-using the same key for large quantities of potentially repetitive plaintext.
S3 objects are self-contained: each encrypted file will have a metadata block in the file itself
storing the KMS-wrapped key to decrypt itself.
Key storage is optional at a per-tenant granularity: eventually this would be on by default, but:
- initially only some environments will have a KMS set up.
- Encryption has some overhead and it may be that some tenants don't want or need it.
## Design
### Summary of format changes
- Pageserver layer files and safekeeper segment objects are split into blocks and each
block is encrypted by the layer key.
- Pageserver layer files and safekeeper segment objects get new metadata fields to
store wrapped layer key and the KMS-wrapped timeline key.
### Summary of API changes
- Pageserver TenantConf API gets a new field for account ID
- Pageserver TenantConf API gets a new field for encryption mode
- Safekeeper timeline creation API gets a new field for account ID
- Controller, pageserver & safekeeper get a new timeline-scoped `rotate_key` API
### KMS interface
Neon will interoperate with different KMS APIs on different platforms. We will implement a generic interface,
similar to how `remote_storage` wraps different object storage APIs:
- `generate(accountId, keyType, alias) -> (wrapped key, plaintext key)`
- `unwrap(accountId, ciphertext key) -> plaintext key`
Hereafter, when we talk about generating or unwrapping a key, this means a call into the KMS API.
The KMS deals with abstract "account IDs", which are not equal to tenant IDs and may not be
1:1 with tenants. The account ID will be provided as part of tenant configuration, along
with a field to identify an encryption mode.
### Pageserver Layer File Format
Encryption blocks are the minimum of unit of read. To read the part of the data within the encryption block
we must decrypt the whole block. All encryption blocks share the same layer key within the layer (is this safe?).
Image layers: each image is one encryption block.
Delta layers: for the first stage of the project, each delta is encrypted separately; in the future, we can batch
several small deltas into a single encryption block.
Indicies: each B+ tree node is an encryption block.
Layer format:
```
| Data Block | Data Block | Data Block | ... | Index Block | Index Block | Index Block | Metadata |
Data block = encrypt(data, layer_key)
Index block = encrypt(index, layer_key); index points a key to a offset of the data block inside the layer file.
Metadata = wrap(layer_key, timeline_key), wrap_kms(tenant_key), and other metadata we want to store in the future
```
Note that we generate a random layer_key for each of the layer. We store the layer key wrapped by the current
tenant key (described in later sections) and the KMS-wrapped tenant key in the layer.
If data compression is enabled, the data is compressed first before being encrypted (is this safe?)
This file format is used across both object storage and local storage. We do not decrypt when downloading
the layer file to the disk. Decryption is done when reading the layer.
### Layer File Format Migration
We record the file format for each of the layer file in both the index_part and the layer file name (suffix v2?).
The layer file format version will be passed into the layer readers. The re-keying operation (described below)
will migrate all layer files automatically to v2.
### Safekeeper Segment Format
TBD
### Pageserver Timeline Index
We will add a `created_at` for each of the layer file so that during re-keying (described in later sections)
we can determine which layer files to rewrite. We also record the offset of the metadata block so that it is
possible to obtain more information about the layer file without downloading the full layer file (i.e., the
exact timeline key being used to encrypt the layer file).
```
# LayerFileMetadata
{
"format": 2,
"created_at": "<time>",
"metadata_block_offset": u64,
}
```
TODO: create an index for safekeeper so that it's faster to determine what files to re-key? Or we can scan all
files.
### Pageserver Key Cache
We have a hashmap from KMS-wrapped tenant key to plain key for each of the tenant so that we do not need to repeatly
unwrap the same key.
### Key rotation
Each tenant stores a tenant key in memory to encrypt all layer files generated across all timelines within
its active period. When the key rotation API gets called, we rotate the timeline key in memory by calling the
KMS API to generate a new key-pair, and all new layer files' layer keys will be encrypted using this key.
### Re-keying
While re-keying and key-rotation are sometimes used synonymously, we distinguish them:
- Key rotation is generating a new key to use for new data
- Re-keying is rewriting existing data so that old keys are no longer used at all
Re-keying is a bulk data operation, and not fully defined in this RFC: it can be defined
quite simply as "For object in objects, if object key version is < the rekeying horizon,
then do a read/write cycle on the object using latest key". This is a simple but potentially very
expensive operation, so we discuss efficiency here.
#### Pageserver re-key
For pageservers, occasional rekeying may be implemented efficiently if one tolerates using
the last few keys and doesn't insist on the latest, because pageservers periodically rewrite
their data for GC-compaction anyway. Thereby an API call to re-key any data with an overly old
key would often be a no-op because all data was rewritten recently anyway.
When object versioning is enabled in storage, re-keying is not fully accomplished by just
re-writing live data: old versions would still contain user data encrypted with older keys. To
fully re-key, an extra step is needed to purge old objects. Ideally, we should only purge
old objects which were encrypted using old keys. To this end, it would be useful to store
the encryption key version as metadata on objects, so that a scrub of deleted object versions
can efficiently select those objects that should be purged during re-key.
Checks on object versions should not only be on deleted objects: because pageserver can emit
"orphan" objects not referenced in the index under some circumstances, re-key must also
check non-deleted objects.
To summarize, the pageserver re-key operation is:
- Iterate over index of layer files, select those with too-old key and rewrite them
- Iterate over all versions in object storage, select those with a too-old key version
in their metadata and purge them (with a safety check that these are not referenced
by the latest index).
It would be wise to combine the re-key procedure with an exhaustive read of a timeline's data,
to ensure that when testing & rolling this feature out we are not rendering anything unreadable
due to bugs in implementation. Since we are deleting old versions in object storage, our
time travel recovery tool will not be any help if we get something wrong in this process.
#### Safekeeper re-key
Re-keying a safekeeper timeline requires an exhaustive walk of segment objects, read
metadata on each one and decide whether it requires rewrite.
Safekeeper currently keeps historic objects forever, so re-keying this data will get
more expensive as time goes on. This would be a good time to add cleanup of old safekeeper
segments, but doing so is beyond the scope of this RFC.
### Enabling encryption for existing tenants
To enable encryption for an existing tenant, we may simply call key-rotation API (to generate a key),
and then re-key API (to rewrite existing data using this key).
## Observability
- To enable some external service to implement re-keying, we should publish metrics per-timeline
on the age of their latest encryption key.
- Calls to KMS should be tracked with typical request rate/result/latency histograms to enable
detection of a slow KMS server and/or errors.
## Alternatives considered
### Use same tenant key for safekeeper and pageserver
We could halve the number of keys in circulation by having the safekeeper and pageserver
share a key rather than working independently.
However, this would be substantially more complex to implement, as safekeepers and pageservers
currently share no storage, so some new communication path would be needed. There is minimal
upside in sharing a key.
### No KMS dependency
We could choose to do all key management ourselves. However, the industry standard approach
to enabling users of cloud SaaS software to self-manage keys is to use the KMS as the intermediary
between our system and the user's control of their key. Although this RFC does not propose user-managed keys, we should design with this in mind.
### Do all key generation/wrapping in KMS service
We could avoid generating and wrapping/unwrapping object keys in our storage
services by delegating all responsibility for key operations to the KMS. However,
KMS services have limited throughput and in some cases may charge per operation, so
it is useful to avoid doing KMS operations per-object, and restrict them to per-timeline
frequency.
### Per-tenant instead of per-timeline pageserver keys
For tenants with many timelines, we may reduce load on KMS service by
using per-tenant instead of per-timeline keys, so that we may do operations
such as creating a timeline without needing to do a KMS unwrap operation.
However, per-timeline key management is much simpler to implement on the safekeeper,
which currently has no concept of a tenant (other than as a namespace for timelines).
It is also slightly simpler to implement on the pageserver, as it avoids implementing
a tenant-scoped creation operation to initialize keys (instead, we may initialize keys
during timeline creation).
As a side benefit, per-timeline key management also enables implementing secure deletion in future
at a per-timeline granularity.

View File

@@ -160,7 +160,7 @@ pub struct CatalogObjects {
pub databases: Vec<Database>,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct ComputeCtlConfig {
/// Set of JSON web keys that the compute can use to authenticate
/// communication from the control plane.
@@ -179,7 +179,7 @@ impl Default for ComputeCtlConfig {
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct TlsConfig {
pub key_path: String,
pub cert_path: String,

View File

@@ -183,6 +183,9 @@ pub enum ComputeFeature {
/// track short-lived connections as user activity.
ActivityMonitorExperimental,
/// Download all SLRU files on demand
LazySlruDownload,
/// This is a special feature flag that is used to represent unknown feature flags.
/// Basically all unknown to enum flags are represented as this one. See unit test
/// `parse_unknown_features()` for more details.

View File

@@ -14,6 +14,7 @@ futures.workspace = true
hyper0.workspace = true
itertools.workspace = true
jemalloc_pprof.workspace = true
jsonwebtoken.workspace = true
once_cell.workspace = true
pprof.workspace = true
regex.workspace = true

View File

@@ -8,6 +8,7 @@ use bytes::{Bytes, BytesMut};
use hyper::header::{AUTHORIZATION, CONTENT_DISPOSITION, CONTENT_TYPE, HeaderName};
use hyper::http::HeaderValue;
use hyper::{Body, Method, Request, Response};
use jsonwebtoken::TokenData;
use metrics::{Encoder, IntCounter, TextEncoder, register_int_counter};
use once_cell::sync::Lazy;
use pprof::ProfilerGuardBuilder;
@@ -618,7 +619,7 @@ pub fn auth_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
})?;
let token = parse_token(header_value)?;
let data = auth.decode(token).map_err(|err| {
let data: TokenData<Claims> = auth.decode(token).map_err(|err| {
warn!("Authentication error: {err}");
// Rely on From<AuthError> for ApiError impl
err

View File

@@ -29,6 +29,7 @@ futures = { workspace = true }
jsonwebtoken.workspace = true
nix = { workspace = true, features = ["ioctl"] }
once_cell.workspace = true
pem.workspace = true
pin-project-lite.workspace = true
regex.workspace = true
serde.workspace = true

View File

@@ -11,7 +11,8 @@ use camino::Utf8Path;
use jsonwebtoken::{
Algorithm, DecodingKey, EncodingKey, Header, TokenData, Validation, decode, encode,
};
use serde::{Deserialize, Serialize};
use pem::Pem;
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use crate::id::TenantId;
@@ -73,7 +74,10 @@ impl SwappableJwtAuth {
pub fn swap(&self, jwt_auth: JwtAuth) {
self.0.swap(Arc::new(jwt_auth));
}
pub fn decode(&self, token: &str) -> std::result::Result<TokenData<Claims>, AuthError> {
pub fn decode<D: DeserializeOwned>(
&self,
token: &str,
) -> std::result::Result<TokenData<D>, AuthError> {
self.0.load().decode(token)
}
}
@@ -148,7 +152,10 @@ impl JwtAuth {
/// The function tries the stored decoding keys in succession,
/// and returns the first yielding a successful result.
/// If there is no working decoding key, it returns the last error.
pub fn decode(&self, token: &str) -> std::result::Result<TokenData<Claims>, AuthError> {
pub fn decode<D: DeserializeOwned>(
&self,
token: &str,
) -> std::result::Result<TokenData<D>, AuthError> {
let mut res = None;
for decoding_key in &self.decoding_keys {
res = Some(decode(token, decoding_key, &self.validation));
@@ -173,8 +180,8 @@ impl std::fmt::Debug for JwtAuth {
}
// this function is used only for testing purposes in CLI e g generate tokens during init
pub fn encode_from_key_file<S: Serialize>(claims: &S, key_data: &[u8]) -> Result<String> {
let key = EncodingKey::from_ed_pem(key_data)?;
pub fn encode_from_key_file<S: Serialize>(claims: &S, pem: &Pem) -> Result<String> {
let key = EncodingKey::from_ed_der(pem.contents());
Ok(encode(&Header::new(STORAGE_TOKEN_ALGORITHM), claims, &key)?)
}
@@ -188,13 +195,13 @@ mod tests {
//
// openssl genpkey -algorithm ed25519 -out ed25519-priv.pem
// openssl pkey -in ed25519-priv.pem -pubout -out ed25519-pub.pem
const TEST_PUB_KEY_ED25519: &[u8] = br#"
const TEST_PUB_KEY_ED25519: &str = r#"
-----BEGIN PUBLIC KEY-----
MCowBQYDK2VwAyEARYwaNBayR+eGI0iXB4s3QxE3Nl2g1iWbr6KtLWeVD/w=
-----END PUBLIC KEY-----
"#;
const TEST_PRIV_KEY_ED25519: &[u8] = br#"
const TEST_PRIV_KEY_ED25519: &str = r#"
-----BEGIN PRIVATE KEY-----
MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
-----END PRIVATE KEY-----
@@ -222,9 +229,9 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
// Check it can be validated with the public key
let auth = JwtAuth::new(vec![
DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519).unwrap(),
DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519.as_bytes()).unwrap(),
]);
let claims_from_token = auth.decode(encoded_eddsa).unwrap().claims;
let claims_from_token: Claims = auth.decode(encoded_eddsa).unwrap().claims;
assert_eq!(claims_from_token, expected_claims);
}
@@ -235,13 +242,14 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
scope: Scope::Tenant,
};
let encoded = encode_from_key_file(&claims, TEST_PRIV_KEY_ED25519).unwrap();
let pem = pem::parse(TEST_PRIV_KEY_ED25519).unwrap();
let encoded = encode_from_key_file(&claims, &pem).unwrap();
// decode it back
let auth = JwtAuth::new(vec![
DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519).unwrap(),
DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519.as_bytes()).unwrap(),
]);
let decoded = auth.decode(&encoded).unwrap();
let decoded: TokenData<Claims> = auth.decode(&encoded).unwrap();
assert_eq!(decoded.claims, claims);
}

View File

@@ -10,6 +10,8 @@ default = []
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints", "pageserver_api/testing", "wal_decoder/testing", "pageserver_client/testing"]
fuzz-read-path = ["testing"]
[dependencies]
anyhow.workspace = true
arc-swap.workspace = true
@@ -33,6 +35,7 @@ humantime.workspace = true
humantime-serde.workspace = true
hyper0.workspace = true
itertools.workspace = true
jsonwebtoken.workspace = true
md5.workspace = true
nix.workspace = true
# hack to get the number of worker threads tokio uses

View File

@@ -126,7 +126,7 @@ async fn ingest(
max_concurrency: NonZeroUsize::new(1).unwrap(),
});
let (_desc, path) = layer
.write_to_disk(&ctx, None, l0_flush_state.inner())
.write_to_disk(&ctx, None, l0_flush_state.inner(), &gate, cancel.clone())
.await?
.unwrap();
tokio::fs::remove_file(path).await?;

View File

@@ -73,6 +73,7 @@ impl From<GetVectoredError> for BasebackupError {
/// * When working without safekeepers. In this situation it is important to match the lsn
/// we are taking basebackup on with the lsn that is used in pageserver's walreceiver
/// to start the replication.
#[allow(clippy::too_many_arguments)]
pub async fn send_basebackup_tarball<'a, W>(
write: &'a mut W,
timeline: &'a Timeline,
@@ -80,6 +81,7 @@ pub async fn send_basebackup_tarball<'a, W>(
prev_lsn: Option<Lsn>,
full_backup: bool,
replica: bool,
lazy_slru_download_enabled: bool,
ctx: &'a RequestContext,
) -> Result<(), BasebackupError>
where
@@ -131,8 +133,8 @@ where
};
info!(
"taking basebackup lsn={}, prev_lsn={} (full_backup={}, replica={})",
backup_lsn, prev_lsn, full_backup, replica
"taking basebackup lsn={}, prev_lsn={} (full_backup={}, replica={}, lazy_slru_download_enabled={})",
backup_lsn, prev_lsn, full_backup, replica, lazy_slru_download_enabled
);
let basebackup = Basebackup {
@@ -142,6 +144,7 @@ where
prev_record_lsn: prev_lsn,
full_backup,
replica,
lazy_slru_download_enabled,
ctx,
io_concurrency: IoConcurrency::spawn_from_conf(
timeline.conf,
@@ -170,6 +173,7 @@ where
prev_record_lsn: Lsn,
full_backup: bool,
replica: bool,
lazy_slru_download_enabled: bool,
ctx: &'a RequestContext,
io_concurrency: IoConcurrency,
}
@@ -308,7 +312,10 @@ where
self.timeline.pg_version,
)?;
let lazy_slru_download = self.timeline.get_lazy_slru_download() && !self.full_backup;
let lazy_slru_download = self
.timeline
.get_lazy_slru_download(self.lazy_slru_download_enabled)
&& !self.full_backup;
let pgversion = self.timeline.pg_version;
let subdirs = dispatch_pgversion!(pgversion, &pgv::bindings::PGDATA_SUBDIRS[..]);

View File

@@ -1714,6 +1714,28 @@ pub enum SmgrQueryType {
Test,
}
#[derive(
Debug,
Clone,
Copy,
IntoStaticStr,
strum_macros::EnumCount,
strum_macros::EnumIter,
strum_macros::FromRepr,
enum_map::Enum,
)]
#[strum(serialize_all = "snake_case")]
pub enum GetPageBatchBreakReason {
BatchFull,
NonBatchableRequest,
NonUniformLsn,
SamePageAtDifferentLsn,
NonUniformTimeline,
ExecutorSteal,
#[cfg(feature = "testing")]
NonUniformKey,
}
pub(crate) struct SmgrQueryTimePerTimeline {
global_started: [IntCounter; SmgrQueryType::COUNT],
global_latency: [Histogram; SmgrQueryType::COUNT],
@@ -1725,6 +1747,8 @@ pub(crate) struct SmgrQueryTimePerTimeline {
per_timeline_flush_in_progress_micros: IntCounter,
global_batch_wait_time: Histogram,
per_timeline_batch_wait_time: Histogram,
global_batch_break_reason: [IntCounter; GetPageBatchBreakReason::COUNT],
per_timeline_batch_break_reason: GetPageBatchBreakReasonTimelineMetrics,
throttling: Arc<tenant_throttling::Pagestream>,
}
@@ -1858,6 +1882,49 @@ static PAGE_SERVICE_BATCH_SIZE_PER_TENANT_TIMELINE: Lazy<HistogramVec> = Lazy::n
.expect("failed to define a metric")
});
static PAGE_SERVICE_BATCH_BREAK_REASON_GLOBAL: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
// it's a counter, but, name is prepared to extend it to a histogram of queue depth
"pageserver_page_service_batch_break_reason_global",
"Reason for breaking batches of get page requests",
&["reason"],
)
.expect("failed to define a metric")
});
struct GetPageBatchBreakReasonTimelineMetrics {
map: EnumMap<GetPageBatchBreakReason, IntCounter>,
}
impl GetPageBatchBreakReasonTimelineMetrics {
fn new(tenant_id: &str, shard_slug: &str, timeline_id: &str) -> Self {
GetPageBatchBreakReasonTimelineMetrics {
map: EnumMap::from_array(std::array::from_fn(|reason_idx| {
let reason = GetPageBatchBreakReason::from_usize(reason_idx);
PAGE_SERVICE_BATCH_BREAK_REASON_PER_TENANT_TIMELINE.with_label_values(&[
tenant_id,
shard_slug,
timeline_id,
reason.into(),
])
})),
}
}
fn inc(&self, reason: GetPageBatchBreakReason) {
self.map[reason].inc()
}
}
static PAGE_SERVICE_BATCH_BREAK_REASON_PER_TENANT_TIMELINE: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_page_service_batch_break_reason",
"Reason for breaking batches of get page requests",
&["tenant_id", "shard_id", "timeline_id", "reason"],
)
.expect("failed to define a metric")
});
pub(crate) static PAGE_SERVICE_CONFIG_MAX_BATCH_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_page_service_config_max_batch_size",
@@ -1985,6 +2052,15 @@ impl SmgrQueryTimePerTimeline {
.get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id])
.unwrap();
let global_batch_break_reason = std::array::from_fn(|i| {
let reason = GetPageBatchBreakReason::from_usize(i);
PAGE_SERVICE_BATCH_BREAK_REASON_GLOBAL
.get_metric_with_label_values(&[reason.into()])
.unwrap()
});
let per_timeline_batch_break_reason =
GetPageBatchBreakReasonTimelineMetrics::new(&tenant_id, &shard_slug, &timeline_id);
let global_flush_in_progress_micros =
PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL.clone();
let per_timeline_flush_in_progress_micros = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS
@@ -2002,6 +2078,8 @@ impl SmgrQueryTimePerTimeline {
per_timeline_flush_in_progress_micros,
global_batch_wait_time,
per_timeline_batch_wait_time,
global_batch_break_reason,
per_timeline_batch_break_reason,
throttling: pagestream_throttle_metrics,
}
}
@@ -2030,9 +2108,16 @@ impl SmgrQueryTimePerTimeline {
}
/// TODO: do something about this? seems odd, we have a similar call on SmgrOpTimer
pub(crate) fn observe_getpage_batch_start(&self, batch_size: usize) {
pub(crate) fn observe_getpage_batch_start(
&self,
batch_size: usize,
break_reason: GetPageBatchBreakReason,
) {
self.global_batch_size.observe(batch_size as f64);
self.per_timeline_batch_size.observe(batch_size as f64);
self.global_batch_break_reason[break_reason.into_usize()].inc();
self.per_timeline_batch_break_reason.inc(break_reason);
}
}
@@ -3398,6 +3483,15 @@ impl TimelineMetrics {
shard_id,
timeline_id,
]);
for reason in GetPageBatchBreakReason::iter() {
let _ = PAGE_SERVICE_BATCH_BREAK_REASON_PER_TENANT_TIMELINE.remove_label_values(&[
tenant_id,
shard_id,
timeline_id,
reason.into(),
]);
}
}
}
@@ -4276,6 +4370,7 @@ pub fn preinitialize_metrics(
[
&BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT,
&SMGR_QUERY_STARTED_GLOBAL,
&PAGE_SERVICE_BATCH_BREAK_REASON_GLOBAL,
]
.into_iter()
.for_each(|c| {

View File

@@ -15,6 +15,7 @@ use async_compression::tokio::write::GzipEncoder;
use bytes::Buf;
use futures::FutureExt;
use itertools::Itertools;
use jsonwebtoken::TokenData;
use once_cell::sync::OnceCell;
use pageserver_api::config::{
PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
@@ -58,8 +59,8 @@ use crate::context::{
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
};
use crate::metrics::{
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, LIVE_CONNECTIONS, SmgrOpTimer,
TimelineMetrics,
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS,
SmgrOpTimer, TimelineMetrics,
};
use crate::pgdatadir_mapping::Version;
use crate::span::{
@@ -672,6 +673,7 @@ enum BatchedFeMessage {
span: Span,
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
pages: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
batch_break_reason: GetPageBatchBreakReason,
},
DbSize {
span: Span,
@@ -724,6 +726,119 @@ impl BatchedFeMessage {
BatchedFeMessage::RespondError { .. } => {}
}
}
fn should_break_batch(
&self,
other: &BatchedFeMessage,
max_batch_size: NonZeroUsize,
batching_strategy: PageServiceProtocolPipelinedBatchingStrategy,
) -> Option<GetPageBatchBreakReason> {
match (self, other) {
(
BatchedFeMessage::GetPage {
shard: accum_shard,
pages: accum_pages,
..
},
BatchedFeMessage::GetPage {
shard: this_shard,
pages: this_pages,
..
},
) => {
assert_eq!(this_pages.len(), 1);
if accum_pages.len() >= max_batch_size.get() {
trace!(%max_batch_size, "stopping batching because of batch size");
assert_eq!(accum_pages.len(), max_batch_size.get());
return Some(GetPageBatchBreakReason::BatchFull);
}
if !accum_shard.is_same_handle_as(this_shard) {
trace!("stopping batching because timeline object mismatch");
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
// But the current logic for keeping responses in order does not support that.
return Some(GetPageBatchBreakReason::NonUniformTimeline);
}
match batching_strategy {
PageServiceProtocolPipelinedBatchingStrategy::UniformLsn => {
if let Some(last_in_batch) = accum_pages.last() {
if last_in_batch.effective_request_lsn
!= this_pages[0].effective_request_lsn
{
trace!(
accum_lsn = %last_in_batch.effective_request_lsn,
this_lsn = %this_pages[0].effective_request_lsn,
"stopping batching because LSN changed"
);
return Some(GetPageBatchBreakReason::NonUniformLsn);
}
}
}
PageServiceProtocolPipelinedBatchingStrategy::ScatteredLsn => {
// The read path doesn't curently support serving the same page at different LSNs.
// While technically possible, it's uncertain if the complexity is worth it.
// Break the batch if such a case is encountered.
let same_page_different_lsn = accum_pages.iter().any(|batched| {
batched.req.rel == this_pages[0].req.rel
&& batched.req.blkno == this_pages[0].req.blkno
&& batched.effective_request_lsn
!= this_pages[0].effective_request_lsn
});
if same_page_different_lsn {
trace!(
rel=%this_pages[0].req.rel,
blkno=%this_pages[0].req.blkno,
lsn=%this_pages[0].effective_request_lsn,
"stopping batching because same page was requested at different LSNs"
);
return Some(GetPageBatchBreakReason::SamePageAtDifferentLsn);
}
}
}
None
}
#[cfg(feature = "testing")]
(
BatchedFeMessage::Test {
shard: accum_shard,
requests: accum_requests,
..
},
BatchedFeMessage::Test {
shard: this_shard,
requests: this_requests,
..
},
) => {
assert!(this_requests.len() == 1);
if accum_requests.len() >= max_batch_size.get() {
trace!(%max_batch_size, "stopping batching because of batch size");
assert_eq!(accum_requests.len(), max_batch_size.get());
return Some(GetPageBatchBreakReason::BatchFull);
}
if !accum_shard.is_same_handle_as(this_shard) {
trace!("stopping batching because timeline object mismatch");
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
// But the current logic for keeping responses in order does not support that.
return Some(GetPageBatchBreakReason::NonUniformTimeline);
}
let this_batch_key = this_requests[0].req.batch_key;
let accum_batch_key = accum_requests[0].req.batch_key;
if this_requests[0].req.batch_key != accum_requests[0].req.batch_key {
trace!(%accum_batch_key, %this_batch_key, "stopping batching because batch key changed");
return Some(GetPageBatchBreakReason::NonUniformKey);
}
None
}
(_, _) => Some(GetPageBatchBreakReason::NonBatchableRequest),
}
}
}
impl PageServerHandler {
@@ -1047,6 +1162,10 @@ impl PageServerHandler {
effective_request_lsn,
ctx,
}],
// The executor grabs the batch when it becomes idle.
// Hence, [`GetPageBatchBreakReason::ExecutorSteal`] is the
// default reason for breaking the batch.
batch_break_reason: GetPageBatchBreakReason::ExecutorSteal,
}
}
#[cfg(feature = "testing")]
@@ -1084,118 +1203,59 @@ impl PageServerHandler {
Err(e) => return Err(Err(e)),
};
match (&mut *batch, this_msg) {
// something batched already, let's see if we can add this message to the batch
(
Ok(BatchedFeMessage::GetPage {
span: _,
shard: accum_shard,
pages: accum_pages,
}),
BatchedFeMessage::GetPage {
span: _,
shard: this_shard,
pages: this_pages,
},
) if (|| {
assert_eq!(this_pages.len(), 1);
if accum_pages.len() >= max_batch_size.get() {
trace!(%max_batch_size, "stopping batching because of batch size");
assert_eq!(accum_pages.len(), max_batch_size.get());
return false;
}
if !accum_shard.is_same_handle_as(&this_shard) {
trace!("stopping batching because timeline object mismatch");
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
// But the current logic for keeping responses in order does not support that.
return false;
}
match batching_strategy {
PageServiceProtocolPipelinedBatchingStrategy::UniformLsn => {
if let Some(last_in_batch) = accum_pages.last() {
if last_in_batch.effective_request_lsn
!= this_pages[0].effective_request_lsn
{
return false;
}
}
}
PageServiceProtocolPipelinedBatchingStrategy::ScatteredLsn => {
// The read path doesn't curently support serving the same page at different LSNs.
// While technically possible, it's uncertain if the complexity is worth it.
// Break the batch if such a case is encountered.
//
// TODO(vlad): Include a metric for batch breaks with a reason label.
let same_page_different_lsn = accum_pages.iter().any(|batched| {
batched.req.rel == this_pages[0].req.rel
&& batched.req.blkno == this_pages[0].req.blkno
&& batched.effective_request_lsn
!= this_pages[0].effective_request_lsn
});
if same_page_different_lsn {
trace!(
rel=%this_pages[0].req.rel,
blkno=%this_pages[0].req.blkno,
lsn=%this_pages[0].effective_request_lsn,
"stopping batching because same page was requested at different LSNs"
);
return false;
}
}
}
true
})() =>
{
// ok to batch
accum_pages.extend(this_pages);
Ok(())
let eligible_batch = match batch {
Ok(b) => b,
Err(_) => {
return Err(Ok(this_msg));
}
#[cfg(feature = "testing")]
(
Ok(BatchedFeMessage::Test {
shard: accum_shard,
requests: accum_requests,
..
}),
BatchedFeMessage::Test {
shard: this_shard,
requests: this_requests,
..
},
) if (|| {
assert!(this_requests.len() == 1);
if accum_requests.len() >= max_batch_size.get() {
trace!(%max_batch_size, "stopping batching because of batch size");
assert_eq!(accum_requests.len(), max_batch_size.get());
return false;
};
let batch_break =
eligible_batch.should_break_batch(&this_msg, max_batch_size, batching_strategy);
match batch_break {
Some(reason) => {
if let BatchedFeMessage::GetPage {
batch_break_reason, ..
} = eligible_batch
{
*batch_break_reason = reason;
}
if !accum_shard.is_same_handle_as(&this_shard) {
trace!("stopping batching because timeline object mismatch");
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
// But the current logic for keeping responses in order does not support that.
return false;
}
let this_batch_key = this_requests[0].req.batch_key;
let accum_batch_key = accum_requests[0].req.batch_key;
if this_requests[0].req.batch_key != accum_requests[0].req.batch_key {
trace!(%accum_batch_key, %this_batch_key, "stopping batching because batch key changed");
return false;
}
true
})() =>
{
// ok to batch
accum_requests.extend(this_requests);
Ok(())
}
// something batched already but this message is unbatchable
(_, this_msg) => {
// by default, don't continue batching
Err(Ok(this_msg))
}
None => {
// ok to batch
match (eligible_batch, this_msg) {
(
BatchedFeMessage::GetPage {
pages: accum_pages, ..
},
BatchedFeMessage::GetPage {
pages: this_pages, ..
},
) => {
accum_pages.extend(this_pages);
Ok(())
}
#[cfg(feature = "testing")]
(
BatchedFeMessage::Test {
requests: accum_requests,
..
},
BatchedFeMessage::Test {
requests: this_requests,
..
},
) => {
accum_requests.extend(this_requests);
Ok(())
}
// Shape guaranteed by [`BatchedFeMessage::should_break_batch`]
_ => unreachable!(),
}
}
}
}
@@ -1413,7 +1473,12 @@ impl PageServerHandler {
span,
)
}
BatchedFeMessage::GetPage { span, shard, pages } => {
BatchedFeMessage::GetPage {
span,
shard,
pages,
batch_break_reason,
} => {
fail::fail_point!("ps::handle-pagerequest-message::getpage");
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
(
@@ -1425,6 +1490,7 @@ impl PageServerHandler {
&shard,
pages,
io_concurrency,
batch_break_reason,
&ctx,
)
.instrument(span.clone())
@@ -2113,13 +2179,14 @@ impl PageServerHandler {
timeline: &Timeline,
requests: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
io_concurrency: IoConcurrency,
batch_break_reason: GetPageBatchBreakReason,
ctx: &RequestContext,
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
debug_assert_current_span_has_tenant_and_timeline_id();
timeline
.query_metrics
.observe_getpage_batch_start(requests.len());
.observe_getpage_batch_start(requests.len(), batch_break_reason);
// If a page trace is running, submit an event for this request.
if let Some(page_trace) = timeline.page_trace.load().as_ref() {
@@ -2327,6 +2394,7 @@ impl PageServerHandler {
full_backup: bool,
gzip: bool,
replica: bool,
lazy_slru_download: bool,
ctx: &RequestContext,
) -> Result<(), QueryError>
where
@@ -2394,6 +2462,7 @@ impl PageServerHandler {
prev_lsn,
full_backup,
replica,
lazy_slru_download,
&ctx,
)
.await
@@ -2417,6 +2486,7 @@ impl PageServerHandler {
prev_lsn,
full_backup,
replica,
lazy_slru_download,
&ctx,
)
.await
@@ -2434,6 +2504,7 @@ impl PageServerHandler {
prev_lsn,
full_backup,
replica,
lazy_slru_download,
&ctx,
)
.await
@@ -2483,7 +2554,7 @@ impl PageServerHandler {
}
}
/// `basebackup tenant timeline [lsn] [--gzip] [--replica]`
/// `basebackup tenant timeline [lsn] [--gzip] [--replica] [--lazy-slru-download]`
#[derive(Debug, Clone, Eq, PartialEq)]
struct BaseBackupCmd {
tenant_id: TenantId,
@@ -2491,6 +2562,7 @@ struct BaseBackupCmd {
lsn: Option<Lsn>,
gzip: bool,
replica: bool,
lazy_slru_download: bool,
}
/// `fullbackup tenant timeline [lsn] [prev_lsn]`
@@ -2623,6 +2695,7 @@ impl BaseBackupCmd {
let mut gzip = false;
let mut replica = false;
let mut lazy_slru_download = false;
for &param in &parameters[flags_parse_from..] {
match param {
@@ -2638,6 +2711,12 @@ impl BaseBackupCmd {
}
replica = true
}
"--lazy-slru-download" => {
if lazy_slru_download {
bail!("duplicate parameter for basebackup command: {param}")
}
lazy_slru_download = true
}
_ => bail!("invalid parameter for basebackup command: {param}"),
}
}
@@ -2647,6 +2726,7 @@ impl BaseBackupCmd {
lsn,
gzip,
replica,
lazy_slru_download,
})
}
}
@@ -2771,7 +2851,7 @@ where
) -> Result<(), QueryError> {
// this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
// which requires auth to be present
let data = self
let data: TokenData<Claims> = self
.auth
.as_ref()
.unwrap()
@@ -2861,6 +2941,7 @@ where
lsn,
gzip,
replica,
lazy_slru_download,
}) => {
tracing::Span::current()
.record("tenant_id", field::display(tenant_id))
@@ -2882,6 +2963,7 @@ where
false,
gzip,
replica,
lazy_slru_download,
&ctx,
)
.await?;
@@ -2919,6 +3001,7 @@ where
true,
false,
false,
false,
&ctx,
)
.await?;
@@ -3053,7 +3136,8 @@ mod tests {
timeline_id,
lsn: None,
gzip: false,
replica: false
replica: false,
lazy_slru_download: false
})
);
let cmd =
@@ -3065,7 +3149,8 @@ mod tests {
timeline_id,
lsn: None,
gzip: true,
replica: false
replica: false,
lazy_slru_download: false
})
);
let cmd =
@@ -3077,7 +3162,8 @@ mod tests {
timeline_id,
lsn: None,
gzip: false,
replica: false
replica: false,
lazy_slru_download: false
})
);
let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} 0/16ABCDE"))
@@ -3089,7 +3175,8 @@ mod tests {
timeline_id,
lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
gzip: false,
replica: false
replica: false,
lazy_slru_download: false
})
);
let cmd = PageServiceCmd::parse(&format!(
@@ -3103,7 +3190,23 @@ mod tests {
timeline_id,
lsn: None,
gzip: true,
replica: true
replica: true,
lazy_slru_download: false
})
);
let cmd = PageServiceCmd::parse(&format!(
"basebackup {tenant_id} {timeline_id} --replica --gzip --lazy-slru-download"
))
.unwrap();
assert_eq!(
cmd,
PageServiceCmd::BaseBackup(BaseBackupCmd {
tenant_id,
timeline_id,
lsn: None,
gzip: true,
replica: true,
lazy_slru_download: true
})
);
let cmd = PageServiceCmd::parse(&format!(
@@ -3117,7 +3220,8 @@ mod tests {
timeline_id,
lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
gzip: true,
replica: true
replica: true,
lazy_slru_download: false
})
);
let cmd = PageServiceCmd::parse(&format!("fullbackup {tenant_id} {timeline_id}")).unwrap();

View File

@@ -5933,12 +5933,20 @@ mod tests {
use models::CompactLsnRange;
use pageserver_api::key::{AUX_KEY_PREFIX, Key, NON_INHERITED_RANGE, RELATION_SIZE_PREFIX};
use pageserver_api::keyspace::KeySpace;
#[cfg(feature = "testing")]
use pageserver_api::keyspace::KeySpaceRandomAccum;
use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings};
#[cfg(feature = "testing")]
use pageserver_api::record::NeonWalRecord;
use pageserver_api::value::Value;
use pageserver_compaction::helpers::overlaps_with;
#[cfg(feature = "testing")]
use rand::SeedableRng;
#[cfg(feature = "testing")]
use rand::rngs::StdRng;
use rand::{Rng, thread_rng};
#[cfg(feature = "testing")]
use std::ops::Range;
use storage_layer::{IoConcurrency, PersistentLayerKey};
use tests::storage_layer::ValuesReconstructState;
use tests::timeline::{GetVectoredError, ShutdownMode};
@@ -5960,6 +5968,318 @@ mod tests {
static TEST_KEY: Lazy<Key> =
Lazy::new(|| Key::from_slice(&hex!("010000000033333333444444445500000001")));
#[cfg(feature = "testing")]
struct TestTimelineSpecification {
start_lsn: Lsn,
last_record_lsn: Lsn,
in_memory_layers_shape: Vec<(Range<Key>, Range<Lsn>)>,
delta_layers_shape: Vec<(Range<Key>, Range<Lsn>)>,
image_layers_shape: Vec<(Range<Key>, Lsn)>,
gap_chance: u8,
will_init_chance: u8,
}
#[cfg(feature = "testing")]
struct Storage {
storage: HashMap<(Key, Lsn), Value>,
start_lsn: Lsn,
}
#[cfg(feature = "testing")]
impl Storage {
fn get(&self, key: Key, lsn: Lsn) -> Bytes {
use bytes::BufMut;
let mut crnt_lsn = lsn;
let mut got_base = false;
let mut acc = Vec::new();
while crnt_lsn >= self.start_lsn {
if let Some(value) = self.storage.get(&(key, crnt_lsn)) {
acc.push(value.clone());
match value {
Value::WalRecord(NeonWalRecord::Test { will_init, .. }) => {
if *will_init {
got_base = true;
break;
}
}
Value::Image(_) => {
got_base = true;
break;
}
_ => unreachable!(),
}
}
crnt_lsn = crnt_lsn.checked_sub(1u64).unwrap();
}
assert!(
got_base,
"Input data was incorrect. No base image for {key}@{lsn}"
);
tracing::debug!("Wal redo depth for {key}@{lsn} is {}", acc.len());
let mut blob = BytesMut::new();
for value in acc.into_iter().rev() {
match value {
Value::WalRecord(NeonWalRecord::Test { append, .. }) => {
blob.extend_from_slice(append.as_bytes());
}
Value::Image(img) => {
blob.put(img);
}
_ => unreachable!(),
}
}
blob.into()
}
}
#[cfg(feature = "testing")]
#[allow(clippy::too_many_arguments)]
async fn randomize_timeline(
tenant: &Arc<Tenant>,
new_timeline_id: TimelineId,
pg_version: u32,
spec: TestTimelineSpecification,
random: &mut rand::rngs::StdRng,
ctx: &RequestContext,
) -> anyhow::Result<(Arc<Timeline>, Storage, Vec<Lsn>)> {
let mut storage: HashMap<(Key, Lsn), Value> = HashMap::default();
let mut interesting_lsns = vec![spec.last_record_lsn];
for (key_range, lsn_range) in spec.in_memory_layers_shape.iter() {
let mut lsn = lsn_range.start;
while lsn < lsn_range.end {
let mut key = key_range.start;
while key < key_range.end {
let gap = random.gen_range(1..=100) <= spec.gap_chance;
let will_init = random.gen_range(1..=100) <= spec.will_init_chance;
if gap {
continue;
}
let record = if will_init {
Value::WalRecord(NeonWalRecord::wal_init(format!("[wil_init {key}@{lsn}]")))
} else {
Value::WalRecord(NeonWalRecord::wal_append(format!("[delta {key}@{lsn}]")))
};
storage.insert((key, lsn), record);
key = key.next();
}
lsn = Lsn(lsn.0 + 1);
}
// Stash some interesting LSN for future use
for offset in [0, 5, 100].iter() {
if *offset == 0 {
interesting_lsns.push(lsn_range.start);
} else {
let below = lsn_range.start.checked_sub(*offset);
match below {
Some(v) if v >= spec.start_lsn => {
interesting_lsns.push(v);
}
_ => {}
}
let above = Lsn(lsn_range.start.0 + offset);
interesting_lsns.push(above);
}
}
}
for (key_range, lsn_range) in spec.delta_layers_shape.iter() {
let mut lsn = lsn_range.start;
while lsn < lsn_range.end {
let mut key = key_range.start;
while key < key_range.end {
let gap = random.gen_range(1..=100) <= spec.gap_chance;
let will_init = random.gen_range(1..=100) <= spec.will_init_chance;
if gap {
continue;
}
let record = if will_init {
Value::WalRecord(NeonWalRecord::wal_init(format!("[wil_init {key}@{lsn}]")))
} else {
Value::WalRecord(NeonWalRecord::wal_append(format!("[delta {key}@{lsn}]")))
};
storage.insert((key, lsn), record);
key = key.next();
}
lsn = Lsn(lsn.0 + 1);
}
// Stash some interesting LSN for future use
for offset in [0, 5, 100].iter() {
if *offset == 0 {
interesting_lsns.push(lsn_range.start);
} else {
let below = lsn_range.start.checked_sub(*offset);
match below {
Some(v) if v >= spec.start_lsn => {
interesting_lsns.push(v);
}
_ => {}
}
let above = Lsn(lsn_range.start.0 + offset);
interesting_lsns.push(above);
}
}
}
for (key_range, lsn) in spec.image_layers_shape.iter() {
let mut key = key_range.start;
while key < key_range.end {
let blob = Bytes::from(format!("[image {key}@{lsn}]"));
let record = Value::Image(blob.clone());
storage.insert((key, *lsn), record);
key = key.next();
}
// Stash some interesting LSN for future use
for offset in [0, 5, 100].iter() {
if *offset == 0 {
interesting_lsns.push(*lsn);
} else {
let below = lsn.checked_sub(*offset);
match below {
Some(v) if v >= spec.start_lsn => {
interesting_lsns.push(v);
}
_ => {}
}
let above = Lsn(lsn.0 + offset);
interesting_lsns.push(above);
}
}
}
let in_memory_test_layers = {
let mut acc = Vec::new();
for (key_range, lsn_range) in spec.in_memory_layers_shape.iter() {
let mut data = Vec::new();
let mut lsn = lsn_range.start;
while lsn < lsn_range.end {
let mut key = key_range.start;
while key < key_range.end {
if let Some(record) = storage.get(&(key, lsn)) {
data.push((key, lsn, record.clone()));
}
key = key.next();
}
lsn = Lsn(lsn.0 + 1);
}
acc.push(InMemoryLayerTestDesc {
data,
lsn_range: lsn_range.clone(),
is_open: false,
})
}
acc
};
let delta_test_layers = {
let mut acc = Vec::new();
for (key_range, lsn_range) in spec.delta_layers_shape.iter() {
let mut data = Vec::new();
let mut lsn = lsn_range.start;
while lsn < lsn_range.end {
let mut key = key_range.start;
while key < key_range.end {
if let Some(record) = storage.get(&(key, lsn)) {
data.push((key, lsn, record.clone()));
}
key = key.next();
}
lsn = Lsn(lsn.0 + 1);
}
acc.push(DeltaLayerTestDesc {
data,
lsn_range: lsn_range.clone(),
key_range: key_range.clone(),
})
}
acc
};
let image_test_layers = {
let mut acc = Vec::new();
for (key_range, lsn) in spec.image_layers_shape.iter() {
let mut data = Vec::new();
let mut key = key_range.start;
while key < key_range.end {
if let Some(record) = storage.get(&(key, *lsn)) {
let blob = match record {
Value::Image(blob) => blob.clone(),
_ => unreachable!(),
};
data.push((key, blob));
}
key = key.next();
}
acc.push((*lsn, data));
}
acc
};
let tline = tenant
.create_test_timeline_with_layers(
new_timeline_id,
spec.start_lsn,
pg_version,
ctx,
in_memory_test_layers,
delta_test_layers,
image_test_layers,
spec.last_record_lsn,
)
.await?;
Ok((
tline,
Storage {
storage,
start_lsn: spec.start_lsn,
},
interesting_lsns,
))
}
#[tokio::test]
async fn test_basic() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_basic").await?.load().await;
@@ -10543,6 +10863,214 @@ mod tests {
Ok(())
}
// A randomized read path test. Generates a layer map according to a deterministic
// specification. Fills the (key, LSN) space in random manner and then performs
// random scattered queries validating the results against in-memory storage.
//
// See this internal Notion page for a diagram of the layer map:
// https://www.notion.so/neondatabase/Read-Path-Unit-Testing-Fuzzing-1d1f189e0047806c8e5cd37781b0a350?pvs=4
//
// A fuzzing mode is also supported. In this mode, the test will use a random
// seed instead of a hardcoded one. Use it in conjunction with `cargo stress`
// to run multiple instances in parallel:
//
// $ RUST_BACKTRACE=1 RUST_LOG=INFO \
// cargo stress --package=pageserver --features=testing,fuzz-read-path --release -- test_read_path
#[cfg(feature = "testing")]
#[tokio::test]
async fn test_read_path() -> anyhow::Result<()> {
use rand::seq::SliceRandom;
let seed = if cfg!(feature = "fuzz-read-path") {
let seed: u64 = thread_rng().r#gen();
seed
} else {
// Use a hard-coded seed when not in fuzzing mode.
// Note that with the current approach results are not reproducible
// accross platforms and Rust releases.
const SEED: u64 = 0;
SEED
};
let mut random = StdRng::seed_from_u64(seed);
let (queries, will_init_chance, gap_chance) = if cfg!(feature = "fuzz-read-path") {
const QUERIES: u64 = 5000;
let will_init_chance: u8 = random.gen_range(0..=10);
let gap_chance: u8 = random.gen_range(0..=50);
(QUERIES, will_init_chance, gap_chance)
} else {
const QUERIES: u64 = 1000;
const WILL_INIT_CHANCE: u8 = 1;
const GAP_CHANCE: u8 = 5;
(QUERIES, WILL_INIT_CHANCE, GAP_CHANCE)
};
let harness = TenantHarness::create("test_read_path").await?;
let (tenant, ctx) = harness.load().await;
tracing::info!("Using random seed: {seed}");
tracing::info!(%will_init_chance, %gap_chance, "Fill params");
// Define the layer map shape. Note that this part is not randomized.
const KEY_DIMENSION_SIZE: u32 = 99;
let start_key = Key::from_hex("110000000033333333444444445500000000").unwrap();
let end_key = start_key.add(KEY_DIMENSION_SIZE);
let total_key_range = start_key..end_key;
let total_key_range_size = end_key.to_i128() - start_key.to_i128();
let total_start_lsn = Lsn(104);
let last_record_lsn = Lsn(504);
assert!(total_key_range_size % 3 == 0);
let in_memory_layers_shape = vec![
(total_key_range.clone(), Lsn(304)..Lsn(400)),
(total_key_range.clone(), Lsn(400)..last_record_lsn),
];
let delta_layers_shape = vec![
(
start_key..(start_key.add((total_key_range_size / 3) as u32)),
Lsn(200)..Lsn(304),
),
(
(start_key.add((total_key_range_size / 3) as u32))
..(start_key.add((total_key_range_size * 2 / 3) as u32)),
Lsn(200)..Lsn(304),
),
(
(start_key.add((total_key_range_size * 2 / 3) as u32))
..(start_key.add(total_key_range_size as u32)),
Lsn(200)..Lsn(304),
),
];
let image_layers_shape = vec![
(
start_key.add((total_key_range_size * 2 / 3 - 10) as u32)
..start_key.add((total_key_range_size * 2 / 3 + 10) as u32),
Lsn(456),
),
(
start_key.add((total_key_range_size / 3 - 10) as u32)
..start_key.add((total_key_range_size / 3 + 10) as u32),
Lsn(256),
),
(total_key_range.clone(), total_start_lsn),
];
let specification = TestTimelineSpecification {
start_lsn: total_start_lsn,
last_record_lsn,
in_memory_layers_shape,
delta_layers_shape,
image_layers_shape,
gap_chance,
will_init_chance,
};
// Create and randomly fill in the layers according to the specification
let (tline, storage, interesting_lsns) = randomize_timeline(
&tenant,
TIMELINE_ID,
DEFAULT_PG_VERSION,
specification,
&mut random,
&ctx,
)
.await?;
// Now generate queries based on the interesting lsns that we've collected.
//
// While there's still room in the query, pick and interesting LSN and a random
// key. Then roll the dice to see if the next key should also be included in
// the query. When the roll fails, break the "batch" and pick another point in the
// (key, LSN) space.
const PICK_NEXT_CHANCE: u8 = 50;
for _ in 0..queries {
let query = {
let mut keyspaces_at_lsn: HashMap<Lsn, KeySpaceRandomAccum> = HashMap::default();
let mut used_keys: HashSet<Key> = HashSet::default();
while used_keys.len() < Timeline::MAX_GET_VECTORED_KEYS as usize {
let selected_lsn = interesting_lsns.choose(&mut random).expect("not empty");
let mut selected_key = start_key.add(random.gen_range(0..KEY_DIMENSION_SIZE));
while used_keys.len() < Timeline::MAX_GET_VECTORED_KEYS as usize {
if used_keys.contains(&selected_key)
|| selected_key >= start_key.add(KEY_DIMENSION_SIZE)
{
break;
}
keyspaces_at_lsn
.entry(*selected_lsn)
.or_default()
.add_key(selected_key);
used_keys.insert(selected_key);
let pick_next = random.gen_range(0..=100) <= PICK_NEXT_CHANCE;
if pick_next {
selected_key = selected_key.next();
} else {
break;
}
}
}
VersionedKeySpaceQuery::scattered(
keyspaces_at_lsn
.into_iter()
.map(|(lsn, acc)| (lsn, acc.to_keyspace()))
.collect(),
)
};
// Run the query and validate the results
let results = tline
.get_vectored(query.clone(), IoConcurrency::Sequential, &ctx)
.await;
let blobs = match results {
Ok(ok) => ok,
Err(err) => {
panic!("seed={seed} Error returned for query {query}: {err}");
}
};
for (key, key_res) in blobs.into_iter() {
match key_res {
Ok(blob) => {
let requested_at_lsn = query.map_key_to_lsn(&key);
let expected = storage.get(key, requested_at_lsn);
if blob != expected {
tracing::error!(
"seed={seed} Mismatch for {key}@{requested_at_lsn} from query: {query}"
);
}
assert_eq!(blob, expected);
}
Err(err) => {
let requested_at_lsn = query.map_key_to_lsn(&key);
panic!(
"seed={seed} Error returned for {key}@{requested_at_lsn} from query {query}: {err}"
);
}
}
}
}
Ok(())
}
fn sort_layer_key(k1: &PersistentLayerKey, k2: &PersistentLayerKey) -> std::cmp::Ordering {
(
k1.is_delta,

View File

@@ -22,6 +22,7 @@ use bytes::{BufMut, BytesMut};
use pageserver_api::models::ImageCompressionAlgorithm;
use tokio::io::AsyncWriteExt;
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
use tokio_util::sync::CancellationToken;
use tracing::warn;
use crate::context::RequestContext;
@@ -169,7 +170,13 @@ pub struct BlobWriter<const BUFFERED: bool> {
}
impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
pub fn new(
inner: VirtualFile,
start_offset: u64,
_gate: &utils::sync::gate::Gate,
_cancel: CancellationToken,
_ctx: &RequestContext,
) -> Self {
Self {
inner,
offset: start_offset,
@@ -432,12 +439,14 @@ pub(crate) mod tests {
) -> Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>), Error> {
let temp_dir = camino_tempfile::tempdir()?;
let pathbuf = temp_dir.path().join("file");
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
// Write part (in block to drop the file)
let mut offsets = Vec::new();
{
let file = VirtualFile::create(pathbuf.as_path(), ctx).await?;
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0, &gate, cancel.clone(), ctx);
for blob in blobs.iter() {
let (_, res) = if compression {
let res = wtr

View File

@@ -714,7 +714,7 @@ impl LayerMap {
true
}
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<PersistentLayerDesc>> {
pub fn iter_historic_layers(&self) -> impl ExactSizeIterator<Item = Arc<PersistentLayerDesc>> {
self.historic.iter()
}

View File

@@ -504,7 +504,7 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
}
/// Iterate all the layers
pub fn iter(&self) -> impl '_ + Iterator<Item = Value> {
pub fn iter(&self) -> impl ExactSizeIterator<Item = Value> {
// NOTE we can actually perform this without rebuilding,
// but it's not necessary for now.
if !self.buffer.is_empty() {

View File

@@ -5,6 +5,7 @@ use std::sync::Arc;
use bytes::Bytes;
use pageserver_api::key::{KEY_SIZE, Key};
use pageserver_api::value::Value;
use tokio_util::sync::CancellationToken;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use utils::shard::TenantShardId;
@@ -179,7 +180,7 @@ impl BatchLayerWriter {
/// An image writer that takes images and produces multiple image layers.
#[must_use]
pub struct SplitImageLayerWriter {
pub struct SplitImageLayerWriter<'a> {
inner: ImageLayerWriter,
target_layer_size: u64,
lsn: Lsn,
@@ -188,9 +189,12 @@ pub struct SplitImageLayerWriter {
tenant_shard_id: TenantShardId,
batches: BatchLayerWriter,
start_key: Key,
gate: &'a utils::sync::gate::Gate,
cancel: CancellationToken,
}
impl SplitImageLayerWriter {
impl<'a> SplitImageLayerWriter<'a> {
#[allow(clippy::too_many_arguments)]
pub async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
@@ -198,6 +202,8 @@ impl SplitImageLayerWriter {
start_key: Key,
lsn: Lsn,
target_layer_size: u64,
gate: &'a utils::sync::gate::Gate,
cancel: CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
Ok(Self {
@@ -208,6 +214,8 @@ impl SplitImageLayerWriter {
tenant_shard_id,
&(start_key..Key::MAX),
lsn,
gate,
cancel.clone(),
ctx,
)
.await?,
@@ -217,6 +225,8 @@ impl SplitImageLayerWriter {
batches: BatchLayerWriter::new(conf).await?,
lsn,
start_key,
gate,
cancel,
})
}
@@ -239,6 +249,8 @@ impl SplitImageLayerWriter {
self.tenant_shard_id,
&(key..Key::MAX),
self.lsn,
self.gate,
self.cancel.clone(),
ctx,
)
.await?;
@@ -291,7 +303,7 @@ impl SplitImageLayerWriter {
/// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
/// will split them into multiple files based on size.
#[must_use]
pub struct SplitDeltaLayerWriter {
pub struct SplitDeltaLayerWriter<'a> {
inner: Option<(Key, DeltaLayerWriter)>,
target_layer_size: u64,
conf: &'static PageServerConf,
@@ -300,15 +312,19 @@ pub struct SplitDeltaLayerWriter {
lsn_range: Range<Lsn>,
last_key_written: Key,
batches: BatchLayerWriter,
gate: &'a utils::sync::gate::Gate,
cancel: CancellationToken,
}
impl SplitDeltaLayerWriter {
impl<'a> SplitDeltaLayerWriter<'a> {
pub async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
lsn_range: Range<Lsn>,
target_layer_size: u64,
gate: &'a utils::sync::gate::Gate,
cancel: CancellationToken,
) -> anyhow::Result<Self> {
Ok(Self {
target_layer_size,
@@ -319,6 +335,8 @@ impl SplitDeltaLayerWriter {
lsn_range,
last_key_written: Key::MIN,
batches: BatchLayerWriter::new(conf).await?,
gate,
cancel,
})
}
@@ -344,6 +362,8 @@ impl SplitDeltaLayerWriter {
self.tenant_shard_id,
key,
self.lsn_range.clone(),
self.gate,
self.cancel.clone(),
ctx,
)
.await?,
@@ -362,6 +382,8 @@ impl SplitDeltaLayerWriter {
self.tenant_shard_id,
key,
self.lsn_range.clone(),
self.gate,
self.cancel.clone(),
ctx,
)
.await?;
@@ -469,6 +491,8 @@ mod tests {
get_key(0),
Lsn(0x18),
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
&ctx,
)
.await
@@ -480,6 +504,8 @@ mod tests {
tenant.tenant_shard_id,
Lsn(0x18)..Lsn(0x20),
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
)
.await
.unwrap();
@@ -546,6 +572,8 @@ mod tests {
get_key(0),
Lsn(0x18),
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
&ctx,
)
.await
@@ -556,6 +584,8 @@ mod tests {
tenant.tenant_shard_id,
Lsn(0x18)..Lsn(0x20),
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
)
.await
.unwrap();
@@ -643,6 +673,8 @@ mod tests {
get_key(0),
Lsn(0x18),
4 * 1024,
&tline.gate,
tline.cancel.clone(),
&ctx,
)
.await
@@ -654,6 +686,8 @@ mod tests {
tenant.tenant_shard_id,
Lsn(0x18)..Lsn(0x20),
4 * 1024,
&tline.gate,
tline.cancel.clone(),
)
.await
.unwrap();
@@ -730,6 +764,8 @@ mod tests {
tenant.tenant_shard_id,
Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
)
.await
.unwrap();

View File

@@ -50,6 +50,7 @@ use rand::distributions::Alphanumeric;
use serde::{Deserialize, Serialize};
use tokio::sync::OnceCell;
use tokio_epoll_uring::IoBuf;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::bin_ser::BeSer;
use utils::id::{TenantId, TimelineId};
@@ -400,12 +401,15 @@ impl DeltaLayerWriterInner {
///
/// Start building a new delta layer.
///
#[allow(clippy::too_many_arguments)]
async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
key_start: Key,
lsn_range: Range<Lsn>,
gate: &utils::sync::gate::Gate,
cancel: CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
// Create the file initially with a temporary filename. We don't know
@@ -420,7 +424,7 @@ impl DeltaLayerWriterInner {
let mut file = VirtualFile::create(&path, ctx).await?;
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();
@@ -628,12 +632,15 @@ impl DeltaLayerWriter {
///
/// Start building a new delta layer.
///
#[allow(clippy::too_many_arguments)]
pub async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
key_start: Key,
lsn_range: Range<Lsn>,
gate: &utils::sync::gate::Gate,
cancel: CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
Ok(Self {
@@ -644,6 +651,8 @@ impl DeltaLayerWriter {
tenant_shard_id,
key_start,
lsn_range,
gate,
cancel,
ctx,
)
.await?,
@@ -1885,6 +1894,8 @@ pub(crate) mod test {
harness.tenant_shard_id,
entries_meta.key_range.start,
entries_meta.lsn_range.clone(),
&timeline.gate,
timeline.cancel.clone(),
&ctx,
)
.await?;
@@ -2079,6 +2090,8 @@ pub(crate) mod test {
tenant.tenant_shard_id,
Key::MIN,
Lsn(0x11)..truncate_at,
&branch.gate,
branch.cancel.clone(),
ctx,
)
.await
@@ -2213,6 +2226,8 @@ pub(crate) mod test {
tenant.tenant_shard_id,
*key_start,
(*lsn_min)..lsn_end,
&tline.gate,
tline.cancel.clone(),
ctx,
)
.await?;

View File

@@ -48,6 +48,7 @@ use rand::distributions::Alphanumeric;
use serde::{Deserialize, Serialize};
use tokio::sync::OnceCell;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::bin_ser::BeSer;
use utils::id::{TenantId, TimelineId};
@@ -748,12 +749,15 @@ impl ImageLayerWriterInner {
///
/// Start building a new image layer.
///
#[allow(clippy::too_many_arguments)]
async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
key_range: &Range<Key>,
lsn: Lsn,
gate: &utils::sync::gate::Gate,
cancel: CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
// Create the file initially with a temporary filename.
@@ -780,7 +784,7 @@ impl ImageLayerWriterInner {
};
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();
@@ -988,18 +992,30 @@ impl ImageLayerWriter {
///
/// Start building a new image layer.
///
#[allow(clippy::too_many_arguments)]
pub async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
key_range: &Range<Key>,
lsn: Lsn,
gate: &utils::sync::gate::Gate,
cancel: CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<ImageLayerWriter> {
Ok(Self {
inner: Some(
ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx)
.await?,
ImageLayerWriterInner::new(
conf,
timeline_id,
tenant_shard_id,
key_range,
lsn,
gate,
cancel,
ctx,
)
.await?,
),
})
}
@@ -1203,6 +1219,8 @@ mod test {
harness.tenant_shard_id,
&range,
lsn,
&timeline.gate,
timeline.cancel.clone(),
&ctx,
)
.await
@@ -1268,6 +1286,8 @@ mod test {
harness.tenant_shard_id,
&range,
lsn,
&timeline.gate,
timeline.cancel.clone(),
&ctx,
)
.await
@@ -1346,6 +1366,8 @@ mod test {
tenant.tenant_shard_id,
&key_range,
lsn,
&tline.gate,
tline.cancel.clone(),
ctx,
)
.await?;

View File

@@ -719,6 +719,8 @@ impl InMemoryLayer {
ctx: &RequestContext,
key_range: Option<Range<Key>>,
l0_flush_global_state: &l0_flush::Inner,
gate: &utils::sync::gate::Gate,
cancel: CancellationToken,
) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
// Grab the lock in read-mode. We hold it over the I/O, but because this
// layer is not writeable anymore, no one should be trying to acquire the
@@ -759,6 +761,8 @@ impl InMemoryLayer {
self.tenant_shard_id,
Key::MIN,
self.start_lsn..end_lsn,
gate,
cancel,
ctx,
)
.await?;

View File

@@ -2490,12 +2490,11 @@ impl Timeline {
tenant_conf.is_gc_blocked_by_lsn_lease_deadline()
}
pub(crate) fn get_lazy_slru_download(&self) -> bool {
pub(crate) fn get_lazy_slru_download(&self, lazy_slru_download_enabled_by_cp: bool) -> bool {
let tenant_conf = self.tenant_conf.load();
tenant_conf
.tenant_conf
.lazy_slru_download
.unwrap_or(self.conf.default_tenant_conf.lazy_slru_download)
tenant_conf.tenant_conf.lazy_slru_download.unwrap_or(
lazy_slru_download_enabled_by_cp || self.conf.default_tenant_conf.lazy_slru_download,
)
}
/// Checks if a get page request should get perf tracing
@@ -4026,7 +4025,7 @@ impl VersionedKeySpaceQuery {
/// Returns LSN for a specific key.
///
/// Invariant: requested key must be part of [`Self::total_keyspace`]
fn map_key_to_lsn(&self, key: &Key) -> Lsn {
pub(super) fn map_key_to_lsn(&self, key: &Key) -> Lsn {
match self {
Self::Uniform { lsn, .. } => *lsn,
Self::Scattered { keyspaces_at_lsn } => {
@@ -4986,7 +4985,13 @@ impl Timeline {
let ctx = ctx.attached_child();
let work = async move {
let Some((desc, path)) = frozen_layer
.write_to_disk(&ctx, key_range, self_clone.l0_flush_global_state.inner())
.write_to_disk(
&ctx,
key_range,
self_clone.l0_flush_global_state.inner(),
&self_clone.gate,
self_clone.cancel.clone(),
)
.await?
else {
return Ok(None);
@@ -5526,6 +5531,8 @@ impl Timeline {
self.tenant_shard_id,
&img_range,
lsn,
&self.gate,
self.cancel.clone(),
ctx,
)
.await?;
@@ -5694,6 +5701,12 @@ impl Timeline {
return;
}
if self.cancel.is_cancelled() {
// We already requested stopping the tenant, so we cannot wait for the logical size
// calculation to complete given the task might have been already cancelled.
return;
}
if let Some(await_bg_cancel) = self
.current_logical_size
.cancel_wait_for_background_loop_concurrency_limit_semaphore
@@ -6890,6 +6903,8 @@ impl Timeline {
self.tenant_shard_id,
&(min_key..end_key),
lsn,
&self.gate,
self.cancel.clone(),
ctx,
)
.await?;
@@ -6951,6 +6966,8 @@ impl Timeline {
self.tenant_shard_id,
deltas.key_range.start,
deltas.lsn_range,
&self.gate,
self.cancel.clone(),
ctx,
)
.await?;

View File

@@ -56,7 +56,8 @@ use crate::tenant::storage_layer::batch_split_writer::{
use crate::tenant::storage_layer::filter_iterator::FilterIterator;
use crate::tenant::storage_layer::merge_iterator::MergeIterator;
use crate::tenant::storage_layer::{
AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
AsLayerDesc, LayerVisibilityHint, PersistentLayerDesc, PersistentLayerKey,
ValueReconstructState,
};
use crate::tenant::tasks::log_compaction_error;
use crate::tenant::timeline::{
@@ -69,6 +70,13 @@ use crate::virtual_file::{MaybeFatalIo, VirtualFile};
/// Maximum number of deltas before generating an image layer in bottom-most compaction.
const COMPACTION_DELTA_THRESHOLD: usize = 5;
/// Ratio of shard-local pages below which we trigger shard ancestor layer rewrites. 0.3 means that
/// <= 30% of layer pages must belong to the descendant shard to rewrite the layer.
///
/// We choose a value < 0.5 to avoid rewriting all visible layers every time we do a power-of-two
/// shard split, which gets expensive for large tenants.
const ANCESTOR_COMPACTION_REWRITE_THRESHOLD: f64 = 0.3;
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub struct GcCompactionJobId(pub usize);
@@ -749,8 +757,8 @@ impl KeyHistoryRetention {
async fn pipe_to(
self,
key: Key,
delta_writer: &mut SplitDeltaLayerWriter,
mut image_writer: Option<&mut SplitImageLayerWriter>,
delta_writer: &mut SplitDeltaLayerWriter<'_>,
mut image_writer: Option<&mut SplitImageLayerWriter<'_>>,
stat: &mut CompactionStatistics,
ctx: &RequestContext,
) -> anyhow::Result<()> {
@@ -819,7 +827,15 @@ impl KeyHistoryRetention {
base_img: &Option<(Lsn, &Bytes)>,
history: &[(Lsn, &NeonWalRecord)],
tline: &Arc<Timeline>,
skip_empty: bool,
) -> anyhow::Result<()> {
if base_img.is_none() && history.is_empty() {
if skip_empty {
return Ok(());
}
anyhow::bail!("verification failed: key {} has no history at {}", key, lsn);
};
let mut records = history
.iter()
.map(|(lsn, val)| (*lsn, (*val).clone()))
@@ -860,17 +876,12 @@ impl KeyHistoryRetention {
if *retain_lsn >= min_lsn {
// Only verify after the key appears in the full history for the first time.
if base_img.is_none() && history.is_empty() {
anyhow::bail!(
"verificatoin failed: key {} has no history at {}",
key,
retain_lsn
);
};
// We don't modify history: in theory, we could replace the history with a single
// image as in `generate_key_retention` to make redos at later LSNs faster. But we
// want to verify everything as if they are read from the real layer map.
collect_and_verify(key, *retain_lsn, &base_img, &history, tline).await?;
collect_and_verify(key, *retain_lsn, &base_img, &history, tline, false)
.await
.context("below horizon retain_lsn")?;
}
}
@@ -878,13 +889,17 @@ impl KeyHistoryRetention {
match val {
Value::Image(img) => {
// Above the GC horizon, we verify every time we see an image.
collect_and_verify(key, *lsn, &base_img, &history, tline).await?;
collect_and_verify(key, *lsn, &base_img, &history, tline, true)
.await
.context("above horizon full image")?;
base_img = Some((*lsn, img));
history.clear();
}
Value::WalRecord(rec) if val.will_init() => {
// Above the GC horizon, we verify every time we see an init record.
collect_and_verify(key, *lsn, &base_img, &history, tline).await?;
collect_and_verify(key, *lsn, &base_img, &history, tline, true)
.await
.context("above horizon init record")?;
base_img = None;
history.clear();
history.push((*lsn, rec));
@@ -895,7 +910,9 @@ impl KeyHistoryRetention {
}
}
// Ensure the latest record is readable.
collect_and_verify(key, max_lsn, &base_img, &history, tline).await?;
collect_and_verify(key, max_lsn, &base_img, &history, tline, false)
.await
.context("latest record")?;
Ok(())
}
}
@@ -1273,7 +1290,10 @@ impl Timeline {
let pitr_cutoff = self.gc_info.read().unwrap().cutoffs.time;
let layers = self.layers.read().await;
for layer_desc in layers.layer_map()?.iter_historic_layers() {
let layers_iter = layers.layer_map()?.iter_historic_layers();
let (layers_total, mut layers_checked) = (layers_iter.len(), 0);
for layer_desc in layers_iter {
layers_checked += 1;
let layer = layers.get_from_desc(&layer_desc);
if layer.metadata().shard.shard_count == self.shard_identity.count {
// This layer does not belong to a historic ancestor, no need to re-image it.
@@ -1317,14 +1337,15 @@ impl Timeline {
continue;
}
// Don't bother re-writing a layer unless it will at least halve its size
// Only rewrite a layer if we can reclaim significant space.
if layer_local_page_count != u32::MAX
&& layer_local_page_count > layer_raw_page_count / 2
&& layer_local_page_count as f64 / layer_raw_page_count as f64
<= ANCESTOR_COMPACTION_REWRITE_THRESHOLD
{
debug!(%layer,
"layer is already mostly local ({}/{}), not rewriting",
layer_local_page_count,
layer_raw_page_count
"layer has a large share of local pages \
({layer_local_page_count}/{layer_raw_page_count} > \
{ANCESTOR_COMPACTION_REWRITE_THRESHOLD}), not rewriting",
);
}
@@ -1336,12 +1357,19 @@ impl Timeline {
continue;
}
// We do not yet implement rewrite of delta layers.
if layer_desc.is_delta() {
// We do not yet implement rewrite of delta layers
debug!(%layer, "Skipping rewrite of delta layer");
continue;
}
// We don't bother rewriting layers that aren't visible, since these won't be needed by
// reads and will likely be garbage collected soon.
if layer.visibility() != LayerVisibilityHint::Visible {
debug!(%layer, "Skipping rewrite of invisible layer");
continue;
}
// Only rewrite layers if their generations differ. This guarantees:
// - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
// - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
@@ -1371,7 +1399,8 @@ impl Timeline {
}
info!(
"starting shard ancestor compaction, rewriting {} layers and dropping {} layers \
"starting shard ancestor compaction, rewriting {} layers and dropping {} layers, \
checked {layers_checked}/{layers_total} layers \
(latest_gc_cutoff={} pitr_cutoff={})",
layers_to_rewrite.len(),
drop_layers.len(),
@@ -1394,6 +1423,8 @@ impl Timeline {
self.tenant_shard_id,
&layer.layer_desc().key_range,
layer.layer_desc().image_layer_lsn(),
&self.gate,
self.cancel.clone(),
ctx,
)
.await
@@ -2033,6 +2064,8 @@ impl Timeline {
debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
lsn_range.clone()
},
&self.gate,
self.cancel.clone(),
ctx,
)
.await
@@ -3232,6 +3265,8 @@ impl Timeline {
job_desc.compaction_key_range.start,
lowest_retain_lsn,
self.get_compaction_target_size(),
&self.gate,
self.cancel.clone(),
ctx,
)
.await
@@ -3248,6 +3283,8 @@ impl Timeline {
self.tenant_shard_id,
lowest_retain_lsn..end_lsn,
self.get_compaction_target_size(),
&self.gate,
self.cancel.clone(),
)
.await
.context("failed to create delta layer writer")
@@ -3344,6 +3381,8 @@ impl Timeline {
self.tenant_shard_id,
desc.key_range.start,
desc.lsn_range.clone(),
&self.gate,
self.cancel.clone(),
ctx,
)
.await
@@ -3361,6 +3400,8 @@ impl Timeline {
self.tenant_shard_id,
job_desc.compaction_key_range.end,
desc.lsn_range.clone(),
&self.gate,
self.cancel.clone(),
ctx,
)
.await
@@ -3932,6 +3973,8 @@ impl CompactionJobExecutor for TimelineAdaptor {
self.timeline.tenant_shard_id,
key_range.start,
lsn_range.clone(),
&self.timeline.gate,
self.timeline.cancel.clone(),
ctx,
)
.await?;
@@ -4007,6 +4050,8 @@ impl TimelineAdaptor {
self.timeline.tenant_shard_id,
key_range,
lsn,
&self.timeline.gate,
self.timeline.cancel.clone(),
ctx,
)
.await?;

View File

@@ -228,6 +228,8 @@ async fn generate_tombstone_image_layer(
detached.tenant_shard_id,
&key_range,
image_lsn,
&detached.gate,
detached.cancel.clone(),
ctx,
)
.await
@@ -776,6 +778,8 @@ async fn copy_lsn_prefix(
target_timeline.tenant_shard_id,
layer.layer_desc().key_range.start,
layer.layer_desc().lsn_range.start..end_lsn,
&target_timeline.gate,
target_timeline.cancel.clone(),
ctx,
)
.await

View File

@@ -738,6 +738,8 @@ impl ChunkProcessingJob {
self.timeline.tenant_shard_id,
&self.range,
self.pgdata_lsn,
&self.timeline.gate,
self.timeline.cancel.clone(),
ctx,
)
.await?;

View File

@@ -2118,9 +2118,6 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *fromsk)
*/
if (wp->config->syncSafekeepers)
{
int n_synced;
n_synced = 0;
for (int i = 0; i < wp->n_safekeepers; i++)
{
Safekeeper *sk = &wp->safekeeper[i];
@@ -2129,8 +2126,6 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *fromsk)
/* alive safekeeper which is not synced yet; wait for it */
if (sk->state != SS_OFFLINE && !synced)
return;
if (synced)
n_synced++;
}
if (newCommitLsn >= wp->propTermStartLsn)

View File

@@ -27,6 +27,7 @@ humantime.workspace = true
http.workspace = true
hyper0.workspace = true
itertools.workspace = true
jsonwebtoken.workspace = true
futures.workspace = true
once_cell.workspace = true
parking_lot.workspace = true

View File

@@ -6,6 +6,7 @@ use std::str::{self, FromStr};
use std::sync::Arc;
use anyhow::Context;
use jsonwebtoken::TokenData;
use pageserver_api::models::ShardParameters;
use pageserver_api::shard::{ShardIdentity, ShardStripeSize};
use postgres_backend::{PostgresBackend, QueryError};
@@ -278,7 +279,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
.auth
.as_ref()
.expect("auth_type is configured but .auth of handler is missing");
let data = auth
let data: TokenData<Claims> = auth
.decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
.map_err(|e| QueryError::Unauthorized(e.0))?;

View File

@@ -629,15 +629,13 @@ impl ComputeHook {
};
let result = if !self.config.use_local_compute_notifications {
let compute_hook_url = if let Some(control_plane_url) = &self.config.control_plane_url {
Some(if control_plane_url.ends_with('/') {
format!("{control_plane_url}notify-attach")
} else {
format!("{control_plane_url}/notify-attach")
})
} else {
self.config.compute_hook_url.clone()
};
let compute_hook_url =
self.config
.control_plane_url
.as_ref()
.map(|control_plane_url| {
format!("{}/notify-attach", control_plane_url.trim_end_matches('/'))
});
// We validate this at startup
let notify_url = compute_hook_url.as_ref().unwrap();

View File

@@ -86,10 +86,6 @@ struct Cli {
#[arg(long)]
peer_jwt_token: Option<String>,
/// URL to control plane compute notification endpoint
#[arg(long)]
compute_hook_url: Option<String>,
/// URL to control plane storage API prefix
#[arg(long)]
control_plane_url: Option<String>,
@@ -360,13 +356,11 @@ async fn async_main() -> anyhow::Result<()> {
"Insecure config! One or more secrets is not set. This is only permitted in `--dev` mode"
);
}
StrictMode::Strict
if args.compute_hook_url.is_none() && args.control_plane_url.is_none() =>
{
StrictMode::Strict if args.control_plane_url.is_none() => {
// Production systems should always have a control plane URL set, to prevent falling
// back to trying to use neon_local.
anyhow::bail!(
"neither `--compute-hook-url` nor `--control-plane-url` are set: this is only permitted in `--dev` mode"
"`--control-plane-url` is not set: this is only permitted in `--dev` mode"
);
}
StrictMode::Strict if args.use_local_compute_notifications => {
@@ -394,7 +388,6 @@ async fn async_main() -> anyhow::Result<()> {
safekeeper_jwt_token: secrets.safekeeper_jwt_token,
control_plane_jwt_token: secrets.control_plane_jwt_token,
peer_jwt_token: secrets.peer_jwt_token,
compute_hook_url: args.compute_hook_url,
control_plane_url: args.control_plane_url,
max_offline_interval: args
.max_offline_interval

View File

@@ -357,18 +357,10 @@ pub struct Config {
// This JWT token will be used to authenticate with other storage controller instances
pub peer_jwt_token: Option<String>,
/// Where the compute hook should send notifications of pageserver attachment locations
/// (this URL points to the control plane in prod). If this is None, the compute hook will
/// assume it is running in a test environment and try to update neon_local.
pub compute_hook_url: Option<String>,
/// Prefix for storage API endpoints of the control plane. We use this prefix to compute
/// URLs that we use to send pageserver and safekeeper attachment locations.
/// If this is None, the compute hook will assume it is running in a test environment
/// and try to invoke neon_local instead.
///
/// For now, there is also `compute_hook_url` which allows configuration of the pageserver
/// specific endpoint, but it is in the process of being phased out.
pub control_plane_url: Option<String>,
/// Grace period within which a pageserver does not respond to heartbeats, but is still

View File

@@ -3,19 +3,35 @@
* Create a Neon project on staging.
* Grant the superuser privileges to the DB user.
* (Optional) create a branch for testing
* Configure the endpoint by updating the control-plane database with the following settings:
* Add the following settings to the `pg_settings` section of the default endpoint configuration for the project using the admin interface:
* `Timeone`: `America/Los_Angeles`
* `DateStyle`: `Postgres,MDY`
* `compute_query_id`: `off`
* Add the following section to the project configuration:
```json
"preload_libraries": {
"use_defaults": false,
"enabled_libraries": []
}
```
* Checkout the actual `Neon` sources
* Patch the sql and expected files for the specific PostgreSQL version, e.g. for v17:
```bash
$ cd vendor/postgres-v17
$ patch -p1 <../../compute/patches/cloud_regress_pg17.patch
```
* Set the environment variables (please modify according your configuration):
```bash
$ export DEFAULT_PG_VERSION=17
$ export BUILD_TYPE=release
```
* Build the Neon binaries see [README.md](../../README.md)
* Set the environment variable `BENCHMARK_CONNSTR` to the connection URI of your project.
* Set the environment variable `PG_VERSION` to the version of your project.
* Update poetry, run
```bash
$ scripts/pysync
```
* Run
```bash
$ pytest -m remote_cluster -k cloud_regress
$ scripts/pytest -m remote_cluster -k cloud_regress
```

View File

@@ -194,6 +194,7 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
counter("pageserver_wait_lsn_started_count"),
counter("pageserver_wait_lsn_finished_count"),
counter("pageserver_wait_ondemand_download_seconds_sum"),
counter("pageserver_page_service_batch_break_reason"),
*histogram("pageserver_page_service_batch_size"),
*histogram("pageserver_page_service_pagestream_batch_wait_time_seconds"),
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,

View File

@@ -947,6 +947,8 @@ class NeonEnvBuilder:
continue
if SMALL_DB_FILE_NAME_REGEX.fullmatch(test_file.name):
continue
if FINAL_METRICS_FILE_NAME == test_file.name:
continue
log.debug(f"Removing large database {test_file} file")
test_file.unlink()
elif test_entry.is_dir():
@@ -1322,10 +1324,6 @@ class NeonEnv:
log.info("test may use old binaries, ignoring warnings about unknown config items")
ps.allowed_errors.append(".*ignoring unknown configuration item.*")
# Allow old software to start until https://github.com/neondatabase/neon/pull/11275
# lands in the compatiblity snapshot.
ps_cfg["page_service_pipelining"].pop("batching")
self.pageservers.append(ps)
cfg["pageservers"].append(ps_cfg)
@@ -1461,6 +1459,12 @@ class NeonEnv:
except Exception as e:
metric_errors.append(e)
log.error(f"metric validation failed on {pageserver.id}: {e}")
try:
pageserver.snapshot_final_metrics()
except Exception as e:
log.error(f"metric snapshot failed on {pageserver.id}: {e}")
try:
pageserver.stop(immediate=immediate)
except RuntimeError:
@@ -2976,6 +2980,20 @@ class NeonPageserver(PgProtocol, LogUtils):
value = self.http_client().get_metric_value(metric)
assert value == 0, f"Nonzero {metric} == {value}"
def snapshot_final_metrics(self):
"""
Take a snapshot of this pageserver's metrics and stash in its work directory.
"""
if not self.running:
log.info(f"Skipping metrics snapshot on pageserver {self.id}, it is not running")
return
metrics = self.http_client().get_metrics_str()
metrics_snapshot_path = self.workdir / FINAL_METRICS_FILE_NAME
with open(metrics_snapshot_path, "w") as f:
f.write(metrics)
def tenant_attach(
self,
tenant_id: TenantId,
@@ -5138,6 +5156,8 @@ SMALL_DB_FILE_NAME_REGEX: re.Pattern[str] = re.compile(
r"config-v1|heatmap-v1|tenant-manifest|metadata|.+\.(?:toml|pid|json|sql|conf)"
)
FINAL_METRICS_FILE_NAME: str = "final_metrics.txt"
SKIP_DIRS = frozenset(
(

View File

@@ -199,7 +199,7 @@ def wait_for_last_record_lsn(
"""waits for pageserver to catch up to a certain lsn, returns the last observed lsn."""
current_lsn = Lsn(0)
for i in range(1000):
for i in range(2000):
current_lsn = last_record_lsn(pageserver_http, tenant, timeline)
if current_lsn >= lsn:
return current_lsn

View File

@@ -1,7 +1,6 @@
import concurrent.futures
import dataclasses
import json
import re
import threading
import time
from dataclasses import dataclass
@@ -170,6 +169,7 @@ def test_throughput(
time: float
pageserver_batch_size_histo_sum: float
pageserver_batch_size_histo_count: float
pageserver_batch_breaks_reason_count: dict[str, int]
compute_getpage_count: float
pageserver_cpu_seconds_total: float
@@ -183,6 +183,10 @@ def test_throughput(
compute_getpage_count=self.compute_getpage_count - other.compute_getpage_count,
pageserver_cpu_seconds_total=self.pageserver_cpu_seconds_total
- other.pageserver_cpu_seconds_total,
pageserver_batch_breaks_reason_count={
reason: count - other.pageserver_batch_breaks_reason_count.get(reason, 0)
for reason, count in self.pageserver_batch_breaks_reason_count.items()
},
)
def normalize(self, by) -> "Metrics":
@@ -192,6 +196,10 @@ def test_throughput(
pageserver_batch_size_histo_count=self.pageserver_batch_size_histo_count / by,
compute_getpage_count=self.compute_getpage_count / by,
pageserver_cpu_seconds_total=self.pageserver_cpu_seconds_total / by,
pageserver_batch_breaks_reason_count={
reason: count / by
for reason, count in self.pageserver_batch_breaks_reason_count.items()
},
)
def get_metrics() -> Metrics:
@@ -201,6 +209,20 @@ def test_throughput(
)
compute_getpage_count = cur.fetchall()[0][0]
pageserver_metrics = ps_http.get_metrics()
for name, samples in pageserver_metrics.metrics.items():
for sample in samples:
log.info(f"{name=} labels={sample.labels} {sample.value}")
raw_batch_break_reason_count = pageserver_metrics.query_all(
"pageserver_page_service_batch_break_reason_total",
filter={"timeline_id": str(env.initial_timeline)},
)
batch_break_reason_count = {
sample.labels["reason"]: int(sample.value)
for sample in raw_batch_break_reason_count
}
return Metrics(
time=time.time(),
pageserver_batch_size_histo_sum=pageserver_metrics.query_one(
@@ -209,6 +231,7 @@ def test_throughput(
pageserver_batch_size_histo_count=pageserver_metrics.query_one(
"pageserver_page_service_batch_size_count"
).value,
pageserver_batch_breaks_reason_count=batch_break_reason_count,
compute_getpage_count=compute_getpage_count,
pageserver_cpu_seconds_total=pageserver_metrics.query_one(
"libmetrics_process_cpu_seconds_highres"
@@ -263,25 +286,6 @@ def test_throughput(
log.info("Results: %s", metrics)
since_last_start: list[str] = []
for line in env.pageserver.logfile.read_text().splitlines():
if "git:" in line:
since_last_start = []
since_last_start.append(line)
stopping_batching_because_re = re.compile(
r"stopping batching because (LSN changed|of batch size|timeline object mismatch|batch key changed|same page was requested at different LSNs|.*)"
)
reasons_for_stopping_batching = {}
for line in since_last_start:
match = stopping_batching_because_re.search(line)
if match:
if match.group(1) not in reasons_for_stopping_batching:
reasons_for_stopping_batching[match.group(1)] = 0
reasons_for_stopping_batching[match.group(1)] += 1
log.info("Reasons for stopping batching: %s", reasons_for_stopping_batching)
#
# Sanity-checks on the collected data
#
@@ -295,7 +299,16 @@ def test_throughput(
#
for metric, value in dataclasses.asdict(metrics).items():
zenbenchmark.record(f"counters.{metric}", value, unit="", report=MetricReport.TEST_PARAM)
if metric == "pageserver_batch_breaks_reason_count":
assert isinstance(value, dict)
for reason, count in value.items():
zenbenchmark.record(
f"counters.{metric}_{reason}", count, unit="", report=MetricReport.TEST_PARAM
)
else:
zenbenchmark.record(
f"counters.{metric}", value, unit="", report=MetricReport.TEST_PARAM
)
zenbenchmark.record(
"perfmetric.batching_factor",

View File

@@ -97,6 +97,7 @@ def test_branch_creation_heavy_write(neon_compare: NeonCompare, n_branches: int)
_record_branch_creation_durations(neon_compare, branch_creation_durations)
@pytest.mark.timeout(1000)
@pytest.mark.parametrize("n_branches", [500, 1024])
@pytest.mark.parametrize("shape", ["one_ancestor", "random"])
def test_branch_creation_many(neon_compare: NeonCompare, n_branches: int, shape: str):
@@ -205,7 +206,7 @@ def wait_and_record_startup_metrics(
assert len(matching) == len(expected_labels)
return matching
samples = wait_until(metrics_are_filled)
samples = wait_until(metrics_are_filled, timeout=60)
for sample in samples:
phase = sample.labels["phase"]

View File

@@ -52,6 +52,8 @@ def test_ingest_insert_bulk(
# would compete with Pageserver for bandwidth.
# neon_env_builder.enable_safekeeper_remote_storage(s3_storage())
neon_env_builder.pageserver_config_override = "wait_lsn_timeout='600 s'"
neon_env_builder.disable_scrub_on_exit() # immediate shutdown may leave stray layers
env = neon_env_builder.init_start()
@@ -92,7 +94,18 @@ def test_ingest_insert_bulk(
worker_rows = rows / CONCURRENCY
pool.submit(insert_rows, endpoint, f"table{i}", worker_rows, value)
end_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
for attempt in range(5):
try:
end_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
break
except Exception as e:
# if we disable backpressure, postgres can become unresponsive for longer than a minute
# and new connection attempts time out in postgres after 1 minute
# so if this happens we retry new connection
log.error(f"Attempt {attempt + 1}/5: Failed to select current wal lsn: {e}")
if attempt == 4:
log.error("Exceeded maximum retry attempts for selecting current wal lsn")
raise
# Wait for pageserver to ingest the WAL.
client = env.pageserver.http_client()

View File

@@ -64,8 +64,8 @@ def test_ro_replica_lag(
project = neon_api.create_project(pg_version)
project_id = project["project"]["id"]
log.info("Project ID: {}", project_id)
log.info("Primary endpoint ID: {}", project["project"]["endpoints"][0]["id"])
log.info("Project ID: %s", project_id)
log.info("Primary endpoint ID: %s", project["project"]["endpoints"][0]["id"])
neon_api.wait_for_operation_to_finish(project_id)
error_occurred = False
try:
@@ -81,7 +81,7 @@ def test_ro_replica_lag(
endpoint_type="read_only",
settings={"pg_settings": {"hot_standby_feedback": "on"}},
)
log.info("Replica endpoint ID: {}", replica["endpoint"]["id"])
log.info("Replica endpoint ID: %s", replica["endpoint"]["id"])
replica_env = master_env.copy()
replica_env["PGHOST"] = replica["endpoint"]["host"]
neon_api.wait_for_operation_to_finish(project_id)
@@ -197,8 +197,8 @@ def test_replication_start_stop(
project = neon_api.create_project(pg_version)
project_id = project["project"]["id"]
log.info("Project ID: {}", project_id)
log.info("Primary endpoint ID: {}", project["project"]["endpoints"][0]["id"])
log.info("Project ID: %s", project_id)
log.info("Primary endpoint ID: %s", project["project"]["endpoints"][0]["id"])
neon_api.wait_for_operation_to_finish(project_id)
try:
branch_id = project["branch"]["id"]
@@ -215,7 +215,7 @@ def test_replication_start_stop(
endpoint_type="read_only",
settings={"pg_settings": {"hot_standby_feedback": "on"}},
)
log.info("Replica {} endpoint ID: {}", i + 1, replica["endpoint"]["id"])
log.info("Replica %d endpoint ID: %s", i + 1, replica["endpoint"]["id"])
replicas.append(replica)
neon_api.wait_for_operation_to_finish(project_id)

View File

@@ -13,7 +13,7 @@ from fixtures.neon_fixtures import (
)
@pytest.mark.timeout(600)
@pytest.mark.timeout(1200)
@pytest.mark.parametrize("shard_count", [1, 8, 32])
@pytest.mark.parametrize(
"wal_receiver_protocol",

View File

@@ -390,6 +390,7 @@ def test_create_churn_during_restart(neon_env_builder: NeonEnvBuilder):
# Tenant creation requests which arrive out of order will generate complaints about
# generation nubmers out of order.
env.pageserver.allowed_errors.append(".*Generation .+ is less than existing .+")
env.pageserver.allowed_errors.append(".*due to stale generation.+")
# Timeline::flush_and_shutdown cannot tell if it is hitting a failure because of
# an incomplete attach, or some other problem. In the field this should be rare,

View File

@@ -70,6 +70,7 @@ num-traits = { version = "0.2", features = ["i128", "libm"] }
once_cell = { version = "1" }
p256 = { version = "0.13", features = ["jwk"] }
parquet = { version = "53", default-features = false, features = ["zstd"] }
pkcs8 = { version = "0.10", default-features = false, features = ["pem", "std"] }
prost = { version = "0.13", features = ["no-recursion-limit", "prost-derive"] }
rand = { version = "0.8", features = ["small_rng"] }
regex = { version = "1" }