Compare commits

...

24 Commits

Author SHA1 Message Date
Vlad Lazar
82e3866c27 pageserver: rate limited warning on too many layers visited 2024-04-22 17:32:12 +01:00
Vlad Lazar
7a8effc190 tests: add a compaction smoke test 2024-04-22 17:32:12 +01:00
Vlad Lazar
35e299d308 pageserver: add a metric counting layer visits on vectored read path 2024-04-22 17:32:10 +01:00
Vlad Lazar
ea5b36c47a pageserver: tweak metric counting layer visits on non-vectored read path
* Give it a better name
* Track all layers
* Larger buckets
2024-04-22 17:32:05 +01:00
Alex Chi Z
25d9dc6eaf chore(pageserver): separate missing key error (#7393)
As part of https://github.com/neondatabase/neon/pull/7375 and to improve
the current vectored get implementation, we separate the missing key
error out. This also saves us several Box allocations in the get page
implementation.

## Summary of changes

* Create a caching field of layer traversal id for each of the layer.
* Remove box allocations for layer traversal id retrieval and implement
MissingKey error message as before. This should be a little bit faster.
* Do not format error message until `Display`.
* For in-mem layer, the descriptor is different before/after frozen. I'm
using once lock for that.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-04-22 10:40:35 -04:00
Christian Schwarz
139d1346d5 pagectl draw-timeline-dir: include layer file name as an SVG comment (#7455)
fixes https://github.com/neondatabase/neon/issues/7452

Also, drive-by improve the usage instructions with commands I found
useful during that incident.

The patch in the fork of `svg_fmt` is [being
upstreamed](https://github.com/nical/rust_debug/pull/4), but, in the
meantime,
let's commit what we have because it was useful during the incident.
2024-04-22 12:55:17 +00:00
John Spray
0bd16182f7 pageserver: fix unlogged relations with sharding (#7454)
## Problem

- #7451 

INIT_FORKNUM blocks must be stored on shard 0 to enable including them
in basebackup.

This issue can be missed in simple tests because creating an unlogged
table isn't sufficient -- to repro I had to create an _index_ on an
unlogged table (then restart the endpoint).

Closes: #7451 

## Summary of changes

- Add a reproducer for the issue.
- Tweak the condition for `key_is_shard0` to include anything that isn't
a normal relation block _and_ any normal relation block whose forknum is
INIT_FORKNUM.
- To enable existing databases to recover from the issue, add a special
case that omits relations if they were stored on the wrong INITFORK.
This enables postgres to start and the user to drop the table and
recreate it.
2024-04-22 11:47:24 +00:00
Anna Khanova
6a5650d40c proxy: Make retries configurable and record it. (#7438)
## Problem

Currently we cannot configure retries, also, we don't really have
visibility of what's going on there.

## Summary of changes

* Added cli params
* Improved logging
* Decrease the number of retries: it feels like most of retries doesn't
help. Once there would be better errors handling, we can increase it
back.
2024-04-22 11:37:22 +00:00
Joonas Koivunen
47addc15f1 relaxation: allow using layers across timelines (#7453)
Before, we asserted that a layer would only be loaded by the timeline
that initially created it. Now, with the ancestor detach, we will want
to utilize remote copy as much as possible, so we will need to open
other timeline layers as our own.

Cc: #6994
2024-04-22 13:04:37 +03:00
Joonas Koivunen
b91c58a8bf refactor(Timeline): simpler metadata updates (#7422)
Currently, any `Timeline::schedule_uploads` will generate a fresh
`TimelineMetadata` instead of updating the values, which it means to
update. This makes it impossible for #6994 to work while `Timeline`
receives layer flushes by overwriting any configured new
`ancestor_timeline_id` and possible `ancestor_lsn`.

The solution is to only make full `TimelineMetadata` "updates" from one
place: branching. At runtime, update only the three fields, same as
before in `Timeline::schedule_updates`.
2024-04-22 11:57:14 +03:00
Heikki Linnakangas
00d9c2d9a8 Make another walcraft test more robust (#7439)
There were two issues with the test at page boundaries:

1. If the first logical message with 10 bytes payload crossed a page
boundary, the calculated 'base_size' was too large because it included
the page header.

2. If it was inserted near the end of a page so that there was not
enough room for another one, we did "remaining_lsn += XLOG_BLCKSZ" but
that didn't take into account the page headers either.

As a result, the test would fail if the WAL insert position at the
beginning of the test was too close to the end of a WAL page. Fix the
calculations by repeating the 10-byte logical message if the starting
position is not suitable.

I bumped into this with PR #7377; it changed the arguments of a few SQL
functions in neon_test_utils extension, which changed the WAL positions
slightly, and caused a test failure.


This is similar to https://github.com/neondatabase/neon/pull/7436, but
for different test.
2024-04-22 10:58:28 +03:00
Heikki Linnakangas
3a673dce67 Make test less sensitive to exact WAL positions (#7436)
As noted in the comment, the craft_internal() function fails if the
inserted WAL happens to land at page boundary. I bumped into that with
PR #7377; it changed the arguments of a few SQL functions in
neon_test_utils extension, which changed the WAL positions slightly, and
caused a test failure.
2024-04-22 10:58:10 +03:00
Em Sharnoff
35e9fb360b Bump vm-builder v0.23.2 -> v0.28.1 (#7433)
Only one relevant change, from v0.28.0:

- neondatabase/autoscaling#887

Double-checked with `git log neonvm/tools/vm-builder`.
2024-04-21 17:35:01 -07:00
Heikki Linnakangas
0d21187322 update rustls
## Problem

`cargo deny check` is complaining about our rustls versions, causing
CI to fail:

```
error[vulnerability]: `rustls::ConnectionCommon::complete_io` could fall into an infinite loop based on network input
    ┌─ /__w/neon/neon/Cargo.lock:395:1
    │
395 │ rustls 0.21.9 registry+https://github.com/rust-lang/crates.io-index
    │ ------------------------------------------------------------------- security vulnerability detected
    │
    = ID: RUSTSEC-2024-0336
    = Advisory: https://rustsec.org/advisories/RUSTSEC-2024-0336
    = If a `close_notify` alert is received during a handshake, `complete_io`
      does not terminate.

      Callers which do not call `complete_io` are not affected.

      `rustls-tokio` and `rustls-ffi` do not call `complete_io`
      and are not affected.

      `rustls::Stream` and `rustls::StreamOwned` types use
      `complete_io` and are affected.
    = Announcement: https://github.com/rustls/rustls/security/advisories/GHSA-6g7w-8wpp-frhj
    = Solution: Upgrade to >=0.23.5 OR >=0.22.4, <0.23.0 OR >=0.21.11, <0.22.0 (try `cargo update -p rustls`)

error[vulnerability]: `rustls::ConnectionCommon::complete_io` could fall into an infinite loop based on network input
    ┌─ /__w/neon/neon/Cargo.lock:396:1
    │
396 │ rustls 0.22.2 registry+https://github.com/rust-lang/crates.io-index
    │ ------------------------------------------------------------------- security vulnerability detected
    │
    = ID: RUSTSEC-2024-0336
    = Advisory: https://rustsec.org/advisories/RUSTSEC-2024-0336
    = If a `close_notify` alert is received during a handshake, `complete_io`
      does not terminate.

      Callers which do not call `complete_io` are not affected.

      `rustls-tokio` and `rustls-ffi` do not call `complete_io`
      and are not affected.

      `rustls::Stream` and `rustls::StreamOwned` types use
      `complete_io` and are affected.
    = Announcement: https://github.com/rustls/rustls/security/advisories/GHSA-6g7w-8wpp-frhj
    = Solution: Upgrade to >=0.23.5 OR >=0.22.4, <0.23.0 OR >=0.21.11, <0.22.0 (try `cargo update -p rustls`)
```

## Summary of changes

`cargo update -p rustls@0.21.9 -p rustls@0.22.2`
2024-04-21 21:10:05 +01:00
Alexander Bayandin
e8a98adcd0 CI: downgrade docker/setup-buildx-action to v2
- Cleanup part for `docker/setup-buildx-action` started to fail with the following error (for no obvious reason):
```
/nvme/actions-runner/_work/_actions/docker/setup-buildx-action/v3/webpack:/docker-setup-buildx/node_modules/@actions/cache/lib/cache.js:175
            throw new Error(`Path Validation Error: Path(s) specified in the action for caching do(es) not exist, hence no cache is being saved.`);
^
Error: Path Validation Error: Path(s) specified in the action for caching do(es) not exist, hence no cache is being saved.
    at Object.rejected (/nvme/actions-runner/_work/_actions/docker/setup-buildx-action/v3/webpack:/docker-setup-buildx/node_modules/@actions/cache/lib/cache.js:175:1)
    at Generator.next (<anonymous>)
    at fulfilled (/nvme/actions-runner/_work/_actions/docker/setup-buildx-action/v3/webpack:/docker-setup-buildx/node_modules/@actions/cache/lib/cache.js:29:1)
```

- Downgrade `docker/setup-buildx-action` from v3 to v2
2024-04-21 21:10:05 +01:00
John Spray
98be8b9430 storcon_cli: tenant-warmup command (#7432)
## Problem

When we migrate a large existing tenant, we would like to be able to
ensure it has pre-loaded layers onto a pageserver managed by the storage
controller.

## Summary of changes

- Add `storcon_cli tenant-warmup`, which configures the tenant into
PlacementPolicy::Secondary (unless it's already attached), and then
polls the secondary download API reporting progress.
- Extend a test case to check that when onboarding with a secondary
location pre-created, we properly use that location for our first
attachment.
2024-04-19 12:32:58 +01:00
Vlad Lazar
6eb946e2de pageserver: fix cont lsn jump on vectored read path (#7412)
## Problem
Vectored read path may return an image that's newer than the request lsn
under certain circumstances.
```
  LSN
    ^
    |
    |
500 | ------------------------- -> branch point
400 |        X
300 |        X
200 | ------------------------------------> requested lsn
100 |        X
    |---------------------------------> Key

Legend:
* X - page images
```

The vectored read path inspects each ancestor timeline one by one
starting from the current one.
When moving into the ancestor timeline, the current code resets the
current search lsn (called `cont_lsn` in code)
to the lsn of the ancestor timeline
([here](d5708e7435/pageserver/src/tenant/timeline.rs (L2971))).

For instance, if the request lsn was 200, we would:
1. Look into the current timeline and find nothing for the key
2. Descend into the ancestor timeline and set `cont_lsn=500`
3. Return the page image at LSN 400

Myself and Christian find it very unlikely for this to have happened in
prod since the vectored read path
is always used at the last record lsn.

This issue was found by a regress test during the work to migrate get
page handling to use the vectored
implementation. I've applied my fix to that wip branch and it fixed the
issue.

## Summary of changes
The fix is to set the current search lsn to the min between the
requested LSN and the ancestor lsn.
Hence, at step 2 above we would set the current search lsn to 200 and
ignore the images above that.

A test illustrating the bug is also included. Fails without the patch
and passes with it.
2024-04-18 18:40:30 +01:00
dependabot[bot]
681a04d287 build(deps): bump aiohttp from 3.9.2 to 3.9.4 (#7429) 2024-04-18 16:47:34 +00:00
Joonas Koivunen
3df67bf4d7 fix(Layer): metric regression with too many canceled evictions (#7363)
#7030 introduced an annoying papercut, deeming a failure to acquire a
strong reference to `LayerInner` from `DownloadedLayer::drop` as a
canceled eviction. Most of the time, it wasn't that, but just timeline
deletion or tenant detach with the layer not wanting to be deleted or
evicted.

When a Layer is dropped as part of a normal shutdown, the `Layer` is
dropped first, and the `DownloadedLayer` the second. Because of this, we
cannot detect eviction being canceled from the `DownloadedLayer::drop`.
We can detect it from `LayerInner::drop`, which this PR adds.

Test case is added which before had 1 started eviction, 2 canceled. Now
it accurately finds 1 started, 1 canceled.
2024-04-18 15:27:58 +00:00
John Spray
0d8e68003a Add a docs page for storage controller (#7392)
## Problem

External contributors need information on how to use the storage
controller.

## Summary of changes

- Background content on what the storage controller is.
- Deployment information on how to use it.

This is not super-detailed, but should be enough for a well motivated
third party to get started, with an occasional peek at the code.
2024-04-18 13:45:25 +00:00
John Spray
637ad4a638 pageserver: fix secondary download scheduling (#7396)
## Problem

Some tenants were observed to stop doing downloads after some time

## Summary of changes

- Fix a rogue `<` that was incorrectly scheduling work when `now` was
_before_ the scheduling target, rather than after. This usually resulted
in too-frequent execution, but could also result in never executing, if
the current time has advanced ahead of `next_download` at the time we
call `schedule()`.
- Fix in-memory list of timelines not being amended after timeline
deletion: the resulted in repeated harmless logs about the timeline
being removed, and redundant calls to remove_dir_all for the timeline
path.
- Add a log at startup to make it easier to see a particular tenant
starting in secondary mode (this is for parity with the logging that
exists when spawning an attached tenant). Previously searching on tenant
ID didn't provide a clear signal as to how the tenant was started during
pageserver start.
- Add a test that exercises secondary downloads using the background
scheduling, whereas existing tests were using the API hook to invoke
download directly.
2024-04-18 13:16:03 +01:00
Joonas Koivunen
8d0f701767 feat: copy delta layer prefix or "truncate" (#7228)
For "timeline ancestor merge" or "timeline detach," we need to "cut"
delta layers at particular LSN. The name "truncate" is not used as it
would imply that a layer file changes, instead of what happens: we copy
keys with Lsn less than a "cut point".

Cc: #6994 

Add the "copy delta layer prefix" operation to DeltaLayerInner, re-using
some of the vectored read internals. The code is `cfg(test)` until it
will be used later with a more complete integration test.
2024-04-18 10:43:04 +03:00
Anna Khanova
5191f6ef0e proxy: Record only valid rejected events (#7415)
## Problem

Sometimes rejected metric might record invalid events.

## Summary of changes

* Only record it `rejected` was explicitly set.
* Change order in logs.
* Report metrics if not under high-load.
2024-04-18 06:09:12 +01:00
Conrad Ludgate
a54ea8fb1c proxy: move endpoint rate limiter (#7413)
## Problem

## Summary of changes

Rate limit for wake_compute calls
2024-04-18 06:00:33 +01:00
53 changed files with 2332 additions and 439 deletions

View File

@@ -735,7 +735,7 @@ jobs:
run: |
mkdir -p .docker-custom
echo DOCKER_CONFIG=$(pwd)/.docker-custom >> $GITHUB_ENV
- uses: docker/setup-buildx-action@v3
- uses: docker/setup-buildx-action@v2
- uses: docker/login-action@v3
with:
@@ -792,7 +792,7 @@ jobs:
run: |
mkdir -p .docker-custom
echo DOCKER_CONFIG=$(pwd)/.docker-custom >> $GITHUB_ENV
- uses: docker/setup-buildx-action@v3
- uses: docker/setup-buildx-action@v2
with:
# Disable parallelism for docker buildkit.
# As we already build everything with `make -j$(nproc)`, running it in additional level of parallelisam blows up the Runner.
@@ -865,7 +865,7 @@ jobs:
run:
shell: sh -eu {0}
env:
VM_BUILDER_VERSION: v0.23.2
VM_BUILDER_VERSION: v0.28.1
steps:
- name: Checkout

35
Cargo.lock generated
View File

@@ -599,7 +599,7 @@ dependencies = [
"once_cell",
"pin-project-lite",
"pin-utils",
"rustls 0.21.9",
"rustls 0.21.11",
"tokio",
"tracing",
]
@@ -2519,7 +2519,7 @@ dependencies = [
"http 0.2.9",
"hyper 0.14.26",
"log",
"rustls 0.21.9",
"rustls 0.21.11",
"rustls-native-certs 0.6.2",
"tokio",
"tokio-rustls 0.24.0",
@@ -4059,7 +4059,7 @@ dependencies = [
"futures",
"once_cell",
"pq_proto",
"rustls 0.22.2",
"rustls 0.22.4",
"rustls-pemfile 2.1.1",
"serde",
"thiserror",
@@ -4350,7 +4350,7 @@ dependencies = [
"routerify",
"rstest",
"rustc-hash",
"rustls 0.22.2",
"rustls 0.22.4",
"rustls-pemfile 2.1.1",
"scopeguard",
"serde",
@@ -4542,7 +4542,7 @@ dependencies = [
"itoa",
"percent-encoding",
"pin-project-lite",
"rustls 0.22.2",
"rustls 0.22.4",
"rustls-native-certs 0.7.0",
"rustls-pemfile 2.1.1",
"rustls-pki-types",
@@ -4696,7 +4696,7 @@ dependencies = [
"once_cell",
"percent-encoding",
"pin-project-lite",
"rustls 0.21.9",
"rustls 0.21.11",
"rustls-pemfile 1.0.2",
"serde",
"serde_json",
@@ -4956,9 +4956,9 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.21.9"
version = "0.21.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "629648aced5775d558af50b2b4c7b02983a04b312126d45eeead26e7caa498b9"
checksum = "7fecbfb7b1444f477b345853b1fce097a2c6fb637b2bfb87e6bc5db0f043fae4"
dependencies = [
"log",
"ring 0.17.6",
@@ -4968,9 +4968,9 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.22.2"
version = "0.22.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41"
checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432"
dependencies = [
"log",
"ring 0.17.6",
@@ -5282,7 +5282,7 @@ checksum = "2e95efd0cefa32028cdb9766c96de71d96671072f9fb494dc9fb84c0ef93e52b"
dependencies = [
"httpdate",
"reqwest",
"rustls 0.21.9",
"rustls 0.21.11",
"sentry-backtrace",
"sentry-contexts",
"sentry-core",
@@ -5830,8 +5830,7 @@ checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc"
[[package]]
name = "svg_fmt"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f83ba502a3265efb76efb89b0a2f7782ad6f2675015d4ce37e4b547dda42b499"
source = "git+https://github.com/neondatabase/fork--nical--rust_debug?branch=neon#b9501105e746629004bc6d0473639320939dbe10"
[[package]]
name = "syn"
@@ -6193,7 +6192,7 @@ checksum = "0ea13f22eda7127c827983bdaf0d7fff9df21c8817bab02815ac277a21143677"
dependencies = [
"futures",
"ring 0.17.6",
"rustls 0.22.2",
"rustls 0.22.4",
"tokio",
"tokio-postgres",
"tokio-rustls 0.25.0",
@@ -6206,7 +6205,7 @@ version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0d409377ff5b1e3ca6437aa86c1eb7d40c134bfec254e44c830defa92669db5"
dependencies = [
"rustls 0.21.9",
"rustls 0.21.11",
"tokio",
]
@@ -6216,7 +6215,7 @@ version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f"
dependencies = [
"rustls 0.22.2",
"rustls 0.22.4",
"rustls-pki-types",
"tokio",
]
@@ -6677,7 +6676,7 @@ dependencies = [
"base64 0.21.1",
"log",
"once_cell",
"rustls 0.21.9",
"rustls 0.21.11",
"rustls-webpki 0.100.2",
"url",
"webpki-roots 0.23.1",
@@ -7354,7 +7353,7 @@ dependencies = [
"regex-automata 0.4.3",
"regex-syntax 0.8.2",
"reqwest",
"rustls 0.21.9",
"rustls 0.21.11",
"scopeguard",
"serde",
"serde_json",

View File

@@ -157,7 +157,8 @@ socket2 = "0.5"
strum = "0.24"
strum_macros = "0.24"
"subtle" = "2.5.0"
svg_fmt = "0.4.1"
# https://github.com/nical/rust_debug/pull/4
svg_fmt = { git = "https://github.com/neondatabase/fork--nical--rust_debug", branch = "neon" }
sync_wrapper = "0.1.2"
tar = "0.4"
task-local-extensions = "0.1.4"

View File

@@ -1,15 +1,15 @@
use std::{collections::HashMap, str::FromStr};
use std::{collections::HashMap, str::FromStr, time::Duration};
use clap::{Parser, Subcommand};
use hyper::Method;
use hyper::{Method, StatusCode};
use pageserver_api::{
controller_api::{
NodeAvailabilityWrapper, NodeDescribeResponse, ShardSchedulingPolicy,
TenantDescribeResponse, TenantPolicyRequest,
},
models::{
ShardParameters, TenantConfig, TenantConfigRequest, TenantCreateRequest,
TenantShardSplitRequest, TenantShardSplitResponse,
LocationConfigSecondary, ShardParameters, TenantConfig, TenantConfigRequest,
TenantCreateRequest, TenantShardSplitRequest, TenantShardSplitResponse,
},
shard::{ShardStripeSize, TenantShardId},
};
@@ -120,6 +120,12 @@ enum Command {
#[arg(long)]
tenant_id: TenantId,
},
/// For a tenant which hasn't been onboarded to the storage controller yet, add it in secondary
/// mode so that it can warm up content on a pageserver.
TenantWarmup {
#[arg(long)]
tenant_id: TenantId,
},
}
#[derive(Parser)]
@@ -581,6 +587,94 @@ async fn main() -> anyhow::Result<()> {
}
println!("{table}");
}
Command::TenantWarmup { tenant_id } => {
let describe_response = storcon_client
.dispatch::<(), TenantDescribeResponse>(
Method::GET,
format!("control/v1/tenant/{tenant_id}"),
None,
)
.await;
match describe_response {
Ok(describe) => {
if matches!(describe.policy, PlacementPolicy::Secondary) {
// Fine: it's already known to controller in secondary mode: calling
// again to put it into secondary mode won't cause problems.
} else {
anyhow::bail!("Tenant already present with policy {:?}", describe.policy);
}
}
Err(mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, _)) => {
// Fine: this tenant isn't know to the storage controller yet.
}
Err(e) => {
// Unexpected API error
return Err(e.into());
}
}
vps_client
.location_config(
TenantShardId::unsharded(tenant_id),
pageserver_api::models::LocationConfig {
mode: pageserver_api::models::LocationConfigMode::Secondary,
generation: None,
secondary_conf: Some(LocationConfigSecondary { warm: true }),
shard_number: 0,
shard_count: 0,
shard_stripe_size: ShardParameters::DEFAULT_STRIPE_SIZE.0,
tenant_conf: TenantConfig::default(),
},
None,
true,
)
.await?;
let describe_response = storcon_client
.dispatch::<(), TenantDescribeResponse>(
Method::GET,
format!("control/v1/tenant/{tenant_id}"),
None,
)
.await?;
let secondary_ps_id = describe_response
.shards
.first()
.unwrap()
.node_secondary
.first()
.unwrap();
println!("Tenant {tenant_id} warming up on pageserver {secondary_ps_id}");
loop {
let (status, progress) = vps_client
.tenant_secondary_download(
TenantShardId::unsharded(tenant_id),
Some(Duration::from_secs(10)),
)
.await?;
println!(
"Progress: {}/{} layers, {}/{} bytes",
progress.layers_downloaded,
progress.layers_total,
progress.bytes_downloaded,
progress.bytes_total
);
match status {
StatusCode::OK => {
println!("Download complete");
break;
}
StatusCode::ACCEPTED => {
// Loop
}
_ => {
anyhow::bail!("Unexpected download status: {status}");
}
}
}
}
}
Ok(())

150
docs/storage_controller.md Normal file
View File

@@ -0,0 +1,150 @@
# Storage Controller
## Concepts
The storage controller sits between administrative API clients and pageservers, and handles the details of mapping tenants to pageserver tenant shards. For example, creating a tenant is one API call to the storage controller,
which is mapped into many API calls to many pageservers (for multiple shards, and for secondary locations).
It implements a pageserver-compatible API that may be used for CRUD operations on tenants and timelines, translating these requests into appropriate operations on the shards within a tenant, which may be on many different pageservers. Using this API, the storage controller may be used in the same way as the pageserver's administrative HTTP API, hiding
the underlying details of how data is spread across multiple nodes.
The storage controller also manages generations, high availability (via secondary locations) and live migrations for tenants under its management. This is done with a reconciliation loop pattern, where tenants have an “intent” state and a “reconcile” task that tries to make the outside world match the intent.
## APIs
The storage controllers HTTP server implements four logically separate APIs:
- `/v1/...` path is the pageserver-compatible API. This has to be at the path root because thats where clients expect to find it on a pageserver.
- `/control/v1/...` path is the storage controllers API, which enables operations such as registering and management pageservers, or executing shard splits.
- `/debug/v1/...` path contains endpoints which are either exclusively used in tests, or are for use by engineers when supporting a deployed system.
- `/upcall/v1/...` path contains endpoints that are called by pageservers. This includes the `/re-attach` and `/validate` APIs used by pageservers
to ensure data safety with generation numbers.
The API is authenticated with a JWT token, and tokens must have scope `pageserverapi` (i.e. the same scope as pageservers APIs).
See the `http.rs` file in the source for where the HTTP APIs are implemented.
## Database
The storage controller uses a postgres database to persist a subset of its state. Note that the storage controller does _not_ keep all its state in the database: this is a design choice to enable most operations to be done efficiently in memory, rather than having to read from the database. See `persistence.rs` for a more comprehensive comment explaining what we do and do not persist: a useful metaphor is that we persist objects like tenants and nodes, but we do not
persist the _relationships_ between them: the attachment state of a tenant's shards to nodes is kept in memory and
rebuilt on startup.
The file `[persistence.rs](http://persistence.rs)` contains all the code for accessing the database, and has a large doc comment that goes into more detail about exactly what we persist and why.
The `diesel` crate is used for defining models & migrations.
Running a local cluster with `cargo neon` automatically starts a vanilla postgress process to host the storage controllers database.
### Diesel tip: migrations
If you need to modify the database schema, heres how to create a migration:
- Install the diesel CLI with `cargo install diesel_cli`
- Use `diesel migration generate <name>` to create a new migration
- Populate the SQL files in the `migrations/` subdirectory
- Use `DATABASE_URL=... diesel migration run` to apply the migration you just wrote: this will update the `[schema.rs](http://schema.rs)` file automatically.
- This requires a running database: the easiest way to do that is to just run `cargo neon init ; cargo neon start`, which will leave a database available at `postgresql://localhost:1235/attachment_service`
- Commit the migration files and the changes to schema.rs
- If you need to iterate, you can rewind migrations with `diesel migration revert -a` and then `diesel migration run` again.
- The migrations are build into the storage controller binary, and automatically run at startup after it is deployed, so once youve committed a migration no further steps are needed.
## storcon_cli
The `storcon_cli` tool enables interactive management of the storage controller. This is usually
only necessary for debug, but may also be used to manage nodes (e.g. marking a node as offline).
`storcon_cli --help` includes details on commands.
# Deploying
This section is aimed at engineers deploying the storage controller outside of Neon's cloud platform, as
part of a self-hosted system.
_General note: since the default `neon_local` environment includes a storage controller, this is a useful
reference when figuring out deployment._
## Database
It is **essential** that the database used by the storage controller is durable (**do not store it on ephemeral
local disk**). This database contains pageserver generation numbers, which are essential to data safety on the pageserver.
The resource requirements for the database are very low: a single CPU core and 1GiB of memory should work well for most deployments. The physical size of the database is typically under a gigabyte.
Set the URL to the database using the `--database-url` CLI option.
There is no need to run migrations manually: the storage controller automatically applies migrations
when it starts up.
## Configure pageservers to use the storage controller
1. The pageserver `control_plane_api` and `control_plane_api_token` should be set in the `pageserver.toml` file. The API setting should
point to the "upcall" prefix, for example `http://127.0.0.1:1234/upcall/v1/` is used in neon_local clusters.
2. Create a `metadata.json` file in the same directory as `pageserver.toml`: this enables the pageserver to automatically register itself
with the storage controller when it starts up. See the example below for the format of this file.
### Example `metadata.json`
```
{"host":"acmehost.localdomain","http_host":"acmehost.localdomain","http_port":9898,"port":64000}
```
- `port` and `host` refer to the _postgres_ port and host, and these must be accessible from wherever
postgres runs.
- `http_port` and `http_host` refer to the pageserver's HTTP api, this must be accessible from where
the storage controller runs.
## Handle compute notifications.
The storage controller independently moves tenant attachments between pageservers in response to
changes such as a pageserver node becoming unavailable, or the tenant's shard count changing. To enable
postgres clients to handle such changes, the storage controller calls an API hook when a tenant's pageserver
location changes.
The hook is configured using the storage controller's `--compute-hook-url` CLI option. If the hook requires
JWT auth, the token may be provided with `--control-plane-jwt-token`. The hook will be invoked with a `PUT` request.
In the Neon cloud service, this hook is implemented by Neon's internal cloud control plane. In `neon_local` systems
the storage controller integrates directly with neon_local to reconfigure local postgres processes instead of calling
the compute hook.
When implementing an on-premise Neon deployment, you must implement a service that handles the compute hook. This is not complicated:
the request body has format of the `ComputeHookNotifyRequest` structure, provided below for convenience.
```
struct ComputeHookNotifyRequestShard {
node_id: NodeId,
shard_number: ShardNumber,
}
struct ComputeHookNotifyRequest {
tenant_id: TenantId,
stripe_size: Option<ShardStripeSize>,
shards: Vec<ComputeHookNotifyRequestShard>,
}
```
When a notification is received:
1. Modify postgres configuration for this tenant:
- set `neon.pageserver_connstr` to a comma-separated list of postgres connection strings to pageservers according to the `shards` list. The
shards identified by `NodeId` must be converted to the address+port of the node.
- if stripe_size is not None, set `neon.stripe_size` to this value
2. Send SIGHUP to postgres to reload configuration
3. Respond with 200 to the notification request. Do not return success if postgres was not updated: if an error is returned, the controller
will retry the notification until it succeeds..
### Example notification body
```
{
"tenant_id": "1f359dd625e519a1a4e8d7509690f6fc",
"stripe_size": 32768,
"shards": [
{"node_id": 344, "shard_number": 0},
{"node_id": 722, "shard_number": 1},
],
}
```

View File

@@ -5,6 +5,7 @@ use crate::{
models::ShardParameters,
};
use hex::FromHex;
use postgres_ffi::relfile_utils::INIT_FORKNUM;
use serde::{Deserialize, Serialize};
use utils::id::TenantId;
@@ -537,6 +538,24 @@ impl ShardIdentity {
}
}
/// Special case for issue `<https://github.com/neondatabase/neon/issues/7451>`
///
/// When we fail to read a forknum block, this function tells us whether we may ignore the error
/// as a symptom of that issue.
pub fn is_key_buggy_forknum(&self, key: &Key) -> bool {
if !is_rel_block_key(key) || key.field5 != INIT_FORKNUM {
return false;
}
let mut hash = murmurhash32(key.field4);
hash = hash_combine(hash, murmurhash32(key.field6 / self.stripe_size.0));
let mapped_shard = ShardNumber((hash % self.count.0 as u32) as u8);
// The key may be affected by issue #7454: it is an initfork and it would not
// have mapped to shard 0 until we fixed that issue.
mapped_shard != ShardNumber(0)
}
/// Return true if the key should be discarded if found in this shard's
/// data store, e.g. during compaction after a split.
///
@@ -649,7 +668,13 @@ fn key_is_shard0(key: &Key) -> bool {
// relation pages are distributed to shards other than shard zero. Everything else gets
// stored on shard 0. This guarantees that shard 0 can independently serve basebackup
// requests, and any request other than those for particular blocks in relations.
!is_rel_block_key(key)
//
// The only exception to this rule is "initfork" data -- this relates to postgres's UNLOGGED table
// type. These are special relations, usually with only 0 or 1 blocks, and we store them on shard 0
// because they must be included in basebackups.
let is_initfork = key.field5 == INIT_FORKNUM;
!is_rel_block_key(key) || is_initfork
}
/// Provide the same result as the function in postgres `hashfn.h` with the same name

View File

@@ -118,7 +118,9 @@ pub use v14::bindings::{TimeLineID, TimestampTz, XLogRecPtr, XLogSegNo};
// Likewise for these, although the assumption that these don't change is a little more iffy.
pub use v14::bindings::{MultiXactOffset, MultiXactStatus};
pub use v14::bindings::{PageHeaderData, XLogRecord};
pub use v14::xlog_utils::{XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD};
pub use v14::xlog_utils::{
XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
};
pub use v14::bindings::{CheckPoint, ControlFileData};

View File

@@ -4,7 +4,9 @@ use log::*;
use postgres::types::PgLsn;
use postgres::Client;
use postgres_ffi::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
use postgres_ffi::{XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD};
use postgres_ffi::{
XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
};
use std::path::{Path, PathBuf};
use std::process::Command;
use std::time::{Duration, Instant};
@@ -262,11 +264,21 @@ fn craft_internal<C: postgres::GenericClient>(
intermediate_lsns.insert(0, initial_lsn);
}
// Some records may be not flushed, e.g. non-transactional logical messages.
// Some records may be not flushed, e.g. non-transactional logical messages. Flush now.
//
// Note: this is broken if pg_current_wal_insert_lsn is at page boundary
// because pg_current_wal_insert_lsn skips page headers.
client.execute("select neon_xlogflush(pg_current_wal_insert_lsn())", &[])?;
// If the previous WAL record ended exactly at page boundary, pg_current_wal_insert_lsn
// returns the position just after the page header on the next page. That's where the next
// record will be inserted. But the page header hasn't actually been written to the WAL
// yet, and if you try to flush it, you get a "request to flush past end of generated WAL"
// error. Because of that, if the insert location is just after a page header, back off to
// previous page boundary.
let mut lsn = u64::from(client.pg_current_wal_insert_lsn()?);
if lsn % WAL_SEGMENT_SIZE as u64 == XLOG_SIZE_OF_XLOG_LONG_PHD as u64 {
lsn -= XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
} else if lsn % XLOG_BLCKSZ as u64 == XLOG_SIZE_OF_XLOG_SHORT_PHD as u64 {
lsn -= XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
}
client.execute("select neon_xlogflush($1)", &[&PgLsn::from(lsn)])?;
Ok(intermediate_lsns)
}
@@ -320,38 +332,49 @@ impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
client.execute("CREATE table t(x int)", &[])?;
// Add padding so the XLOG_SWITCH record ends exactly on XLOG_BLCKSZ boundary.
// We will use logical message as the padding. We start with detecting how much WAL
// it takes for one logical message, considering all alignments and headers.
let base_wal_advance = {
// Add padding so the XLOG_SWITCH record ends exactly on XLOG_BLCKSZ boundary. We
// will use carefully-sized logical messages to advance WAL insert location such
// that there is just enough space on the page for the XLOG_SWITCH record.
loop {
// We start with measuring how much WAL it takes for one logical message,
// considering all alignments and headers.
let before_lsn = client.pg_current_wal_insert_lsn()?;
// Small non-empty message bigger than few bytes is more likely than an empty
// message to have the same format as the big padding message.
client.execute(
"SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', 10))",
&[],
)?;
// The XLOG_SWITCH record has no data => its size is exactly XLOG_SIZE_OF_XLOG_RECORD.
(u64::from(client.pg_current_wal_insert_lsn()?) - u64::from(before_lsn)) as usize
+ XLOG_SIZE_OF_XLOG_RECORD
};
let mut remaining_lsn =
XLOG_BLCKSZ - u64::from(client.pg_current_wal_insert_lsn()?) as usize % XLOG_BLCKSZ;
if remaining_lsn < base_wal_advance {
remaining_lsn += XLOG_BLCKSZ;
let after_lsn = client.pg_current_wal_insert_lsn()?;
// Did the record cross a page boundary? If it did, start over. Crossing a
// page boundary adds to the apparent size of the record because of the page
// header, which throws off the calculation.
if u64::from(before_lsn) / XLOG_BLCKSZ as u64
!= u64::from(after_lsn) / XLOG_BLCKSZ as u64
{
continue;
}
// base_size is the size of a logical message without the payload
let base_size = u64::from(after_lsn) - u64::from(before_lsn) - 10;
// Is there enough space on the page for another logical message and an
// XLOG_SWITCH? If not, start over.
let page_remain = XLOG_BLCKSZ as u64 - u64::from(after_lsn) % XLOG_BLCKSZ as u64;
if page_remain < base_size - XLOG_SIZE_OF_XLOG_RECORD as u64 {
continue;
}
// We will write another logical message, such that after the logical message
// record, there will be space for exactly one XLOG_SWITCH. How large should
// the logical message's payload be? An XLOG_SWITCH record has no data => its
// size is exactly XLOG_SIZE_OF_XLOG_RECORD.
let repeats = page_remain - base_size - XLOG_SIZE_OF_XLOG_RECORD as u64;
client.execute(
"SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', $1))",
&[&(repeats as i32)],
)?;
break;
}
let repeats = 10 + remaining_lsn - base_wal_advance;
info!(
"current_wal_insert_lsn={}, remaining_lsn={}, base_wal_advance={}, repeats={}",
client.pg_current_wal_insert_lsn()?,
remaining_lsn,
base_wal_advance,
repeats
);
client.execute(
"SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', $1))",
&[&(repeats as i32)],
)?;
info!(
"current_wal_insert_lsn={}, XLOG_SIZE_OF_XLOG_RECORD={}",
client.pg_current_wal_insert_lsn()?,

View File

@@ -5,6 +5,7 @@ use std::time::{Duration, Instant};
pub struct RateLimit {
last: Option<Instant>,
interval: Duration,
rate_limited: u32,
}
impl RateLimit {
@@ -12,9 +13,16 @@ impl RateLimit {
Self {
last: None,
interval,
rate_limited: 0,
}
}
/// Returns the number of calls that were rate limited
/// since the last one was allowed.
pub fn rate_limited(&self) -> u32 {
self.rate_limited
}
/// Call `f` if the rate limit allows.
/// Don't call it otherwise.
pub fn call<F: FnOnce()>(&mut self, f: F) {
@@ -22,9 +30,13 @@ impl RateLimit {
match self.last {
Some(last) if now - last <= self.interval => {
// ratelimit
if let Some(updated) = self.rate_limited.checked_add(1) {
self.rate_limited = updated;
}
}
_ => {
self.last = Some(now);
self.rate_limited = 0;
f();
}
}
@@ -50,17 +62,24 @@ mod tests {
f.call(cl);
assert_eq!(called.load(Relaxed), 1);
assert_eq!(f.rate_limited(), 0);
f.call(cl);
assert_eq!(called.load(Relaxed), 1);
assert_eq!(f.rate_limited(), 1);
f.call(cl);
assert_eq!(called.load(Relaxed), 1);
assert_eq!(f.rate_limited(), 2);
std::thread::sleep(Duration::from_millis(100));
f.call(cl);
assert_eq!(called.load(Relaxed), 2);
assert_eq!(f.rate_limited(), 0);
f.call(cl);
assert_eq!(called.load(Relaxed), 2);
assert_eq!(f.rate_limited(), 1);
f.call(cl);
std::thread::sleep(Duration::from_millis(100));
f.call(cl);
assert_eq!(called.load(Relaxed), 3);
assert_eq!(f.rate_limited(), 0);
}
}

View File

@@ -192,6 +192,14 @@ impl<T> OnceCell<T> {
}
}
/// Like [`Guard::take_and_deinit`], but will return `None` if this OnceCell was never
/// initialized.
pub fn take_and_deinit(&mut self) -> Option<(T, InitPermit)> {
let inner = self.inner.get_mut().unwrap();
inner.take_and_deinit()
}
/// Return the number of [`Self::get_or_init`] calls waiting for initialization to complete.
pub fn initializer_count(&self) -> usize {
self.initializers.load(Ordering::Relaxed)
@@ -246,15 +254,23 @@ impl<'a, T> Guard<'a, T> {
/// The permit will be on a semaphore part of the new internal value, and any following
/// [`OnceCell::get_or_init`] will wait on it to complete.
pub fn take_and_deinit(mut self) -> (T, InitPermit) {
self.0
.take_and_deinit()
.expect("guard is not created unless value has been initialized")
}
}
impl<T> Inner<T> {
pub fn take_and_deinit(&mut self) -> Option<(T, InitPermit)> {
let value = self.value.take()?;
let mut swapped = Inner::default();
let sem = swapped.init_semaphore.clone();
// acquire and forget right away, moving the control over to InitPermit
sem.try_acquire().expect("we just created this").forget();
std::mem::swap(&mut *self.0, &mut swapped);
swapped
.value
.map(|v| (v, InitPermit(sem)))
.expect("guard is not created unless value has been initialized")
let permit = InitPermit(sem);
std::mem::swap(self, &mut swapped);
Some((value, permit))
}
}
@@ -263,6 +279,13 @@ impl<'a, T> Guard<'a, T> {
/// On drop, this type will return the permit.
pub struct InitPermit(Arc<tokio::sync::Semaphore>);
impl std::fmt::Debug for InitPermit {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let ptr = Arc::as_ptr(&self.0) as *const ();
f.debug_tuple("InitPermit").field(&ptr).finish()
}
}
impl Drop for InitPermit {
fn drop(&mut self) {
assert_eq!(
@@ -559,4 +582,22 @@ mod tests {
assert_eq!(*target.get().unwrap(), 11);
}
#[tokio::test]
async fn take_and_deinit_on_mut() {
use std::convert::Infallible;
let mut target = OnceCell::<u32>::default();
assert!(target.take_and_deinit().is_none());
target
.get_or_init(|permit| async move { Ok::<_, Infallible>((42, permit)) })
.await
.unwrap();
let again = target.take_and_deinit();
assert!(matches!(again, Some((42, _))), "{again:?}");
assert!(target.take_and_deinit().is_none());
}
}

View File

@@ -9,18 +9,45 @@
//! Coordinates in both axis are compressed for better readability.
//! (see <https://medium.com/algorithms-digest/coordinate-compression-2fff95326fb>)
//!
//! Example use:
//! The plain text API was chosen so that we can easily work with filenames from various
//! sources; see the Usage section below for examples.
//!
//! # Usage
//!
//! ## Producing the SVG
//!
//! ```bash
//! $ ls test_output/test_pgbench\[neon-45-684\]/repo/tenants/$TENANT/timelines/$TIMELINE | \
//! $ grep "__" | cargo run --release --bin pagectl draw-timeline-dir > out.svg
//! $ firefox out.svg
//!
//! # local timeline dir
//! ls test_output/test_pgbench\[neon-45-684\]/repo/tenants/$TENANT/timelines/$TIMELINE | \
//! grep "__" | cargo run --release --bin pagectl draw-timeline-dir > out.svg
//!
//! # Layer map dump from `/v1/tenant/$TENANT/timeline/$TIMELINE/layer`
//! (jq -r '.historic_layers[] | .layer_file_name' | cargo run -p pagectl draw-timeline) < layer-map.json > out.svg
//!
//! # From an `index_part.json` in S3
//! (jq -r '.layer_metadata | keys[]' | cargo run -p pagectl draw-timeline ) < index_part.json-00000016 > out.svg
//!
//! ```
//!
//! This API was chosen so that we can easily work with filenames extracted from ssh,
//! or from pageserver log files.
//! ## Viewing
//!
//! TODO Consider shipping this as a grafana panel plugin:
//! <https://grafana.com/tutorials/build-a-panel-plugin/>
//! **Inkscape** is better than the built-in viewers in browsers.
//!
//! After selecting a layer file rectangle, use "Open XML Editor" (Ctrl|Cmd + Shift + X)
//! to see the layer file name in the comment field.
//!
//! ```bash
//!
//! # Linux
//! inkscape out.svg
//!
//! # macOS
//! /Applications/Inkscape.app/Contents/MacOS/inkscape out.svg
//!
//! ```
//!
use anyhow::Result;
use pageserver::repository::Key;
use pageserver::METADATA_FILE_NAME;
@@ -65,7 +92,12 @@ fn parse_filename(name: &str) -> (Range<Key>, Range<Lsn>) {
pub fn main() -> Result<()> {
// Parse layer filenames from stdin
let mut ranges: Vec<(Range<Key>, Range<Lsn>)> = vec![];
struct Layer {
filename: String,
key_range: Range<Key>,
lsn_range: Range<Lsn>,
}
let mut files: Vec<Layer> = vec![];
let stdin = io::stdin();
for line in stdin.lock().lines() {
let line = line.unwrap();
@@ -76,14 +108,23 @@ pub fn main() -> Result<()> {
// Don't try and parse "metadata" like a key-lsn range
continue;
}
let range = parse_filename(filename);
ranges.push(range);
let (key_range, lsn_range) = parse_filename(filename);
files.push(Layer {
filename: filename.to_owned(),
key_range,
lsn_range,
});
}
// Collect all coordinates
let mut keys: Vec<Key> = vec![];
let mut lsns: Vec<Lsn> = vec![];
for (keyr, lsnr) in &ranges {
for Layer {
key_range: keyr,
lsn_range: lsnr,
..
} in &files
{
keys.push(keyr.start);
keys.push(keyr.end);
lsns.push(lsnr.start);
@@ -107,7 +148,12 @@ pub fn main() -> Result<()> {
h: stretch * lsn_map.len() as f32
}
);
for (keyr, lsnr) in &ranges {
for Layer {
filename,
key_range: keyr,
lsn_range: lsnr,
} in &files
{
let key_start = *key_map.get(&keyr.start).unwrap();
let key_end = *key_map.get(&keyr.end).unwrap();
let key_diff = key_end - key_start;
@@ -151,6 +197,7 @@ pub fn main() -> Result<()> {
.fill(fill)
.stroke(Stroke::Color(rgb(0, 0, 0), 0.1))
.border_radius(0.4)
.comment(filename)
);
}
println!("{}", EndSvg);

View File

@@ -13,7 +13,7 @@
use anyhow::{anyhow, bail, ensure, Context};
use bytes::{BufMut, Bytes, BytesMut};
use fail::fail_point;
use pageserver_api::key::{key_to_slru_block, Key};
use pageserver_api::key::{key_to_slru_block, rel_block_to_key, Key};
use postgres_ffi::pg_constants;
use std::fmt::Write as FmtWrite;
use std::time::SystemTime;
@@ -297,7 +297,20 @@ where
if rel.forknum == INIT_FORKNUM {
// I doubt we need _init fork itself, but having it at least
// serves as a marker relation is unlogged.
self.add_rel(rel, rel).await?;
if let Err(_e) = self.add_rel(rel, rel).await {
if self
.timeline
.get_shard_identity()
.is_key_buggy_forknum(&rel_block_to_key(rel, 0x0))
{
// Workaround https://github.com/neondatabase/neon/issues/7451 -- if we have an unlogged relation
// whose INIT_FORKNUM is not correctly on shard zero, then omit it in the basebackup. This allows
// postgres to start up. The relation won't work, but it will be possible to DROP TABLE on it and
// recreate.
tracing::warn!("Omitting relation {rel} for issue #7451: drop and recreate this unlogged relation");
continue;
}
};
self.add_rel(rel, rel.with_forknum(MAIN_FORKNUM)).await?;
continue;
}

View File

@@ -160,6 +160,9 @@ impl From<PageReconstructError> for ApiError {
fn from(pre: PageReconstructError) -> ApiError {
match pre {
PageReconstructError::Other(pre) => ApiError::InternalServerError(pre),
PageReconstructError::MissingKey(e) => {
ApiError::InternalServerError(anyhow::anyhow!("{e}"))
}
PageReconstructError::Cancelled => {
ApiError::InternalServerError(anyhow::anyhow!("request was cancelled"))
}

View File

@@ -86,11 +86,20 @@ pub(crate) static STORAGE_TIME_GLOBAL: Lazy<HistogramVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static READ_NUM_FS_LAYERS: Lazy<Histogram> = Lazy::new(|| {
pub(crate) static READ_NUM_LAYERS_VISITED: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_read_num_fs_layers",
"Number of persistent layers accessed for processing a read request, including those in the cache",
vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 10.0, 20.0, 50.0, 100.0],
"pageserver_layers_visited_per_read_global",
"Number of layers visited to reconstruct one key",
vec![1.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0],
)
.expect("failed to define a metric")
});
pub(crate) static VEC_READ_NUM_LAYERS_VISITED: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_layers_visited_per_vectored_read_global",
"Average number of layers visited to reconstruct one key",
vec![1.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0],
)
.expect("failed to define a metric")
});
@@ -1518,7 +1527,8 @@ pub(crate) struct SecondaryModeMetrics {
pub(crate) download_heatmap: IntCounter,
pub(crate) download_layer: IntCounter,
}
pub(crate) static SECONDARY_MODE: Lazy<SecondaryModeMetrics> = Lazy::new(|| SecondaryModeMetrics {
pub(crate) static SECONDARY_MODE: Lazy<SecondaryModeMetrics> = Lazy::new(|| {
SecondaryModeMetrics {
upload_heatmap: register_int_counter!(
"pageserver_secondary_upload_heatmap",
"Number of heatmaps written to remote storage by attached tenants"
@@ -1536,7 +1546,7 @@ pub(crate) static SECONDARY_MODE: Lazy<SecondaryModeMetrics> = Lazy::new(|| Seco
.expect("failed to define a metric"),
download_heatmap: register_int_counter!(
"pageserver_secondary_download_heatmap",
"Number of downloads of heatmaps by secondary mode locations"
"Number of downloads of heatmaps by secondary mode locations, including when it hasn't changed"
)
.expect("failed to define a metric"),
download_layer: register_int_counter!(
@@ -1544,6 +1554,7 @@ pub(crate) static SECONDARY_MODE: Lazy<SecondaryModeMetrics> = Lazy::new(|| Seco
"Number of downloads of layers by secondary mode locations"
)
.expect("failed to define a metric"),
}
});
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
@@ -2769,7 +2780,8 @@ pub fn preinitialize_metrics() {
// histograms
[
&READ_NUM_FS_LAYERS,
&READ_NUM_LAYERS_VISITED,
&VEC_READ_NUM_LAYERS_VISITED,
&WAIT_LSN_TIME,
&WAL_REDO_TIME,
&WAL_REDO_RECORDS_HISTOGRAM,

View File

@@ -1446,10 +1446,14 @@ impl<'a> DatadirModification<'a> {
// reset the map.
return Err(e.into());
}
// FIXME: PageReconstructError doesn't have an explicit variant for key-not-found, so
// we are assuming that all _other_ possible errors represents a missing key. If some
// other error occurs, we may incorrectly reset the map of aux files.
Err(PageReconstructError::Other(_) | PageReconstructError::WalRedo(_)) => {
// Note: we added missing key error variant in https://github.com/neondatabase/neon/pull/7393 but
// the original code assumes all other errors are missing keys. Therefore, we keep the code path
// the same for now, though in theory, we should only match the `MissingKey` variant.
Err(
PageReconstructError::Other(_)
| PageReconstructError::WalRedo(_)
| PageReconstructError::MissingKey { .. },
) => {
// Key is missing, we must insert an image as the basis for subsequent deltas.
let mut dir = AuxFilesDirectory {

View File

@@ -33,6 +33,52 @@ impl Value {
}
}
#[cfg(test)]
#[derive(Debug, PartialEq)]
pub(crate) enum InvalidInput {
TooShortValue,
TooShortPostgresRecord,
}
/// We could have a ValueRef where everything is `serde(borrow)`. Before implementing that, lets
/// use this type for querying if a slice looks some particular way.
#[cfg(test)]
pub(crate) struct ValueBytes;
#[cfg(test)]
impl ValueBytes {
pub(crate) fn will_init(raw: &[u8]) -> Result<bool, InvalidInput> {
if raw.len() < 12 {
return Err(InvalidInput::TooShortValue);
}
let value_discriminator = &raw[0..4];
if value_discriminator == [0, 0, 0, 0] {
// Value::Image always initializes
return Ok(true);
}
if value_discriminator != [0, 0, 0, 1] {
// not a Value::WalRecord(..)
return Ok(false);
}
let walrecord_discriminator = &raw[4..8];
if walrecord_discriminator != [0, 0, 0, 0] {
// only NeonWalRecord::Postgres can have will_init
return Ok(false);
}
if raw.len() < 17 {
return Err(InvalidInput::TooShortPostgresRecord);
}
Ok(raw[8] == 1)
}
}
#[cfg(test)]
mod test {
use super::*;
@@ -70,6 +116,8 @@ mod test {
];
roundtrip!(image, expected);
assert!(ValueBytes::will_init(&expected).unwrap());
}
#[test]
@@ -93,6 +141,96 @@ mod test {
];
roundtrip!(rec, expected);
assert!(ValueBytes::will_init(&expected).unwrap());
}
#[test]
fn bytes_inspection_too_short_image() {
let rec = Value::Image(Bytes::from_static(b""));
#[rustfmt::skip]
let expected = [
// top level discriminator of 4 bytes
0x00, 0x00, 0x00, 0x00,
// 8 byte length
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
];
roundtrip!(rec, expected);
assert!(ValueBytes::will_init(&expected).unwrap());
assert_eq!(expected.len(), 12);
for len in 0..12 {
assert_eq!(
ValueBytes::will_init(&expected[..len]).unwrap_err(),
InvalidInput::TooShortValue
);
}
}
#[test]
fn bytes_inspection_too_short_postgres_record() {
let rec = NeonWalRecord::Postgres {
will_init: false,
rec: Bytes::from_static(b""),
};
let rec = Value::WalRecord(rec);
#[rustfmt::skip]
let expected = [
// flattened discriminator of total 8 bytes
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x00,
// will_init
0x00,
// 8 byte length
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
];
roundtrip!(rec, expected);
assert!(!ValueBytes::will_init(&expected).unwrap());
assert_eq!(expected.len(), 17);
for len in 12..17 {
assert_eq!(
ValueBytes::will_init(&expected[..len]).unwrap_err(),
InvalidInput::TooShortPostgresRecord
)
}
for len in 0..12 {
assert_eq!(
ValueBytes::will_init(&expected[..len]).unwrap_err(),
InvalidInput::TooShortValue
)
}
}
#[test]
fn clear_visibility_map_flags_example() {
let rec = NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno: Some(0x11),
old_heap_blkno: None,
flags: 0x03,
};
let rec = Value::WalRecord(rec);
#[rustfmt::skip]
let expected = [
// discriminators
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x01,
// Some == 1 followed by 4 bytes
0x01, 0x00, 0x00, 0x00, 0x11,
// None == 0
0x00,
// flags
0x03
];
roundtrip!(rec, expected);
assert!(!ValueBytes::will_init(&expected).unwrap());
}
}

View File

@@ -559,9 +559,10 @@ impl Tenant {
// By doing what we do here, the index part upload is retried.
// If control plane retries timeline creation in the meantime, the mgmt API handler
// for timeline creation will coalesce on the upload we queue here.
// FIXME: this branch should be dead code as we no longer write local metadata.
let rtc = timeline.remote_client.as_ref().unwrap();
rtc.init_upload_queue_for_empty_remote(&metadata)?;
rtc.schedule_index_upload_for_metadata_update(&metadata)?;
rtc.schedule_index_upload_for_full_metadata_update(&metadata)?;
}
timeline
@@ -3027,7 +3028,7 @@ impl Tenant {
// See also https://github.com/neondatabase/neon/issues/3865
if let Some(remote_client) = new_timeline.remote_client.as_ref() {
remote_client
.schedule_index_upload_for_metadata_update(&metadata)
.schedule_index_upload_for_full_metadata_update(&metadata)
.context("branch initial metadata upload")?;
}
@@ -3848,6 +3849,8 @@ pub(crate) mod harness {
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use super::*;
use crate::keyspace::KeySpaceAccum;
use crate::repository::{Key, Value};
@@ -3858,7 +3861,7 @@ mod tests {
use hex_literal::hex;
use pageserver_api::keyspace::KeySpace;
use rand::{thread_rng, Rng};
use tests::timeline::ShutdownMode;
use tests::timeline::{GetVectoredError, ShutdownMode};
static TEST_KEY: Lazy<Key> =
Lazy::new(|| Key::from_slice(&hex!("010000000033333333444444445500000001")));
@@ -4794,6 +4797,166 @@ mod tests {
Ok(())
}
// Test that vectored get descends into ancestor timelines correctly and
// does not return an image that's newer than requested.
//
// The diagram below ilustrates an interesting case. We have a parent timeline
// (top of the Lsn range) and a child timeline. The request key cannot be reconstructed
// from the child timeline, so the parent timeline must be visited. When advacing into
// the child timeline, the read path needs to remember what the requested Lsn was in
// order to avoid returning an image that's too new. The test below constructs such
// a timeline setup and does a few queries around the Lsn of each page image.
// ```
// LSN
// ^
// |
// |
// 500 | --------------------------------------> branch point
// 400 | X
// 300 | X
// 200 | --------------------------------------> requested lsn
// 100 | X
// |---------------------------------------> Key
// |
// ------> requested key
//
// Legend:
// * X - page images
// ```
#[tokio::test]
async fn test_get_vectored_ancestor_descent() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_get_vectored_on_lsn_axis")?;
let (tenant, ctx) = harness.load().await;
let start_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
let end_key = start_key.add(1000);
let child_gap_at_key = start_key.add(500);
let mut parent_gap_lsns: BTreeMap<Lsn, String> = BTreeMap::new();
let mut current_lsn = Lsn(0x10);
let timeline_id = TimelineId::generate();
let parent_timeline = tenant
.create_test_timeline(timeline_id, current_lsn, DEFAULT_PG_VERSION, &ctx)
.await?;
current_lsn += 0x100;
for _ in 0..3 {
let mut key = start_key;
while key < end_key {
current_lsn += 0x10;
let image_value = format!("{} at {}", child_gap_at_key, current_lsn);
let mut writer = parent_timeline.writer().await;
writer
.put(
key,
current_lsn,
&Value::Image(test_img(&image_value)),
&ctx,
)
.await?;
writer.finish_write(current_lsn);
if key == child_gap_at_key {
parent_gap_lsns.insert(current_lsn, image_value);
}
key = key.next();
}
parent_timeline.freeze_and_flush().await?;
}
let child_timeline_id = TimelineId::generate();
let child_timeline = tenant
.branch_timeline_test(&parent_timeline, child_timeline_id, Some(current_lsn), &ctx)
.await?;
let mut key = start_key;
while key < end_key {
if key == child_gap_at_key {
key = key.next();
continue;
}
current_lsn += 0x10;
let mut writer = child_timeline.writer().await;
writer
.put(
key,
current_lsn,
&Value::Image(test_img(&format!("{} at {}", key, current_lsn))),
&ctx,
)
.await?;
writer.finish_write(current_lsn);
key = key.next();
}
child_timeline.freeze_and_flush().await?;
let lsn_offsets: [i64; 5] = [-10, -1, 0, 1, 10];
let mut query_lsns = Vec::new();
for image_lsn in parent_gap_lsns.keys().rev() {
for offset in lsn_offsets {
query_lsns.push(Lsn(image_lsn
.0
.checked_add_signed(offset)
.expect("Shouldn't overflow")));
}
}
for query_lsn in query_lsns {
let results = child_timeline
.get_vectored_impl(
KeySpace {
ranges: vec![child_gap_at_key..child_gap_at_key.next()],
},
query_lsn,
&ctx,
)
.await;
let expected_item = parent_gap_lsns
.iter()
.rev()
.find(|(lsn, _)| **lsn <= query_lsn);
info!(
"Doing vectored read at LSN {}. Expecting image to be: {:?}",
query_lsn, expected_item
);
match expected_item {
Some((_, img_value)) => {
let key_results = results.expect("No vectored get error expected");
let key_result = &key_results[&child_gap_at_key];
let returned_img = key_result
.as_ref()
.expect("No page reconstruct error expected");
info!(
"Vectored read at LSN {} returned image {}",
query_lsn,
std::str::from_utf8(returned_img)?
);
assert_eq!(*returned_img, test_img(img_value));
}
None => {
assert!(matches!(results, Err(GetVectoredError::MissingKey(_))));
}
}
}
Ok(())
}
#[tokio::test]
async fn test_random_updates() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_random_updates")?;

View File

@@ -235,6 +235,12 @@ impl TimelineMetadata {
let bytes = instance.to_bytes().unwrap();
Self::from_bytes(&bytes).unwrap()
}
pub(crate) fn apply(&mut self, update: &MetadataUpdate) {
self.body.disk_consistent_lsn = update.disk_consistent_lsn;
self.body.prev_record_lsn = update.prev_record_lsn;
self.body.latest_gc_cutoff_lsn = update.latest_gc_cutoff_lsn;
}
}
impl<'de> Deserialize<'de> for TimelineMetadata {
@@ -259,6 +265,27 @@ impl Serialize for TimelineMetadata {
}
}
/// Parts of the metadata which are regularly modified.
pub(crate) struct MetadataUpdate {
disk_consistent_lsn: Lsn,
prev_record_lsn: Option<Lsn>,
latest_gc_cutoff_lsn: Lsn,
}
impl MetadataUpdate {
pub(crate) fn new(
disk_consistent_lsn: Lsn,
prev_record_lsn: Option<Lsn>,
latest_gc_cutoff_lsn: Lsn,
) -> Self {
Self {
disk_consistent_lsn,
prev_record_lsn,
latest_gc_cutoff_lsn,
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -678,12 +678,19 @@ pub async fn init_tenant_mgr(
}
}
}
LocationMode::Secondary(secondary_conf) => TenantSlot::Secondary(SecondaryTenant::new(
tenant_shard_id,
shard_identity,
location_conf.tenant_conf,
&secondary_conf,
)),
LocationMode::Secondary(secondary_conf) => {
info!(
tenant_id = %tenant_shard_id.tenant_id,
shard_id = %tenant_shard_id.shard_slug(),
"Starting secondary tenant"
);
TenantSlot::Secondary(SecondaryTenant::new(
tenant_shard_id,
shard_identity,
location_conf.tenant_conf,
&secondary_conf,
))
}
};
tenants.insert(tenant_shard_id, slot);

View File

@@ -236,6 +236,7 @@ use utils::id::{TenantId, TimelineId};
use self::index::IndexPart;
use super::metadata::MetadataUpdate;
use super::storage_layer::{Layer, LayerFileName, ResidentLayer};
use super::upload_queue::SetDeletedFlagProgress;
use super::Generation;
@@ -536,9 +537,10 @@ impl RemoteTimelineClient {
// Upload operations.
//
///
/// Launch an index-file upload operation in the background, with
/// updated metadata.
/// fully updated metadata.
///
/// This should only be used to upload initial metadata to remote storage.
///
/// The upload will be added to the queue immediately, but it
/// won't be performed until all previously scheduled layer file
@@ -550,7 +552,7 @@ impl RemoteTimelineClient {
/// If there were any changes to the list of files, i.e. if any
/// layer file uploads were scheduled, since the last index file
/// upload, those will be included too.
pub fn schedule_index_upload_for_metadata_update(
pub fn schedule_index_upload_for_full_metadata_update(
self: &Arc<Self>,
metadata: &TimelineMetadata,
) -> anyhow::Result<()> {
@@ -566,6 +568,27 @@ impl RemoteTimelineClient {
Ok(())
}
/// Launch an index-file upload operation in the background, with only parts of the metadata
/// updated.
///
/// This is the regular way of updating metadata on layer flushes or Gc.
///
/// Using this lighter update mechanism allows for reparenting and detaching without changes to
/// `index_part.json`, while being more clear on what values update regularly.
pub(crate) fn schedule_index_upload_for_metadata_update(
self: &Arc<Self>,
update: &MetadataUpdate,
) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
upload_queue.latest_metadata.apply(update);
self.schedule_index_upload(upload_queue, upload_queue.latest_metadata.clone());
Ok(())
}
///
/// Launch an index-file upload operation in the background, if necessary.
///
@@ -2024,7 +2047,7 @@ mod tests {
// Schedule upload of index. Check that it is queued
let metadata = dummy_metadata(Lsn(0x20));
client
.schedule_index_upload_for_metadata_update(&metadata)
.schedule_index_upload_for_full_metadata_update(&metadata)
.unwrap();
{
let mut guard = client.upload_queue.lock().unwrap();

View File

@@ -312,7 +312,7 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
(detail.last_download, detail.next_download.unwrap())
};
if now < next_download {
if now > next_download {
Some(PendingDownload {
secondary_state: secondary_tenant,
last_download,
@@ -647,6 +647,12 @@ impl<'a> TenantDownloader<'a> {
progress.bytes_downloaded += layer_byte_count;
progress.layers_downloaded += layer_count;
}
for delete_timeline in &delete_timelines {
// We haven't removed from disk yet, but optimistically remove from in-memory state: if removal
// from disk fails that will be a fatal error.
detail.timelines.remove(delete_timeline);
}
}
// Execute accumulated deletions
@@ -710,13 +716,14 @@ impl<'a> TenantDownloader<'a> {
.await
.map_err(UpdateError::from)?;
SECONDARY_MODE.download_heatmap.inc();
if Some(&download.etag) == prev_etag {
Ok(HeatMapDownload::Unmodified)
} else {
let mut heatmap_bytes = Vec::new();
let mut body = tokio_util::io::StreamReader::new(download.download_stream);
let _size = tokio::io::copy_buf(&mut body, &mut heatmap_bytes).await?;
SECONDARY_MODE.download_heatmap.inc();
Ok(HeatMapDownload::Modified(HeatMapModified {
etag: download.etag,
last_modified: download.last_modified,

View File

@@ -118,6 +118,7 @@ pub(crate) struct ValuesReconstructState {
pub(crate) keys: HashMap<Key, Result<VectoredValueReconstructState, PageReconstructError>>,
keys_done: KeySpaceRandomAccum,
layers_visited: u32,
}
impl ValuesReconstructState {
@@ -125,6 +126,7 @@ impl ValuesReconstructState {
Self {
keys: HashMap::new(),
keys_done: KeySpaceRandomAccum::new(),
layers_visited: 0,
}
}
@@ -138,6 +140,14 @@ impl ValuesReconstructState {
}
}
pub(crate) fn on_layer_visited(&mut self) {
self.layers_visited += 1;
}
pub(crate) fn get_layers_visited(&self) -> u32 {
self.layers_visited
}
/// Update the state collected for a given key.
/// Returns true if this was the last value needed for the key and false otherwise.
///

View File

@@ -20,8 +20,8 @@
//! 000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
//! ```
//!
//! Every delta file consists of three parts: "summary", "index", and
//! "values". The summary is a fixed size header at the beginning of the file,
//! Every delta file consists of three parts: "summary", "values", and
//! "index". The summary is a fixed size header at the beginning of the file,
//! and it contains basic information about the layer, and offsets to the other
//! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
//! "values" part. The actual page images and WAL records are stored in the
@@ -728,6 +728,9 @@ impl DeltaLayerInner {
// production code path
expected_summary.index_start_blk = actual_summary.index_start_blk;
expected_summary.index_root_blk = actual_summary.index_root_blk;
// mask out the timeline_id, but still require the layers to be from the same tenant
expected_summary.timeline_id = actual_summary.timeline_id;
if actual_summary != expected_summary {
bail!(
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
@@ -863,7 +866,7 @@ impl DeltaLayerInner {
.into(),
);
let data_end_offset = self.index_start_blk as u64 * PAGE_SZ as u64;
let data_end_offset = self.index_start_offset();
let reads = Self::plan_reads(
keyspace,
@@ -1103,11 +1106,195 @@ impl DeltaLayerInner {
if let Some(last) = all_keys.last_mut() {
// Last key occupies all space till end of value storage,
// which corresponds to beginning of the index
last.size = self.index_start_blk as u64 * PAGE_SZ as u64 - last.size;
last.size = self.index_start_offset() - last.size;
}
Ok(all_keys)
}
/// Using the given writer, write out a truncated version, where LSNs higher than the
/// truncate_at are missing.
#[cfg(test)]
pub(super) async fn copy_prefix(
&self,
writer: &mut DeltaLayerWriter,
truncate_at: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<()> {
use crate::tenant::vectored_blob_io::{
BlobMeta, VectoredReadBuilder, VectoredReadExtended,
};
use futures::stream::TryStreamExt;
#[derive(Debug)]
enum Item {
Actual(Key, Lsn, BlobRef),
Sentinel,
}
impl From<Item> for Option<(Key, Lsn, BlobRef)> {
fn from(value: Item) -> Self {
match value {
Item::Actual(key, lsn, blob) => Some((key, lsn, blob)),
Item::Sentinel => None,
}
}
}
impl Item {
fn offset(&self) -> Option<BlobRef> {
match self {
Item::Actual(_, _, blob) => Some(*blob),
Item::Sentinel => None,
}
}
fn is_last(&self) -> bool {
matches!(self, Item::Sentinel)
}
}
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
self.index_start_blk,
self.index_root_blk,
block_reader,
);
let stream = self.stream_index_forwards(&tree_reader, &[0u8; DELTA_KEY_SIZE], ctx);
let stream = stream.map_ok(|(key, lsn, pos)| Item::Actual(key, lsn, pos));
// put in a sentinel value for getting the end offset for last item, and not having to
// repeat the whole read part
let stream = stream.chain(futures::stream::once(futures::future::ready(Ok(
Item::Sentinel,
))));
let mut stream = std::pin::pin!(stream);
let mut prev: Option<(Key, Lsn, BlobRef)> = None;
let mut read_builder: Option<VectoredReadBuilder> = None;
let max_read_size = self
.max_vectored_read_bytes
.map(|x| x.0.get())
.unwrap_or(8192);
let mut buffer = Some(BytesMut::with_capacity(max_read_size));
// FIXME: buffering of DeltaLayerWriter
let mut per_blob_copy = Vec::new();
while let Some(item) = stream.try_next().await? {
tracing::debug!(?item, "popped");
let offset = item
.offset()
.unwrap_or(BlobRef::new(self.index_start_offset(), false));
let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
let end_offset = offset;
Some((BlobMeta { key, lsn }, start_offset..end_offset))
} else {
None
};
let is_last = item.is_last();
prev = Option::from(item);
let actionable = actionable.filter(|x| x.0.lsn < truncate_at);
let builder = if let Some((meta, offsets)) = actionable {
// extend or create a new builder
if read_builder
.as_mut()
.map(|x| x.extend(offsets.start.pos(), offsets.end.pos(), meta))
.unwrap_or(VectoredReadExtended::No)
== VectoredReadExtended::Yes
{
None
} else {
read_builder.replace(VectoredReadBuilder::new(
offsets.start.pos(),
offsets.end.pos(),
meta,
max_read_size,
))
}
} else {
// nothing to do, except perhaps flush any existing for the last element
None
};
// flush the possible older builder and also the new one if the item was the last one
let builders = builder.into_iter();
let builders = if is_last {
builders.chain(read_builder.take())
} else {
builders.chain(None)
};
for builder in builders {
let read = builder.build();
let reader = VectoredBlobReader::new(&self.file);
let mut buf = buffer.take().unwrap();
buf.clear();
buf.reserve(read.size());
let res = reader.read_blobs(&read, buf).await?;
for blob in res.blobs {
let key = blob.meta.key;
let lsn = blob.meta.lsn;
let data = &res.buf[blob.start..blob.end];
#[cfg(debug_assertions)]
Value::des(data)
.with_context(|| {
format!(
"blob failed to deserialize for {}@{}, {}..{}: {:?}",
blob.meta.key,
blob.meta.lsn,
blob.start,
blob.end,
utils::Hex(data)
)
})
.unwrap();
// is it an image or will_init walrecord?
// FIXME: this could be handled by threading the BlobRef to the
// VectoredReadBuilder
let will_init = crate::repository::ValueBytes::will_init(data)
.inspect_err(|_e| {
#[cfg(feature = "testing")]
tracing::error!(data=?utils::Hex(data), err=?_e, "failed to parse will_init out of serialized value");
})
.unwrap_or(false);
per_blob_copy.clear();
per_blob_copy.extend_from_slice(data);
let (tmp, res) = writer
.put_value_bytes(key, lsn, std::mem::take(&mut per_blob_copy), will_init)
.await;
per_blob_copy = tmp;
res?;
}
buffer = Some(res.buf);
}
}
assert!(
read_builder.is_none(),
"with the sentinel above loop should had handled all"
);
Ok(())
}
pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
println!(
"index_start_blk: {}, root {}",
@@ -1177,6 +1364,44 @@ impl DeltaLayerInner {
Ok(())
}
#[cfg(test)]
fn stream_index_forwards<'a, R>(
&'a self,
reader: &'a DiskBtreeReader<R, DELTA_KEY_SIZE>,
start: &'a [u8; DELTA_KEY_SIZE],
ctx: &'a RequestContext,
) -> impl futures::stream::Stream<
Item = Result<(Key, Lsn, BlobRef), crate::tenant::disk_btree::DiskBtreeError>,
> + 'a
where
R: BlockReader,
{
use futures::stream::TryStreamExt;
let stream = reader.get_stream_from(start, ctx);
stream.map_ok(|(key, value)| {
let key = DeltaKey::from_slice(&key);
let (key, lsn) = (key.key(), key.lsn());
let offset = BlobRef(value);
(key, lsn, offset)
})
}
/// The file offset to the first block of index.
///
/// The file structure is summary, values, and index. We often need this for the size of last blob.
fn index_start_offset(&self) -> u64 {
let offset = self.index_start_blk as u64 * PAGE_SZ as u64;
let bref = BlobRef(offset);
tracing::debug!(
index_start_blk = self.index_start_blk,
offset,
pos = bref.pos(),
"index_start_offset"
);
offset
}
}
/// A set of data associated with a delta layer key and its value
@@ -1538,7 +1763,7 @@ mod test {
let resident = writer.finish(entries_meta.key_range.end, &timeline).await?;
let inner = resident.get_inner_delta(&ctx).await?;
let inner = resident.as_delta(&ctx).await?;
let file_size = inner.file.metadata().await?.len();
tracing::info!(
@@ -1594,4 +1819,217 @@ mod test {
Ok(())
}
#[tokio::test]
async fn copy_delta_prefix_smoke() {
use crate::walrecord::NeonWalRecord;
use bytes::Bytes;
let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke").unwrap();
let (tenant, ctx) = h.load().await;
let ctx = &ctx;
let timeline = tenant
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, ctx)
.await
.unwrap();
let initdb_layer = timeline
.layers
.read()
.await
.likely_resident_layers()
.next()
.unwrap();
{
let mut writer = timeline.writer().await;
let data = [
(0x20, 12, Value::Image(Bytes::from_static(b"foobar"))),
(
0x30,
12,
Value::WalRecord(NeonWalRecord::Postgres {
will_init: false,
rec: Bytes::from_static(b"1"),
}),
),
(
0x40,
12,
Value::WalRecord(NeonWalRecord::Postgres {
will_init: true,
rec: Bytes::from_static(b"2"),
}),
),
// build an oversized value so we cannot extend and existing read over
// this
(
0x50,
12,
Value::WalRecord(NeonWalRecord::Postgres {
will_init: true,
rec: {
let mut buf =
vec![0u8; tenant.conf.max_vectored_read_bytes.0.get() + 1024];
buf.iter_mut()
.enumerate()
.for_each(|(i, slot)| *slot = (i % 256) as u8);
Bytes::from(buf)
},
}),
),
// because the oversized read cannot be extended further, we are sure to exercise the
// builder created on the last round with this:
(
0x60,
12,
Value::WalRecord(NeonWalRecord::Postgres {
will_init: true,
rec: Bytes::from_static(b"3"),
}),
),
(
0x60,
9,
Value::Image(Bytes::from_static(b"something for a different key")),
),
];
let mut last_lsn = None;
for (lsn, key, value) in data {
let key = Key::from_i128(key);
writer.put(key, Lsn(lsn), &value, ctx).await.unwrap();
last_lsn = Some(lsn);
}
writer.finish_write(Lsn(last_lsn.unwrap()));
}
timeline.freeze_and_flush().await.unwrap();
let new_layer = timeline
.layers
.read()
.await
.likely_resident_layers()
.find(|x| x != &initdb_layer)
.unwrap();
// create a copy for the timeline, so we don't overwrite the file
let branch = tenant
.branch_timeline_test(&timeline, TimelineId::generate(), None, ctx)
.await
.unwrap();
assert_eq!(branch.get_ancestor_lsn(), Lsn(0x60));
// truncating at 0x61 gives us a full copy, otherwise just go backwards until there's just
// a single key
for truncate_at in [0x61, 0x51, 0x41, 0x31, 0x21] {
let truncate_at = Lsn(truncate_at);
let mut writer = DeltaLayerWriter::new(
tenant.conf,
branch.timeline_id,
tenant.tenant_shard_id,
Key::MIN,
Lsn(0x11)..truncate_at,
)
.await
.unwrap();
let new_layer = new_layer.download_and_keep_resident().await.unwrap();
new_layer
.copy_delta_prefix(&mut writer, truncate_at, ctx)
.await
.unwrap();
let copied_layer = writer.finish(Key::MAX, &branch).await.unwrap();
copied_layer.as_delta(ctx).await.unwrap();
assert_keys_and_values_eq(
new_layer.as_delta(ctx).await.unwrap(),
copied_layer.as_delta(ctx).await.unwrap(),
truncate_at,
ctx,
)
.await;
}
}
async fn assert_keys_and_values_eq(
source: &DeltaLayerInner,
truncated: &DeltaLayerInner,
truncated_at: Lsn,
ctx: &RequestContext,
) {
use futures::future::ready;
use futures::stream::TryStreamExt;
let start_key = [0u8; DELTA_KEY_SIZE];
let source_reader = FileBlockReader::new(&source.file, source.file_id);
let source_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
source.index_start_blk,
source.index_root_blk,
&source_reader,
);
let source_stream = source.stream_index_forwards(&source_tree, &start_key, ctx);
let source_stream = source_stream.filter(|res| match res {
Ok((_, lsn, _)) => ready(lsn < &truncated_at),
_ => ready(true),
});
let mut source_stream = std::pin::pin!(source_stream);
let truncated_reader = FileBlockReader::new(&truncated.file, truncated.file_id);
let truncated_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
truncated.index_start_blk,
truncated.index_root_blk,
&truncated_reader,
);
let truncated_stream = truncated.stream_index_forwards(&truncated_tree, &start_key, ctx);
let mut truncated_stream = std::pin::pin!(truncated_stream);
let mut scratch_left = Vec::new();
let mut scratch_right = Vec::new();
loop {
let (src, truncated) = (source_stream.try_next(), truncated_stream.try_next());
let (src, truncated) = tokio::try_join!(src, truncated).unwrap();
if src.is_none() {
assert!(truncated.is_none());
break;
}
let (src, truncated) = (src.unwrap(), truncated.unwrap());
// because we've filtered the source with Lsn, we should always have the same keys from both.
assert_eq!(src.0, truncated.0);
assert_eq!(src.1, truncated.1);
// if this is needed for something else, just drop this assert.
assert!(
src.2.pos() >= truncated.2.pos(),
"value position should not go backwards {} vs. {}",
src.2.pos(),
truncated.2.pos()
);
scratch_left.clear();
let src_cursor = source_reader.block_cursor();
let left = src_cursor.read_blob_into_buf(src.2.pos(), &mut scratch_left, ctx);
scratch_right.clear();
let trunc_cursor = truncated_reader.block_cursor();
let right = trunc_cursor.read_blob_into_buf(truncated.2.pos(), &mut scratch_right, ctx);
tokio::try_join!(left, right).unwrap();
assert_eq!(utils::Hex(&scratch_left), utils::Hex(&scratch_right));
}
}
}

View File

@@ -396,6 +396,8 @@ impl ImageLayerInner {
// production code path
expected_summary.index_start_blk = actual_summary.index_start_blk;
expected_summary.index_root_blk = actual_summary.index_root_blk;
// mask out the timeline_id, but still require the layers to be from the same tenant
expected_summary.timeline_id = actual_summary.timeline_id;
if actual_summary != expected_summary {
bail!(

View File

@@ -26,7 +26,7 @@ use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
// while being able to use std::fmt::Write's methods
use crate::metrics::TIMELINE_EPHEMERAL_BYTES;
use std::cmp::Ordering;
use std::fmt::Write as _;
use std::fmt::Write;
use std::ops::Range;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::atomic::{AtomicU64, AtomicUsize};
@@ -54,6 +54,12 @@ pub struct InMemoryLayer {
/// Writes are only allowed when this is `None`.
end_lsn: OnceLock<Lsn>,
/// Used for traversal path. Cached representation of the in-memory layer before frozen.
local_path_str: Arc<str>,
/// Used for traversal path. Cached representation of the in-memory layer after frozen.
frozen_local_path_str: OnceLock<Arc<str>>,
opened_at: Instant,
/// The above fields never change, except for `end_lsn`, which is only set once.
@@ -241,6 +247,12 @@ impl InMemoryLayer {
self.start_lsn..self.end_lsn_or_max()
}
pub(crate) fn local_path_str(&self) -> &Arc<str> {
self.frozen_local_path_str
.get()
.unwrap_or(&self.local_path_str)
}
/// debugging function to print out the contents of the layer
///
/// this is likely completly unused
@@ -430,10 +442,24 @@ impl InMemoryLayer {
}
}
fn inmem_layer_display(mut f: impl Write, start_lsn: Lsn, end_lsn: Lsn) -> std::fmt::Result {
write!(f, "inmem-{:016X}-{:016X}", start_lsn.0, end_lsn.0)
}
fn inmem_layer_log_display(
mut f: impl Write,
timeline: TimelineId,
start_lsn: Lsn,
end_lsn: Lsn,
) -> std::fmt::Result {
write!(f, "timeline {} in-memory ", timeline)?;
inmem_layer_display(f, start_lsn, end_lsn)
}
impl std::fmt::Display for InMemoryLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let end_lsn = self.end_lsn_or_max();
write!(f, "inmem-{:016X}-{:016X}", self.start_lsn.0, end_lsn.0)
inmem_layer_display(f, self.start_lsn, end_lsn)
}
}
@@ -458,6 +484,12 @@ impl InMemoryLayer {
Ok(InMemoryLayer {
file_id: key,
local_path_str: {
let mut buf = String::new();
inmem_layer_log_display(&mut buf, timeline_id, start_lsn, Lsn::MAX).unwrap();
buf.into()
},
frozen_local_path_str: OnceLock::new(),
conf,
timeline_id,
tenant_shard_id,
@@ -552,6 +584,15 @@ impl InMemoryLayer {
);
self.end_lsn.set(end_lsn).expect("end_lsn set only once");
self.frozen_local_path_str
.set({
let mut buf = String::new();
inmem_layer_log_display(&mut buf, self.get_timeline_id(), self.start_lsn, end_lsn)
.unwrap();
buf.into()
})
.expect("frozen_local_path_str set only once");
for vec_map in inner.index.values() {
for (lsn, _pos) in vec_map.as_slice() {
assert!(*lsn < end_lsn);

View File

@@ -116,6 +116,12 @@ impl AsLayerDesc for Layer {
}
}
impl PartialEq for Layer {
fn eq(&self, other: &Self) -> bool {
Arc::as_ptr(&self.0) == Arc::as_ptr(&other.0)
}
}
impl Layer {
/// Creates a layer value for a file we know to not be resident.
pub(crate) fn for_evicted(
@@ -389,6 +395,10 @@ impl Layer {
&self.0.path
}
pub(crate) fn local_path_str(&self) -> &Arc<str> {
&self.0.path_str
}
pub(crate) fn metadata(&self) -> LayerFileMetadata {
self.0.metadata()
}
@@ -511,6 +521,9 @@ struct LayerInner {
/// Full path to the file; unclear if this should exist anymore.
path: Utf8PathBuf,
/// String representation of the full path, used for traversal id.
path_str: Arc<str>,
desc: PersistentLayerDesc,
/// Timeline access is needed for remote timeline client and metrics.
@@ -604,9 +617,17 @@ enum Status {
impl Drop for LayerInner {
fn drop(&mut self) {
// if there was a pending eviction, mark it cancelled here to balance metrics
if let Some((ResidentOrWantedEvicted::WantedEvicted(..), _)) = self.inner.take_and_deinit()
{
// eviction has already been started
LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone);
// eviction request is intentionally not honored as no one is present to wait for it
// and we could be delaying shutdown for nothing.
}
if !*self.wanted_deleted.get_mut() {
// should we try to evict if the last wish was for eviction? seems more like a hazard
// than a clear win.
return;
}
@@ -708,6 +729,7 @@ impl LayerInner {
LayerInner {
conf,
path_str: path.to_string().into(),
path,
desc,
timeline: Arc::downgrade(timeline),
@@ -1552,8 +1574,8 @@ impl Drop for DownloadedLayer {
if let Some(owner) = self.owner.upgrade() {
owner.on_downloaded_layer_drop(self.version);
} else {
// no need to do anything, we are shutting down
LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone);
// Layer::drop will handle cancelling the eviction; because of drop order and
// `DownloadedLayer` never leaking, we cannot know here if eviction was requested.
}
}
}
@@ -1752,6 +1774,28 @@ impl ResidentLayer {
}
}
/// FIXME: truncate is bad name because we are not truncating anything, but copying the
/// filtered parts.
#[cfg(test)]
pub(super) async fn copy_delta_prefix(
&self,
writer: &mut super::delta_layer::DeltaLayerWriter,
truncate_at: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<()> {
use LayerKind::*;
let owner = &self.owner.0;
match self.downloaded.get(owner, ctx).await? {
Delta(ref d) => d
.copy_prefix(writer, truncate_at, ctx)
.await
.with_context(|| format!("truncate {self}")),
Image(_) => anyhow::bail!(format!("cannot truncate image layer {self}")),
}
}
pub(crate) fn local_path(&self) -> &Utf8Path {
&self.owner.0.path
}
@@ -1761,14 +1805,14 @@ impl ResidentLayer {
}
#[cfg(test)]
pub(crate) async fn get_inner_delta<'a>(
&'a self,
pub(crate) async fn as_delta(
&self,
ctx: &RequestContext,
) -> anyhow::Result<&'a delta_layer::DeltaLayerInner> {
let owner = &self.owner.0;
match self.downloaded.get(owner, ctx).await? {
LayerKind::Delta(d) => Ok(d),
LayerKind::Image(_) => Err(anyhow::anyhow!("Expected a delta layer")),
) -> anyhow::Result<&delta_layer::DeltaLayerInner> {
use LayerKind::*;
match self.downloaded.get(&self.owner.0, ctx).await? {
Delta(ref d) => Ok(d),
Image(_) => Err(anyhow::anyhow!("image layer")),
}
}
}

View File

@@ -721,11 +721,110 @@ async fn evict_and_wait_does_not_wait_for_download() {
layer.evict_and_wait(FOREVER).await.unwrap();
}
/// Asserts that there is no miscalculation when Layer is dropped while it is being kept resident,
/// which is the last value.
///
/// Also checks that the same does not happen on a non-evicted layer (regression test).
#[tokio::test(start_paused = true)]
async fn eviction_cancellation_on_drop() {
use crate::repository::Value;
use bytes::Bytes;
// this is the runtime on which Layer spawns the blocking tasks on
let handle = tokio::runtime::Handle::current();
let h = TenantHarness::create("eviction_cancellation_on_drop").unwrap();
utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
let (tenant, ctx) = h.load().await;
let timeline = tenant
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
.await
.unwrap();
{
// create_test_timeline wrote us one layer, write another
let mut writer = timeline.writer().await;
writer
.put(
Key::from_i128(5),
Lsn(0x20),
&Value::Image(Bytes::from_static(b"this does not matter either")),
&ctx,
)
.await
.unwrap();
writer.finish_write(Lsn(0x20));
}
timeline.freeze_and_flush().await.unwrap();
// wait for the upload to complete so our Arc::strong_count assertion holds
timeline
.remote_client
.as_ref()
.unwrap()
.wait_completion()
.await
.unwrap();
let (evicted_layer, not_evicted) = {
let mut layers = {
let mut guard = timeline.layers.write().await;
let layers = guard.likely_resident_layers().collect::<Vec<_>>();
// remove the layers from layermap
guard.finish_gc_timeline(&layers);
layers
};
assert_eq!(layers.len(), 2);
(layers.pop().unwrap(), layers.pop().unwrap())
};
let victims = [(evicted_layer, true), (not_evicted, false)];
for (victim, evict) in victims {
let resident = victim.keep_resident().await.unwrap();
drop(victim);
assert_eq!(Arc::strong_count(&resident.owner.0), 1);
if evict {
let evict_and_wait = resident.owner.evict_and_wait(FOREVER);
// drive the future to await on the status channel, and then drop it
tokio::time::timeout(ADVANCE, evict_and_wait)
.await
.expect_err("should had been a timeout since we are holding the layer resident");
}
// 1 == we only evict one of the layers
assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
drop(resident);
// run any spawned
tokio::time::sleep(ADVANCE).await;
SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
assert_eq!(
1,
LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::LayerGone].get()
);
}
}
/// A test case to remind you the cost of these structures. You can bump the size limit
/// below if it is really necessary to add more fields to the structures.
#[test]
fn layer_size() {
assert_eq!(std::mem::size_of::<LayerAccessStats>(), 2040);
assert_eq!(std::mem::size_of::<PersistentLayerDesc>(), 104);
assert_eq!(std::mem::size_of::<LayerInner>(), 2328);
assert_eq!(std::mem::size_of::<LayerInner>(), 2344);
// it also has the utf8 path
}

View File

@@ -23,7 +23,7 @@ use pageserver_api::{
EvictionPolicy, InMemoryLayerInfo, LayerMapInfo, TimelineState,
},
reltag::BlockNumber,
shard::{ShardIdentity, TenantShardId},
shard::{ShardIdentity, ShardNumber, TenantShardId},
};
use rand::Rng;
use serde_with::serde_as;
@@ -428,6 +428,62 @@ pub(crate) enum PageReconstructError {
/// An error happened replaying WAL records
#[error(transparent)]
WalRedo(anyhow::Error),
#[error("{0}")]
MissingKey(MissingKeyError),
}
#[derive(Debug)]
pub struct MissingKeyError {
stuck_at_lsn: bool,
key: Key,
shard: ShardNumber,
cont_lsn: Lsn,
request_lsn: Lsn,
ancestor_lsn: Option<Lsn>,
traversal_path: Vec<TraversalPathItem>,
backtrace: Option<std::backtrace::Backtrace>,
}
impl std::fmt::Display for MissingKeyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.stuck_at_lsn {
// Records are found in this timeline but no image layer or initial delta record was found.
write!(
f,
"could not find layer with more data for key {} (shard {:?}) at LSN {}, request LSN {}",
self.key, self.shard, self.cont_lsn, self.request_lsn
)?;
if let Some(ref ancestor_lsn) = self.ancestor_lsn {
write!(f, ", ancestor {}", ancestor_lsn)?;
}
} else {
// No records in this timeline.
write!(
f,
"could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}",
self.key, self.shard, self.cont_lsn, self.request_lsn
)?;
}
if !self.traversal_path.is_empty() {
writeln!(f)?;
}
for (r, c, l) in &self.traversal_path {
writeln!(
f,
"layer traversal: result {:?}, cont_lsn {}, layer: {}",
r, c, l,
)?;
}
if let Some(ref backtrace) = self.backtrace {
write!(f, "\n{}", backtrace)?;
}
Ok(())
}
}
impl PageReconstructError {
@@ -439,6 +495,7 @@ impl PageReconstructError {
AncestorLsnTimeout(_) => false,
Cancelled | AncestorStopping(_) => true,
WalRedo(_) => false,
MissingKey { .. } => false,
}
}
}
@@ -753,7 +810,7 @@ impl Timeline {
writeln!(
msg,
"- layer traversal: result {res:?}, cont_lsn {cont_lsn}, layer: {}",
layer(),
layer,
)
.expect("string grows")
});
@@ -767,6 +824,10 @@ impl Timeline {
}
pub(crate) const MAX_GET_VECTORED_KEYS: u64 = 32;
// If we are visiting more than 50 layers to reconstruct one value,
// it's likely that something is wrong with compaction. We print a
// rate limited warning in that case.
pub(crate) const LAYERS_VISITED_WARN_THRESHOLD: u64 = 50;
/// Look up multiple page versions at a given LSN
///
@@ -872,9 +933,11 @@ impl Timeline {
Err(Cancelled | AncestorStopping(_)) => {
return Err(GetVectoredError::Cancelled)
}
Err(Other(err)) if err.to_string().contains("could not find data for key") => {
return Err(GetVectoredError::MissingKey(key))
}
// we only capture stuck_at_lsn=false now until we figure out https://github.com/neondatabase/neon/issues/7380
Err(MissingKey(MissingKeyError {
stuck_at_lsn: false,
..
})) => return Err(GetVectoredError::MissingKey(key)),
_ => {
values.insert(key, block);
key = key.next();
@@ -898,6 +961,7 @@ impl Timeline {
.await?;
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
let layers_visited = reconstruct_state.get_layers_visited();
for (key, res) in reconstruct_state.keys {
match res {
Err(err) => {
@@ -912,6 +976,25 @@ impl Timeline {
}
}
// Note that this is an approximation. Tracking the exact number of layers visited
// per key requires virtually unbounded memory usage and is inefficient
// (i.e. segment tree tracking each range queried from a layer)
let average_layers_visited = layers_visited as f64 / results.len() as f64;
crate::metrics::VEC_READ_NUM_LAYERS_VISITED.observe(average_layers_visited);
if average_layers_visited >= Self::LAYERS_VISITED_WARN_THRESHOLD as f64 {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut rate_limit = LOGGED.lock().unwrap();
let rate_limited = rate_limit.rate_limited();
rate_limit.call(|| {
warn!("Too many layers visited by vectored read: {} >= {}; rate limited {} occurences",
average_layers_visited, Self::LAYERS_VISITED_WARN_THRESHOLD, rate_limited);
});
}
Ok(results)
}
@@ -2692,7 +2775,7 @@ impl Timeline {
}
}
type TraversalId = String;
type TraversalId = Arc<str>;
trait TraversalLayerExt {
fn traversal_id(&self) -> TraversalId;
@@ -2700,13 +2783,13 @@ trait TraversalLayerExt {
impl TraversalLayerExt for Layer {
fn traversal_id(&self) -> TraversalId {
self.local_path().to_string()
Arc::clone(self.local_path_str())
}
}
impl TraversalLayerExt for Arc<InMemoryLayer> {
fn traversal_id(&self) -> TraversalId {
format!("timeline {} in-memory {self}", self.get_timeline_id())
Arc::clone(self.local_path_str())
}
}
@@ -2735,7 +2818,18 @@ impl Timeline {
let mut timeline = self;
let mut read_count = scopeguard::guard(0, |cnt| {
crate::metrics::READ_NUM_FS_LAYERS.observe(cnt as f64)
crate::metrics::READ_NUM_LAYERS_VISITED.observe(cnt as f64);
if cnt >= Self::LAYERS_VISITED_WARN_THRESHOLD {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut rate_limit = LOGGED.lock().unwrap();
let rate_limited = rate_limit.rate_limited();
rate_limit.call(|| {
warn!("Too many layers visited by non-vectored read: {} >= {}; rate limited {} occurences",
cnt, Self::LAYERS_VISITED_WARN_THRESHOLD, rate_limited);
});
}
});
// For debugging purposes, collect the path of layers that we traversed
@@ -2775,32 +2869,35 @@ impl Timeline {
if prev <= cont_lsn {
// Didn't make any progress in last iteration. Error out to avoid
// getting stuck in the loop.
return Err(layer_traversal_error(format!(
"could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}",
return Err(PageReconstructError::MissingKey(MissingKeyError {
stuck_at_lsn: true,
key,
Lsn(cont_lsn.0 - 1),
shard: self.shard_identity.get_shard_number(&key),
cont_lsn: Lsn(cont_lsn.0 - 1),
request_lsn,
timeline.ancestor_lsn
), traversal_path));
ancestor_lsn: Some(timeline.ancestor_lsn),
traversal_path,
backtrace: None,
}));
}
}
prev_lsn = Some(cont_lsn);
}
ValueReconstructResult::Missing => {
return Err(layer_traversal_error(
if cfg!(test) {
format!(
"could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}\n{}",
key, self.shard_identity.get_shard_number(&key), cont_lsn, request_lsn, std::backtrace::Backtrace::force_capture(),
)
} else {
format!(
"could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}",
key, self.shard_identity.get_shard_number(&key), cont_lsn, request_lsn
)
},
return Err(PageReconstructError::MissingKey(MissingKeyError {
stuck_at_lsn: false,
key,
shard: self.shard_identity.get_shard_number(&key),
cont_lsn,
request_lsn,
ancestor_lsn: None,
traversal_path,
));
backtrace: if cfg!(test) {
Some(std::backtrace::Backtrace::force_capture())
} else {
None
},
}));
}
}
@@ -2847,12 +2944,8 @@ impl Timeline {
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
// metrics: open_layer does not count as fs access, so we are not updating `read_count`
traversal_path.push((
result,
cont_lsn,
Box::new(move || open_layer.traversal_id()),
));
*read_count += 1;
traversal_path.push((result, cont_lsn, open_layer.traversal_id()));
continue 'outer;
}
}
@@ -2878,12 +2971,8 @@ impl Timeline {
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
// metrics: open_layer does not count as fs access, so we are not updating `read_count`
traversal_path.push((
result,
cont_lsn,
Box::new(move || frozen_layer.traversal_id()),
));
*read_count += 1;
traversal_path.push((result, cont_lsn, frozen_layer.traversal_id()));
continue 'outer;
}
}
@@ -2904,14 +2993,7 @@ impl Timeline {
};
cont_lsn = lsn_floor;
*read_count += 1;
traversal_path.push((
result,
cont_lsn,
Box::new({
let layer = layer.to_owned();
move || layer.traversal_id()
}),
));
traversal_path.push((result, cont_lsn, layer.traversal_id()));
continue 'outer;
} else if timeline.ancestor_timeline.is_some() {
// Nothing on this timeline. Traverse to parent
@@ -2968,7 +3050,8 @@ impl Timeline {
break;
}
cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
// Take the min to avoid reconstructing a page with data newer than request Lsn.
cont_lsn = std::cmp::min(Lsn(request_lsn.0 + 1), Lsn(timeline.ancestor_lsn.0 + 1));
timeline_owned = timeline
.get_ready_ancestor_timeline(ctx)
.await
@@ -3081,6 +3164,8 @@ impl Timeline {
unmapped_keyspace = keyspace_to_read;
cont_lsn = next_cont_lsn;
reconstruct_state.on_layer_visited();
} else {
break;
}
@@ -3524,7 +3609,7 @@ impl Timeline {
&self,
disk_consistent_lsn: Lsn,
layers_to_upload: impl IntoIterator<Item = ResidentLayer>,
) -> anyhow::Result<TimelineMetadata> {
) -> anyhow::Result<()> {
// We can only save a valid 'prev_record_lsn' value on disk if we
// flushed *all* in-memory changes to disk. We only track
// 'prev_record_lsn' in memory for the latest processed record, so we
@@ -3541,19 +3626,10 @@ impl Timeline {
None
};
let ancestor_timeline_id = self
.ancestor_timeline
.as_ref()
.map(|ancestor| ancestor.timeline_id);
let metadata = TimelineMetadata::new(
let update = crate::tenant::metadata::MetadataUpdate::new(
disk_consistent_lsn,
ondisk_prev_record_lsn,
ancestor_timeline_id,
self.ancestor_lsn,
*self.latest_gc_cutoff_lsn.read(),
self.initdb_lsn,
self.pg_version,
);
fail_point!("checkpoint-before-saving-metadata", |x| bail!(
@@ -3565,10 +3641,10 @@ impl Timeline {
for layer in layers_to_upload {
remote_client.schedule_layer_file_upload(layer)?;
}
remote_client.schedule_index_upload_for_metadata_update(&metadata)?;
remote_client.schedule_index_upload_for_metadata_update(&update)?;
}
Ok(metadata)
Ok(())
}
pub(crate) async fn preserve_initdb_archive(&self) -> anyhow::Result<()> {
@@ -4664,35 +4740,7 @@ impl Timeline {
}
}
type TraversalPathItem = (
ValueReconstructResult,
Lsn,
Box<dyn Send + FnOnce() -> TraversalId>,
);
/// Helper function for get_reconstruct_data() to add the path of layers traversed
/// to an error, as anyhow context information.
fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageReconstructError {
// We want the original 'msg' to be the outermost context. The outermost context
// is the most high-level information, which also gets propagated to the client.
let mut msg_iter = path
.into_iter()
.map(|(r, c, l)| {
format!(
"layer traversal: result {:?}, cont_lsn {}, layer: {}",
r,
c,
l(),
)
})
.chain(std::iter::once(msg));
// Construct initial message from the first traversed layer
let err = anyhow!(msg_iter.next().unwrap());
// Append all subsequent traversals, and the error message 'msg', as contexts.
let msg = msg_iter.fold(err, |err, msg| err.context(msg));
PageReconstructError::from(msg)
}
type TraversalPathItem = (ValueReconstructResult, Lsn, TraversalId);
struct TimelineWriterState {
open_layer: Arc<InMemoryLayer>,

View File

@@ -61,18 +61,18 @@ pub struct VectoredRead {
}
impl VectoredRead {
pub fn size(&self) -> usize {
pub(crate) fn size(&self) -> usize {
(self.end - self.start) as usize
}
}
#[derive(Eq, PartialEq)]
enum VectoredReadExtended {
pub(crate) enum VectoredReadExtended {
Yes,
No,
}
struct VectoredReadBuilder {
pub(crate) struct VectoredReadBuilder {
start: u64,
end: u64,
blobs_at: VecMap<u64, BlobMeta>,
@@ -80,7 +80,17 @@ struct VectoredReadBuilder {
}
impl VectoredReadBuilder {
fn new(start_offset: u64, end_offset: u64, meta: BlobMeta, max_read_size: usize) -> Self {
/// Start building a new vectored read.
///
/// Note that by design, this does not check against reading more than `max_read_size` to
/// support reading larger blobs than the configuration value. The builder will be single use
/// however after that.
pub(crate) fn new(
start_offset: u64,
end_offset: u64,
meta: BlobMeta,
max_read_size: usize,
) -> Self {
let mut blobs_at = VecMap::default();
blobs_at
.append(start_offset, meta)
@@ -97,7 +107,8 @@ impl VectoredReadBuilder {
/// Attempt to extend the current read with a new blob if the start
/// offset matches with the current end of the vectored read
/// and the resuting size is below the max read size
fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
tracing::trace!(start, end, "trying to extend");
let size = (end - start) as usize;
if self.end == start && self.size() + size <= self.max_read_size {
self.end = end;
@@ -111,11 +122,11 @@ impl VectoredReadBuilder {
VectoredReadExtended::No
}
fn size(&self) -> usize {
pub(crate) fn size(&self) -> usize {
(self.end - self.start) as usize
}
fn build(self) -> VectoredRead {
pub(crate) fn build(self) -> VectoredRead {
VectoredRead {
start: self.start,
end: self.end,

View File

@@ -55,6 +55,7 @@ impl NeonWalRecord {
/// Does replaying this WAL record initialize the page from scratch, or does
/// it need to be applied over the previous image of the page?
pub fn will_init(&self) -> bool {
// If you change this function, you'll also need to change ValueBytes::will_init
match self {
NeonWalRecord::Postgres { will_init, rec: _ } => *will_init,

156
poetry.lock generated
View File

@@ -2,87 +2,87 @@
[[package]]
name = "aiohttp"
version = "3.9.2"
version = "3.9.4"
description = "Async http client/server framework (asyncio)"
optional = false
python-versions = ">=3.8"
files = [
{file = "aiohttp-3.9.2-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:772fbe371788e61c58d6d3d904268e48a594ba866804d08c995ad71b144f94cb"},
{file = "aiohttp-3.9.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:edd4f1af2253f227ae311ab3d403d0c506c9b4410c7fc8d9573dec6d9740369f"},
{file = "aiohttp-3.9.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:cfee9287778399fdef6f8a11c9e425e1cb13cc9920fd3a3df8f122500978292b"},
{file = "aiohttp-3.9.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3cc158466f6a980a6095ee55174d1de5730ad7dec251be655d9a6a9dd7ea1ff9"},
{file = "aiohttp-3.9.2-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:54ec82f45d57c9a65a1ead3953b51c704f9587440e6682f689da97f3e8defa35"},
{file = "aiohttp-3.9.2-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:abeb813a18eb387f0d835ef51f88568540ad0325807a77a6e501fed4610f864e"},
{file = "aiohttp-3.9.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:cc91d07280d7d169f3a0f9179d8babd0ee05c79d4d891447629ff0d7d8089ec2"},
{file = "aiohttp-3.9.2-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b65e861f4bebfb660f7f0f40fa3eb9f2ab9af10647d05dac824390e7af8f75b7"},
{file = "aiohttp-3.9.2-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:04fd8ffd2be73d42bcf55fd78cde7958eeee6d4d8f73c3846b7cba491ecdb570"},
{file = "aiohttp-3.9.2-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:3d8d962b439a859b3ded9a1e111a4615357b01620a546bc601f25b0211f2da81"},
{file = "aiohttp-3.9.2-cp310-cp310-musllinux_1_1_ppc64le.whl", hash = "sha256:8ceb658afd12b27552597cf9a65d9807d58aef45adbb58616cdd5ad4c258c39e"},
{file = "aiohttp-3.9.2-cp310-cp310-musllinux_1_1_s390x.whl", hash = "sha256:0e4ee4df741670560b1bc393672035418bf9063718fee05e1796bf867e995fad"},
{file = "aiohttp-3.9.2-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:2dec87a556f300d3211decf018bfd263424f0690fcca00de94a837949fbcea02"},
{file = "aiohttp-3.9.2-cp310-cp310-win32.whl", hash = "sha256:3e1a800f988ce7c4917f34096f81585a73dbf65b5c39618b37926b1238cf9bc4"},
{file = "aiohttp-3.9.2-cp310-cp310-win_amd64.whl", hash = "sha256:ea510718a41b95c236c992b89fdfc3d04cc7ca60281f93aaada497c2b4e05c46"},
{file = "aiohttp-3.9.2-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:6aaa6f99256dd1b5756a50891a20f0d252bd7bdb0854c5d440edab4495c9f973"},
{file = "aiohttp-3.9.2-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:a27d8c70ad87bcfce2e97488652075a9bdd5b70093f50b10ae051dfe5e6baf37"},
{file = "aiohttp-3.9.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:54287bcb74d21715ac8382e9de146d9442b5f133d9babb7e5d9e453faadd005e"},
{file = "aiohttp-3.9.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5bb3d05569aa83011fcb346b5266e00b04180105fcacc63743fc2e4a1862a891"},
{file = "aiohttp-3.9.2-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:c8534e7d69bb8e8d134fe2be9890d1b863518582f30c9874ed7ed12e48abe3c4"},
{file = "aiohttp-3.9.2-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:4bd9d5b989d57b41e4ff56ab250c5ddf259f32db17159cce630fd543376bd96b"},
{file = "aiohttp-3.9.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fa6904088e6642609981f919ba775838ebf7df7fe64998b1a954fb411ffb4663"},
{file = "aiohttp-3.9.2-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:bda42eb410be91b349fb4ee3a23a30ee301c391e503996a638d05659d76ea4c2"},
{file = "aiohttp-3.9.2-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:193cc1ccd69d819562cc7f345c815a6fc51d223b2ef22f23c1a0f67a88de9a72"},
{file = "aiohttp-3.9.2-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:b9f1cb839b621f84a5b006848e336cf1496688059d2408e617af33e3470ba204"},
{file = "aiohttp-3.9.2-cp311-cp311-musllinux_1_1_ppc64le.whl", hash = "sha256:d22a0931848b8c7a023c695fa2057c6aaac19085f257d48baa24455e67df97ec"},
{file = "aiohttp-3.9.2-cp311-cp311-musllinux_1_1_s390x.whl", hash = "sha256:4112d8ba61fbd0abd5d43a9cb312214565b446d926e282a6d7da3f5a5aa71d36"},
{file = "aiohttp-3.9.2-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:c4ad4241b52bb2eb7a4d2bde060d31c2b255b8c6597dd8deac2f039168d14fd7"},
{file = "aiohttp-3.9.2-cp311-cp311-win32.whl", hash = "sha256:ee2661a3f5b529f4fc8a8ffee9f736ae054adfb353a0d2f78218be90617194b3"},
{file = "aiohttp-3.9.2-cp311-cp311-win_amd64.whl", hash = "sha256:4deae2c165a5db1ed97df2868ef31ca3cc999988812e82386d22937d9d6fed52"},
{file = "aiohttp-3.9.2-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:6f4cdba12539215aaecf3c310ce9d067b0081a0795dd8a8805fdb67a65c0572a"},
{file = "aiohttp-3.9.2-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:84e843b33d5460a5c501c05539809ff3aee07436296ff9fbc4d327e32aa3a326"},
{file = "aiohttp-3.9.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:8008d0f451d66140a5aa1c17e3eedc9d56e14207568cd42072c9d6b92bf19b52"},
{file = "aiohttp-3.9.2-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:61c47ab8ef629793c086378b1df93d18438612d3ed60dca76c3422f4fbafa792"},
{file = "aiohttp-3.9.2-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:bc71f748e12284312f140eaa6599a520389273174b42c345d13c7e07792f4f57"},
{file = "aiohttp-3.9.2-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:a1c3a4d0ab2f75f22ec80bca62385db2e8810ee12efa8c9e92efea45c1849133"},
{file = "aiohttp-3.9.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9a87aa0b13bbee025faa59fa58861303c2b064b9855d4c0e45ec70182bbeba1b"},
{file = "aiohttp-3.9.2-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e2cc0d04688b9f4a7854c56c18aa7af9e5b0a87a28f934e2e596ba7e14783192"},
{file = "aiohttp-3.9.2-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:1956e3ac376b1711c1533266dec4efd485f821d84c13ce1217d53e42c9e65f08"},
{file = "aiohttp-3.9.2-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:114da29f39eccd71b93a0fcacff178749a5c3559009b4a4498c2c173a6d74dff"},
{file = "aiohttp-3.9.2-cp312-cp312-musllinux_1_1_ppc64le.whl", hash = "sha256:3f17999ae3927d8a9a823a1283b201344a0627272f92d4f3e3a4efe276972fe8"},
{file = "aiohttp-3.9.2-cp312-cp312-musllinux_1_1_s390x.whl", hash = "sha256:f31df6a32217a34ae2f813b152a6f348154f948c83213b690e59d9e84020925c"},
{file = "aiohttp-3.9.2-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:7a75307ffe31329928a8d47eae0692192327c599113d41b278d4c12b54e1bd11"},
{file = "aiohttp-3.9.2-cp312-cp312-win32.whl", hash = "sha256:972b63d589ff8f305463593050a31b5ce91638918da38139b9d8deaba9e0fed7"},
{file = "aiohttp-3.9.2-cp312-cp312-win_amd64.whl", hash = "sha256:200dc0246f0cb5405c80d18ac905c8350179c063ea1587580e3335bfc243ba6a"},
{file = "aiohttp-3.9.2-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:158564d0d1020e0d3fe919a81d97aadad35171e13e7b425b244ad4337fc6793a"},
{file = "aiohttp-3.9.2-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:da1346cd0ccb395f0ed16b113ebb626fa43b7b07fd7344fce33e7a4f04a8897a"},
{file = "aiohttp-3.9.2-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:eaa9256de26ea0334ffa25f1913ae15a51e35c529a1ed9af8e6286dd44312554"},
{file = "aiohttp-3.9.2-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1543e7fb00214fb4ccead42e6a7d86f3bb7c34751ec7c605cca7388e525fd0b4"},
{file = "aiohttp-3.9.2-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:186e94570433a004e05f31f632726ae0f2c9dee4762a9ce915769ce9c0a23d89"},
{file = "aiohttp-3.9.2-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:d52d20832ac1560f4510d68e7ba8befbc801a2b77df12bd0cd2bcf3b049e52a4"},
{file = "aiohttp-3.9.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1c45e4e815ac6af3b72ca2bde9b608d2571737bb1e2d42299fc1ffdf60f6f9a1"},
{file = "aiohttp-3.9.2-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:aa906b9bdfd4a7972dd0628dbbd6413d2062df5b431194486a78f0d2ae87bd55"},
{file = "aiohttp-3.9.2-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:68bbee9e17d66f17bb0010aa15a22c6eb28583edcc8b3212e2b8e3f77f3ebe2a"},
{file = "aiohttp-3.9.2-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:4c189b64bd6d9a403a1a3f86a3ab3acbc3dc41a68f73a268a4f683f89a4dec1f"},
{file = "aiohttp-3.9.2-cp38-cp38-musllinux_1_1_ppc64le.whl", hash = "sha256:8a7876f794523123bca6d44bfecd89c9fec9ec897a25f3dd202ee7fc5c6525b7"},
{file = "aiohttp-3.9.2-cp38-cp38-musllinux_1_1_s390x.whl", hash = "sha256:d23fba734e3dd7b1d679b9473129cd52e4ec0e65a4512b488981a56420e708db"},
{file = "aiohttp-3.9.2-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:b141753be581fab842a25cb319f79536d19c2a51995d7d8b29ee290169868eab"},
{file = "aiohttp-3.9.2-cp38-cp38-win32.whl", hash = "sha256:103daf41ff3b53ba6fa09ad410793e2e76c9d0269151812e5aba4b9dd674a7e8"},
{file = "aiohttp-3.9.2-cp38-cp38-win_amd64.whl", hash = "sha256:328918a6c2835861ff7afa8c6d2c70c35fdaf996205d5932351bdd952f33fa2f"},
{file = "aiohttp-3.9.2-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:5264d7327c9464786f74e4ec9342afbbb6ee70dfbb2ec9e3dfce7a54c8043aa3"},
{file = "aiohttp-3.9.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:07205ae0015e05c78b3288c1517afa000823a678a41594b3fdc870878d645305"},
{file = "aiohttp-3.9.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:ae0a1e638cffc3ec4d4784b8b4fd1cf28968febc4bd2718ffa25b99b96a741bd"},
{file = "aiohttp-3.9.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d43302a30ba1166325974858e6ef31727a23bdd12db40e725bec0f759abce505"},
{file = "aiohttp-3.9.2-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:16a967685907003765855999af11a79b24e70b34dc710f77a38d21cd9fc4f5fe"},
{file = "aiohttp-3.9.2-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:6fa3ee92cd441d5c2d07ca88d7a9cef50f7ec975f0117cd0c62018022a184308"},
{file = "aiohttp-3.9.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0b500c5ad9c07639d48615a770f49618130e61be36608fc9bc2d9bae31732b8f"},
{file = "aiohttp-3.9.2-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c07327b368745b1ce2393ae9e1aafed7073d9199e1dcba14e035cc646c7941bf"},
{file = "aiohttp-3.9.2-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:cc7d6502c23a0ec109687bf31909b3fb7b196faf198f8cff68c81b49eb316ea9"},
{file = "aiohttp-3.9.2-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:07be2be7071723c3509ab5c08108d3a74f2181d4964e869f2504aaab68f8d3e8"},
{file = "aiohttp-3.9.2-cp39-cp39-musllinux_1_1_ppc64le.whl", hash = "sha256:122468f6fee5fcbe67cb07014a08c195b3d4c41ff71e7b5160a7bcc41d585a5f"},
{file = "aiohttp-3.9.2-cp39-cp39-musllinux_1_1_s390x.whl", hash = "sha256:00a9abcea793c81e7f8778ca195a1714a64f6d7436c4c0bb168ad2a212627000"},
{file = "aiohttp-3.9.2-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:7a9825fdd64ecac5c670234d80bb52bdcaa4139d1f839165f548208b3779c6c6"},
{file = "aiohttp-3.9.2-cp39-cp39-win32.whl", hash = "sha256:5422cd9a4a00f24c7244e1b15aa9b87935c85fb6a00c8ac9b2527b38627a9211"},
{file = "aiohttp-3.9.2-cp39-cp39-win_amd64.whl", hash = "sha256:7d579dcd5d82a86a46f725458418458fa43686f6a7b252f2966d359033ffc8ab"},
{file = "aiohttp-3.9.2.tar.gz", hash = "sha256:b0ad0a5e86ce73f5368a164c10ada10504bf91869c05ab75d982c6048217fbf7"},
{file = "aiohttp-3.9.4-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:76d32588ef7e4a3f3adff1956a0ba96faabbdee58f2407c122dd45aa6e34f372"},
{file = "aiohttp-3.9.4-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:56181093c10dbc6ceb8a29dfeea1e815e1dfdc020169203d87fd8d37616f73f9"},
{file = "aiohttp-3.9.4-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:c7a5b676d3c65e88b3aca41816bf72831898fcd73f0cbb2680e9d88e819d1e4d"},
{file = "aiohttp-3.9.4-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d1df528a85fb404899d4207a8d9934cfd6be626e30e5d3a5544a83dbae6d8a7e"},
{file = "aiohttp-3.9.4-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:f595db1bceabd71c82e92df212dd9525a8a2c6947d39e3c994c4f27d2fe15b11"},
{file = "aiohttp-3.9.4-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:9c0b09d76e5a4caac3d27752027fbd43dc987b95f3748fad2b924a03fe8632ad"},
{file = "aiohttp-3.9.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:689eb4356649ec9535b3686200b231876fb4cab4aca54e3bece71d37f50c1d13"},
{file = "aiohttp-3.9.4-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a3666cf4182efdb44d73602379a66f5fdfd5da0db5e4520f0ac0dcca644a3497"},
{file = "aiohttp-3.9.4-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:b65b0f8747b013570eea2f75726046fa54fa8e0c5db60f3b98dd5d161052004a"},
{file = "aiohttp-3.9.4-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:a1885d2470955f70dfdd33a02e1749613c5a9c5ab855f6db38e0b9389453dce7"},
{file = "aiohttp-3.9.4-cp310-cp310-musllinux_1_1_ppc64le.whl", hash = "sha256:0593822dcdb9483d41f12041ff7c90d4d1033ec0e880bcfaf102919b715f47f1"},
{file = "aiohttp-3.9.4-cp310-cp310-musllinux_1_1_s390x.whl", hash = "sha256:47f6eb74e1ecb5e19a78f4a4228aa24df7fbab3b62d4a625d3f41194a08bd54f"},
{file = "aiohttp-3.9.4-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:c8b04a3dbd54de6ccb7604242fe3ad67f2f3ca558f2d33fe19d4b08d90701a89"},
{file = "aiohttp-3.9.4-cp310-cp310-win32.whl", hash = "sha256:8a78dfb198a328bfb38e4308ca8167028920fb747ddcf086ce706fbdd23b2926"},
{file = "aiohttp-3.9.4-cp310-cp310-win_amd64.whl", hash = "sha256:e78da6b55275987cbc89141a1d8e75f5070e577c482dd48bd9123a76a96f0bbb"},
{file = "aiohttp-3.9.4-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:c111b3c69060d2bafc446917534150fd049e7aedd6cbf21ba526a5a97b4402a5"},
{file = "aiohttp-3.9.4-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:efbdd51872cf170093998c87ccdf3cb5993add3559341a8e5708bcb311934c94"},
{file = "aiohttp-3.9.4-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:7bfdb41dc6e85d8535b00d73947548a748e9534e8e4fddd2638109ff3fb081df"},
{file = "aiohttp-3.9.4-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2bd9d334412961125e9f68d5b73c1d0ab9ea3f74a58a475e6b119f5293eee7ba"},
{file = "aiohttp-3.9.4-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:35d78076736f4a668d57ade00c65d30a8ce28719d8a42471b2a06ccd1a2e3063"},
{file = "aiohttp-3.9.4-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:824dff4f9f4d0f59d0fa3577932ee9a20e09edec8a2f813e1d6b9f89ced8293f"},
{file = "aiohttp-3.9.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:52b8b4e06fc15519019e128abedaeb56412b106ab88b3c452188ca47a25c4093"},
{file = "aiohttp-3.9.4-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:eae569fb1e7559d4f3919965617bb39f9e753967fae55ce13454bec2d1c54f09"},
{file = "aiohttp-3.9.4-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:69b97aa5792428f321f72aeb2f118e56893371f27e0b7d05750bcad06fc42ca1"},
{file = "aiohttp-3.9.4-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:4d79aad0ad4b980663316f26d9a492e8fab2af77c69c0f33780a56843ad2f89e"},
{file = "aiohttp-3.9.4-cp311-cp311-musllinux_1_1_ppc64le.whl", hash = "sha256:d6577140cd7db19e430661e4b2653680194ea8c22c994bc65b7a19d8ec834403"},
{file = "aiohttp-3.9.4-cp311-cp311-musllinux_1_1_s390x.whl", hash = "sha256:9860d455847cd98eb67897f5957b7cd69fbcb436dd3f06099230f16a66e66f79"},
{file = "aiohttp-3.9.4-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:69ff36d3f8f5652994e08bd22f093e11cfd0444cea310f92e01b45a4e46b624e"},
{file = "aiohttp-3.9.4-cp311-cp311-win32.whl", hash = "sha256:e27d3b5ed2c2013bce66ad67ee57cbf614288bda8cdf426c8d8fe548316f1b5f"},
{file = "aiohttp-3.9.4-cp311-cp311-win_amd64.whl", hash = "sha256:d6a67e26daa686a6fbdb600a9af8619c80a332556245fa8e86c747d226ab1a1e"},
{file = "aiohttp-3.9.4-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:c5ff8ff44825736a4065d8544b43b43ee4c6dd1530f3a08e6c0578a813b0aa35"},
{file = "aiohttp-3.9.4-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:d12a244627eba4e9dc52cbf924edef905ddd6cafc6513849b4876076a6f38b0e"},
{file = "aiohttp-3.9.4-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:dcad56c8d8348e7e468899d2fb3b309b9bc59d94e6db08710555f7436156097f"},
{file = "aiohttp-3.9.4-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4f7e69a7fd4b5ce419238388e55abd220336bd32212c673ceabc57ccf3d05b55"},
{file = "aiohttp-3.9.4-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:c4870cb049f10d7680c239b55428916d84158798eb8f353e74fa2c98980dcc0b"},
{file = "aiohttp-3.9.4-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:3b2feaf1b7031ede1bc0880cec4b0776fd347259a723d625357bb4b82f62687b"},
{file = "aiohttp-3.9.4-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:939393e8c3f0a5bcd33ef7ace67680c318dc2ae406f15e381c0054dd658397de"},
{file = "aiohttp-3.9.4-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7d2334e387b2adcc944680bebcf412743f2caf4eeebd550f67249c1c3696be04"},
{file = "aiohttp-3.9.4-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:e0198ea897680e480845ec0ffc5a14e8b694e25b3f104f63676d55bf76a82f1a"},
{file = "aiohttp-3.9.4-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:e40d2cd22914d67c84824045861a5bb0fb46586b15dfe4f046c7495bf08306b2"},
{file = "aiohttp-3.9.4-cp312-cp312-musllinux_1_1_ppc64le.whl", hash = "sha256:aba80e77c227f4234aa34a5ff2b6ff30c5d6a827a91d22ff6b999de9175d71bd"},
{file = "aiohttp-3.9.4-cp312-cp312-musllinux_1_1_s390x.whl", hash = "sha256:fb68dc73bc8ac322d2e392a59a9e396c4f35cb6fdbdd749e139d1d6c985f2527"},
{file = "aiohttp-3.9.4-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:f3460a92638dce7e47062cf088d6e7663adb135e936cb117be88d5e6c48c9d53"},
{file = "aiohttp-3.9.4-cp312-cp312-win32.whl", hash = "sha256:32dc814ddbb254f6170bca198fe307920f6c1308a5492f049f7f63554b88ef36"},
{file = "aiohttp-3.9.4-cp312-cp312-win_amd64.whl", hash = "sha256:63f41a909d182d2b78fe3abef557fcc14da50c7852f70ae3be60e83ff64edba5"},
{file = "aiohttp-3.9.4-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:c3770365675f6be220032f6609a8fbad994d6dcf3ef7dbcf295c7ee70884c9af"},
{file = "aiohttp-3.9.4-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:305edae1dea368ce09bcb858cf5a63a064f3bff4767dec6fa60a0cc0e805a1d3"},
{file = "aiohttp-3.9.4-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:6f121900131d116e4a93b55ab0d12ad72573f967b100e49086e496a9b24523ea"},
{file = "aiohttp-3.9.4-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b71e614c1ae35c3d62a293b19eface83d5e4d194e3eb2fabb10059d33e6e8cbf"},
{file = "aiohttp-3.9.4-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:419f009fa4cfde4d16a7fc070d64f36d70a8d35a90d71aa27670bba2be4fd039"},
{file = "aiohttp-3.9.4-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:7b39476ee69cfe64061fd77a73bf692c40021f8547cda617a3466530ef63f947"},
{file = "aiohttp-3.9.4-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b33f34c9c7decdb2ab99c74be6443942b730b56d9c5ee48fb7df2c86492f293c"},
{file = "aiohttp-3.9.4-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c78700130ce2dcebb1a8103202ae795be2fa8c9351d0dd22338fe3dac74847d9"},
{file = "aiohttp-3.9.4-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:268ba22d917655d1259af2d5659072b7dc11b4e1dc2cb9662fdd867d75afc6a4"},
{file = "aiohttp-3.9.4-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:17e7c051f53a0d2ebf33013a9cbf020bb4e098c4bc5bce6f7b0c962108d97eab"},
{file = "aiohttp-3.9.4-cp38-cp38-musllinux_1_1_ppc64le.whl", hash = "sha256:7be99f4abb008cb38e144f85f515598f4c2c8932bf11b65add0ff59c9c876d99"},
{file = "aiohttp-3.9.4-cp38-cp38-musllinux_1_1_s390x.whl", hash = "sha256:d58a54d6ff08d2547656356eea8572b224e6f9bbc0cf55fa9966bcaac4ddfb10"},
{file = "aiohttp-3.9.4-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:7673a76772bda15d0d10d1aa881b7911d0580c980dbd16e59d7ba1422b2d83cd"},
{file = "aiohttp-3.9.4-cp38-cp38-win32.whl", hash = "sha256:e4370dda04dc8951012f30e1ce7956a0a226ac0714a7b6c389fb2f43f22a250e"},
{file = "aiohttp-3.9.4-cp38-cp38-win_amd64.whl", hash = "sha256:eb30c4510a691bb87081192a394fb661860e75ca3896c01c6d186febe7c88530"},
{file = "aiohttp-3.9.4-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:84e90494db7df3be5e056f91412f9fa9e611fbe8ce4aaef70647297f5943b276"},
{file = "aiohttp-3.9.4-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:7d4845f8501ab28ebfdbeab980a50a273b415cf69e96e4e674d43d86a464df9d"},
{file = "aiohttp-3.9.4-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:69046cd9a2a17245c4ce3c1f1a4ff8c70c7701ef222fce3d1d8435f09042bba1"},
{file = "aiohttp-3.9.4-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8b73a06bafc8dcc508420db43b4dd5850e41e69de99009d0351c4f3007960019"},
{file = "aiohttp-3.9.4-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:418bb0038dfafeac923823c2e63226179976c76f981a2aaad0ad5d51f2229bca"},
{file = "aiohttp-3.9.4-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:71a8f241456b6c2668374d5d28398f8e8cdae4cce568aaea54e0f39359cd928d"},
{file = "aiohttp-3.9.4-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:935c369bf8acc2dc26f6eeb5222768aa7c62917c3554f7215f2ead7386b33748"},
{file = "aiohttp-3.9.4-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:74e4e48c8752d14ecfb36d2ebb3d76d614320570e14de0a3aa7a726ff150a03c"},
{file = "aiohttp-3.9.4-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:916b0417aeddf2c8c61291238ce25286f391a6acb6f28005dd9ce282bd6311b6"},
{file = "aiohttp-3.9.4-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:9b6787b6d0b3518b2ee4cbeadd24a507756ee703adbac1ab6dc7c4434b8c572a"},
{file = "aiohttp-3.9.4-cp39-cp39-musllinux_1_1_ppc64le.whl", hash = "sha256:221204dbda5ef350e8db6287937621cf75e85778b296c9c52260b522231940ed"},
{file = "aiohttp-3.9.4-cp39-cp39-musllinux_1_1_s390x.whl", hash = "sha256:10afd99b8251022ddf81eaed1d90f5a988e349ee7d779eb429fb07b670751e8c"},
{file = "aiohttp-3.9.4-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:2506d9f7a9b91033201be9ffe7d89c6a54150b0578803cce5cb84a943d075bc3"},
{file = "aiohttp-3.9.4-cp39-cp39-win32.whl", hash = "sha256:e571fdd9efd65e86c6af2f332e0e95dad259bfe6beb5d15b3c3eca3a6eb5d87b"},
{file = "aiohttp-3.9.4-cp39-cp39-win_amd64.whl", hash = "sha256:7d29dd5319d20aa3b7749719ac9685fbd926f71ac8c77b2477272725f882072d"},
{file = "aiohttp-3.9.4.tar.gz", hash = "sha256:6ff71ede6d9a5a58cfb7b6fffc83ab5d4a63138276c771ac91ceaaddf5459644"},
]
[package.dependencies]
@@ -2900,4 +2900,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.0"
python-versions = "^3.9"
content-hash = "df7161da4fdc3cba0a445176fc9dda2a0e8a53e13a7aa8a864385ca259381b41"
content-hash = "b3452b50901123fd5f2c385ce8a0c1c492296393b8a7926a322b6df0ea3ac572"

View File

@@ -200,6 +200,12 @@ struct ProxyCliArgs {
/// Size of each event is no more than 400 bytes, so 2**22 is about 200MB before the compression.
#[clap(long, default_value = "4194304")]
metric_backup_collection_chunk_size: usize,
/// Whether to retry the connection to the compute node
#[clap(long, default_value = config::RetryConfig::CONNECT_TO_COMPUTE_DEFAULT_VALUES)]
connect_to_compute_retry: String,
/// Whether to retry the wake_compute request
#[clap(long, default_value = config::RetryConfig::WAKE_COMPUTE_DEFAULT_VALUES)]
wake_compute_retry: String,
}
#[derive(clap::Args, Clone, Copy, Debug)]
@@ -331,7 +337,6 @@ async fn main() -> anyhow::Result<()> {
let proxy_listener = TcpListener::bind(proxy_address).await?;
let cancellation_token = CancellationToken::new();
let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new(&config.endpoint_rps_limit));
let cancel_map = CancelMap::default();
let redis_publisher = match &regional_redis_client {
@@ -357,7 +362,6 @@ async fn main() -> anyhow::Result<()> {
config,
proxy_listener,
cancellation_token.clone(),
endpoint_rate_limiter.clone(),
cancellation_handler.clone(),
));
@@ -372,7 +376,6 @@ async fn main() -> anyhow::Result<()> {
config,
serverless_listener,
cancellation_token.clone(),
endpoint_rate_limiter.clone(),
cancellation_handler.clone(),
));
}
@@ -533,7 +536,11 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
let url = args.auth_endpoint.parse()?;
let endpoint = http::Endpoint::new(url, http::new_client());
let api = console::provider::neon::Api::new(endpoint, caches, locks);
let mut endpoint_rps_limit = args.endpoint_rps_limit.clone();
RateBucketInfo::validate(&mut endpoint_rps_limit)?;
let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new(endpoint_rps_limit));
let api =
console::provider::neon::Api::new(endpoint, caches, locks, endpoint_rate_limiter);
let api = console::provider::ConsoleBackend::Console(api);
auth::BackendType::Console(MaybeOwned::Owned(api), ())
}
@@ -567,8 +574,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
rate_limit_ip_subnet: args.auth_rate_limit_ip_subnet,
};
let mut endpoint_rps_limit = args.endpoint_rps_limit.clone();
RateBucketInfo::validate(&mut endpoint_rps_limit)?;
let mut redis_rps_limit = args.redis_rps_limit.clone();
RateBucketInfo::validate(&mut redis_rps_limit)?;
@@ -581,11 +586,14 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
authentication_config,
require_client_ip: args.require_client_ip,
disable_ip_check_for_http: args.disable_ip_check_for_http,
endpoint_rps_limit,
redis_rps_limit,
handshake_timeout: args.handshake_timeout,
region: args.region.clone(),
aws_region: args.aws_region.clone(),
wake_compute_retry_config: config::RetryConfig::parse(&args.wake_compute_retry)?,
connect_to_compute_retry_config: config::RetryConfig::parse(
&args.connect_to_compute_retry,
)?,
}));
Ok(config)

View File

@@ -70,20 +70,14 @@ impl EndpointsCache {
if !self.ready.load(Ordering::Acquire) {
return true;
}
// If cache is disabled, just collect the metrics and return.
if self.config.disable_cache {
let rejected = self.should_reject(endpoint);
ctx.set_rejected(rejected);
info!(?rejected, "check endpoint is valid, disabled cache");
return true;
}
// If the limiter allows, we don't need to check the cache.
if self.limiter.lock().await.check() {
return true;
}
let rejected = self.should_reject(endpoint);
info!(?rejected, "check endpoint is valid, enabled cache");
ctx.set_rejected(rejected);
info!(?rejected, "check endpoint is valid, disabled cache");
// If cache is disabled, just collect the metrics and return or
// If the limiter allows, we don't need to check the cache.
if self.config.disable_cache || self.limiter.lock().await.check() {
return true;
}
!rejected
}
fn should_reject(&self, endpoint: &EndpointId) -> bool {

View File

@@ -29,11 +29,12 @@ pub struct ProxyConfig {
pub authentication_config: AuthenticationConfig,
pub require_client_ip: bool,
pub disable_ip_check_for_http: bool,
pub endpoint_rps_limit: Vec<RateBucketInfo>,
pub redis_rps_limit: Vec<RateBucketInfo>,
pub region: String,
pub handshake_timeout: Duration,
pub aws_region: String,
pub wake_compute_retry_config: RetryConfig,
pub connect_to_compute_retry_config: RetryConfig,
}
#[derive(Debug)]
@@ -518,6 +519,59 @@ impl FromStr for ProjectInfoCacheOptions {
}
}
/// This is a config for connect to compute and wake compute.
#[derive(Clone, Copy, Debug)]
pub struct RetryConfig {
/// Number of times we should retry.
pub max_retries: u32,
/// Retry duration is base_delay * backoff_factor ^ n, where n starts at 0
pub base_delay: tokio::time::Duration,
/// Exponential base for retry wait duration
pub backoff_factor: f64,
}
impl RetryConfig {
/// Default options for RetryConfig.
/// Total delay for 4 retries with 1s base delay and 2.0 backoff factor is 7s.
pub const CONNECT_TO_COMPUTE_DEFAULT_VALUES: &'static str =
"num_retries=4,base_retry_wait_duration=1s,retry_wait_exponent_base=2.0";
/// Total delay for 4 retries with 1s base delay and 2.0 backoff factor is 7s.
/// Cplane has timeout of 60s on each request.
pub const WAKE_COMPUTE_DEFAULT_VALUES: &'static str =
"num_retries=4,base_retry_wait_duration=1s,retry_wait_exponent_base=2.0";
/// Parse retry options passed via cmdline.
/// Example: [`Self::CONNECT_TO_COMPUTE_DEFAULT_VALUES`].
pub fn parse(options: &str) -> anyhow::Result<Self> {
let mut num_retries = None;
let mut base_retry_wait_duration = None;
let mut retry_wait_exponent_base = None;
for option in options.split(',') {
let (key, value) = option
.split_once('=')
.with_context(|| format!("bad key-value pair: {option}"))?;
match key {
"num_retries" => num_retries = Some(value.parse()?),
"base_retry_wait_duration" => {
base_retry_wait_duration = Some(humantime::parse_duration(value)?)
}
"retry_wait_exponent_base" => retry_wait_exponent_base = Some(value.parse()?),
unknown => bail!("unknown key: {unknown}"),
}
}
Ok(Self {
max_retries: num_retries.context("missing `num_retries`")?,
base_delay: base_retry_wait_duration.context("missing `base_retry_wait_duration`")?,
backoff_factor: retry_wait_exponent_base
.context("missing `retry_wait_exponent_base`")?,
})
}
}
/// Helper for cmdline cache options parsing.
pub struct WakeComputeLockOptions {
/// The number of shards the lock map should have

View File

@@ -208,6 +208,9 @@ pub mod errors {
#[error(transparent)]
ApiError(ApiError),
#[error("Too many connections attempts")]
TooManyConnections,
#[error("Timeout waiting to acquire wake compute lock")]
TimeoutError,
}
@@ -240,6 +243,8 @@ pub mod errors {
// However, API might return a meaningful error.
ApiError(e) => e.to_string_client(),
TooManyConnections => self.to_string(),
TimeoutError => "timeout while acquiring the compute resource lock".to_owned(),
}
}
@@ -250,6 +255,7 @@ pub mod errors {
match self {
WakeComputeError::BadComputeAddress(_) => crate::error::ErrorKind::ControlPlane,
WakeComputeError::ApiError(e) => e.get_error_kind(),
WakeComputeError::TooManyConnections => crate::error::ErrorKind::RateLimit,
WakeComputeError::TimeoutError => crate::error::ErrorKind::ServiceRateLimit,
}
}

View File

@@ -12,6 +12,7 @@ use crate::{
console::messages::ColdStartInfo,
http,
metrics::{CacheOutcome, Metrics},
rate_limiter::EndpointRateLimiter,
scram, Normalize,
};
use crate::{cache::Cached, context::RequestMonitoring};
@@ -25,6 +26,7 @@ pub struct Api {
endpoint: http::Endpoint,
pub caches: &'static ApiCaches,
pub locks: &'static ApiLocks,
pub endpoint_rate_limiter: Arc<EndpointRateLimiter>,
jwt: String,
}
@@ -34,6 +36,7 @@ impl Api {
endpoint: http::Endpoint,
caches: &'static ApiCaches,
locks: &'static ApiLocks,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
) -> Self {
let jwt: String = match std::env::var("NEON_PROXY_TO_CONTROLPLANE_TOKEN") {
Ok(v) => v,
@@ -43,6 +46,7 @@ impl Api {
endpoint,
caches,
locks,
endpoint_rate_limiter,
jwt,
}
}
@@ -277,6 +281,14 @@ impl super::Api for Api {
return Ok(cached);
}
// check rate limit
if !self
.endpoint_rate_limiter
.check(user_info.endpoint.normalize().into(), 1)
{
return Err(WakeComputeError::TooManyConnections);
}
let permit = self.locks.get_wake_compute_permit(&key).await?;
// after getting back a permit - it's possible the cache was filled

View File

@@ -51,7 +51,7 @@ pub struct RequestMonitoring {
sender: Option<mpsc::UnboundedSender<RequestData>>,
pub latency_timer: LatencyTimer,
// Whether proxy decided that it's not a valid endpoint end rejected it before going to cplane.
rejected: bool,
rejected: Option<bool>,
}
#[derive(Clone, Debug)]
@@ -96,7 +96,7 @@ impl RequestMonitoring {
error_kind: None,
auth_method: None,
success: false,
rejected: false,
rejected: None,
cold_start_info: ColdStartInfo::Unknown,
sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
@@ -118,7 +118,7 @@ impl RequestMonitoring {
}
pub fn set_rejected(&mut self, rejected: bool) {
self.rejected = rejected;
self.rejected = Some(rejected);
}
pub fn set_cold_start_info(&mut self, info: ColdStartInfo) {
@@ -200,27 +200,28 @@ impl Drop for RequestMonitoring {
} else {
ConnectOutcome::Failed
};
let rejected = self.rejected;
let ep = self
.endpoint_id
.as_ref()
.map(|x| x.as_str())
.unwrap_or_default();
// This makes sense only if cache is disabled
info!(
?ep,
?outcome,
?rejected,
"check endpoint is valid with outcome"
);
Metrics::get()
.proxy
.invalid_endpoints_total
.inc(InvalidEndpointsGroup {
protocol: self.protocol,
rejected: rejected.into(),
outcome,
});
if let Some(rejected) = self.rejected {
let ep = self
.endpoint_id
.as_ref()
.map(|x| x.as_str())
.unwrap_or_default();
// This makes sense only if cache is disabled
info!(
?outcome,
?rejected,
?ep,
"check endpoint is valid with outcome"
);
Metrics::get()
.proxy
.invalid_endpoints_total
.inc(InvalidEndpointsGroup {
protocol: self.protocol,
rejected: rejected.into(),
outcome,
});
}
if let Some(tx) = self.sender.take() {
let _: Result<(), _> = tx.send(RequestData::from(&*self));
}

View File

@@ -119,6 +119,10 @@ pub struct ProxyMetrics {
/// Number of invalid endpoints (per protocol, per rejected).
pub invalid_endpoints_total: CounterVec<InvalidEndpointsSet>,
/// Number of retries (per outcome, per retry_type).
#[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]))]
pub retries_metric: HistogramVec<RetriesMetricSet, 9>,
}
#[derive(MetricGroup)]
@@ -480,3 +484,16 @@ pub struct InvalidEndpointsGroup {
pub rejected: Bool,
pub outcome: ConnectOutcome,
}
#[derive(LabelGroup)]
#[label(set = RetriesMetricSet)]
pub struct RetriesMetricGroup {
pub outcome: ConnectOutcome,
pub retry_type: RetryType,
}
#[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
pub enum RetryType {
WakeCompute,
ConnectToCompute,
}

View File

@@ -19,9 +19,8 @@ use crate::{
metrics::{Metrics, NumClientConnectionsGuard},
protocol2::WithClientIp,
proxy::handshake::{handshake, HandshakeData},
rate_limiter::EndpointRateLimiter,
stream::{PqStream, Stream},
EndpointCacheKey, Normalize,
EndpointCacheKey,
};
use futures::TryFutureExt;
use itertools::Itertools;
@@ -61,7 +60,6 @@ pub async fn task_main(
config: &'static ProxyConfig,
listener: tokio::net::TcpListener,
cancellation_token: CancellationToken,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
cancellation_handler: Arc<CancellationHandlerMain>,
) -> anyhow::Result<()> {
scopeguard::defer! {
@@ -86,7 +84,6 @@ pub async fn task_main(
let session_id = uuid::Uuid::new_v4();
let cancellation_handler = Arc::clone(&cancellation_handler);
let endpoint_rate_limiter = endpoint_rate_limiter.clone();
tracing::info!(protocol = "tcp", %session_id, "accepted new TCP connection");
@@ -128,7 +125,6 @@ pub async fn task_main(
cancellation_handler,
socket,
ClientMode::Tcp,
endpoint_rate_limiter,
conn_gauge,
)
.instrument(span.clone())
@@ -242,7 +238,6 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
cancellation_handler: Arc<CancellationHandlerMain>,
stream: S,
mode: ClientMode,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
conn_gauge: NumClientConnectionsGuard<'static>,
) -> Result<Option<ProxyPassthrough<CancellationHandlerMainInternal, S>>, ClientRequestError> {
info!(
@@ -288,15 +283,6 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
Err(e) => stream.throw_error(e).await?,
};
// check rate limit
if let Some(ep) = user_info.get_endpoint() {
if !endpoint_rate_limiter.check(ep.normalize(), 1) {
return stream
.throw_error(auth::AuthError::too_many_connections())
.await?;
}
}
let user = user_info.get_user().to_owned();
let user_info = match user_info
.authenticate(
@@ -322,6 +308,8 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
&TcpMechanism { params: &params },
&user_info,
mode.allow_self_signed_compute(config),
config.wake_compute_retry_config,
config.connect_to_compute_retry_config,
)
.or_else(|e| stream.throw_error(e))
.await?;

View File

@@ -1,10 +1,11 @@
use crate::{
auth::backend::ComputeCredentialKeys,
compute::{self, PostgresConnection},
config::RetryConfig,
console::{self, errors::WakeComputeError, CachedNodeInfo, NodeInfo},
context::RequestMonitoring,
error::ReportableError,
metrics::{ConnectionFailureKind, Metrics},
metrics::{ConnectOutcome, ConnectionFailureKind, Metrics, RetriesMetricGroup, RetryType},
proxy::{
retry::{retry_after, ShouldRetry},
wake_compute::wake_compute,
@@ -93,19 +94,23 @@ pub async fn connect_to_compute<M: ConnectMechanism, B: ComputeConnectBackend>(
mechanism: &M,
user_info: &B,
allow_self_signed_compute: bool,
wake_compute_retry_config: RetryConfig,
connect_to_compute_retry_config: RetryConfig,
) -> Result<M::Connection, M::Error>
where
M::ConnectError: ShouldRetry + std::fmt::Debug,
M::Error: From<WakeComputeError>,
{
let mut num_retries = 0;
let mut node_info = wake_compute(&mut num_retries, ctx, user_info).await?;
let mut node_info =
wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?;
if let Some(keys) = user_info.get_keys() {
node_info.set_keys(keys);
}
node_info.allow_self_signed_compute = allow_self_signed_compute;
// let mut node_info = credentials.get_node_info(ctx, user_info).await?;
mechanism.update_connect_config(&mut node_info.config);
let retry_type = RetryType::ConnectToCompute;
// try once
let err = match mechanism
@@ -114,6 +119,13 @@ where
{
Ok(res) => {
ctx.latency_timer.success();
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Success,
retry_type,
},
num_retries.into(),
);
return Ok(res);
}
Err(e) => e,
@@ -124,7 +136,7 @@ where
let node_info = if !node_info.cached() {
// If we just recieved this from cplane and dodn't get it from cache, we shouldn't retry.
// Do not need to retrieve a new node_info, just return the old one.
if !err.should_retry(num_retries) {
if !err.should_retry(num_retries, connect_to_compute_retry_config) {
return Err(err.into());
}
node_info
@@ -132,7 +144,8 @@ where
// if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
info!("compute node's state has likely changed; requesting a wake-up");
let old_node_info = invalidate_cache(node_info);
let mut node_info = wake_compute(&mut num_retries, ctx, user_info).await?;
let mut node_info =
wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?;
node_info.reuse_settings(old_node_info);
mechanism.update_connect_config(&mut node_info.config);
@@ -151,19 +164,34 @@ where
{
Ok(res) => {
ctx.latency_timer.success();
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Success,
retry_type,
},
num_retries.into(),
);
info!(?num_retries, "connected to compute node after");
return Ok(res);
}
Err(e) => {
let retriable = e.should_retry(num_retries);
let retriable = e.should_retry(num_retries, connect_to_compute_retry_config);
if !retriable {
error!(error = ?e, num_retries, retriable, "couldn't connect to compute node");
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Failed,
retry_type,
},
num_retries.into(),
);
return Err(e.into());
}
warn!(error = ?e, num_retries, retriable, "couldn't connect to compute node");
}
}
let wait_duration = retry_after(num_retries);
let wait_duration = retry_after(num_retries, connect_to_compute_retry_config);
num_retries += 1;
time::sleep(wait_duration).await;

View File

@@ -1,18 +1,12 @@
use crate::compute;
use crate::{compute, config::RetryConfig};
use std::{error::Error, io};
use tokio::time;
/// Number of times we should retry the `/proxy_wake_compute` http request.
/// Retry duration is BASE_RETRY_WAIT_DURATION * RETRY_WAIT_EXPONENT_BASE ^ n, where n starts at 0
pub const NUM_RETRIES_CONNECT: u32 = 16;
const BASE_RETRY_WAIT_DURATION: time::Duration = time::Duration::from_millis(25);
const RETRY_WAIT_EXPONENT_BASE: f64 = std::f64::consts::SQRT_2;
pub trait ShouldRetry {
fn could_retry(&self) -> bool;
fn should_retry(&self, num_retries: u32) -> bool {
fn should_retry(&self, num_retries: u32, config: RetryConfig) -> bool {
match self {
_ if num_retries >= NUM_RETRIES_CONNECT => false,
_ if num_retries >= config.max_retries => false,
err => err.could_retry(),
}
}
@@ -63,6 +57,8 @@ impl ShouldRetry for compute::ConnectionError {
}
}
pub fn retry_after(num_retries: u32) -> time::Duration {
BASE_RETRY_WAIT_DURATION.mul_f64(RETRY_WAIT_EXPONENT_BASE.powi((num_retries as i32) - 1))
pub fn retry_after(num_retries: u32, config: RetryConfig) -> time::Duration {
config
.base_delay
.mul_f64(config.backoff_factor.powi((num_retries as i32) - 1))
}

View File

@@ -10,13 +10,13 @@ use super::*;
use crate::auth::backend::{
ComputeCredentialKeys, ComputeCredentials, ComputeUserInfo, MaybeOwned, TestBackend,
};
use crate::config::CertResolver;
use crate::config::{CertResolver, RetryConfig};
use crate::console::caches::NodeInfoCache;
use crate::console::messages::MetricsAuxInfo;
use crate::console::provider::{CachedAllowedIps, CachedRoleSecret, ConsoleBackend};
use crate::console::{self, CachedNodeInfo, NodeInfo};
use crate::error::ErrorKind;
use crate::proxy::retry::{retry_after, NUM_RETRIES_CONNECT};
use crate::proxy::retry::retry_after;
use crate::{http, sasl, scram, BranchId, EndpointId, ProjectId};
use anyhow::{bail, Context};
use async_trait::async_trait;
@@ -361,11 +361,15 @@ async fn scram_auth_mock() -> anyhow::Result<()> {
#[test]
fn connect_compute_total_wait() {
let mut total_wait = tokio::time::Duration::ZERO;
for num_retries in 1..NUM_RETRIES_CONNECT {
total_wait += retry_after(num_retries);
let config = RetryConfig {
base_delay: Duration::from_secs(1),
max_retries: 5,
backoff_factor: 2.0,
};
for num_retries in 1..config.max_retries {
total_wait += retry_after(num_retries, config);
}
assert!(total_wait < tokio::time::Duration::from_secs(12));
assert!(total_wait > tokio::time::Duration::from_secs(10));
assert!(f64::abs(total_wait.as_secs_f64() - 15.0) < 0.1);
}
#[derive(Clone, Copy, Debug)]
@@ -549,7 +553,12 @@ async fn connect_to_compute_success() {
let mut ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism);
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
let config = RetryConfig {
base_delay: Duration::from_secs(1),
max_retries: 5,
backoff_factor: 2.0,
};
connect_to_compute(&mut ctx, &mechanism, &user_info, false, config, config)
.await
.unwrap();
mechanism.verify();
@@ -562,7 +571,12 @@ async fn connect_to_compute_retry() {
let mut ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism);
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
let config = RetryConfig {
base_delay: Duration::from_secs(1),
max_retries: 5,
backoff_factor: 2.0,
};
connect_to_compute(&mut ctx, &mechanism, &user_info, false, config, config)
.await
.unwrap();
mechanism.verify();
@@ -576,7 +590,12 @@ async fn connect_to_compute_non_retry_1() {
let mut ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Fail]);
let user_info = helper_create_connect_info(&mechanism);
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
let config = RetryConfig {
base_delay: Duration::from_secs(1),
max_retries: 5,
backoff_factor: 2.0,
};
connect_to_compute(&mut ctx, &mechanism, &user_info, false, config, config)
.await
.unwrap_err();
mechanism.verify();
@@ -590,7 +609,12 @@ async fn connect_to_compute_non_retry_2() {
let mut ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![Wake, Fail, Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism);
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
let config = RetryConfig {
base_delay: Duration::from_secs(1),
max_retries: 5,
backoff_factor: 2.0,
};
connect_to_compute(&mut ctx, &mechanism, &user_info, false, config, config)
.await
.unwrap();
mechanism.verify();
@@ -600,17 +624,32 @@ async fn connect_to_compute_non_retry_2() {
#[tokio::test]
async fn connect_to_compute_non_retry_3() {
let _ = env_logger::try_init();
assert_eq!(NUM_RETRIES_CONNECT, 16);
tokio::time::pause();
use ConnectAction::*;
let mut ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![
Wake, Retry, Wake, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry,
Retry, Retry, Retry, Retry, Retry, /* the 17th time */ Retry,
]);
let mechanism =
TestConnectMechanism::new(vec![Wake, Retry, Wake, Retry, Retry, Retry, Retry, Retry]);
let user_info = helper_create_connect_info(&mechanism);
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
.await
.unwrap_err();
let wake_compute_retry_config = RetryConfig {
base_delay: Duration::from_secs(1),
max_retries: 1,
backoff_factor: 2.0,
};
let connect_to_compute_retry_config = RetryConfig {
base_delay: Duration::from_secs(1),
max_retries: 5,
backoff_factor: 2.0,
};
connect_to_compute(
&mut ctx,
&mechanism,
&user_info,
false,
wake_compute_retry_config,
connect_to_compute_retry_config,
)
.await
.unwrap_err();
mechanism.verify();
}
@@ -622,7 +661,12 @@ async fn wake_retry() {
let mut ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![WakeRetry, Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism);
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
let config = RetryConfig {
base_delay: Duration::from_secs(1),
max_retries: 5,
backoff_factor: 2.0,
};
connect_to_compute(&mut ctx, &mechanism, &user_info, false, config, config)
.await
.unwrap();
mechanism.verify();
@@ -636,7 +680,12 @@ async fn wake_non_retry() {
let mut ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![WakeRetry, WakeFail]);
let user_info = helper_create_connect_info(&mechanism);
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
let config = RetryConfig {
base_delay: Duration::from_secs(1),
max_retries: 5,
backoff_factor: 2.0,
};
connect_to_compute(&mut ctx, &mechanism, &user_info, false, config, config)
.await
.unwrap_err();
mechanism.verify();

View File

@@ -1,10 +1,14 @@
use crate::config::RetryConfig;
use crate::console::{errors::WakeComputeError, provider::CachedNodeInfo};
use crate::context::RequestMonitoring;
use crate::metrics::{ConnectionFailuresBreakdownGroup, Metrics, WakeupFailureKind};
use crate::metrics::{
ConnectOutcome, ConnectionFailuresBreakdownGroup, Metrics, RetriesMetricGroup, RetryType,
WakeupFailureKind,
};
use crate::proxy::retry::retry_after;
use hyper::StatusCode;
use std::ops::ControlFlow;
use tracing::{error, warn};
use tracing::{error, info, warn};
use super::connect_compute::ComputeConnectBackend;
use super::retry::ShouldRetry;
@@ -13,23 +17,42 @@ pub async fn wake_compute<B: ComputeConnectBackend>(
num_retries: &mut u32,
ctx: &mut RequestMonitoring,
api: &B,
config: RetryConfig,
) -> Result<CachedNodeInfo, WakeComputeError> {
let retry_type = RetryType::WakeCompute;
loop {
let wake_res = api.wake_compute(ctx).await;
match handle_try_wake(wake_res, *num_retries) {
match handle_try_wake(wake_res, *num_retries, config) {
Err(e) => {
error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node");
report_error(&e, false);
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Failed,
retry_type,
},
(*num_retries).into(),
);
return Err(e);
}
Ok(ControlFlow::Continue(e)) => {
warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node");
report_error(&e, true);
}
Ok(ControlFlow::Break(n)) => return Ok(n),
Ok(ControlFlow::Break(n)) => {
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Success,
retry_type,
},
(*num_retries).into(),
);
info!(?num_retries, "compute node woken up after");
return Ok(n);
}
}
let wait_duration = retry_after(*num_retries);
let wait_duration = retry_after(*num_retries, config);
*num_retries += 1;
tokio::time::sleep(wait_duration).await;
}
@@ -42,10 +65,11 @@ pub async fn wake_compute<B: ComputeConnectBackend>(
pub fn handle_try_wake(
result: Result<CachedNodeInfo, WakeComputeError>,
num_retries: u32,
config: RetryConfig,
) -> Result<ControlFlow<CachedNodeInfo, WakeComputeError>, WakeComputeError> {
match result {
Err(err) => match &err {
WakeComputeError::ApiError(api) if api.should_retry(num_retries) => {
WakeComputeError::ApiError(api) if api.should_retry(num_retries, config) => {
Ok(ControlFlow::Continue(err))
}
_ => Err(err),
@@ -90,6 +114,7 @@ fn report_error(e: &WakeComputeError, retry: bool) {
WakeComputeError::ApiError(ApiError::Console { .. }) => {
WakeupFailureKind::ApiConsoleOtherError
}
WakeComputeError::TooManyConnections => WakeupFailureKind::ApiConsoleLocked,
WakeComputeError::TimeoutError => WakeupFailureKind::TimeoutError,
};
Metrics::get()

View File

@@ -15,7 +15,7 @@ use rand::{rngs::StdRng, Rng, SeedableRng};
use tokio::time::{Duration, Instant};
use tracing::info;
use crate::EndpointId;
use crate::intern::EndpointIdInt;
pub struct GlobalRateLimiter {
data: Vec<RateBucket>,
@@ -61,12 +61,7 @@ impl GlobalRateLimiter {
// Purposefully ignore user name and database name as clients can reconnect
// with different names, so we'll end up sending some http requests to
// the control plane.
//
// We also may save quite a lot of CPU (I think) by bailing out right after we
// saw SNI, before doing TLS handshake. User-side error messages in that case
// does not look very nice (`SSL SYSCALL error: Undefined error: 0`), so for now
// I went with a more expensive way that yields user-friendlier error messages.
pub type EndpointRateLimiter = BucketRateLimiter<EndpointId, StdRng, RandomState>;
pub type EndpointRateLimiter = BucketRateLimiter<EndpointIdInt, StdRng, RandomState>;
pub struct BucketRateLimiter<Key, Rand = StdRng, Hasher = RandomState> {
map: DashMap<Key, Vec<RateBucket>, Hasher>,
@@ -245,7 +240,7 @@ mod tests {
use tokio::time;
use super::{BucketRateLimiter, EndpointRateLimiter};
use crate::{rate_limiter::RateBucketInfo, EndpointId};
use crate::{intern::EndpointIdInt, rate_limiter::RateBucketInfo, EndpointId};
#[test]
fn rate_bucket_rpi() {
@@ -295,39 +290,40 @@ mod tests {
let limiter = EndpointRateLimiter::new(rates);
let endpoint = EndpointId::from("ep-my-endpoint-1234");
let endpoint = EndpointIdInt::from(endpoint);
time::pause();
for _ in 0..100 {
assert!(limiter.check(endpoint.clone(), 1));
assert!(limiter.check(endpoint, 1));
}
// more connections fail
assert!(!limiter.check(endpoint.clone(), 1));
assert!(!limiter.check(endpoint, 1));
// fail even after 500ms as it's in the same bucket
time::advance(time::Duration::from_millis(500)).await;
assert!(!limiter.check(endpoint.clone(), 1));
assert!(!limiter.check(endpoint, 1));
// after a full 1s, 100 requests are allowed again
time::advance(time::Duration::from_millis(500)).await;
for _ in 1..6 {
for _ in 0..50 {
assert!(limiter.check(endpoint.clone(), 2));
assert!(limiter.check(endpoint, 2));
}
time::advance(time::Duration::from_millis(1000)).await;
}
// more connections after 600 will exceed the 20rps@30s limit
assert!(!limiter.check(endpoint.clone(), 1));
assert!(!limiter.check(endpoint, 1));
// will still fail before the 30 second limit
time::advance(time::Duration::from_millis(30_000 - 6_000 - 1)).await;
assert!(!limiter.check(endpoint.clone(), 1));
assert!(!limiter.check(endpoint, 1));
// after the full 30 seconds, 100 requests are allowed again
time::advance(time::Duration::from_millis(1)).await;
for _ in 0..100 {
assert!(limiter.check(endpoint.clone(), 1));
assert!(limiter.check(endpoint, 1));
}
}

View File

@@ -35,7 +35,6 @@ use crate::context::RequestMonitoring;
use crate::metrics::Metrics;
use crate::protocol2::WithClientIp;
use crate::proxy::run_until_cancelled;
use crate::rate_limiter::EndpointRateLimiter;
use crate::serverless::backend::PoolingBackend;
use crate::serverless::http_util::{api_error_into_response, json_response};
@@ -53,7 +52,6 @@ pub async fn task_main(
config: &'static ProxyConfig,
ws_listener: TcpListener,
cancellation_token: CancellationToken,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
cancellation_handler: Arc<CancellationHandlerMain>,
) -> anyhow::Result<()> {
scopeguard::defer! {
@@ -117,7 +115,6 @@ pub async fn task_main(
backend.clone(),
connections.clone(),
cancellation_handler.clone(),
endpoint_rate_limiter.clone(),
cancellation_token.clone(),
server.clone(),
tls_acceptor.clone(),
@@ -147,7 +144,6 @@ async fn connection_handler(
backend: Arc<PoolingBackend>,
connections: TaskTracker,
cancellation_handler: Arc<CancellationHandlerMain>,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
cancellation_token: CancellationToken,
server: Builder<TokioExecutor>,
tls_acceptor: TlsAcceptor,
@@ -231,7 +227,6 @@ async fn connection_handler(
cancellation_handler.clone(),
session_id,
peer_addr,
endpoint_rate_limiter.clone(),
http_request_token,
)
.in_current_span()
@@ -270,7 +265,6 @@ async fn request_handler(
cancellation_handler: Arc<CancellationHandlerMain>,
session_id: uuid::Uuid,
peer_addr: IpAddr,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
// used to cancel in-flight HTTP requests. not used to cancel websockets
http_cancellation_token: CancellationToken,
) -> Result<Response<Full<Bytes>>, ApiError> {
@@ -298,15 +292,9 @@ async fn request_handler(
ws_connections.spawn(
async move {
if let Err(e) = websocket::serve_websocket(
config,
ctx,
websocket,
cancellation_handler,
host,
endpoint_rate_limiter,
)
.await
if let Err(e) =
websocket::serve_websocket(config, ctx, websocket, cancellation_handler, host)
.await
{
error!("error in websocket connection: {e:#}");
}

View File

@@ -108,6 +108,8 @@ impl PoolingBackend {
},
&backend,
false, // do not allow self signed compute for http flow
self.config.wake_compute_retry_config,
self.config.connect_to_compute_retry_config,
)
.await
}

View File

@@ -5,7 +5,6 @@ use crate::{
error::{io_error, ReportableError},
metrics::Metrics,
proxy::{handle_client, ClientMode},
rate_limiter::EndpointRateLimiter,
};
use bytes::{Buf, Bytes};
use futures::{Sink, Stream};
@@ -136,7 +135,6 @@ pub async fn serve_websocket(
websocket: HyperWebsocket,
cancellation_handler: Arc<CancellationHandlerMain>,
hostname: Option<String>,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
) -> anyhow::Result<()> {
let websocket = websocket.await?;
let conn_gauge = Metrics::get()
@@ -150,7 +148,6 @@ pub async fn serve_websocket(
cancellation_handler,
WebSocketRw::new(websocket),
ClientMode::Websockets { hostname },
endpoint_rate_limiter,
conn_gauge,
)
.await;

View File

@@ -33,7 +33,7 @@ psutil = "^5.9.4"
types-psutil = "^5.9.5.12"
types-toml = "^0.10.8.6"
pytest-httpserver = "^1.0.8"
aiohttp = "3.9.2"
aiohttp = "3.9.4"
pytest-rerunfailures = "^13.0"
types-pytest-lazy-fixture = "^0.6.3.3"
pytest-split = "^0.8.1"

View File

@@ -129,7 +129,7 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
"pageserver_getpage_reconstruct_seconds_sum",
*[f"pageserver_basebackup_query_seconds_{x}" for x in ["bucket", "count", "sum"]],
*histogram("pageserver_smgr_query_seconds_global"),
*histogram("pageserver_read_num_fs_layers"),
*histogram("pageserver_layers_visited_per_read_global"),
*histogram("pageserver_getpage_get_reconstruct_data_seconds"),
*histogram("pageserver_wait_lsn_seconds"),
*histogram("pageserver_remote_operation_seconds"),

View File

@@ -0,0 +1,93 @@
import os
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.workload import Workload
AGGRESIVE_COMPACTION_TENANT_CONF = {
# Disable gc and compaction. The test runs compaction manually.
"gc_period": "0s",
"compaction_period": "0s",
# Small checkpoint distance to create many layers
"checkpoint_distance": 1024**2,
# Compact small layers
"compaction_target_size": 1024**2,
"image_creation_threshold": 2,
# INC-186: remove when merging the fix
"image_layer_creation_check_threshold": 0,
}
@pytest.mark.skipif(os.environ.get("BUILD_TYPE") == "debug", reason="only run with release build")
def test_pageserver_compaction_smoke(neon_env_builder: NeonEnvBuilder):
"""
This is a smoke test that compaction kicks in. The workload repeatedly churns
a small number of rows and manually instructs the pageserver to run compaction
between iterations. At the end of the test validate that the average number of
layers visited to gather reconstruct data for a given key is within the empirically
observed bounds.
"""
# Effectively disable the page cache to rely only on image layers
# to shorten reads.
neon_env_builder.pageserver_config_override = """
page_cache_size=10
"""
env = neon_env_builder.init_start(initial_tenant_conf=AGGRESIVE_COMPACTION_TENANT_CONF)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
row_count = 10000
churn_rounds = 100
ps_http = env.pageserver.http_client()
workload = Workload(env, tenant_id, timeline_id)
workload.init(env.pageserver.id)
log.info("Writing initial data ...")
workload.write_rows(row_count, env.pageserver.id)
for i in range(1, churn_rounds + 1):
if i % 10 == 0:
log.info(f"Running churn round {i}/{churn_rounds} ...")
workload.churn_rows(row_count, env.pageserver.id)
ps_http.timeline_compact(tenant_id, timeline_id)
log.info("Validating at workload end ...")
workload.validate(env.pageserver.id)
log.info("Checking layer access metrics ...")
layer_access_metric_names = [
"pageserver_layers_visited_per_read_global_sum",
"pageserver_layers_visited_per_read_global_count",
"pageserver_layers_visited_per_read_global_bucket",
"pageserver_layers_visited_per_vectored_read_global_sum",
"pageserver_layers_visited_per_vectored_read_global_count",
"pageserver_layers_visited_per_vectored_read_global_bucket",
]
metrics = env.pageserver.http_client().get_metrics()
for name in layer_access_metric_names:
layer_access_metrics = metrics.query_all(name)
log.info(f"Got metrics: {layer_access_metrics}")
non_vectored_sum = metrics.query_one("pageserver_layers_visited_per_read_global_sum")
non_vectored_count = metrics.query_one("pageserver_layers_visited_per_read_global_count")
non_vectored_average = non_vectored_sum / non_vectored_count
vectored_sum = metrics.query_one("pageserver_layers_visited_per_vectored_read_global_sum")
vectored_count = metrics.query_one("pageserver_layers_visited_per_vectored_read_global_count")
vectored_average = vectored_sum / vectored_count
log.info(f"{non_vectored_average=} {vectored_average=}")
# The upper bound for average number of layer visits below (8)
# was chosen empirically for this workload.
assert non_vectored_average < 8
assert vectored_average < 8

View File

@@ -1,6 +1,7 @@
import json
import os
import random
import time
from pathlib import Path
from typing import Any, Dict, Optional
@@ -582,6 +583,91 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
)
def test_secondary_background_downloads(neon_env_builder: NeonEnvBuilder):
"""
Slow test that runs in realtime, checks that the background scheduling of secondary
downloads happens as expected.
"""
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_configs()
env.start()
# Create this many tenants, each with two timelines
tenant_count = 4
tenant_timelines = {}
# This mirrors a constant in `downloader.rs`
freshen_interval_secs = 60
for _i in range(0, tenant_count):
tenant_id = TenantId.generate()
timeline_a = TimelineId.generate()
timeline_b = TimelineId.generate()
env.neon_cli.create_tenant(
tenant_id,
timeline_a,
placement_policy='{"Attached":1}',
# Run with a low heatmap period so that we can avoid having to do synthetic API calls
# to trigger the upload promptly.
conf={"heatmap_period": "1s"},
)
env.neon_cli.create_timeline("main2", tenant_id, timeline_b)
tenant_timelines[tenant_id] = [timeline_a, timeline_b]
t_start = time.time()
# Wait long enough that the background downloads should happen; we expect all the inital layers
# of all the initial timelines to show up on the secondary location of each tenant.
time.sleep(freshen_interval_secs * 1.5)
for tenant_id, timelines in tenant_timelines.items():
attached_to_id = env.storage_controller.locate(tenant_id)[0]["node_id"]
ps_attached = env.get_pageserver(attached_to_id)
# We only have two: the other one must be secondary
ps_secondary = next(p for p in env.pageservers if p != ps_attached)
for timeline_id in timelines:
log.info(f"Checking for secondary timeline {timeline_id} on node {ps_secondary.id}")
# One or more layers should be present for all timelines
assert list_layers(ps_secondary, tenant_id, timeline_id)
# Delete the second timeline: this should be reflected later on the secondary
env.storage_controller.pageserver_api().timeline_delete(tenant_id, timelines[1])
# Wait long enough for the secondary locations to see the deletion
time.sleep(freshen_interval_secs * 1.5)
for tenant_id, timelines in tenant_timelines.items():
attached_to_id = env.storage_controller.locate(tenant_id)[0]["node_id"]
ps_attached = env.get_pageserver(attached_to_id)
# We only have two: the other one must be secondary
ps_secondary = next(p for p in env.pageservers if p != ps_attached)
# This one was not deleted
assert list_layers(ps_secondary, tenant_id, timelines[0])
# This one was deleted
assert not list_layers(ps_secondary, tenant_id, timelines[1])
t_end = time.time()
# Measure how many heatmap downloads we did in total: this checks that we succeeded with
# proper scheduling, and not some bug that just runs downloads in a loop.
total_heatmap_downloads = 0
for ps in env.pageservers:
v = ps.http_client().get_metric_value("pageserver_secondary_download_heatmap_total")
assert v is not None
total_heatmap_downloads += int(v)
download_rate = (total_heatmap_downloads / tenant_count) / (t_end - t_start)
expect_download_rate = 1.0 / freshen_interval_secs
log.info(f"Download rate: {download_rate * 60}/min vs expected {expect_download_rate * 60}/min")
assert download_rate < expect_download_rate * 2
@pytest.mark.skipif(os.environ.get("BUILD_TYPE") == "debug", reason="only run with release build")
@pytest.mark.parametrize("via_controller", [True, False])
def test_slow_secondary_downloads(neon_env_builder: NeonEnvBuilder, via_controller: bool):

View File

@@ -1201,3 +1201,45 @@ def test_sharding_backpressure(neon_env_builder: NeonEnvBuilder):
max_lsn = max(Lsn(info["last_record_lsn"]) for info in infos)
diff = max_lsn - min_lsn
assert diff < 2 * 1024 * 1024, f"LSN diff={diff}, expected diff < 2MB due to backpressure"
def test_sharding_unlogged_relation(neon_env_builder: NeonEnvBuilder):
"""
Check that an unlogged relation is handled properly on a sharded tenant
Reproducer for https://github.com/neondatabase/neon/issues/7451
"""
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_configs()
neon_env_builder.start()
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
env.neon_cli.create_tenant(tenant_id, timeline_id, shard_count=8)
# We will create many tables to ensure it's overwhelmingly likely that at least one
# of them doesn't land on shard 0
table_names = [f"my_unlogged_{i}" for i in range(0, 16)]
with env.endpoints.create_start("main", tenant_id=tenant_id) as ep:
for table_name in table_names:
ep.safe_psql(f"CREATE UNLOGGED TABLE {table_name} (id integer, value varchar(64));")
ep.safe_psql(f"INSERT INTO {table_name} VALUES (1, 'foo')")
result = ep.safe_psql(f"SELECT * from {table_name};")
assert result == [(1, "foo")]
ep.safe_psql(f"CREATE INDEX ON {table_name} USING btree (value);")
wait_for_last_flush_lsn(env, ep, tenant_id, timeline_id)
with env.endpoints.create_start("main", tenant_id=tenant_id) as ep:
for table_name in table_names:
# Check that table works: we can select and insert
result = ep.safe_psql(f"SELECT * from {table_name};")
assert result == []
ep.safe_psql(f"INSERT INTO {table_name} VALUES (2, 'bar');")
result = ep.safe_psql(f"SELECT * from {table_name};")
assert result == [(2, "bar")]
# Ensure that post-endpoint-restart modifications are ingested happily by pageserver
wait_for_last_flush_lsn(env, ep, tenant_id, timeline_id)

View File

@@ -273,7 +273,8 @@ def test_storage_controller_onboarding(neon_env_builder: NeonEnvBuilder, warm_up
but imports the generation number.
"""
neon_env_builder.num_pageservers = 2
# One pageserver to simulate legacy environment, two to be managed by storage controller
neon_env_builder.num_pageservers = 3
# Start services by hand so that we can skip registration on one of the pageservers
env = neon_env_builder.init_configs()
@@ -288,10 +289,10 @@ def test_storage_controller_onboarding(neon_env_builder: NeonEnvBuilder, warm_up
)
origin_ps = env.pageservers[0]
# This is the pageserver managed by the sharding service, where the tenant
# These are the pageservers managed by the sharding service, where the tenant
# will be attached after onboarding
env.pageservers[1].start()
dest_ps = env.pageservers[1]
env.pageservers[2].start()
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
for sk in env.safekeepers:
@@ -330,6 +331,9 @@ def test_storage_controller_onboarding(neon_env_builder: NeonEnvBuilder, warm_up
)
virtual_ps_http.tenant_secondary_download(tenant_id)
warm_up_ps = env.storage_controller.tenant_describe(tenant_id)["shards"][0][
"node_secondary"
][0]
# Call into storage controller to onboard the tenant
generation += 1
@@ -344,6 +348,18 @@ def test_storage_controller_onboarding(neon_env_builder: NeonEnvBuilder, warm_up
)
assert len(r["shards"]) == 1
describe = env.storage_controller.tenant_describe(tenant_id)["shards"][0]
dest_ps_id = describe["node_attached"]
dest_ps = env.get_pageserver(dest_ps_id)
if warm_up:
# The storage controller should have attached the tenant to the same placce
# it had a secondary location, otherwise there was no point warming it up
assert dest_ps_id == warm_up_ps
# It should have been given a new secondary location as well
assert len(describe["node_secondary"]) == 1
assert describe["node_secondary"][0] != warm_up_ps
# As if doing a live migration, detach the original pageserver
origin_ps.http_client().tenant_location_conf(
tenant_id,
@@ -415,6 +431,9 @@ def test_storage_controller_onboarding(neon_env_builder: NeonEnvBuilder, warm_up
dest_tenant_after_conf_change["generation"] == dest_tenant_before_conf_change["generation"]
)
dest_tenant_conf_after = dest_ps.http_client().tenant_config(tenant_id)
# Storage controller auto-sets heatmap period, ignore it for the comparison
del dest_tenant_conf_after.tenant_specific_overrides["heatmap_period"]
assert dest_tenant_conf_after.tenant_specific_overrides == modified_tenant_conf
env.storage_controller.consistency_check()