Compare commits

..

25 Commits

Author SHA1 Message Date
Ivan Efremov
bea1580af2 DO NOT MERGE [proxy]: Add geo-based routing for replicated projects 2025-05-24 12:15:16 +03:00
Dmitrii Kovalkov
136eaeb74a pageserver: basebackup cache (hackathon project) (#11989)
## Problem
Basebackup cache is on the hot path of compute startup and is generated
on every request (may be slow).

- Issue: https://github.com/neondatabase/cloud/issues/29353

## Summary of changes
- Add `BasebackupCache` which stores basebackups on local disk.
- Basebackup prepare requests are triggered by
`XLOG_CHECKPOINT_SHUTDOWN` records in the log.
- Limit the size of the cache by number of entries.
- Add `basebackup_cache_enabled` feature flag to TenantConfig.
- Write tests for the cache

## Not implemented yet
- Limit the size of the cache by total size in bytes

---------

Co-authored-by: Aleksandr Sarantsev <aleksandr@neon.tech>
2025-05-22 12:45:00 +00:00
Erik Grinaker
211b824d62 pageserver: add branch-local consumption metrics (#11852)
## Problem

For billing, we'd like per-branch consumption metrics.

Requires https://github.com/neondatabase/neon/pull/11984.
Resolves https://github.com/neondatabase/cloud/issues/28155.

## Summary of changes

This patch adds two new consumption metrics:

* `written_size_since_parent`: `written_size - ancestor_lsn`
* `pitr_history_size_since_parent`: `written_size - max(pitr_cutoff,
ancestor_lsn)`

Note that `pitr_history_size_since_parent` will not be emitted until the
PITR cutoff has been computed, and may or may not increase ~immediately
when a user increases their PITR window (depending on how much history
we have available and whether the tenant is restarted/migrated).
2025-05-22 12:26:32 +00:00
Peter Bendel
f9fdbc9618 remove auth_endpoint password from log and command line for local proxy mode (#11991)
## Problem

When testing local proxy the auth-endpoint password shows up in command
line and log

```bash
RUST_LOG=proxy LOGFMT=text cargo run --release --package proxy --bin proxy --features testing -- \
  --auth-backend postgres \
  --auth-endpoint 'postgresql://postgres:secret_password@127.0.0.1:5432/postgres' \
  --tls-cert server.crt \
  --tls-key server.key \
  --wss 0.0.0.0:4444
```

## Summary of changes

- Allow to set env variable PGPASSWORD
- fall back to use PGPASSWORD env variable when auth-endpoint does not
contain password
- remove auth-endpoint password from logs in `--features testing` mode

Example

```bash
export PGPASSWORD=secret_password

RUST_LOG=proxy LOGFMT=text cargo run --package proxy --bin proxy --features testing -- \
  --auth-backend postgres \
  --auth-endpoint 'postgresql://postgres@127.0.0.1:5432/postgres' \
  --tls-cert server.crt \
  --tls-key server.key \
  --wss 0.0.0.0:4444 
```
2025-05-21 20:26:05 +00:00
Erik Grinaker
95a5f749c8 pageserver: use an Option for GcCutoffs::time (#11984)
## Problem

It is not currently possible to disambiguate a timeline with an
uninitialized PITR cutoff from one that was created within the PITR
window -- both of these have `GcCutoffs::time == Lsn(0)`. For billing
metrics, we need to disambiguate these to avoid accidentally billing the
entire history when a tenant is initially loaded.

Touches https://github.com/neondatabase/cloud/issues/28155.

## Summary of changes

Make `GcCutoffs::time` an `Option<Lsn>`, and only set it to `Some` when
initialized. A `pitr_interval` of 0 will yield `Some(last_record_lsn)`.

This PR takes a conservative approach, and mostly retains the old
behavior of consumers by using `unwrap_or_default()` to yield 0 when
uninitialized, to avoid accidentally introducing bugs -- except in cases
where there is high confidence that the change is beneficial (e.g. for
the `pageserver_pitr_history_size` Prometheus metric and to return early
during GC).
2025-05-21 15:42:11 +00:00
Konstantin Merenkov
5db20af8a7 Keep the conn info cache on max_client_conn from pgbouncer (#11986)
## Problem
Hitting max_client_conn from pgbouncer would lead to invalidation of the
conn info cache.
Customers would hit the limit on wake_compute.

## Summary of changes
`should_retry_wake_compute` detects this specific error from pgbouncer
as non-retriable,
meaning we won't try to wake up the compute again.
2025-05-21 15:27:30 +00:00
Arpad Müller
136cf1979b Add metric for number of offloaded timelines (#11976)
We want to keep track of the number of offloaded timelines. It's a
per-tenant shard metric because each shard makes offloading decisions on
its own.
2025-05-21 11:28:22 +00:00
Vlad Lazar
08bb72e516 pageserver: allow in-mem reads to be planned during writes (#11937)
## Problem

Get page tracing revealed situations where planning an in-memory layer
is taking around 150ms. Upon investigation, the culprit is the inner
in-mem layer file lock. A batch being written holds the write lock and a
read being planned wants the read lock. See [this
trace](https://neonprod.grafana.net/explore?schemaVersion=1&panes=%7B%22j61%22:%7B%22datasource%22:%22JMfY_5TVz%22,%22queries%22:%5B%7B%22refId%22:%22traceId%22,%22queryType%22:%22traceql%22,%22query%22:%22412ec4522fe1750798aca54aec2680ac%22,%22datasource%22:%7B%22type%22:%22tempo%22,%22uid%22:%22JMfY_5TVz%22%7D,%22limit%22:20,%22tableType%22:%22traces%22,%22metricsQueryType%22:%22range%22%7D%5D,%22range%22:%7B%22to%22:%221746702606349%22,%22from%22:%221746681006349%22%7D,%22panelsState%22:%7B%22trace%22:%7B%22spanId%22:%2291e9f1879c9bccc0%22%7D%7D%7D,%226d0%22:%7B%22datasource%22:%22JMfY_5TVz%22,%22queries%22:%5B%7B%22refId%22:%22traceId%22,%22queryType%22:%22traceql%22,%22query%22:%2220a4757706b16af0e1fbab83f9d2e925%22,%22datasource%22:%7B%22type%22:%22tempo%22,%22uid%22:%22JMfY_5TVz%22%7D,%22limit%22:20,%22tableType%22:%22traces%22,%22metricsQueryType%22:%22range%22%7D%5D,%22range%22:%7B%22to%22:%221746702614807%22,%22from%22:%221746681014807%22%7D,%22panelsState%22:%7B%22trace%22:%7B%22spanId%22:%2260e7825512bc2a6b%22%7D%7D%7D%7D)
for example.

## Summary of changes

Lift the index into its own RwLock such that we can at least plan during
write IO.

I tried to be smarter in
https://github.com/neondatabase/neon/pull/11866: arc swap + structurally
shared datastructure
and that killed ingest perf for small keys.

## Benchmarking

* No statistically significant difference for rust inget benchmarks when
compared to main.
2025-05-21 11:08:49 +00:00
Alexander Sarantcev
6f4f3691a5 pageserver: Add tracing endpoint correctness check in config validation (#11970)
## Problem

When using an incorrect endpoint string - `"localhost:4317"`, it's a
runtime error, but it can be a config error
- Closes: https://github.com/neondatabase/neon/issues/11394

## Summary of changes

Add config parse time check via `request::Url::parse` validation.

---------

Co-authored-by: Aleksandr Sarantsev <ephemeralsad@gmail.com>
2025-05-21 09:03:26 +00:00
dependabot[bot]
a2b756843e chore(deps): bump setuptools from 70.0.0 to 78.1.1 in the pip group across 1 directory (#11977)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-05-20 23:00:49 +00:00
Conrad Ludgate
f3c9d0adf4 proxy(logging): significant changes to json logging internals for performance. (#11974)
#11962 

Please review each commit separately.

Each commit is rather small in goal. The overall goal of this PR is to
keep the behaviour identical, but shave away small inefficiencies here
and there.
2025-05-20 17:57:59 +00:00
Konstantin Knizhnik
2e3dc9a8c2 Add rel_size_replica_cache (#11889)
## Problem

See 
Discussion:
https://neondb.slack.com/archives/C033RQ5SPDH/p1746645666075799
Issue: https://github.com/neondatabase/cloud/issues/28609

Relation size cache is not correctly updated at PS in case of replicas.

## Summary of changes

1. Have two caches for relation size in timeline:
`rel_size_primary_cache` and `rel_size_replica_cache`.
2. `rel_size_primary_cache` is actually what we have now. The only
difference is that it is not updated in `get_rel_size`, only by WAL
ingestion
3. `rel_size_replica_cache` has limited size (LruCache) and it's key is
`(Lsn,RelTag)` . It is updated in `get_rel_size`. Only strict LSN
matches are accepted as cache hit.

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-05-20 15:38:27 +00:00
Konstantin Merenkov
568779fa8a proxy/scram: avoid memory copy to improve performance (#11980)
Touches #11941

## Problem
Performance of our PBKDF2 was worse than reference.

## Summary of changes
Avoided memory copy when HMACing in a tight loop.
2025-05-20 15:23:54 +00:00
Alexey Kondratov
e94acbc816 fix(compute_ctl): Dollar escaping and tests (#11969)
## Problem

In the escaping path we were checking that `${tag}$` or `${outer_tag}$`
are present in the string, but that's not enough, as original string
surrounded by `$` can also form a 'tag', like `$x$xx$x$`, which is fine
on it's own, but cannot be used in the string escaped with `$xx$`.

## Summary of changes

Remove `$` from the checks, just check if `{tag}` or `{outer_tag}` are
present. Add more test cases and change the catalog test to stress the
`drop_subscriptions_before_start: true` path as well.

Fixes https://github.com/neondatabase/cloud/issues/29198
2025-05-20 09:03:36 +00:00
Erik Grinaker
f4150614d0 pageserver: don't pass config to PageHandler (#11973)
## Problem

The gRPC page service API will require decoupling the `PageHandler` from
the libpq protocol implementation. As preparation for this, avoid
passing in the entire server config to `PageHandler`, and instead
explicitly pass in the relevant fields.

Touches https://github.com/neondatabase/neon/issues/11728.

## Summary of changes

* Change `PageHandler` to take a `GetVectoredConcurrentIo` instead of
the entire config.
* Change `IoConcurrency::spawn_from_conf` to take a
`GetVectoredConcurrentIo`.
2025-05-19 15:47:40 +00:00
Erik Grinaker
38dbc5f67f pageserver/page_api: add binary Protobuf descriptor (#11968)
## Problem

A binary Protobuf schema descriptor can be used to expose an API
reflection service, which in turn allows convenient usage of e.g.
`grpcurl` against the gRPC server.

Touches #11728.

## Summary of changes

* Generate a binary schema descriptor as
`pageserver_page_api::proto::FILE_DESCRIPTOR_SET`.
* Opportunistically rename the Protobuf package from `page_service` to
`page_api`.
2025-05-19 11:17:45 +00:00
Folke Behrens
3685ad606d endpoint_storage: Fix metrics test by excluding assertion on macos (#11952) 2025-05-19 10:56:03 +00:00
Ivan Efremov
76a7d37f7e proxy: Drop cancellation ops if they don't fit into the queue (#11950)
Add a redis ops batch size argument for proxy and remove timeouts by
using try_send()
2025-05-19 10:10:55 +00:00
Erik Grinaker
cdb6479c8a pageserver: add gRPC page service schema (#11815)
## Problem

For the [communicator
project](https://github.com/neondatabase/company_projects/issues/352),
we want to move to gRPC for the page service protocol.

Touches #11728.

## Summary of changes

This patch adds an experimental gRPC Protobuf schema for the page
service. It is equivalent to the current page service, but with several
improvements, e.g.:

* Connection multiplexing.
* Reduced head-of-line blocking.
* Client-side batching.
* Explicit tenant shard routing.
* GetPage request classification (normal vs. prefetch).
* Explicit rate limiting ("slow down" response status).

The API is exposed as a new `pageserver/page_api` package. This is
separate from the `pageserver_api` package to reduce the dependency
footprint for the communicator. The longer-term plan is to also split
out e.g. the WAL ingestion service to a separate gRPC package, e.g.
`pageserver/wal_api`.

Subsequent PRs will: add Rust domain types for the Protobuf types,
expose a gRPC server, and implement the page service.

Preliminary prototype benchmarks of this gRPC API is within 10% of
baseline libpq performance. We'll do further benchmarking and
optimization as the implementation lands in `main` and is deployed to
staging.
2025-05-19 09:03:06 +00:00
Konstantin Knizhnik
81c557d87e Unlogged build get smgr (#11954)
## Problem

See https://github.com/neondatabase/neon/issues/11910
and https://neondb.slack.com/archives/C04DGM6SMTM/p1747314649059129

## Summary of changes

Do not change persistence in `start_unlogged_build`

Postgres PRs:
https://github.com/neondatabase/postgres/pull/642
https://github.com/neondatabase/postgres/pull/641
https://github.com/neondatabase/postgres/pull/640
https://github.com/neondatabase/postgres/pull/639

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-05-18 05:02:47 +00:00
Trung Dinh
e963129678 pagesteam_handle_batched_message -> pagestream_handle_batched_message (#11916)
## Problem
Found a typo in code.

## Summary of changes

Co-authored-by: Trung Dinh <tdinh@roblox.com>
Co-authored-by: Erik Grinaker <erik@neon.tech>
2025-05-17 22:30:29 +00:00
dependabot[bot]
4f0a9fc569 chore(deps): bump flask-cors from 5.0.0 to 6.0.0 in the pip group across 1 directory (#11960)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-05-17 22:06:32 +00:00
Emmanuel Ferdman
81c6a5a796 Migrate to correct logger interface (#11956)
## Problem
Currently the `logger` library throws annoying deprecation warnings:
```python
DeprecationWarning: The 'warn' method is deprecated, use 'warning' instead
```

## Summary of changes
This small PR resolves the annoying deprecation warnings by migrating to
`.warning` as suggested.

Signed-off-by: Emmanuel Ferdman <emmanuelferdman@gmail.com>
2025-05-17 21:12:01 +00:00
Konstantin Knizhnik
8e05639dbf Invalidate LFC after unlogged build (#11951)
## Problem


See https://neondb.slack.com/archives/C04DGM6SMTM/p1747391617951239

LFC is not always properly updated during unlogged build so it can
contain stale content.

## Summary of changes

Invalidate LFC content at the end of unlogged build

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-05-17 19:06:59 +00:00
Alexander Bayandin
deed46015d CI(test-images): increase timeout from 20m to 60m (#11955)
## Problem

For some reason (unknown yet) 20m timeout is not enough for
`test-images` job on arm runners.
Ref:
https://github.com/neondatabase/neon/actions/runs/15075321681/job/42387530399?pr=11953

## Summary of changes
- Increase the timeout from 20m to 1h
2025-05-17 06:34:54 +00:00
73 changed files with 3059 additions and 674 deletions

View File

@@ -963,7 +963,7 @@ jobs:
fi
- name: Verify docker-compose example and test extensions
timeout-minutes: 20
timeout-minutes: 60
env:
TAG: >-
${{

51
Cargo.lock generated
View File

@@ -3898,6 +3898,16 @@ dependencies = [
"winapi",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num"
version = "0.4.1"
@@ -4182,6 +4192,12 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "p256"
version = "0.11.1"
@@ -4286,6 +4302,7 @@ dependencies = [
"enumset",
"fail",
"futures",
"hashlink",
"hex",
"hex-literal",
"http-utils",
@@ -4434,6 +4451,16 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "pageserver_page_api"
version = "0.1.0"
dependencies = [
"prost 0.13.3",
"tonic",
"tonic-build",
"workspace_hack",
]
[[package]]
name = "papaya"
version = "0.2.1"
@@ -5228,6 +5255,7 @@ dependencies = [
"tracing-log",
"tracing-opentelemetry",
"tracing-subscriber",
"tracing-test",
"tracing-utils",
"try-lock",
"typed-json",
@@ -7678,6 +7706,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"serde",
@@ -7691,6 +7720,27 @@ dependencies = [
"tracing-serde",
]
[[package]]
name = "tracing-test"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68"
dependencies = [
"tracing-core",
"tracing-subscriber",
"tracing-test-macro",
]
[[package]]
name = "tracing-test-macro"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568"
dependencies = [
"quote",
"syn 2.0.100",
]
[[package]]
name = "tracing-utils"
version = "0.1.0"
@@ -8543,6 +8593,7 @@ dependencies = [
"tracing",
"tracing-core",
"tracing-log",
"tracing-subscriber",
"url",
"uuid",
"zeroize",

View File

@@ -9,6 +9,7 @@ members = [
"pageserver/ctl",
"pageserver/client",
"pageserver/pagebench",
"pageserver/page_api",
"proxy",
"safekeeper",
"safekeeper/client",
@@ -252,6 +253,7 @@ pageserver = { path = "./pageserver" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
pageserver_client = { path = "./pageserver/client" }
pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" }
pageserver_page_api = { path = "./pageserver/page_api" }
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }
postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" }
postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" }

View File

@@ -7,7 +7,7 @@ index 255e616..1c6edb7 100644
RelationGetRelationName(index));
+#ifdef NEON_SMGR
+ smgr_start_unlogged_build(index->rd_smgr);
+ smgr_start_unlogged_build(RelationGetSmgr(index));
+#endif
+
initRumState(&buildstate.rumstate, index);
@@ -18,7 +18,7 @@ index 255e616..1c6edb7 100644
rumUpdateStats(index, &buildstate.buildStats, buildstate.rumstate.isBuild);
+#ifdef NEON_SMGR
+ smgr_finish_unlogged_build_phase_1(index->rd_smgr);
+ smgr_finish_unlogged_build_phase_1(RelationGetSmgr(index));
+#endif
+
/*
@@ -29,7 +29,7 @@ index 255e616..1c6edb7 100644
}
+#ifdef NEON_SMGR
+ smgr_end_unlogged_build(index->rd_smgr);
+ smgr_end_unlogged_build(RelationGetSmgr(index));
+#endif
+
/*

View File

@@ -213,8 +213,10 @@ impl Escaping for PgIdent {
// Find the first suitable tag that is not present in the string.
// Postgres' max role/DB name length is 63 bytes, so even in the
// worst case it won't take long.
while self.contains(&format!("${tag}$")) || self.contains(&format!("${outer_tag}$")) {
// worst case it won't take long. Outer tag is always `tag + "x"`,
// so if `tag` is not present in the string, `outer_tag` is not
// present in the string either.
while self.contains(&tag.to_string()) {
tag += "x";
outer_tag = tag.clone() + "x";
}

View File

@@ -71,6 +71,14 @@ test.escaping = 'here''s a backslash \\ and a quote '' and a double-quote " hoor
("name$$$", ("$x$name$$$$x$", "xx")),
("name$$$$", ("$x$name$$$$$x$", "xx")),
("name$x$", ("$xx$name$x$$xx$", "xxx")),
("x", ("$xx$x$xx$", "xxx")),
("xx", ("$xxx$xx$xxx$", "xxxx")),
("$x", ("$xx$$x$xx$", "xxx")),
("x$", ("$xx$x$$xx$", "xxx")),
("$x$", ("$xx$$x$$xx$", "xxx")),
("xx$", ("$xxx$xx$$xxx$", "xxxx")),
("$xx", ("$xxx$$xx$xxx$", "xxxx")),
("$xx$", ("$xxx$$xx$$xxx$", "xxxx")),
];
for (input, expected) in test_cases {

View File

@@ -546,6 +546,16 @@ impl PageServerNode {
.map(serde_json::from_str)
.transpose()
.context("Falied to parse 'sampling_ratio'")?,
relsize_snapshot_cache_capacity: settings
.remove("relsize snapshot cache capacity")
.map(|x| x.parse::<usize>())
.transpose()
.context("Falied to parse 'relsize_snapshot_cache_capacity' as integer")?,
basebackup_cache_enabled: settings
.remove("basebackup_cache_enabled")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'basebackup_cache_enabled' as bool")?,
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")

View File

@@ -462,6 +462,8 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
if var(REAL_S3_ENV).is_ok() {
assert!(body.contains("remote_storage_s3_deleted_objects_total"));
}
#[cfg(target_os = "linux")]
assert!(body.contains("process_threads"));
}

View File

@@ -183,6 +183,8 @@ pub struct ConfigToml {
pub enable_tls_page_service_api: bool,
pub dev_mode: bool,
pub timeline_import_config: TimelineImportConfig,
#[serde(skip_serializing_if = "Option::is_none")]
pub basebackup_cache_config: Option<BasebackupCacheConfig>,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -235,7 +237,7 @@ pub enum PageServiceProtocolPipelinedBatchingStrategy {
ScatteredLsn,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(tag = "mode", rename_all = "kebab-case")]
pub enum GetVectoredConcurrentIo {
/// The read path is fully sequential: layers are visited
@@ -308,6 +310,26 @@ pub struct TimelineImportConfig {
pub import_job_checkpoint_threshold: NonZeroUsize,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(default)]
pub struct BasebackupCacheConfig {
#[serde(with = "humantime_serde")]
pub cleanup_period: Duration,
// FIXME: Support max_size_bytes.
// pub max_size_bytes: usize,
pub max_size_entries: i64,
}
impl Default for BasebackupCacheConfig {
fn default() -> Self {
Self {
cleanup_period: Duration::from_secs(60),
// max_size_bytes: 1024 * 1024 * 1024, // 1 GiB
max_size_entries: 1000,
}
}
}
pub mod statvfs {
pub mod mock {
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -491,6 +513,14 @@ pub struct TenantConfigToml {
/// Tenant level performance sampling ratio override. Controls the ratio of get page requests
/// that will get perf sampling for the tenant.
pub sampling_ratio: Option<Ratio>,
/// Capacity of relsize snapshot cache (used by replicas).
pub relsize_snapshot_cache_capacity: usize,
/// Enable preparing basebackup on XLOG_CHECKPOINT_SHUTDOWN and using it in basebackup requests.
// FIXME: Remove skip_serializing_if when the feature is stable.
#[serde(skip_serializing_if = "std::ops::Not::not")]
pub basebackup_cache_enabled: bool,
}
pub mod defaults {
@@ -664,6 +694,7 @@ impl Default for ConfigToml {
import_job_soft_size_limit: NonZeroUsize::new(1024 * 1024 * 1024).unwrap(),
import_job_checkpoint_threshold: NonZeroUsize::new(128).unwrap(),
},
basebackup_cache_config: None,
}
}
}
@@ -730,6 +761,7 @@ pub mod tenant_conf_defaults {
pub const DEFAULT_GC_COMPACTION_VERIFICATION: bool = true;
pub const DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB: u64 = 5 * 1024 * 1024; // 5GB
pub const DEFAULT_GC_COMPACTION_RATIO_PERCENT: u64 = 100;
pub const DEFAULT_RELSIZE_SNAPSHOT_CACHE_CAPACITY: usize = 1000;
}
impl Default for TenantConfigToml {
@@ -787,6 +819,8 @@ impl Default for TenantConfigToml {
gc_compaction_initial_threshold_kb: DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB,
gc_compaction_ratio_percent: DEFAULT_GC_COMPACTION_RATIO_PERCENT,
sampling_ratio: None,
relsize_snapshot_cache_capacity: DEFAULT_RELSIZE_SNAPSHOT_CACHE_CAPACITY,
basebackup_cache_enabled: false,
}
}
}

View File

@@ -630,6 +630,10 @@ pub struct TenantConfigPatch {
pub gc_compaction_ratio_percent: FieldPatch<u64>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub sampling_ratio: FieldPatch<Option<Ratio>>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub relsize_snapshot_cache_capacity: FieldPatch<usize>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub basebackup_cache_enabled: FieldPatch<bool>,
}
/// Like [`crate::config::TenantConfigToml`], but preserves the information
@@ -759,6 +763,12 @@ pub struct TenantConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub sampling_ratio: Option<Option<Ratio>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub relsize_snapshot_cache_capacity: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
pub basebackup_cache_enabled: Option<bool>,
}
impl TenantConfig {
@@ -804,6 +814,8 @@ impl TenantConfig {
mut gc_compaction_initial_threshold_kb,
mut gc_compaction_ratio_percent,
mut sampling_ratio,
mut relsize_snapshot_cache_capacity,
mut basebackup_cache_enabled,
} = self;
patch.checkpoint_distance.apply(&mut checkpoint_distance);
@@ -905,6 +917,12 @@ impl TenantConfig {
.gc_compaction_ratio_percent
.apply(&mut gc_compaction_ratio_percent);
patch.sampling_ratio.apply(&mut sampling_ratio);
patch
.relsize_snapshot_cache_capacity
.apply(&mut relsize_snapshot_cache_capacity);
patch
.basebackup_cache_enabled
.apply(&mut basebackup_cache_enabled);
Ok(Self {
checkpoint_distance,
@@ -944,6 +962,8 @@ impl TenantConfig {
gc_compaction_initial_threshold_kb,
gc_compaction_ratio_percent,
sampling_ratio,
relsize_snapshot_cache_capacity,
basebackup_cache_enabled,
})
}
@@ -1052,6 +1072,12 @@ impl TenantConfig {
.gc_compaction_ratio_percent
.unwrap_or(global_conf.gc_compaction_ratio_percent),
sampling_ratio: self.sampling_ratio.unwrap_or(global_conf.sampling_ratio),
relsize_snapshot_cache_capacity: self
.relsize_snapshot_cache_capacity
.unwrap_or(global_conf.relsize_snapshot_cache_capacity),
basebackup_cache_enabled: self
.basebackup_cache_enabled
.unwrap_or(global_conf.basebackup_cache_enabled),
}
}
}

View File

@@ -86,6 +86,27 @@ pub struct DbError {
}
impl DbError {
pub fn new_test_error(code: SqlState, message: String) -> Self {
DbError {
severity: "ERROR".to_string(),
parsed_severity: Some(Severity::Error),
code,
message,
detail: None,
hint: None,
position: None,
where_: None,
schema: None,
table: None,
column: None,
datatype: None,
constraint: None,
file: None,
line: None,
routine: None,
}
}
pub(crate) fn parse(fields: &mut ErrorFields<'_>) -> io::Result<DbError> {
let mut severity = None;
let mut parsed_severity = None;

View File

@@ -30,6 +30,7 @@ crc32c.workspace = true
either.workspace = true
fail.workspace = true
futures.workspace = true
hashlink.workspace = true
hex.workspace = true
humantime.workspace = true
humantime-serde.workspace = true

View File

@@ -0,0 +1,13 @@
[package]
name = "pageserver_page_api"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
prost.workspace = true
tonic.workspace = true
workspace_hack.workspace = true
[build-dependencies]
tonic-build.workspace = true

View File

@@ -0,0 +1,13 @@
use std::env;
use std::path::PathBuf;
/// Generates Rust code from .proto Protobuf schemas, along with a binary file
/// descriptor set for Protobuf schema reflection.
fn main() -> Result<(), Box<dyn std::error::Error>> {
let out_dir = PathBuf::from(env::var("OUT_DIR")?);
tonic_build::configure()
.bytes(["."])
.file_descriptor_set_path(out_dir.join("page_api_descriptor.bin"))
.compile_protos(&["proto/page_service.proto"], &["proto"])
.map_err(|err| err.into())
}

View File

@@ -0,0 +1,233 @@
// Page service, presented by pageservers for computes.
//
// This is the compute read path. It primarily serves page versions at given
// LSNs, but also base backups, SLRU segments, and relation metadata.
//
// EXPERIMENTAL: this is still under development and subject to change.
//
// Request metadata headers:
// - authorization: JWT token ("Bearer <token>"), if auth is enabled
// - neon-tenant-id: tenant ID ("7c4a1f9e3bd6470c8f3e21a65bd2e980")
// - neon-shard-id: shard ID, as <number><count> in hex ("0b10" = shard 11 of 16, 0-based)
// - neon-timeline-id: timeline ID ("f08c4e9a2d5f76b1e3a7c2d8910f4b3e")
//
// The service can be accessed via e.g. grpcurl:
//
// ```
// grpcurl \
// -plaintext \
// -H "neon-tenant-id: 7c4a1f9e3bd6470c8f3e21a65bd2e980" \
// -H "neon-shard-id: 0b10" \
// -H "neon-timeline-id: f08c4e9a2d5f76b1e3a7c2d8910f4b3e" \
// -H "authorization: Bearer $JWT" \
// -d '{"read_lsn": {"request_lsn": 1234567890}, "rel": {"spc_oid": 1663, "db_oid": 1234, "rel_number": 5678, "fork_number": 0}}'
// localhost:51051 page_api.PageService/CheckRelExists
// ```
//
// TODO: consider adding neon-compute-mode ("primary", "static", "replica").
// However, this will require reconnecting when changing modes.
//
// TODO: write implementation guidance on
// - Health checks
// - Tracing, OpenTelemetry
// - Compression
syntax = "proto3";
package page_api;
service PageService {
// Returns whether a relation exists.
rpc CheckRelExists(CheckRelExistsRequest) returns (CheckRelExistsResponse);
// Fetches a base backup.
rpc GetBaseBackup (GetBaseBackupRequest) returns (stream GetBaseBackupResponseChunk);
// Returns the total size of a database, as # of bytes.
rpc GetDbSize (GetDbSizeRequest) returns (GetDbSizeResponse);
// Fetches pages.
//
// This is implemented as a bidirectional streaming RPC for performance. Unary
// requests incur costs for e.g. HTTP/2 stream setup, header parsing,
// authentication, and so on -- with streaming, we only pay these costs during
// the initial stream setup. This ~doubles throughput in benchmarks. Other
// RPCs use regular unary requests, since they are not as frequent and
// performance-critical, and this simplifies implementation.
//
// NB: a status response (e.g. errors) will terminate the stream. The stream
// may be shared by e.g. multiple Postgres backends, so we should avoid this.
// Most errors are therefore sent as GetPageResponse.status instead.
rpc GetPages (stream GetPageRequest) returns (stream GetPageResponse);
// Returns the size of a relation, as # of blocks.
rpc GetRelSize (GetRelSizeRequest) returns (GetRelSizeResponse);
// Fetches an SLRU segment.
rpc GetSlruSegment (GetSlruSegmentRequest) returns (GetSlruSegmentResponse);
}
// The LSN a request should read at.
message ReadLsn {
// The request's read LSN. Required.
uint64 request_lsn = 1;
// If given, the caller guarantees that the page has not been modified since
// this LSN. Must be smaller than or equal to request_lsn. This allows the
// Pageserver to serve an old page without waiting for the request LSN to
// arrive. Valid for all request types.
//
// It is undefined behaviour to make a request such that the page was, in
// fact, modified between request_lsn and not_modified_since_lsn. The
// Pageserver might detect it and return an error, or it might return the old
// page version or the new page version. Setting not_modified_since_lsn equal
// to request_lsn is always safe, but can lead to unnecessary waiting.
uint64 not_modified_since_lsn = 2;
}
// A relation identifier.
message RelTag {
uint32 spc_oid = 1;
uint32 db_oid = 2;
uint32 rel_number = 3;
uint32 fork_number = 4;
}
// Checks whether a relation exists, at the given LSN. Only valid on shard 0,
// other shards will error.
message CheckRelExistsRequest {
ReadLsn read_lsn = 1;
RelTag rel = 2;
}
message CheckRelExistsResponse {
bool exists = 1;
}
// Requests a base backup at a given LSN.
message GetBaseBackupRequest {
// The LSN to fetch a base backup at.
ReadLsn read_lsn = 1;
// If true, logical replication slots will not be created.
bool replica = 2;
}
// Base backup response chunk, returned as an ordered stream.
message GetBaseBackupResponseChunk {
// A basebackup data chunk. The size is undefined, but bounded by the 4 MB
// gRPC message size limit.
bytes chunk = 1;
}
// Requests the size of a database, as # of bytes. Only valid on shard 0, other
// shards will error.
message GetDbSizeRequest {
ReadLsn read_lsn = 1;
uint32 db_oid = 2;
}
message GetDbSizeResponse {
uint64 num_bytes = 1;
}
// Requests one or more pages.
message GetPageRequest {
// A request ID. Will be included in the response. Should be unique for
// in-flight requests on the stream.
uint64 request_id = 1;
// The request class.
GetPageClass request_class = 2;
// The LSN to read at.
ReadLsn read_lsn = 3;
// The relation to read from.
RelTag rel = 4;
// Page numbers to read. Must belong to the remote shard.
//
// Multiple pages will be executed as a single batch by the Pageserver,
// amortizing layer access costs and parallelizing them. This may increase the
// latency of any individual request, but improves the overall latency and
// throughput of the batch as a whole.
//
// TODO: this causes an allocation in the common single-block case. The sender
// can use a SmallVec to stack-allocate it, but Prost will always deserialize
// into a heap-allocated Vec. Consider optimizing this.
//
// TODO: we might be able to avoid a sort or something if we mandate that these
// are always in order. But we can't currenly rely on this on the server, because
// of compatibility with the libpq protocol handler.
repeated uint32 block_number = 5;
}
// A GetPageRequest class. Primarily intended for observability, but may also be
// used for prioritization in the future.
enum GetPageClass {
// Unknown class. For forwards compatibility: used when the client sends a
// class that the server doesn't know about.
GET_PAGE_CLASS_UNKNOWN = 0;
// A normal request. This is the default.
GET_PAGE_CLASS_NORMAL = 1;
// A prefetch request. NB: can only be classified on pg < 18.
GET_PAGE_CLASS_PREFETCH = 2;
// A background request (e.g. vacuum).
GET_PAGE_CLASS_BACKGROUND = 3;
}
// A GetPage response.
//
// A batch response will contain all of the requested pages. We could eagerly
// emit individual pages as soon as they are ready, but on a readv() Postgres
// holds buffer pool locks on all pages in the batch and we'll only return once
// the entire batch is ready, so no one can make use of the individual pages.
message GetPageResponse {
// The original request's ID.
uint64 request_id = 1;
// The response status code.
GetPageStatus status = 2;
// A string describing the status, if any.
string reason = 3;
// The 8KB page images, in the same order as the request. Empty if status != OK.
repeated bytes page_image = 4;
}
// A GetPageResponse status code. Since we use a bidirectional stream, we don't
// want to send errors as gRPC statuses, since this would terminate the stream.
enum GetPageStatus {
// Unknown status. For forwards compatibility: used when the server sends a
// status code that the client doesn't know about.
GET_PAGE_STATUS_UNKNOWN = 0;
// The request was successful.
GET_PAGE_STATUS_OK = 1;
// The page did not exist. The tenant/timeline/shard has already been
// validated during stream setup.
GET_PAGE_STATUS_NOT_FOUND = 2;
// The request was invalid.
GET_PAGE_STATUS_INVALID = 3;
// The tenant is rate limited. Slow down and retry later.
GET_PAGE_STATUS_SLOW_DOWN = 4;
// TODO: consider adding a GET_PAGE_STATUS_LAYER_DOWNLOAD in the case of a
// layer download. This could free up the server task to process other
// requests while the layer download is in progress.
}
// Fetches the size of a relation at a given LSN, as # of blocks. Only valid on
// shard 0, other shards will error.
message GetRelSizeRequest {
ReadLsn read_lsn = 1;
RelTag rel = 2;
}
message GetRelSizeResponse {
uint32 num_blocks = 1;
}
// Requests an SLRU segment. Only valid on shard 0, other shards will error.
message GetSlruSegmentRequest {
ReadLsn read_lsn = 1;
uint32 kind = 2;
uint32 segno = 3;
}
// Returns an SLRU segment.
//
// These are up 32 pages (256 KB), so we can send them as a single response.
message GetSlruSegmentResponse {
bytes segment = 1;
}

View File

@@ -0,0 +1,19 @@
//! This crate provides the Pageserver's page API. It contains:
//!
//! * proto/page_service.proto: the Protobuf schema for the page API.
//! * proto: auto-generated Protobuf types for gRPC.
//!
//! This crate is used by both the client and the server. Try to keep it slim.
// Code generated by protobuf.
pub mod proto {
tonic::include_proto!("page_api");
/// File descriptor set for Protobuf schema reflection. This allows using
/// e.g. grpcurl with the API.
pub const FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("page_api_descriptor");
pub use page_service_client::PageServiceClient;
pub use page_service_server::{PageService, PageServiceServer};
}

View File

@@ -144,7 +144,7 @@ where
replica,
ctx,
io_concurrency: IoConcurrency::spawn_from_conf(
timeline.conf,
timeline.conf.get_vectored_concurrent_io,
timeline
.gate
.enter()
@@ -343,7 +343,7 @@ where
// Gather non-relational files from object storage pages.
let slru_partitions = self
.timeline
.get_slru_keyspace(Version::Lsn(self.lsn), self.ctx)
.get_slru_keyspace(Version::at(self.lsn), self.ctx)
.await?
.partition(
self.timeline.get_shard_identity(),
@@ -378,7 +378,7 @@ where
// Otherwise only include init forks of unlogged relations.
let rels = self
.timeline
.list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
.list_rels(spcnode, dbnode, Version::at(self.lsn), self.ctx)
.await?;
for &rel in rels.iter() {
// Send init fork as main fork to provide well formed empty
@@ -517,7 +517,7 @@ where
async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> Result<(), BasebackupError> {
let nblocks = self
.timeline
.get_rel_size(src, Version::Lsn(self.lsn), self.ctx)
.get_rel_size(src, Version::at(self.lsn), self.ctx)
.await?;
// If the relation is empty, create an empty file
@@ -577,7 +577,7 @@ where
let relmap_img = if has_relmap_file {
let img = self
.timeline
.get_relmap_file(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
.get_relmap_file(spcnode, dbnode, Version::at(self.lsn), self.ctx)
.await?;
if img.len()
@@ -631,7 +631,7 @@ where
if !has_relmap_file
&& self
.timeline
.list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
.list_rels(spcnode, dbnode, Version::at(self.lsn), self.ctx)
.await?
.is_empty()
{

View File

@@ -0,0 +1,518 @@
use std::{collections::HashMap, sync::Arc};
use async_compression::tokio::write::GzipEncoder;
use camino::{Utf8Path, Utf8PathBuf};
use metrics::core::{AtomicU64, GenericCounter};
use pageserver_api::{config::BasebackupCacheConfig, models::TenantState};
use tokio::{
io::{AsyncWriteExt, BufWriter},
sync::mpsc::{UnboundedReceiver, UnboundedSender},
};
use tokio_util::sync::CancellationToken;
use utils::{
id::{TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
shard::TenantShardId,
};
use crate::{
basebackup::send_basebackup_tarball,
context::{DownloadBehavior, RequestContext},
metrics::{BASEBACKUP_CACHE_ENTRIES, BASEBACKUP_CACHE_PREPARE, BASEBACKUP_CACHE_READ},
task_mgr::TaskKind,
tenant::{
Timeline,
mgr::{TenantManager, TenantSlot},
},
};
pub struct BasebackupPrepareRequest {
pub tenant_shard_id: TenantShardId,
pub timeline_id: TimelineId,
pub lsn: Lsn,
}
pub type BasebackupPrepareSender = UnboundedSender<BasebackupPrepareRequest>;
pub type BasebackupPrepareReceiver = UnboundedReceiver<BasebackupPrepareRequest>;
type BasebackupRemoveEntrySender = UnboundedSender<Utf8PathBuf>;
type BasebackupRemoveEntryReceiver = UnboundedReceiver<Utf8PathBuf>;
/// BasebackupCache stores cached basebackup archives for timelines on local disk.
///
/// The main purpose of this cache is to speed up the startup process of compute nodes
/// after scaling to zero.
/// Thus, the basebackup is stored only for the latest LSN of the timeline and with
/// fixed set of parameters (gzip=true, full_backup=false, replica=false, prev_lsn=none).
///
/// The cache receives prepare requests through the `BasebackupPrepareSender` channel,
/// generates a basebackup from the timeline in the background, and stores it on disk.
///
/// Basebackup requests are pretty rare. We expect ~thousands of entries in the cache
/// and ~1 RPS for get requests.
pub struct BasebackupCache {
data_dir: Utf8PathBuf,
config: BasebackupCacheConfig,
tenant_manager: Arc<TenantManager>,
remove_entry_sender: BasebackupRemoveEntrySender,
entries: std::sync::Mutex<HashMap<TenantTimelineId, Lsn>>,
cancel: CancellationToken,
read_hit_count: GenericCounter<AtomicU64>,
read_miss_count: GenericCounter<AtomicU64>,
read_err_count: GenericCounter<AtomicU64>,
prepare_ok_count: GenericCounter<AtomicU64>,
prepare_skip_count: GenericCounter<AtomicU64>,
prepare_err_count: GenericCounter<AtomicU64>,
}
impl BasebackupCache {
/// Creates a BasebackupCache and spawns the background task.
/// The initialization of the cache is performed in the background and does not
/// block the caller. The cache will return `None` for any get requests until
/// initialization is complete.
pub fn spawn(
runtime_handle: &tokio::runtime::Handle,
data_dir: Utf8PathBuf,
config: Option<BasebackupCacheConfig>,
prepare_receiver: BasebackupPrepareReceiver,
tenant_manager: Arc<TenantManager>,
cancel: CancellationToken,
) -> Arc<Self> {
let (remove_entry_sender, remove_entry_receiver) = tokio::sync::mpsc::unbounded_channel();
let enabled = config.is_some();
let cache = Arc::new(BasebackupCache {
data_dir,
config: config.unwrap_or_default(),
tenant_manager,
remove_entry_sender,
entries: std::sync::Mutex::new(HashMap::new()),
cancel,
read_hit_count: BASEBACKUP_CACHE_READ.with_label_values(&["hit"]),
read_miss_count: BASEBACKUP_CACHE_READ.with_label_values(&["miss"]),
read_err_count: BASEBACKUP_CACHE_READ.with_label_values(&["error"]),
prepare_ok_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["ok"]),
prepare_skip_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["skip"]),
prepare_err_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["error"]),
});
if enabled {
runtime_handle.spawn(
cache
.clone()
.background(prepare_receiver, remove_entry_receiver),
);
}
cache
}
/// Gets a basebackup entry from the cache.
/// If the entry is found, opens a file with the basebackup archive and returns it.
/// The open file descriptor will prevent the file system from deleting the file
/// even if the entry is removed from the cache in the background.
pub async fn get(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Option<tokio::fs::File> {
// Fast path. Check if the entry exists using the in-memory state.
let tti = TenantTimelineId::new(tenant_id, timeline_id);
if self.entries.lock().unwrap().get(&tti) != Some(&lsn) {
self.read_miss_count.inc();
return None;
}
let path = self.entry_path(tenant_id, timeline_id, lsn);
match tokio::fs::File::open(path).await {
Ok(file) => {
self.read_hit_count.inc();
Some(file)
}
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
// We may end up here if the basebackup was concurrently removed by the cleanup task.
self.read_miss_count.inc();
} else {
self.read_err_count.inc();
tracing::warn!("Unexpected error opening basebackup cache file: {:?}", e);
}
None
}
}
}
// Private methods.
fn entry_filename(tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn) -> String {
// The default format for LSN is 0/ABCDEF.
// The backslash is not filename friendly, so serialize it as plain hex.
let lsn = lsn.0;
format!("basebackup_{tenant_id}_{timeline_id}_{lsn:016X}.tar.gz")
}
fn entry_path(&self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn) -> Utf8PathBuf {
self.data_dir
.join(Self::entry_filename(tenant_id, timeline_id, lsn))
}
fn entry_tmp_path(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Utf8PathBuf {
self.data_dir
.join("tmp")
.join(Self::entry_filename(tenant_id, timeline_id, lsn))
}
fn parse_entry_filename(filename: &str) -> Option<(TenantId, TimelineId, Lsn)> {
let parts: Vec<&str> = filename
.strip_prefix("basebackup_")?
.strip_suffix(".tar.gz")?
.split('_')
.collect();
if parts.len() != 3 {
return None;
}
let tenant_id = parts[0].parse::<TenantId>().ok()?;
let timeline_id = parts[1].parse::<TimelineId>().ok()?;
let lsn = Lsn(u64::from_str_radix(parts[2], 16).ok()?);
Some((tenant_id, timeline_id, lsn))
}
async fn cleanup(&self) -> anyhow::Result<()> {
// Cleanup tmp directory.
let tmp_dir = self.data_dir.join("tmp");
let mut tmp_dir = tokio::fs::read_dir(&tmp_dir).await?;
while let Some(dir_entry) = tmp_dir.next_entry().await? {
if let Err(e) = tokio::fs::remove_file(dir_entry.path()).await {
tracing::warn!("Failed to remove basebackup cache tmp file: {:#}", e);
}
}
// Remove outdated entries.
let entries_old = self.entries.lock().unwrap().clone();
let mut entries_new = HashMap::new();
for (tenant_shard_id, tenant_slot) in self.tenant_manager.list() {
if !tenant_shard_id.is_shard_zero() {
continue;
}
let TenantSlot::Attached(tenant) = tenant_slot else {
continue;
};
let tenant_id = tenant_shard_id.tenant_id;
for timeline in tenant.list_timelines() {
let tti = TenantTimelineId::new(tenant_id, timeline.timeline_id);
if let Some(&entry_lsn) = entries_old.get(&tti) {
if timeline.get_last_record_lsn() <= entry_lsn {
entries_new.insert(tti, entry_lsn);
}
}
}
}
for (&tti, &lsn) in entries_old.iter() {
if !entries_new.contains_key(&tti) {
self.remove_entry_sender
.send(self.entry_path(tti.tenant_id, tti.timeline_id, lsn))
.unwrap();
}
}
BASEBACKUP_CACHE_ENTRIES.set(entries_new.len() as i64);
*self.entries.lock().unwrap() = entries_new;
Ok(())
}
async fn on_startup(&self) -> anyhow::Result<()> {
// Create data_dir and tmp directory if they do not exist.
tokio::fs::create_dir_all(&self.data_dir.join("tmp"))
.await
.map_err(|e| {
anyhow::anyhow!(
"Failed to create basebackup cache data_dir {:?}: {:?}",
self.data_dir,
e
)
})?;
// Read existing entries from the data_dir and add them to in-memory state.
let mut entries = HashMap::new();
let mut dir = tokio::fs::read_dir(&self.data_dir).await?;
while let Some(dir_entry) = dir.next_entry().await? {
let filename = dir_entry.file_name();
if filename == "tmp" {
// Skip the tmp directory.
continue;
}
let parsed = Self::parse_entry_filename(filename.to_string_lossy().as_ref());
let Some((tenant_id, timeline_id, lsn)) = parsed else {
tracing::warn!("Invalid basebackup cache file name: {:?}", filename);
continue;
};
let tti = TenantTimelineId::new(tenant_id, timeline_id);
use std::collections::hash_map::Entry::*;
match entries.entry(tti) {
Occupied(mut entry) => {
let entry_lsn = *entry.get();
// Leave only the latest entry, remove the old one.
if lsn < entry_lsn {
self.remove_entry_sender.send(self.entry_path(
tenant_id,
timeline_id,
lsn,
))?;
} else if lsn > entry_lsn {
self.remove_entry_sender.send(self.entry_path(
tenant_id,
timeline_id,
entry_lsn,
))?;
entry.insert(lsn);
} else {
// Two different filenames parsed to the same timline_id and LSN.
// Should never happen.
return Err(anyhow::anyhow!(
"Duplicate basebackup cache entry with the same LSN: {:?}",
filename
));
}
}
Vacant(entry) => {
entry.insert(lsn);
}
}
}
BASEBACKUP_CACHE_ENTRIES.set(entries.len() as i64);
*self.entries.lock().unwrap() = entries;
Ok(())
}
async fn background(
self: Arc<Self>,
mut prepare_receiver: BasebackupPrepareReceiver,
mut remove_entry_receiver: BasebackupRemoveEntryReceiver,
) {
// Panic in the background is a safe fallback.
// It will drop receivers and the cache will be effectively disabled.
self.on_startup()
.await
.expect("Failed to initialize basebackup cache");
let mut cleanup_ticker = tokio::time::interval(self.config.cleanup_period);
cleanup_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
Some(req) = prepare_receiver.recv() => {
if let Err(err) = self.prepare_basebackup(
req.tenant_shard_id,
req.timeline_id,
req.lsn,
).await {
tracing::info!("Failed to prepare basebackup: {:#}", err);
self.prepare_err_count.inc();
continue;
}
}
Some(req) = remove_entry_receiver.recv() => {
if let Err(e) = tokio::fs::remove_file(req).await {
tracing::warn!("Failed to remove basebackup cache file: {:#}", e);
}
}
_ = cleanup_ticker.tick() => {
self.cleanup().await.unwrap_or_else(|e| {
tracing::warn!("Failed to clean up basebackup cache: {:#}", e);
});
}
_ = self.cancel.cancelled() => {
tracing::info!("BasebackupCache background task cancelled");
break;
}
}
}
}
/// Prepare a basebackup for the given timeline.
///
/// If the basebackup already exists with a higher LSN or the timeline already
/// has a higher last_record_lsn, skip the preparation.
///
/// The basebackup is prepared in a temporary directory and then moved to the final
/// location to make the operation atomic.
async fn prepare_basebackup(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
req_lsn: Lsn,
) -> anyhow::Result<()> {
tracing::info!(
tenant_id = %tenant_shard_id.tenant_id,
%timeline_id,
%req_lsn,
"Preparing basebackup for timeline",
);
let tti = TenantTimelineId::new(tenant_shard_id.tenant_id, timeline_id);
{
let entries = self.entries.lock().unwrap();
if let Some(&entry_lsn) = entries.get(&tti) {
if entry_lsn >= req_lsn {
tracing::info!(
%timeline_id,
%req_lsn,
%entry_lsn,
"Basebackup entry already exists for timeline with higher LSN, skipping basebackup",
);
self.prepare_skip_count.inc();
return Ok(());
}
}
if entries.len() as i64 >= self.config.max_size_entries {
tracing::info!(
%timeline_id,
%req_lsn,
"Basebackup cache is full, skipping basebackup",
);
self.prepare_skip_count.inc();
return Ok(());
}
}
let tenant = self
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
let tenant_state = tenant.current_state();
if tenant_state != TenantState::Active {
anyhow::bail!(
"Tenant {} is not active, current state: {:?}",
tenant_shard_id.tenant_id,
tenant_state
)
}
let timeline = tenant.get_timeline(timeline_id, true)?;
let last_record_lsn = timeline.get_last_record_lsn();
if last_record_lsn > req_lsn {
tracing::info!(
%timeline_id,
%req_lsn,
%last_record_lsn,
"Timeline has a higher LSN than the requested one, skipping basebackup",
);
self.prepare_skip_count.inc();
return Ok(());
}
let entry_tmp_path = self.entry_tmp_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
let res = self
.prepare_basebackup_tmp(&entry_tmp_path, &timeline, req_lsn)
.await;
if let Err(err) = res {
tracing::info!("Failed to prepare basebackup tmp file: {:#}", err);
// Try to clean up tmp file. If we fail, the background clean up task will take care of it.
match tokio::fs::remove_file(&entry_tmp_path).await {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => {
tracing::info!("Failed to remove basebackup tmp file: {:?}", e);
}
}
return Err(err);
}
// Move the tmp file to the final location atomically.
let entry_path = self.entry_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
tokio::fs::rename(&entry_tmp_path, &entry_path).await?;
let mut entries = self.entries.lock().unwrap();
if let Some(old_lsn) = entries.insert(tti, req_lsn) {
// Remove the old entry if it exists.
self.remove_entry_sender
.send(self.entry_path(tenant_shard_id.tenant_id, timeline_id, old_lsn))
.unwrap();
}
BASEBACKUP_CACHE_ENTRIES.set(entries.len() as i64);
self.prepare_ok_count.inc();
Ok(())
}
/// Prepares a basebackup in a temporary file.
async fn prepare_basebackup_tmp(
&self,
emptry_tmp_path: &Utf8Path,
timeline: &Arc<Timeline>,
req_lsn: Lsn,
) -> anyhow::Result<()> {
let ctx = RequestContext::new(TaskKind::BasebackupCache, DownloadBehavior::Download);
let ctx = ctx.with_scope_timeline(timeline);
let file = tokio::fs::File::create(emptry_tmp_path).await?;
let mut writer = BufWriter::new(file);
let mut encoder = GzipEncoder::with_quality(
&mut writer,
// Level::Best because compression is not on the hot path of basebackup requests.
// The decompression is almost not affected by the compression level.
async_compression::Level::Best,
);
// We may receive a request before the WAL record is applied to the timeline.
// Wait for the requested LSN to be applied.
timeline
.wait_lsn(
req_lsn,
crate::tenant::timeline::WaitLsnWaiter::BaseBackupCache,
crate::tenant::timeline::WaitLsnTimeout::Default,
&ctx,
)
.await?;
send_basebackup_tarball(
&mut encoder,
timeline,
Some(req_lsn),
None,
false,
false,
&ctx,
)
.await?;
encoder.shutdown().await?;
writer.flush().await?;
writer.into_inner().sync_all().await?;
Ok(())
}
}

View File

@@ -16,6 +16,7 @@ use http_utils::tls_certs::ReloadingCertificateResolver;
use metrics::launch_timestamp::{LaunchTimestamp, set_launch_timestamp_metric};
use metrics::set_build_info_metric;
use nix::sys::socket::{setsockopt, sockopt};
use pageserver::basebackup_cache::BasebackupCache;
use pageserver::config::{PageServerConf, PageserverIdentity, ignored_fields};
use pageserver::controller_upcall_client::StorageControllerUpcallClient;
use pageserver::deletion_queue::DeletionQueue;
@@ -541,6 +542,8 @@ fn start_pageserver(
pageserver::l0_flush::L0FlushGlobalState::new(conf.l0_flush.clone());
// Scan the local 'tenants/' directory and start loading the tenants
let (basebackup_prepare_sender, basebackup_prepare_receiver) =
tokio::sync::mpsc::unbounded_channel();
let deletion_queue_client = deletion_queue.new_client();
let background_purges = mgr::BackgroundPurges::default();
let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
@@ -551,12 +554,22 @@ fn start_pageserver(
remote_storage: remote_storage.clone(),
deletion_queue_client,
l0_flush_global_state,
basebackup_prepare_sender,
},
order,
shutdown_pageserver.clone(),
))?;
let tenant_manager = Arc::new(tenant_manager);
let basebackup_cache = BasebackupCache::spawn(
BACKGROUND_RUNTIME.handle(),
conf.basebackup_cache_dir(),
conf.basebackup_cache_config.clone(),
basebackup_prepare_receiver,
Arc::clone(&tenant_manager),
shutdown_pageserver.child_token(),
);
BACKGROUND_RUNTIME.spawn({
let shutdown_pageserver = shutdown_pageserver.clone();
let drive_init = async move {
@@ -763,6 +776,7 @@ fn start_pageserver(
} else {
None
},
basebackup_cache,
);
// All started up! Now just sit and wait for shutdown signal.

View File

@@ -232,6 +232,8 @@ pub struct PageServerConf {
pub dev_mode: bool,
pub timeline_import_config: pageserver_api::config::TimelineImportConfig,
pub basebackup_cache_config: Option<pageserver_api::config::BasebackupCacheConfig>,
}
/// Token for authentication to safekeepers
@@ -261,6 +263,10 @@ impl PageServerConf {
self.workdir.join("metadata.json")
}
pub fn basebackup_cache_dir(&self) -> Utf8PathBuf {
self.workdir.join("basebackup_cache")
}
pub fn deletion_list_path(&self, sequence: u64) -> Utf8PathBuf {
// Encode a version in the filename, so that if we ever switch away from JSON we can
// increment this.
@@ -407,6 +413,7 @@ impl PageServerConf {
enable_tls_page_service_api,
dev_mode,
timeline_import_config,
basebackup_cache_config,
} = config_toml;
let mut conf = PageServerConf {
@@ -461,6 +468,7 @@ impl PageServerConf {
enable_tls_page_service_api,
dev_mode,
timeline_import_config,
basebackup_cache_config,
// ------------------------------------------------------------
// fields that require additional validation or custom handling
@@ -544,6 +552,23 @@ impl PageServerConf {
ratio.numerator, ratio.denominator
)
);
let url = Url::parse(&tracing_config.export_config.endpoint)
.map_err(anyhow::Error::msg)
.with_context(|| {
format!(
"tracing endpoint URL is invalid : {}",
tracing_config.export_config.endpoint
)
})?;
ensure!(
url.scheme() == "http" || url.scheme() == "https",
format!(
"tracing endpoint URL must start with http:// or https://: {}",
tracing_config.export_config.endpoint
)
);
}
IndexEntry::validate_checkpoint_distance(conf.default_tenant_conf.checkpoint_distance)
@@ -660,4 +685,25 @@ mod tests {
PageServerConf::parse_and_validate(NodeId(0), config_toml, &workdir)
.expect("parse_and_validate");
}
#[test]
fn test_config_tracing_endpoint_is_invalid() {
let input = r#"
control_plane_api = "http://localhost:6666"
[tracing]
sampling_ratio = { numerator = 1, denominator = 0 }
[tracing.export_config]
endpoint = "localhost:4317"
protocol = "http-binary"
timeout = "1ms"
"#;
let config_toml = toml_edit::de::from_str::<pageserver_api::config::ConfigToml>(input)
.expect("config has valid fields");
let workdir = Utf8PathBuf::from("/nonexistent");
PageServerConf::parse_and_validate(NodeId(0), config_toml, &workdir)
.expect_err("parse_and_validate should fail for endpoint without scheme");
}
}

View File

@@ -18,12 +18,25 @@ use crate::tenant::timeline::logical_size::CurrentLogicalSize;
// management.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub(super) enum Name {
/// Timeline last_record_lsn, absolute
/// Timeline last_record_lsn, absolute.
#[serde(rename = "written_size")]
WrittenSize,
/// Timeline last_record_lsn, incremental
#[serde(rename = "written_data_bytes_delta")]
WrittenSizeDelta,
/// Written bytes only on this timeline (not including ancestors):
/// written_size - ancestor_lsn
///
/// On the root branch, this is equivalent to `written_size`.
#[serde(rename = "written_size_since_parent")]
WrittenSizeSinceParent,
/// PITR history size only on this timeline (not including ancestors):
/// last_record_lsn - max(pitr_cutoff, ancestor_lsn).
///
/// On the root branch, this is its entire PITR history size. Not emitted if GC hasn't computed
/// the PITR cutoff yet. 0 if PITR is disabled.
#[serde(rename = "pitr_history_size_since_parent")]
PitrHistorySizeSinceParent,
/// Timeline logical size
#[serde(rename = "timeline_logical_size")]
LogicalSize,
@@ -157,6 +170,32 @@ impl MetricsKey {
.incremental_values()
}
/// `written_size` - `ancestor_lsn`.
const fn written_size_since_parent(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
metric: Name::WrittenSizeSinceParent,
}
.absolute_values()
}
/// `written_size` - max(`pitr_cutoff`, `ancestor_lsn`).
const fn pitr_history_size_since_parent(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
metric: Name::PitrHistorySizeSinceParent,
}
.absolute_values()
}
/// Exact [`Timeline::get_current_logical_size`].
///
/// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
@@ -334,7 +373,13 @@ impl TenantSnapshot {
struct TimelineSnapshot {
loaded_at: (Lsn, SystemTime),
last_record_lsn: Lsn,
ancestor_lsn: Lsn,
current_exact_logical_size: Option<u64>,
/// Whether PITR is enabled (pitr_interval > 0).
pitr_enabled: bool,
/// The PITR cutoff LSN. None if not yet initialized. If PITR is disabled, this is approximately
/// Some(last_record_lsn), but may lag behind it since it's computed periodically.
pitr_cutoff: Option<Lsn>,
}
impl TimelineSnapshot {
@@ -354,6 +399,9 @@ impl TimelineSnapshot {
} else {
let loaded_at = t.loaded_at;
let last_record_lsn = t.get_last_record_lsn();
let ancestor_lsn = t.get_ancestor_lsn();
let pitr_enabled = !t.get_pitr_interval().is_zero();
let pitr_cutoff = t.gc_info.read().unwrap().cutoffs.time;
let current_exact_logical_size = {
let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
@@ -373,7 +421,10 @@ impl TimelineSnapshot {
Ok(Some(TimelineSnapshot {
loaded_at,
last_record_lsn,
ancestor_lsn,
current_exact_logical_size,
pitr_enabled,
pitr_cutoff,
}))
}
}
@@ -424,6 +475,8 @@ impl TimelineSnapshot {
let up_to = now;
let written_size_last = written_size_now.value.max(prev.1); // don't regress
if let Some(delta) = written_size_now.value.checked_sub(prev.1) {
let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
// written_size_delta
@@ -441,6 +494,27 @@ impl TimelineSnapshot {
});
}
// Compute the branch-local written size.
let written_size_since_parent_key =
MetricsKey::written_size_since_parent(tenant_id, timeline_id);
metrics.push(
written_size_since_parent_key
.at(now, written_size_last.saturating_sub(self.ancestor_lsn.0)),
);
// Compute the branch-local PITR history size. Not emitted if GC hasn't yet computed the
// PITR cutoff. 0 if PITR is disabled.
let pitr_history_size_since_parent_key =
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id);
if !self.pitr_enabled {
metrics.push(pitr_history_size_since_parent_key.at(now, 0));
} else if let Some(pitr_cutoff) = self.pitr_cutoff {
metrics.push(pitr_history_size_since_parent_key.at(
now,
written_size_last.saturating_sub(pitr_cutoff.max(self.ancestor_lsn).0),
));
}
{
let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
let current_or_previous = self

View File

@@ -12,12 +12,17 @@ fn startup_collected_timeline_metrics_before_advancing() {
let cache = HashMap::new();
let initdb_lsn = Lsn(0x10000);
let pitr_cutoff = Lsn(0x11000);
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
let logical_size = 0x42000;
let snap = TimelineSnapshot {
loaded_at: (disk_consistent_lsn, SystemTime::now()),
last_record_lsn: disk_consistent_lsn,
current_exact_logical_size: Some(0x42000),
ancestor_lsn: Lsn(0),
current_exact_logical_size: Some(logical_size),
pitr_enabled: true,
pitr_cutoff: Some(pitr_cutoff),
};
let now = DateTime::<Utc>::from(SystemTime::now());
@@ -33,7 +38,11 @@ fn startup_collected_timeline_metrics_before_advancing() {
0
),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
MetricsKey::written_size_since_parent(tenant_id, timeline_id)
.at(now, disk_consistent_lsn.0),
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id)
.at(now, disk_consistent_lsn.0 - pitr_cutoff.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, logical_size)
]
);
}
@@ -49,7 +58,9 @@ fn startup_collected_timeline_metrics_second_round() {
let before = DateTime::<Utc>::from(before);
let initdb_lsn = Lsn(0x10000);
let pitr_cutoff = Lsn(0x11000);
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
let logical_size = 0x42000;
let mut metrics = Vec::new();
let cache = HashMap::from([MetricsKey::written_size(tenant_id, timeline_id)
@@ -59,7 +70,10 @@ fn startup_collected_timeline_metrics_second_round() {
let snap = TimelineSnapshot {
loaded_at: (disk_consistent_lsn, init),
last_record_lsn: disk_consistent_lsn,
current_exact_logical_size: Some(0x42000),
ancestor_lsn: Lsn(0),
current_exact_logical_size: Some(logical_size),
pitr_enabled: true,
pitr_cutoff: Some(pitr_cutoff),
};
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
@@ -69,7 +83,11 @@ fn startup_collected_timeline_metrics_second_round() {
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, now, 0),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
MetricsKey::written_size_since_parent(tenant_id, timeline_id)
.at(now, disk_consistent_lsn.0),
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id)
.at(now, disk_consistent_lsn.0 - pitr_cutoff.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, logical_size)
]
);
}
@@ -86,7 +104,9 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
let before = DateTime::<Utc>::from(before);
let initdb_lsn = Lsn(0x10000);
let pitr_cutoff = Lsn(0x11000);
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
let logical_size = 0x42000;
let mut metrics = Vec::new();
let cache = HashMap::from([
@@ -103,7 +123,10 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
let snap = TimelineSnapshot {
loaded_at: (disk_consistent_lsn, init),
last_record_lsn: disk_consistent_lsn,
current_exact_logical_size: Some(0x42000),
ancestor_lsn: Lsn(0),
current_exact_logical_size: Some(logical_size),
pitr_enabled: true,
pitr_cutoff: Some(pitr_cutoff),
};
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
@@ -113,16 +136,18 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(just_before, now, 0),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
MetricsKey::written_size_since_parent(tenant_id, timeline_id)
.at(now, disk_consistent_lsn.0),
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id)
.at(now, disk_consistent_lsn.0 - pitr_cutoff.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, logical_size)
]
);
}
/// Tests that written sizes do not regress across restarts.
#[test]
fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
// it can happen that we lose the inmemorylayer but have previously sent metrics and we
// should never go backwards
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
@@ -140,7 +165,10 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
let snap = TimelineSnapshot {
loaded_at: (Lsn(50), at_restart),
last_record_lsn: Lsn(50),
ancestor_lsn: Lsn(0),
current_exact_logical_size: None,
pitr_enabled: true,
pitr_cutoff: Some(Lsn(20)),
};
let mut cache = HashMap::from([
@@ -169,6 +197,8 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
0
),
MetricsKey::written_size(tenant_id, timeline_id).at(now, 100),
MetricsKey::written_size_since_parent(tenant_id, timeline_id).at(now, 100),
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id).at(now, 80),
]
);
@@ -183,6 +213,157 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(now, later, 0),
MetricsKey::written_size(tenant_id, timeline_id).at(later, 100),
MetricsKey::written_size_since_parent(tenant_id, timeline_id).at(later, 100),
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id).at(later, 80),
]
);
}
/// Tests that written sizes do not regress across restarts, even on child branches.
#[test]
fn post_restart_written_sizes_with_rolled_back_last_record_lsn_and_ancestor_lsn() {
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let [later, now, at_restart] = time_backwards();
// FIXME: tests would be so much easier if we did not need to juggle back and forth
// SystemTime and DateTime::<Utc> ... Could do the conversion only at upload time?
let now = DateTime::<Utc>::from(now);
let later = DateTime::<Utc>::from(later);
let before_restart = at_restart - std::time::Duration::from_secs(5 * 60);
let way_before = before_restart - std::time::Duration::from_secs(10 * 60);
let before_restart = DateTime::<Utc>::from(before_restart);
let way_before = DateTime::<Utc>::from(way_before);
let snap = TimelineSnapshot {
loaded_at: (Lsn(50), at_restart),
last_record_lsn: Lsn(50),
ancestor_lsn: Lsn(40),
current_exact_logical_size: None,
pitr_enabled: true,
pitr_cutoff: Some(Lsn(20)),
};
let mut cache = HashMap::from([
MetricsKey::written_size(tenant_id, timeline_id)
.at(before_restart, 100)
.to_kv_pair(),
MetricsKey::written_size_delta(tenant_id, timeline_id)
.from_until(
way_before,
before_restart,
// not taken into account, but the timestamps are important
999_999_999,
)
.to_kv_pair(),
]);
let mut metrics = Vec::new();
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(
before_restart,
now,
0
),
MetricsKey::written_size(tenant_id, timeline_id).at(now, 100),
MetricsKey::written_size_since_parent(tenant_id, timeline_id).at(now, 60),
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id).at(now, 60),
]
);
// now if we cache these metrics, and re-run while "still in recovery"
cache.extend(metrics.drain(..).map(|x| x.to_kv_pair()));
// "still in recovery", because our snapshot did not change
snap.to_metrics(tenant_id, timeline_id, later, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(now, later, 0),
MetricsKey::written_size(tenant_id, timeline_id).at(later, 100),
MetricsKey::written_size_since_parent(tenant_id, timeline_id).at(later, 60),
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id).at(later, 60),
]
);
}
/// Tests that written sizes do not regress across restarts, even on child branches and
/// with a PITR cutoff after the branch point.
#[test]
fn post_restart_written_sizes_with_rolled_back_last_record_lsn_and_ancestor_lsn_and_pitr_cutoff() {
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let [later, now, at_restart] = time_backwards();
// FIXME: tests would be so much easier if we did not need to juggle back and forth
// SystemTime and DateTime::<Utc> ... Could do the conversion only at upload time?
let now = DateTime::<Utc>::from(now);
let later = DateTime::<Utc>::from(later);
let before_restart = at_restart - std::time::Duration::from_secs(5 * 60);
let way_before = before_restart - std::time::Duration::from_secs(10 * 60);
let before_restart = DateTime::<Utc>::from(before_restart);
let way_before = DateTime::<Utc>::from(way_before);
let snap = TimelineSnapshot {
loaded_at: (Lsn(50), at_restart),
last_record_lsn: Lsn(50),
ancestor_lsn: Lsn(30),
current_exact_logical_size: None,
pitr_enabled: true,
pitr_cutoff: Some(Lsn(40)),
};
let mut cache = HashMap::from([
MetricsKey::written_size(tenant_id, timeline_id)
.at(before_restart, 100)
.to_kv_pair(),
MetricsKey::written_size_delta(tenant_id, timeline_id)
.from_until(
way_before,
before_restart,
// not taken into account, but the timestamps are important
999_999_999,
)
.to_kv_pair(),
]);
let mut metrics = Vec::new();
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(
before_restart,
now,
0
),
MetricsKey::written_size(tenant_id, timeline_id).at(now, 100),
MetricsKey::written_size_since_parent(tenant_id, timeline_id).at(now, 70),
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id).at(now, 60),
]
);
// now if we cache these metrics, and re-run while "still in recovery"
cache.extend(metrics.drain(..).map(|x| x.to_kv_pair()));
// "still in recovery", because our snapshot did not change
snap.to_metrics(tenant_id, timeline_id, later, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(now, later, 0),
MetricsKey::written_size(tenant_id, timeline_id).at(later, 100),
MetricsKey::written_size_since_parent(tenant_id, timeline_id).at(later, 70),
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id).at(later, 60),
]
);
}
@@ -201,7 +382,10 @@ fn post_restart_current_exact_logical_size_uses_cached() {
let snap = TimelineSnapshot {
loaded_at: (Lsn(50), at_restart),
last_record_lsn: Lsn(50),
ancestor_lsn: Lsn(0),
current_exact_logical_size: None,
pitr_enabled: true,
pitr_cutoff: None,
};
let cache = HashMap::from([MetricsKey::timeline_logical_size(tenant_id, timeline_id)
@@ -286,16 +470,101 @@ fn time_backwards<const N: usize>() -> [std::time::SystemTime; N] {
times
}
/// Tests that disabled PITR history does not yield any history size, even when the PITR cutoff
/// indicates otherwise.
#[test]
fn pitr_disabled_yields_no_history_size() {
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let mut metrics = Vec::new();
let cache = HashMap::new();
let initdb_lsn = Lsn(0x10000);
let pitr_cutoff = Lsn(0x11000);
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
let snap = TimelineSnapshot {
loaded_at: (disk_consistent_lsn, SystemTime::now()),
last_record_lsn: disk_consistent_lsn,
ancestor_lsn: Lsn(0),
current_exact_logical_size: None,
pitr_enabled: false,
pitr_cutoff: Some(pitr_cutoff),
};
let now = DateTime::<Utc>::from(SystemTime::now());
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(
snap.loaded_at.1.into(),
now,
0
),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::written_size_since_parent(tenant_id, timeline_id)
.at(now, disk_consistent_lsn.0),
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id).at(now, 0),
]
);
}
/// Tests that uninitialized PITR cutoff does not emit any history size metric at all.
#[test]
fn pitr_uninitialized_does_not_emit_history_size() {
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let mut metrics = Vec::new();
let cache = HashMap::new();
let initdb_lsn = Lsn(0x10000);
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
let snap = TimelineSnapshot {
loaded_at: (disk_consistent_lsn, SystemTime::now()),
last_record_lsn: disk_consistent_lsn,
ancestor_lsn: Lsn(0),
current_exact_logical_size: None,
pitr_enabled: true,
pitr_cutoff: None,
};
let now = DateTime::<Utc>::from(SystemTime::now());
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(
snap.loaded_at.1.into(),
now,
0
),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::written_size_since_parent(tenant_id, timeline_id)
.at(now, disk_consistent_lsn.0),
]
);
}
pub(crate) const fn metric_examples_old(
tenant_id: TenantId,
timeline_id: TimelineId,
now: DateTime<Utc>,
before: DateTime<Utc>,
) -> [RawMetric; 5] {
) -> [RawMetric; 7] {
[
MetricsKey::written_size(tenant_id, timeline_id).at_old_format(now, 0),
MetricsKey::written_size_delta(tenant_id, timeline_id)
.from_until_old_format(before, now, 0),
MetricsKey::written_size_since_parent(tenant_id, timeline_id).at_old_format(now, 0),
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id).at_old_format(now, 0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at_old_format(now, 0),
MetricsKey::remote_storage_size(tenant_id).at_old_format(now, 0),
MetricsKey::synthetic_size(tenant_id).at_old_format(now, 1),
@@ -307,10 +576,12 @@ pub(crate) const fn metric_examples(
timeline_id: TimelineId,
now: DateTime<Utc>,
before: DateTime<Utc>,
) -> [NewRawMetric; 5] {
) -> [NewRawMetric; 7] {
[
MetricsKey::written_size(tenant_id, timeline_id).at(now, 0),
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, now, 0),
MetricsKey::written_size_since_parent(tenant_id, timeline_id).at(now, 0),
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id).at(now, 0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0),
MetricsKey::remote_storage_size(tenant_id).at(now, 0),
MetricsKey::synthetic_size(tenant_id).at(now, 1),

View File

@@ -513,6 +513,14 @@ mod tests {
line!(),
r#"{"type":"incremental","start_time":"2023-09-14T00:00:00.123456789Z","stop_time":"2023-09-15T00:00:00.123456789Z","metric":"written_data_bytes_delta","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
),
(
line!(),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size_since_parent","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
),
(
line!(),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"pitr_history_size_since_parent","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
),
(
line!(),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"timeline_logical_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
@@ -560,7 +568,7 @@ mod tests {
assert_eq!(upgraded_samples, new_samples);
}
fn metric_samples_old() -> [RawMetric; 5] {
fn metric_samples_old() -> [RawMetric; 7] {
let tenant_id = TenantId::from_array([0; 16]);
let timeline_id = TimelineId::from_array([0xff; 16]);
@@ -572,7 +580,7 @@ mod tests {
super::super::metrics::metric_examples_old(tenant_id, timeline_id, now, before)
}
fn metric_samples() -> [NewRawMetric; 5] {
fn metric_samples() -> [NewRawMetric; 7] {
let tenant_id = TenantId::from_array([0; 16]);
let timeline_id = TimelineId::from_array([0xff; 16]);

View File

@@ -449,7 +449,7 @@ async fn build_timeline_info_common(
// Internally we distinguish between the planned GC cutoff (PITR point) and the "applied" GC cutoff (where we
// actually trimmed data to), which can pass each other when PITR is changed.
let min_readable_lsn = std::cmp::max(
timeline.get_gc_cutoff_lsn(),
timeline.get_gc_cutoff_lsn().unwrap_or_default(),
*timeline.get_applied_gc_cutoff_lsn(),
);
@@ -3199,7 +3199,7 @@ async fn list_aux_files(
.await?;
let io_concurrency = IoConcurrency::spawn_from_conf(
state.conf,
state.conf.get_vectored_concurrent_io,
timeline.gate.enter().map_err(|_| ApiError::Cancelled)?,
);

View File

@@ -3,6 +3,7 @@
mod auth;
pub mod basebackup;
pub mod basebackup_cache;
pub mod config;
pub mod consumption_metrics;
pub mod context;

View File

@@ -843,23 +843,50 @@ pub(crate) static COMPRESSION_IMAGE_OUTPUT_BYTES: Lazy<IntCounter> = Lazy::new(|
.expect("failed to define a metric")
});
pub(crate) static RELSIZE_CACHE_ENTRIES: Lazy<UIntGauge> = Lazy::new(|| {
pub(crate) static RELSIZE_LATEST_CACHE_ENTRIES: Lazy<UIntGauge> = Lazy::new(|| {
register_uint_gauge!(
"pageserver_relsize_cache_entries",
"Number of entries in the relation size cache",
"pageserver_relsize_latest_cache_entries",
"Number of entries in the latest relation size cache",
)
.expect("failed to define a metric")
});
pub(crate) static RELSIZE_CACHE_HITS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!("pageserver_relsize_cache_hits", "Relation size cache hits",)
.expect("failed to define a metric")
pub(crate) static RELSIZE_LATEST_CACHE_HITS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_relsize_latest_cache_hits",
"Latest relation size cache hits",
)
.expect("failed to define a metric")
});
pub(crate) static RELSIZE_CACHE_MISSES: Lazy<IntCounter> = Lazy::new(|| {
pub(crate) static RELSIZE_LATEST_CACHE_MISSES: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_relsize_cache_misses",
"Relation size cache misses",
"pageserver_relsize_latest_cache_misses",
"Relation size latest cache misses",
)
.expect("failed to define a metric")
});
pub(crate) static RELSIZE_SNAPSHOT_CACHE_ENTRIES: Lazy<UIntGauge> = Lazy::new(|| {
register_uint_gauge!(
"pageserver_relsize_snapshot_cache_entries",
"Number of entries in the pitr relation size cache",
)
.expect("failed to define a metric")
});
pub(crate) static RELSIZE_SNAPSHOT_CACHE_HITS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_relsize_snapshot_cache_hits",
"Pitr relation size cache hits",
)
.expect("failed to define a metric")
});
pub(crate) static RELSIZE_SNAPSHOT_CACHE_MISSES: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_relsize_snapshot_cache_misses",
"Relation size snapshot cache misses",
)
.expect("failed to define a metric")
});
@@ -1039,6 +1066,15 @@ pub(crate) static TENANT_SYNTHETIC_SIZE_METRIC: Lazy<UIntGaugeVec> = Lazy::new(|
.expect("Failed to register pageserver_tenant_synthetic_cached_size_bytes metric")
});
pub(crate) static TENANT_OFFLOADED_TIMELINES: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_tenant_offloaded_timelines",
"Number of offloaded timelines of a tenant",
&["tenant_id", "shard_id"]
)
.expect("Failed to register pageserver_tenant_offloaded_timelines metric")
});
pub(crate) static EVICTION_ITERATION_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_eviction_iteration_duration_seconds_global",
@@ -3524,11 +3560,14 @@ impl TimelineMetrics {
}
pub(crate) fn remove_tenant_metrics(tenant_shard_id: &TenantShardId) {
let tid = tenant_shard_id.tenant_id.to_string();
let shard_id = tenant_shard_id.shard_slug().to_string();
// Only shard zero deals in synthetic sizes
if tenant_shard_id.is_shard_zero() {
let tid = tenant_shard_id.tenant_id.to_string();
let _ = TENANT_SYNTHETIC_SIZE_METRIC.remove_label_values(&[&tid]);
}
let _ = TENANT_OFFLOADED_TIMELINES.remove_label_values(&[&tid, &shard_id]);
tenant_throttling::remove_tenant_metrics(tenant_shard_id);
@@ -4320,6 +4359,42 @@ pub(crate) fn set_tokio_runtime_setup(setup: &str, num_threads: NonZeroUsize) {
.set(u64::try_from(num_threads.get()).unwrap());
}
pub(crate) static BASEBACKUP_CACHE_READ: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_basebackup_cache_read_total",
"Number of read accesses to the basebackup cache grouped by hit/miss/error",
&["result"]
)
.expect("failed to define a metric")
});
pub(crate) static BASEBACKUP_CACHE_PREPARE: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_basebackup_cache_prepare_total",
"Number of prepare requests processed by the basebackup cache grouped by ok/skip/error",
&["result"]
)
.expect("failed to define a metric")
});
pub(crate) static BASEBACKUP_CACHE_ENTRIES: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"pageserver_basebackup_cache_entries_total",
"Number of entries in the basebackup cache"
)
.expect("failed to define a metric")
});
// FIXME: Support basebackup cache size metrics.
#[allow(dead_code)]
pub(crate) static BASEBACKUP_CACHE_SIZE: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"pageserver_basebackup_cache_size_bytes",
"Total size of all basebackup cache entries on disk in bytes"
)
.expect("failed to define a metric")
});
static PAGESERVER_CONFIG_IGNORED_ITEMS: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_config_ignored_items",

View File

@@ -9,7 +9,6 @@ use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use std::{io, str};
use crate::PERF_TRACE_TARGET;
use anyhow::{Context, bail};
use async_compression::tokio::write::GzipEncoder;
use bytes::Buf;
@@ -18,7 +17,7 @@ use itertools::Itertools;
use jsonwebtoken::TokenData;
use once_cell::sync::OnceCell;
use pageserver_api::config::{
PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
GetVectoredConcurrentIo, PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
PageServiceProtocolPipelinedBatchingStrategy, PageServiceProtocolPipelinedExecutionStrategy,
};
use pageserver_api::key::rel_block_to_key;
@@ -52,8 +51,10 @@ use utils::simple_rcu::RcuReadGuard;
use utils::sync::gate::{Gate, GateGuard};
use utils::sync::spsc_fold;
use crate::PERF_TRACE_TARGET;
use crate::auth::check_permission;
use crate::basebackup::BasebackupError;
use crate::basebackup_cache::BasebackupCache;
use crate::config::PageServerConf;
use crate::context::{
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
@@ -62,7 +63,7 @@ use crate::metrics::{
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS,
SmgrOpTimer, TimelineMetrics,
};
use crate::pgdatadir_mapping::Version;
use crate::pgdatadir_mapping::{LsnRange, Version};
use crate::span::{
debug_assert_current_span_has_tenant_and_timeline_id,
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
@@ -107,6 +108,7 @@ pub fn spawn(
perf_trace_dispatch: Option<Dispatch>,
tcp_listener: tokio::net::TcpListener,
tls_config: Option<Arc<rustls::ServerConfig>>,
basebackup_cache: Arc<BasebackupCache>,
) -> Listener {
let cancel = CancellationToken::new();
let libpq_ctx = RequestContext::todo_child(
@@ -128,6 +130,7 @@ pub fn spawn(
conf.pg_auth_type,
tls_config,
conf.page_service_pipelining.clone(),
basebackup_cache,
libpq_ctx,
cancel.clone(),
)
@@ -186,6 +189,7 @@ pub async fn libpq_listener_main(
auth_type: AuthType,
tls_config: Option<Arc<rustls::ServerConfig>>,
pipelining_config: PageServicePipeliningConfig,
basebackup_cache: Arc<BasebackupCache>,
listener_ctx: RequestContext,
listener_cancel: CancellationToken,
) -> Connections {
@@ -229,6 +233,7 @@ pub async fn libpq_listener_main(
auth_type,
tls_config.clone(),
pipelining_config.clone(),
Arc::clone(&basebackup_cache),
connection_ctx,
connections_cancel.child_token(),
gate_guard,
@@ -271,6 +276,7 @@ async fn page_service_conn_main(
auth_type: AuthType,
tls_config: Option<Arc<rustls::ServerConfig>>,
pipelining_config: PageServicePipeliningConfig,
basebackup_cache: Arc<BasebackupCache>,
connection_ctx: RequestContext,
cancel: CancellationToken,
gate_guard: GateGuard,
@@ -331,11 +337,12 @@ async fn page_service_conn_main(
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
// and create the per-query context in process_query ourselves.
let mut conn_handler = PageServerHandler::new(
conf,
tenant_manager,
auth,
pipelining_config,
conf.get_vectored_concurrent_io,
perf_span_fields,
basebackup_cache,
connection_ctx,
cancel.clone(),
gate_guard,
@@ -371,7 +378,6 @@ async fn page_service_conn_main(
}
struct PageServerHandler {
conf: &'static PageServerConf,
auth: Option<Arc<SwappableJwtAuth>>,
claims: Option<Claims>,
@@ -389,6 +395,9 @@ struct PageServerHandler {
timeline_handles: Option<TimelineHandles>,
pipelining_config: PageServicePipeliningConfig,
get_vectored_concurrent_io: GetVectoredConcurrentIo,
basebackup_cache: Arc<BasebackupCache>,
gate_guard: GateGuard,
}
@@ -642,7 +651,7 @@ impl std::fmt::Display for BatchedPageStreamError {
struct BatchedGetPageRequest {
req: PagestreamGetPageRequest,
timer: SmgrOpTimer,
effective_request_lsn: Lsn,
lsn_range: LsnRange,
ctx: RequestContext,
}
@@ -764,12 +773,12 @@ impl BatchedFeMessage {
match batching_strategy {
PageServiceProtocolPipelinedBatchingStrategy::UniformLsn => {
if let Some(last_in_batch) = accum_pages.last() {
if last_in_batch.effective_request_lsn
!= this_pages[0].effective_request_lsn
if last_in_batch.lsn_range.effective_lsn
!= this_pages[0].lsn_range.effective_lsn
{
trace!(
accum_lsn = %last_in_batch.effective_request_lsn,
this_lsn = %this_pages[0].effective_request_lsn,
accum_lsn = %last_in_batch.lsn_range.effective_lsn,
this_lsn = %this_pages[0].lsn_range.effective_lsn,
"stopping batching because LSN changed"
);
@@ -784,15 +793,15 @@ impl BatchedFeMessage {
let same_page_different_lsn = accum_pages.iter().any(|batched| {
batched.req.rel == this_pages[0].req.rel
&& batched.req.blkno == this_pages[0].req.blkno
&& batched.effective_request_lsn
!= this_pages[0].effective_request_lsn
&& batched.lsn_range.effective_lsn
!= this_pages[0].lsn_range.effective_lsn
});
if same_page_different_lsn {
trace!(
rel=%this_pages[0].req.rel,
blkno=%this_pages[0].req.blkno,
lsn=%this_pages[0].effective_request_lsn,
lsn=%this_pages[0].lsn_range.effective_lsn,
"stopping batching because same page was requested at different LSNs"
);
@@ -844,17 +853,17 @@ impl BatchedFeMessage {
impl PageServerHandler {
#[allow(clippy::too_many_arguments)]
pub fn new(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
pipelining_config: PageServicePipeliningConfig,
get_vectored_concurrent_io: GetVectoredConcurrentIo,
perf_span_fields: ConnectionPerfSpanFields,
basebackup_cache: Arc<BasebackupCache>,
connection_ctx: RequestContext,
cancel: CancellationToken,
gate_guard: GateGuard,
) -> Self {
PageServerHandler {
conf,
auth,
claims: None,
connection_ctx,
@@ -862,6 +871,8 @@ impl PageServerHandler {
timeline_handles: Some(TimelineHandles::new(tenant_manager)),
cancel,
pipelining_config,
get_vectored_concurrent_io,
basebackup_cache,
gate_guard,
}
}
@@ -1158,7 +1169,7 @@ impl PageServerHandler {
.await?;
// We're holding the Handle
let effective_request_lsn = match Self::effective_request_lsn(
let effective_lsn = match Self::effective_request_lsn(
&shard,
shard.get_last_record_lsn(),
req.hdr.request_lsn,
@@ -1177,7 +1188,10 @@ impl PageServerHandler {
pages: smallvec::smallvec![BatchedGetPageRequest {
req,
timer,
effective_request_lsn,
lsn_range: LsnRange {
effective_lsn,
request_lsn: req.hdr.request_lsn
},
ctx,
}],
// The executor grabs the batch when it becomes idle.
@@ -1278,7 +1292,7 @@ impl PageServerHandler {
}
#[instrument(level = tracing::Level::DEBUG, skip_all)]
async fn pagesteam_handle_batched_message<IO>(
async fn pagestream_handle_batched_message<IO>(
&mut self,
pgb_writer: &mut PostgresBackend<IO>,
batch: BatchedFeMessage,
@@ -1623,7 +1637,7 @@ impl PageServerHandler {
}
let io_concurrency = IoConcurrency::spawn_from_conf(
self.conf,
self.get_vectored_concurrent_io,
match self.gate_guard.try_clone() {
Ok(guard) => guard,
Err(_) => {
@@ -1733,7 +1747,7 @@ impl PageServerHandler {
};
let result = self
.pagesteam_handle_batched_message(
.pagestream_handle_batched_message(
pgb_writer,
msg,
io_concurrency.clone(),
@@ -1909,7 +1923,7 @@ impl PageServerHandler {
return Err(e);
}
};
self.pagesteam_handle_batched_message(
self.pagestream_handle_batched_message(
pgb_writer,
batch,
io_concurrency.clone(),
@@ -2127,7 +2141,14 @@ impl PageServerHandler {
.await?;
let exists = timeline
.get_rel_exists(req.rel, Version::Lsn(lsn), ctx)
.get_rel_exists(
req.rel,
Version::LsnRange(LsnRange {
effective_lsn: lsn,
request_lsn: req.hdr.request_lsn,
}),
ctx,
)
.await?;
Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
@@ -2154,7 +2175,14 @@ impl PageServerHandler {
.await?;
let n_blocks = timeline
.get_rel_size(req.rel, Version::Lsn(lsn), ctx)
.get_rel_size(
req.rel,
Version::LsnRange(LsnRange {
effective_lsn: lsn,
request_lsn: req.hdr.request_lsn,
}),
ctx,
)
.await?;
Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
@@ -2181,7 +2209,15 @@ impl PageServerHandler {
.await?;
let total_blocks = timeline
.get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx)
.get_db_size(
DEFAULTTABLESPACE_OID,
req.dbnode,
Version::LsnRange(LsnRange {
effective_lsn: lsn,
request_lsn: req.hdr.request_lsn,
}),
ctx,
)
.await?;
let db_size = total_blocks as i64 * BLCKSZ as i64;
@@ -2214,7 +2250,7 @@ impl PageServerHandler {
// Ignore error (trace buffer may be full or tracer may have disconnected).
_ = page_trace.try_send(PageTraceEvent {
key,
effective_lsn: batch.effective_request_lsn,
effective_lsn: batch.lsn_range.effective_lsn,
time,
});
}
@@ -2229,7 +2265,7 @@ impl PageServerHandler {
perf_instrument = true;
}
req.effective_request_lsn
req.lsn_range.effective_lsn
})
.max()
.expect("batch is never empty");
@@ -2283,7 +2319,7 @@ impl PageServerHandler {
(
&p.req.rel,
&p.req.blkno,
p.effective_request_lsn,
p.lsn_range,
p.ctx.attached_child(),
)
}),
@@ -2468,6 +2504,8 @@ impl PageServerHandler {
.map_err(QueryError::Disconnected)?;
self.flush_cancellable(pgb, &self.cancel).await?;
let mut from_cache = false;
// Send a tarball of the latest layer on the timeline. Compress if not
// fullbackup. TODO Compress in that case too (tests need to be updated)
if full_backup {
@@ -2485,7 +2523,33 @@ impl PageServerHandler {
.map_err(map_basebackup_error)?;
} else {
let mut writer = BufWriter::new(pgb.copyout_writer());
if gzip {
let cached = {
// Basebackup is cached only for this combination of parameters.
if timeline.is_basebackup_cache_enabled()
&& gzip
&& lsn.is_some()
&& prev_lsn.is_none()
{
self.basebackup_cache
.get(tenant_id, timeline_id, lsn.unwrap())
.await
} else {
None
}
};
if let Some(mut cached) = cached {
from_cache = true;
tokio::io::copy(&mut cached, &mut writer)
.await
.map_err(|e| {
map_basebackup_error(BasebackupError::Client(
e,
"handle_basebackup_request,cached,copy",
))
})?;
} else if gzip {
let mut encoder = GzipEncoder::with_quality(
&mut writer,
// NOTE using fast compression because it's on the critical path
@@ -2544,6 +2608,7 @@ impl PageServerHandler {
info!(
lsn_await_millis = lsn_awaited_after.as_millis(),
basebackup_millis = basebackup_after.as_millis(),
%from_cache,
"basebackup complete"
);

View File

@@ -43,7 +43,9 @@ use crate::aux_file;
use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder};
use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::metrics::{
RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
RELSIZE_CACHE_MISSES_OLD, RELSIZE_LATEST_CACHE_ENTRIES, RELSIZE_LATEST_CACHE_HITS,
RELSIZE_LATEST_CACHE_MISSES, RELSIZE_SNAPSHOT_CACHE_ENTRIES, RELSIZE_SNAPSHOT_CACHE_HITS,
RELSIZE_SNAPSHOT_CACHE_MISSES,
};
use crate::span::{
debug_assert_current_span_has_tenant_and_timeline_id,
@@ -90,6 +92,28 @@ pub enum LsnForTimestamp {
NoData(Lsn),
}
/// Each request to page server contains LSN range: `not_modified_since..request_lsn`.
/// See comments libs/pageserver_api/src/models.rs.
/// Based on this range and `last_record_lsn` PS calculates `effective_lsn`.
/// But to distinguish requests from primary and replicas we need also to pass `request_lsn`.
#[derive(Debug, Clone, Copy, Default)]
pub struct LsnRange {
pub effective_lsn: Lsn,
pub request_lsn: Lsn,
}
impl LsnRange {
pub fn at(lsn: Lsn) -> LsnRange {
LsnRange {
effective_lsn: lsn,
request_lsn: lsn,
}
}
pub fn is_latest(&self) -> bool {
self.request_lsn == Lsn::MAX
}
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum CalculateLogicalSizeError {
#[error("cancelled")]
@@ -202,13 +226,13 @@ impl Timeline {
io_concurrency: IoConcurrency,
) -> Result<Bytes, PageReconstructError> {
match version {
Version::Lsn(effective_lsn) => {
Version::LsnRange(lsns) => {
let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
let res = self
.get_rel_page_at_lsn_batched(
pages.iter().map(|(tag, blknum)| {
(tag, blknum, effective_lsn, ctx.attached_child())
}),
pages
.iter()
.map(|(tag, blknum)| (tag, blknum, lsns, ctx.attached_child())),
io_concurrency.clone(),
ctx,
)
@@ -246,7 +270,7 @@ impl Timeline {
/// The ordering of the returned vec corresponds to the ordering of `pages`.
pub(crate) async fn get_rel_page_at_lsn_batched(
&self,
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, Lsn, RequestContext)>,
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, LsnRange, RequestContext)>,
io_concurrency: IoConcurrency,
ctx: &RequestContext,
) -> Vec<Result<Bytes, PageReconstructError>> {
@@ -265,7 +289,7 @@ impl Timeline {
let mut req_keyspaces: HashMap<Lsn, KeySpaceRandomAccum> =
HashMap::with_capacity(pages.len());
for (response_slot_idx, (tag, blknum, lsn, ctx)) in pages.enumerate() {
for (response_slot_idx, (tag, blknum, lsns, ctx)) in pages.enumerate() {
if tag.relnode == 0 {
result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
@@ -274,7 +298,7 @@ impl Timeline {
slots_filled += 1;
continue;
}
let lsn = lsns.effective_lsn;
let nblocks = {
let ctx = RequestContextBuilder::from(&ctx)
.perf_span(|crnt_perf_span| {
@@ -289,7 +313,7 @@ impl Timeline {
.attached_child();
match self
.get_rel_size(*tag, Version::Lsn(lsn), &ctx)
.get_rel_size(*tag, Version::LsnRange(lsns), &ctx)
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
.await
{
@@ -470,7 +494,7 @@ impl Timeline {
));
}
if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
if let Some(nblocks) = self.get_cached_rel_size(&tag, version) {
return Ok(nblocks);
}
@@ -488,7 +512,7 @@ impl Timeline {
let mut buf = version.get(self, key, ctx).await?;
let nblocks = buf.get_u32_le();
self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
self.update_cached_rel_size(tag, version, nblocks);
Ok(nblocks)
}
@@ -510,7 +534,7 @@ impl Timeline {
}
// first try to lookup relation in cache
if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
if let Some(_nblocks) = self.get_cached_rel_size(&tag, version) {
return Ok(true);
}
// then check if the database was already initialized.
@@ -586,7 +610,7 @@ impl Timeline {
// scan directory listing (new), merge with the old results
let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
let io_concurrency = IoConcurrency::spawn_from_conf(
self.conf,
self.conf.get_vectored_concurrent_io,
self.gate
.enter()
.map_err(|_| PageReconstructError::Cancelled)?,
@@ -632,7 +656,7 @@ impl Timeline {
) -> Result<Bytes, PageReconstructError> {
assert!(self.tenant_shard_id.is_shard_zero());
let n_blocks = self
.get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
.get_slru_segment_size(kind, segno, Version::at(lsn), ctx)
.await?;
let keyspace = KeySpace::single(
@@ -645,7 +669,7 @@ impl Timeline {
);
let io_concurrency = IoConcurrency::spawn_from_conf(
self.conf,
self.conf.get_vectored_concurrent_io,
self.gate
.enter()
.map_err(|_| PageReconstructError::Cancelled)?,
@@ -867,11 +891,11 @@ impl Timeline {
mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
) -> Result<T, PageReconstructError> {
for segno in self
.list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
.list_slru_segments(SlruKind::Clog, Version::at(probe_lsn), ctx)
.await?
{
let nblocks = self
.get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
.get_slru_segment_size(SlruKind::Clog, segno, Version::at(probe_lsn), ctx)
.await?;
let keyspace = KeySpace::single(
@@ -885,7 +909,7 @@ impl Timeline {
);
let io_concurrency = IoConcurrency::spawn_from_conf(
self.conf,
self.conf.get_vectored_concurrent_io,
self.gate
.enter()
.map_err(|_| PageReconstructError::Cancelled)?,
@@ -1137,7 +1161,7 @@ impl Timeline {
let mut total_size: u64 = 0;
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
for rel in self
.list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
.list_rels(*spcnode, *dbnode, Version::at(lsn), ctx)
.await?
{
if self.cancel.is_cancelled() {
@@ -1212,7 +1236,7 @@ impl Timeline {
result.add_key(rel_dir_to_key(spcnode, dbnode));
let mut rels: Vec<RelTag> = self
.list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
.list_rels(spcnode, dbnode, Version::at(lsn), ctx)
.await?
.into_iter()
.collect();
@@ -1329,59 +1353,75 @@ impl Timeline {
Ok((dense_keyspace, sparse_keyspace))
}
/// Get cached size of relation if it not updated after specified LSN
pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
let rel_size_cache = self.rel_size_cache.read().unwrap();
if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
if lsn >= *cached_lsn {
RELSIZE_CACHE_HITS.inc();
return Some(*nblocks);
/// Get cached size of relation. There are two caches: one for primary updates, it captures the latest state of
/// of the timeline and snapshot cache, which key includes LSN and so can be used by replicas to get relation size
/// at the particular LSN (snapshot).
pub fn get_cached_rel_size(&self, tag: &RelTag, version: Version<'_>) -> Option<BlockNumber> {
let lsn = version.get_lsn();
{
let rel_size_cache = self.rel_size_latest_cache.read().unwrap();
if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
if lsn >= *cached_lsn {
RELSIZE_LATEST_CACHE_HITS.inc();
return Some(*nblocks);
}
RELSIZE_CACHE_MISSES_OLD.inc();
}
RELSIZE_CACHE_MISSES_OLD.inc();
}
RELSIZE_CACHE_MISSES.inc();
{
let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
if let Some(nblock) = rel_size_cache.get(&(lsn, *tag)) {
RELSIZE_SNAPSHOT_CACHE_HITS.inc();
return Some(*nblock);
}
}
if version.is_latest() {
RELSIZE_LATEST_CACHE_MISSES.inc();
} else {
RELSIZE_SNAPSHOT_CACHE_MISSES.inc();
}
None
}
/// Update cached relation size if there is no more recent update
pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
if lsn < rel_size_cache.complete_as_of {
// Do not cache old values. It's safe to cache the size on read, as long as
// the read was at an LSN since we started the WAL ingestion. Reasoning: we
// never evict values from the cache, so if the relation size changed after
// 'lsn', the new value is already in the cache.
return;
}
match rel_size_cache.map.entry(tag) {
hash_map::Entry::Occupied(mut entry) => {
let cached_lsn = entry.get_mut();
if lsn >= cached_lsn.0 {
*cached_lsn = (lsn, nblocks);
pub fn update_cached_rel_size(&self, tag: RelTag, version: Version<'_>, nblocks: BlockNumber) {
let lsn = version.get_lsn();
if version.is_latest() {
let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
match rel_size_cache.entry(tag) {
hash_map::Entry::Occupied(mut entry) => {
let cached_lsn = entry.get_mut();
if lsn >= cached_lsn.0 {
*cached_lsn = (lsn, nblocks);
}
}
hash_map::Entry::Vacant(entry) => {
entry.insert((lsn, nblocks));
RELSIZE_LATEST_CACHE_ENTRIES.inc();
}
}
hash_map::Entry::Vacant(entry) => {
entry.insert((lsn, nblocks));
RELSIZE_CACHE_ENTRIES.inc();
} else {
let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
if rel_size_cache.capacity() != 0 {
rel_size_cache.insert((lsn, tag), nblocks);
RELSIZE_SNAPSHOT_CACHE_ENTRIES.set(rel_size_cache.len() as u64);
}
}
}
/// Store cached relation size
pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
if rel_size_cache.map.insert(tag, (lsn, nblocks)).is_none() {
RELSIZE_CACHE_ENTRIES.inc();
let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
if rel_size_cache.insert(tag, (lsn, nblocks)).is_none() {
RELSIZE_LATEST_CACHE_ENTRIES.inc();
}
}
/// Remove cached relation size
pub fn remove_cached_rel_size(&self, tag: &RelTag) {
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
if rel_size_cache.map.remove(tag).is_some() {
RELSIZE_CACHE_ENTRIES.dec();
let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
if rel_size_cache.remove(tag).is_some() {
RELSIZE_LATEST_CACHE_ENTRIES.dec();
}
}
}
@@ -1585,7 +1625,10 @@ impl DatadirModification<'_> {
// check the cache too. This is because eagerly checking the cache results in
// less work overall and 10% better performance. It's more work on cache miss
// but cache miss is rare.
if let Some(nblocks) = self.tline.get_cached_rel_size(&rel, self.get_lsn()) {
if let Some(nblocks) = self
.tline
.get_cached_rel_size(&rel, Version::Modified(self))
{
Ok(nblocks)
} else if !self
.tline
@@ -2667,7 +2710,7 @@ pub struct DatadirModificationStats {
/// timeline to not miss the latest updates.
#[derive(Clone, Copy)]
pub enum Version<'a> {
Lsn(Lsn),
LsnRange(LsnRange),
Modified(&'a DatadirModification<'a>),
}
@@ -2679,7 +2722,7 @@ impl Version<'_> {
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
match self {
Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
Version::LsnRange(lsns) => timeline.get(key, lsns.effective_lsn, ctx).await,
Version::Modified(modification) => modification.get(key, ctx).await,
}
}
@@ -2701,12 +2744,26 @@ impl Version<'_> {
}
}
fn get_lsn(&self) -> Lsn {
pub fn is_latest(&self) -> bool {
match self {
Version::Lsn(lsn) => *lsn,
Version::LsnRange(lsns) => lsns.is_latest(),
Version::Modified(_) => true,
}
}
pub fn get_lsn(&self) -> Lsn {
match self {
Version::LsnRange(lsns) => lsns.effective_lsn,
Version::Modified(modification) => modification.lsn,
}
}
pub fn at(lsn: Lsn) -> Self {
Version::LsnRange(LsnRange {
effective_lsn: lsn,
request_lsn: lsn,
})
}
}
//--- Metadata structs stored in key-value pairs in the repository.

View File

@@ -380,6 +380,10 @@ pub enum TaskKind {
DetachAncestor,
ImportPgdata,
/// Background task of [`crate::basebackup_cache::BasebackupCache`].
/// Prepares basebackups and clears outdated entries.
BasebackupCache,
}
#[derive(Default)]

View File

@@ -78,6 +78,7 @@ use self::timeline::uninit::{TimelineCreateGuard, TimelineExclusionError, Uninit
use self::timeline::{
EvictionTaskTenantState, GcCutoffs, TimelineDeleteProgress, TimelineResources, WaitLsnError,
};
use crate::basebackup_cache::BasebackupPrepareSender;
use crate::config::PageServerConf;
use crate::context;
use crate::context::RequestContextBuilder;
@@ -86,8 +87,8 @@ use crate::deletion_queue::{DeletionQueueClient, DeletionQueueError};
use crate::l0_flush::L0FlushGlobalState;
use crate::metrics::{
BROKEN_TENANTS_SET, CIRCUIT_BREAKERS_BROKEN, CIRCUIT_BREAKERS_UNBROKEN, CONCURRENT_INITDBS,
INITDB_RUN_TIME, INITDB_SEMAPHORE_ACQUISITION_TIME, TENANT, TENANT_STATE_METRIC,
TENANT_SYNTHETIC_SIZE_METRIC, remove_tenant_metrics,
INITDB_RUN_TIME, INITDB_SEMAPHORE_ACQUISITION_TIME, TENANT, TENANT_OFFLOADED_TIMELINES,
TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC, remove_tenant_metrics,
};
use crate::task_mgr::TaskKind;
use crate::tenant::config::LocationMode;
@@ -157,6 +158,7 @@ pub struct TenantSharedResources {
pub remote_storage: GenericRemoteStorage,
pub deletion_queue_client: DeletionQueueClient,
pub l0_flush_global_state: L0FlushGlobalState,
pub basebackup_prepare_sender: BasebackupPrepareSender,
}
/// A [`TenantShard`] is really an _attached_ tenant. The configuration
@@ -317,12 +319,15 @@ pub struct TenantShard {
gc_cs: tokio::sync::Mutex<()>,
walredo_mgr: Option<Arc<WalRedoManager>>,
// provides access to timeline data sitting in the remote storage
/// Provides access to timeline data sitting in the remote storage.
pub(crate) remote_storage: GenericRemoteStorage,
// Access to global deletion queue for when this tenant wants to schedule a deletion
/// Access to global deletion queue for when this tenant wants to schedule a deletion.
deletion_queue_client: DeletionQueueClient,
/// A channel to send async requests to prepare a basebackup for the basebackup cache.
basebackup_prepare_sender: BasebackupPrepareSender,
/// Cached logical sizes updated updated on each [`TenantShard::gather_size_inputs`].
cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
cached_synthetic_tenant_size: Arc<AtomicU64>,
@@ -1286,6 +1291,7 @@ impl TenantShard {
remote_storage,
deletion_queue_client,
l0_flush_global_state,
basebackup_prepare_sender,
} = resources;
let attach_mode = attached_conf.location.attach_mode;
@@ -1301,6 +1307,7 @@ impl TenantShard {
remote_storage.clone(),
deletion_queue_client,
l0_flush_global_state,
basebackup_prepare_sender,
));
// The attach task will carry a GateGuard, so that shutdown() reliably waits for it to drop out if
@@ -3348,6 +3355,13 @@ impl TenantShard {
activated_timelines += 1;
}
let tid = self.tenant_shard_id.tenant_id.to_string();
let shard_id = self.tenant_shard_id.shard_slug().to_string();
let offloaded_timeline_count = timelines_offloaded_accessor.len();
TENANT_OFFLOADED_TIMELINES
.with_label_values(&[&tid, &shard_id])
.set(offloaded_timeline_count as u64);
self.state.send_modify(move |current_state| {
assert!(
matches!(current_state, TenantState::Activating(_)),
@@ -4232,6 +4246,7 @@ impl TenantShard {
remote_storage: GenericRemoteStorage,
deletion_queue_client: DeletionQueueClient,
l0_flush_global_state: L0FlushGlobalState,
basebackup_prepare_sender: BasebackupPrepareSender,
) -> TenantShard {
assert!(!attached_conf.location.generation.is_none());
@@ -4335,6 +4350,7 @@ impl TenantShard {
ongoing_timeline_detach: std::sync::Mutex::default(),
gc_block: Default::default(),
l0_flush_global_state,
basebackup_prepare_sender,
}
}
@@ -4587,7 +4603,7 @@ impl TenantShard {
target.cutoffs = GcCutoffs {
space: space_cutoff,
time: Lsn::INVALID,
time: None,
};
}
}
@@ -4671,7 +4687,7 @@ impl TenantShard {
if let Some(ancestor_id) = timeline.get_ancestor_timeline_id() {
if let Some(ancestor_gc_cutoffs) = gc_cutoffs.get(&ancestor_id) {
target.within_ancestor_pitr =
timeline.get_ancestor_lsn() >= ancestor_gc_cutoffs.time;
Some(timeline.get_ancestor_lsn()) >= ancestor_gc_cutoffs.time;
}
}
@@ -4684,13 +4700,15 @@ impl TenantShard {
} else {
0
});
timeline.metrics.pitr_history_size.set(
timeline
.get_last_record_lsn()
.checked_sub(target.cutoffs.time)
.unwrap_or(Lsn(0))
.0,
);
if let Some(time_cutoff) = target.cutoffs.time {
timeline.metrics.pitr_history_size.set(
timeline
.get_last_record_lsn()
.checked_sub(time_cutoff)
.unwrap_or_default()
.0,
);
}
// Apply the cutoffs we found to the Timeline's GcInfo. Why might we _not_ have cutoffs for a timeline?
// - this timeline was created while we were finding cutoffs
@@ -4699,8 +4717,8 @@ impl TenantShard {
let original_cutoffs = target.cutoffs.clone();
// GC cutoffs should never go back
target.cutoffs = GcCutoffs {
space: Lsn(cutoffs.space.0.max(original_cutoffs.space.0)),
time: Lsn(cutoffs.time.0.max(original_cutoffs.time.0)),
space: cutoffs.space.max(original_cutoffs.space),
time: cutoffs.time.max(original_cutoffs.time),
}
}
}
@@ -5252,6 +5270,7 @@ impl TenantShard {
pagestream_throttle_metrics: self.pagestream_throttle_metrics.clone(),
l0_compaction_trigger: self.l0_compaction_trigger.clone(),
l0_flush_global_state: self.l0_flush_global_state.clone(),
basebackup_prepare_sender: self.basebackup_prepare_sender.clone(),
}
}
@@ -5560,6 +5579,14 @@ impl TenantShard {
}
}
// Update metrics
let tid = self.tenant_shard_id.to_string();
let shard_id = self.tenant_shard_id.shard_slug().to_string();
let set_key = &[tid.as_str(), shard_id.as_str()][..];
TENANT_OFFLOADED_TIMELINES
.with_label_values(set_key)
.set(manifest.offloaded_timelines.len() as u64);
// Upload the manifest. Remote storage does no retries internally, so retry here.
match backoff::retry(
|| async {
@@ -5826,6 +5853,8 @@ pub(crate) mod harness {
) -> anyhow::Result<Arc<TenantShard>> {
let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager));
let (basebackup_requst_sender, _) = tokio::sync::mpsc::unbounded_channel();
let tenant = Arc::new(TenantShard::new(
TenantState::Attaching,
self.conf,
@@ -5843,6 +5872,7 @@ pub(crate) mod harness {
self.deletion_queue.new_client(),
// TODO: ideally we should run all unit tests with both configs
L0FlushGlobalState::new(L0FlushConfig::default()),
basebackup_requst_sender,
));
let preload = tenant
@@ -8596,8 +8626,10 @@ mod tests {
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Option<Bytes>, GetVectoredError> {
let io_concurrency =
IoConcurrency::spawn_from_conf(tline.conf, tline.gate.enter().unwrap());
let io_concurrency = IoConcurrency::spawn_from_conf(
tline.conf.get_vectored_concurrent_io,
tline.gate.enter().unwrap(),
);
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);
let query = VersionedKeySpaceQuery::uniform(KeySpace::single(key..key.next()), lsn);
let mut res = tline
@@ -8935,7 +8967,7 @@ mod tests {
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
guard.cutoffs.time = Lsn(0x30);
guard.cutoffs.time = Some(Lsn(0x30));
guard.cutoffs.space = Lsn(0x30);
}
@@ -9043,7 +9075,7 @@ mod tests {
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
guard.cutoffs.time = Lsn(0x40);
guard.cutoffs.time = Some(Lsn(0x40));
guard.cutoffs.space = Lsn(0x40);
}
tline
@@ -9461,7 +9493,7 @@ mod tests {
*guard = GcInfo {
retain_lsns: vec![],
cutoffs: GcCutoffs {
time: Lsn(0x30),
time: Some(Lsn(0x30)),
space: Lsn(0x30),
},
leases: Default::default(),
@@ -9545,7 +9577,7 @@ mod tests {
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
guard.cutoffs.time = Lsn(0x40);
guard.cutoffs.time = Some(Lsn(0x40));
guard.cutoffs.space = Lsn(0x40);
}
tline
@@ -10016,7 +10048,7 @@ mod tests {
(Lsn(0x20), tline.timeline_id, MaybeOffloaded::No),
],
cutoffs: GcCutoffs {
time: Lsn(0x30),
time: Some(Lsn(0x30)),
space: Lsn(0x30),
},
leases: Default::default(),
@@ -10079,7 +10111,7 @@ mod tests {
let verify_result = || async {
let gc_horizon = {
let gc_info = tline.gc_info.read().unwrap();
gc_info.cutoffs.time
gc_info.cutoffs.time.unwrap_or_default()
};
for idx in 0..10 {
assert_eq!(
@@ -10157,7 +10189,7 @@ mod tests {
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
guard.cutoffs.time = Lsn(0x38);
guard.cutoffs.time = Some(Lsn(0x38));
guard.cutoffs.space = Lsn(0x38);
}
tline
@@ -10265,7 +10297,7 @@ mod tests {
(Lsn(0x20), tline.timeline_id, MaybeOffloaded::No),
],
cutoffs: GcCutoffs {
time: Lsn(0x30),
time: Some(Lsn(0x30)),
space: Lsn(0x30),
},
leases: Default::default(),
@@ -10328,7 +10360,7 @@ mod tests {
let verify_result = || async {
let gc_horizon = {
let gc_info = tline.gc_info.read().unwrap();
gc_info.cutoffs.time
gc_info.cutoffs.time.unwrap_or_default()
};
for idx in 0..10 {
assert_eq!(
@@ -10514,7 +10546,7 @@ mod tests {
*guard = GcInfo {
retain_lsns: vec![(Lsn(0x18), branch_tline.timeline_id, MaybeOffloaded::No)],
cutoffs: GcCutoffs {
time: Lsn(0x10),
time: Some(Lsn(0x10)),
space: Lsn(0x10),
},
leases: Default::default(),
@@ -10534,7 +10566,7 @@ mod tests {
*guard = GcInfo {
retain_lsns: vec![(Lsn(0x40), branch_tline.timeline_id, MaybeOffloaded::No)],
cutoffs: GcCutoffs {
time: Lsn(0x50),
time: Some(Lsn(0x50)),
space: Lsn(0x50),
},
leases: Default::default(),
@@ -11255,7 +11287,7 @@ mod tests {
*guard = GcInfo {
retain_lsns: vec![(Lsn(0x20), tline.timeline_id, MaybeOffloaded::No)],
cutoffs: GcCutoffs {
time: Lsn(0x30),
time: Some(Lsn(0x30)),
space: Lsn(0x30),
},
leases: Default::default(),
@@ -11644,7 +11676,7 @@ mod tests {
(Lsn(0x20), tline.timeline_id, MaybeOffloaded::No),
],
cutoffs: GcCutoffs {
time: Lsn(0x30),
time: Some(Lsn(0x30)),
space: Lsn(0x30),
},
leases: Default::default(),
@@ -11707,7 +11739,7 @@ mod tests {
let verify_result = || async {
let gc_horizon = {
let gc_info = tline.gc_info.read().unwrap();
gc_info.cutoffs.time
gc_info.cutoffs.time.unwrap_or_default()
};
for idx in 0..10 {
assert_eq!(
@@ -11896,7 +11928,7 @@ mod tests {
(Lsn(0x20), tline.timeline_id, MaybeOffloaded::No),
],
cutoffs: GcCutoffs {
time: Lsn(0x30),
time: Some(Lsn(0x30)),
space: Lsn(0x30),
},
leases: Default::default(),
@@ -11959,7 +11991,7 @@ mod tests {
let verify_result = || async {
let gc_horizon = {
let gc_info = tline.gc_info.read().unwrap();
gc_info.cutoffs.time
gc_info.cutoffs.time.unwrap_or_default()
};
for idx in 0..10 {
assert_eq!(
@@ -12222,7 +12254,7 @@ mod tests {
*guard = GcInfo {
retain_lsns: vec![],
cutoffs: GcCutoffs {
time: Lsn(0x30),
time: Some(Lsn(0x30)),
space: Lsn(0x30),
},
leases: Default::default(),

View File

@@ -235,7 +235,7 @@ pub(super) async fn gather_inputs(
// than our internal space cutoff. This means that if someone drops a database and waits for their
// PITR interval, they will see synthetic size decrease, even if we are still storing data inside
// the space cutoff.
let mut next_pitr_cutoff = gc_info.cutoffs.time;
let mut next_pitr_cutoff = gc_info.cutoffs.time.unwrap_or_default(); // TODO: handle None
// If the caller provided a shorter retention period, use that instead of the GC cutoff.
let retention_param_cutoff = if let Some(max_retention_period) = max_retention_period {

View File

@@ -31,6 +31,7 @@ pub use inmemory_layer::InMemoryLayer;
pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
pub use layer_name::{DeltaLayerName, ImageLayerName, LayerName};
use pageserver_api::config::GetVectoredConcurrentIo;
use pageserver_api::key::Key;
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
use pageserver_api::record::NeonWalRecord;
@@ -43,7 +44,6 @@ use self::inmemory_layer::InMemoryLayerFileId;
use super::PageReconstructError;
use super::layer_map::InMemoryLayerDesc;
use super::timeline::{GetVectoredError, ReadPath};
use crate::config::PageServerConf;
use crate::context::{
AccessStatsBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
};
@@ -318,11 +318,10 @@ impl IoConcurrency {
}
pub(crate) fn spawn_from_conf(
conf: &'static PageServerConf,
conf: GetVectoredConcurrentIo,
gate_guard: GateGuard,
) -> IoConcurrency {
use pageserver_api::config::GetVectoredConcurrentIo;
let selected = match conf.get_vectored_concurrent_io {
let selected = match conf {
GetVectoredConcurrentIo::Sequential => SelectedIoConcurrency::Sequential,
GetVectoredConcurrentIo::SidecarTask => SelectedIoConcurrency::SidecarTask(gate_guard),
};

View File

@@ -63,7 +63,28 @@ pub struct InMemoryLayer {
opened_at: Instant,
/// The above fields never change, except for `end_lsn`, which is only set once.
/// All versions of all pages in the layer are kept here. Indexed
/// by block number and LSN. The [`IndexEntry`] is an offset into the
/// ephemeral file where the page version is stored.
///
/// We use a separate lock for the index to reduce the critical section
/// during which reads cannot be planned.
///
/// If you need access to both the index and the underlying file at the same time,
/// respect the following locking order to avoid deadlocks:
/// 1. [`InMemoryLayer::inner`]
/// 2. [`InMemoryLayer::index`]
///
/// Note that the file backing [`InMemoryLayer::inner`] is append-only,
/// so it is not necessary to hold simultaneous locks on index.
/// This avoids holding index locks across IO, and is crucial for avoiding read tail latency.
/// In particular:
/// 1. It is safe to read and release [`InMemoryLayer::index`] before locking and reading from [`InMemoryLayer::inner`].
/// 2. It is safe to write and release [`InMemoryLayer::inner`] before locking and updating [`InMemoryLayer::index`].
index: RwLock<BTreeMap<CompactKey, VecMap<Lsn, IndexEntry>>>,
/// The above fields never change, except for `end_lsn`, which is only set once,
/// and `index` (see rationale there).
/// All other changing parts are in `inner`, and protected by a mutex.
inner: RwLock<InMemoryLayerInner>,
@@ -81,11 +102,6 @@ impl std::fmt::Debug for InMemoryLayer {
}
pub struct InMemoryLayerInner {
/// All versions of all pages in the layer are kept here. Indexed
/// by block number and LSN. The [`IndexEntry`] is an offset into the
/// ephemeral file where the page version is stored.
index: BTreeMap<CompactKey, VecMap<Lsn, IndexEntry>>,
/// The values are stored in a serialized format in this file.
/// Each serialized Value is preceded by a 'u32' length field.
/// PerSeg::page_versions map stores offsets into this file.
@@ -105,7 +121,7 @@ const MAX_SUPPORTED_BLOB_LEN_BITS: usize = {
trailing_ones
};
/// See [`InMemoryLayerInner::index`].
/// See [`InMemoryLayer::index`].
///
/// For memory efficiency, the data is packed into a u64.
///
@@ -425,7 +441,7 @@ impl InMemoryLayer {
.page_content_kind(PageContentKind::InMemoryLayer)
.attached_child();
let inner = self.inner.read().await;
let index = self.index.read().await;
struct ValueRead {
entry_lsn: Lsn,
@@ -435,10 +451,7 @@ impl InMemoryLayer {
let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default();
for range in keyspace.ranges.iter() {
for (key, vec_map) in inner
.index
.range(range.start.to_compact()..range.end.to_compact())
{
for (key, vec_map) in index.range(range.start.to_compact()..range.end.to_compact()) {
let key = Key::from_compact(*key);
let slice = vec_map.slice_range(lsn_range.clone());
@@ -466,7 +479,7 @@ impl InMemoryLayer {
}
}
}
drop(inner); // release the lock before we spawn the IO; if it's serial-mode IO we will deadlock on the read().await below
drop(index); // release the lock before we spawn the IO; if it's serial-mode IO we will deadlock on the read().await below
let read_from = Arc::clone(self);
let read_ctx = ctx.attached_child();
reconstruct_state
@@ -573,8 +586,8 @@ impl InMemoryLayer {
start_lsn,
end_lsn: OnceLock::new(),
opened_at: Instant::now(),
index: RwLock::new(BTreeMap::new()),
inner: RwLock::new(InMemoryLayerInner {
index: BTreeMap::new(),
file,
resource_units: GlobalResourceUnits::new(),
}),
@@ -592,31 +605,39 @@ impl InMemoryLayer {
serialized_batch: SerializedValueBatch,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let mut inner = self.inner.write().await;
self.assert_writable();
let (base_offset, metadata) = {
let mut inner = self.inner.write().await;
self.assert_writable();
let base_offset = inner.file.len();
let base_offset = inner.file.len();
let SerializedValueBatch {
raw,
metadata,
max_lsn: _,
len: _,
} = serialized_batch;
let SerializedValueBatch {
raw,
metadata,
max_lsn: _,
len: _,
} = serialized_batch;
// Write the batch to the file
inner.file.write_raw(&raw, ctx).await?;
let new_size = inner.file.len();
// Write the batch to the file
inner.file.write_raw(&raw, ctx).await?;
let new_size = inner.file.len();
let expected_new_len = base_offset
.checked_add(raw.len().into_u64())
// write_raw would error if we were to overflow u64.
// also IndexEntry and higher levels in
//the code don't allow the file to grow that large
.unwrap();
assert_eq!(new_size, expected_new_len);
let expected_new_len = base_offset
.checked_add(raw.len().into_u64())
// write_raw would error if we were to overflow u64.
// also IndexEntry and higher levels in
//the code don't allow the file to grow that large
.unwrap();
assert_eq!(new_size, expected_new_len);
inner.resource_units.maybe_publish_size(new_size);
(base_offset, metadata)
};
// Update the index with the new entries
let mut index = self.index.write().await;
for meta in metadata {
let SerializedValueMeta {
key,
@@ -639,7 +660,7 @@ impl InMemoryLayer {
will_init,
})?;
let vec_map = inner.index.entry(key).or_default();
let vec_map = index.entry(key).or_default();
let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0;
if old.is_some() {
// This should not break anything, but is unexpected: ingestion code aims to filter out
@@ -658,8 +679,6 @@ impl InMemoryLayer {
);
}
inner.resource_units.maybe_publish_size(new_size);
Ok(())
}
@@ -680,6 +699,18 @@ impl InMemoryLayer {
/// Records the end_lsn for non-dropped layers.
/// `end_lsn` is exclusive
///
/// A note on locking:
/// The current API of [`InMemoryLayer`] does not ensure that there's no ongoing
/// writes while freezing the layer. This is enforced at a higher level via
/// [`crate::tenant::Timeline::write_lock`]. Freeze might be called via two code paths:
/// 1. Via the active [`crate::tenant::timeline::TimelineWriter`]. This holds the
/// Timeline::write_lock for its lifetime. The rolling is handled in
/// [`crate::tenant::timeline::TimelineWriter::put_batch`]. It's a &mut self function
/// so can't be called from different threads.
/// 2. In the background via [`crate::tenant::Timeline::maybe_freeze_ephemeral_layer`].
/// This only proceeds if try_lock on Timeline::write_lock succeeds (i.e. there's no active writer),
/// hence there can be no concurrent writes
pub async fn freeze(&self, end_lsn: Lsn) {
assert!(
self.start_lsn < end_lsn,
@@ -700,8 +731,8 @@ impl InMemoryLayer {
#[cfg(debug_assertions)]
{
let inner = self.inner.write().await;
for vec_map in inner.index.values() {
let index = self.index.read().await;
for vec_map in index.values() {
for (lsn, _) in vec_map.as_slice() {
assert!(*lsn < end_lsn);
}
@@ -724,14 +755,11 @@ impl InMemoryLayer {
) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
// Grab the lock in read-mode. We hold it over the I/O, but because this
// layer is not writeable anymore, no one should be trying to acquire the
// write lock on it, so we shouldn't block anyone. There's one exception
// though: another thread might have grabbed a reference to this layer
// in `get_layer_for_write' just before the checkpointer called
// `freeze`, and then `write_to_disk` on it. When the thread gets the
// lock, it will see that it's not writeable anymore and retry, but it
// would have to wait until we release it. That race condition is very
// rare though, so we just accept the potential latency hit for now.
// write lock on it, so we shouldn't block anyone. See the comment on
// [`InMemoryLayer::freeze`] to understand how locking between the append path
// and layer flushing works.
let inner = self.inner.read().await;
let index = self.index.read().await;
use l0_flush::Inner;
let _concurrency_permit = match l0_flush_global_state {
@@ -743,13 +771,9 @@ impl InMemoryLayer {
let key_count = if let Some(key_range) = key_range {
let key_range = key_range.start.to_compact()..key_range.end.to_compact();
inner
.index
.iter()
.filter(|(k, _)| key_range.contains(k))
.count()
index.iter().filter(|(k, _)| key_range.contains(k)).count()
} else {
inner.index.len()
index.len()
};
if key_count == 0 {
return Ok(None);
@@ -772,7 +796,7 @@ impl InMemoryLayer {
let file_contents = inner.file.load_to_io_buf(ctx).await?;
let file_contents = file_contents.freeze();
for (key, vec_map) in inner.index.iter() {
for (key, vec_map) in index.iter() {
// Write all page versions
for (lsn, entry) in vec_map
.as_slice()

View File

@@ -14,6 +14,7 @@ pub mod span;
pub mod uninit;
mod walreceiver;
use hashlink::LruCache;
use std::array;
use std::cmp::{max, min};
use std::collections::btree_map::Entry;
@@ -23,8 +24,6 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, OnceLock, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use crate::PERF_TRACE_TARGET;
use crate::walredo::RedoAttemptType;
use anyhow::{Context, Result, anyhow, bail, ensure};
use arc_swap::{ArcSwap, ArcSwapOption};
use bytes::Bytes;
@@ -93,10 +92,12 @@ use super::storage_layer::{LayerFringe, LayerVisibilityHint, ReadableLayer};
use super::tasks::log_compaction_error;
use super::upload_queue::NotInitialized;
use super::{
AttachedTenantConf, GcError, HeatMapTimeline, MaybeOffloaded,
AttachedTenantConf, BasebackupPrepareSender, GcError, HeatMapTimeline, MaybeOffloaded,
debug_assert_current_span_has_tenant_and_timeline_id,
};
use crate::PERF_TRACE_TARGET;
use crate::aux_file::AuxFileSizeEstimator;
use crate::basebackup_cache::BasebackupPrepareRequest;
use crate::config::PageServerConf;
use crate::context::{
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
@@ -130,6 +131,7 @@ use crate::tenant::tasks::BackgroundLoopKind;
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
use crate::walingest::WalLagCooldown;
use crate::walredo::RedoAttemptType;
use crate::{ZERO_PAGE, task_mgr, walredo};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
@@ -195,16 +197,7 @@ pub struct TimelineResources {
pub pagestream_throttle_metrics: Arc<crate::metrics::tenant_throttling::Pagestream>,
pub l0_compaction_trigger: Arc<Notify>,
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
}
/// The relation size cache caches relation sizes at the end of the timeline. It speeds up WAL
/// ingestion considerably, because WAL ingestion needs to check on most records if the record
/// implicitly extends the relation. At startup, `complete_as_of` is initialized to the current end
/// of the timeline (disk_consistent_lsn). It's used on reads of relation sizes to check if the
/// value can be used to also update the cache, see [`Timeline::update_cached_rel_size`].
pub(crate) struct RelSizeCache {
pub(crate) complete_as_of: Lsn,
pub(crate) map: HashMap<RelTag, (Lsn, BlockNumber)>,
pub basebackup_prepare_sender: BasebackupPrepareSender,
}
pub struct Timeline {
@@ -365,7 +358,8 @@ pub struct Timeline {
pub walreceiver: Mutex<Option<WalReceiver>>,
/// Relation size cache
pub(crate) rel_size_cache: RwLock<RelSizeCache>,
pub(crate) rel_size_latest_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
pub(crate) rel_size_snapshot_cache: Mutex<LruCache<(Lsn, RelTag), BlockNumber>>,
download_all_remote_layers_task_info: RwLock<Option<DownloadRemoteLayersTaskInfo>>,
@@ -447,6 +441,9 @@ pub struct Timeline {
pub(crate) rel_size_v2_status: ArcSwapOption<RelSizeMigration>,
wait_lsn_log_slow: tokio::sync::Semaphore,
/// A channel to send async requests to prepare a basebackup for the basebackup cache.
basebackup_prepare_sender: BasebackupPrepareSender,
}
pub(crate) enum PreviousHeatmap {
@@ -537,29 +534,24 @@ impl GcInfo {
/// The `GcInfo` component describing which Lsns need to be retained. Functionally, this
/// is a single number (the oldest LSN which we must retain), but it internally distinguishes
/// between time-based and space-based retention for observability and consumption metrics purposes.
#[derive(Debug, Clone)]
#[derive(Clone, Debug, Default)]
pub(crate) struct GcCutoffs {
/// Calculated from the [`pageserver_api::models::TenantConfig::gc_horizon`], this LSN indicates how much
/// history we must keep to retain a specified number of bytes of WAL.
pub(crate) space: Lsn,
/// Calculated from [`pageserver_api::models::TenantConfig::pitr_interval`], this LSN indicates how much
/// history we must keep to enable reading back at least the PITR interval duration.
pub(crate) time: Lsn,
}
impl Default for GcCutoffs {
fn default() -> Self {
Self {
space: Lsn::INVALID,
time: Lsn::INVALID,
}
}
/// Calculated from [`pageserver_api::models::TenantConfig::pitr_interval`], this LSN indicates
/// how much history we must keep to enable reading back at least the PITR interval duration.
///
/// None indicates that the PITR cutoff has not been computed. A PITR interval of 0 will yield
/// Some(last_record_lsn).
pub(crate) time: Option<Lsn>,
}
impl GcCutoffs {
fn select_min(&self) -> Lsn {
std::cmp::min(self.space, self.time)
// NB: if we haven't computed the PITR cutoff yet, we can't GC anything.
self.space.min(self.time.unwrap_or_default())
}
}
@@ -1041,6 +1033,7 @@ pub(crate) enum WaitLsnWaiter<'a> {
Tenant,
PageService,
HttpEndpoint,
BaseBackupCache,
}
/// Argument to [`Timeline::shutdown`].
@@ -1096,11 +1089,14 @@ impl Timeline {
/// Get the bytes written since the PITR cutoff on this branch, and
/// whether this branch's ancestor_lsn is within its parent's PITR.
pub(crate) fn get_pitr_history_stats(&self) -> (u64, bool) {
// TODO: for backwards compatibility, we return the full history back to 0 when the PITR
// cutoff has not yet been initialized. This should return None instead, but this is exposed
// in external HTTP APIs and callers may not handle a null value.
let gc_info = self.gc_info.read().unwrap();
let history = self
.get_last_record_lsn()
.checked_sub(gc_info.cutoffs.time)
.unwrap_or(Lsn(0))
.checked_sub(gc_info.cutoffs.time.unwrap_or_default())
.unwrap_or_default()
.0;
(history, gc_info.within_ancestor_pitr)
}
@@ -1110,9 +1106,10 @@ impl Timeline {
self.applied_gc_cutoff_lsn.read()
}
/// Read timeline's planned GC cutoff: this is the logical end of history that users
/// are allowed to read (based on configured PITR), even if physically we have more history.
pub(crate) fn get_gc_cutoff_lsn(&self) -> Lsn {
/// Read timeline's planned GC cutoff: this is the logical end of history that users are allowed
/// to read (based on configured PITR), even if physically we have more history. Returns None
/// if the PITR cutoff has not yet been initialized.
pub(crate) fn get_gc_cutoff_lsn(&self) -> Option<Lsn> {
self.gc_info.read().unwrap().cutoffs.time
}
@@ -1563,7 +1560,8 @@ impl Timeline {
}
WaitLsnWaiter::Tenant
| WaitLsnWaiter::PageService
| WaitLsnWaiter::HttpEndpoint => unreachable!(
| WaitLsnWaiter::HttpEndpoint
| WaitLsnWaiter::BaseBackupCache => unreachable!(
"tenant or page_service context are not expected to have task kind {:?}",
ctx.task_kind()
),
@@ -2468,6 +2466,41 @@ impl Timeline {
false
}
}
pub(crate) fn is_basebackup_cache_enabled(&self) -> bool {
let tenant_conf = self.tenant_conf.load();
tenant_conf
.tenant_conf
.basebackup_cache_enabled
.unwrap_or(self.conf.default_tenant_conf.basebackup_cache_enabled)
}
/// Prepare basebackup for the given LSN and store it in the basebackup cache.
/// The method is asynchronous and returns immediately.
/// The actual basebackup preparation is performed in the background
/// by the basebackup cache on a best-effort basis.
pub(crate) fn prepare_basebackup(&self, lsn: Lsn) {
if !self.is_basebackup_cache_enabled() {
return;
}
if !self.tenant_shard_id.is_shard_zero() {
// In theory we should never get here, but just in case check it.
// Preparing basebackup doesn't make sense for shards other than shard zero.
return;
}
let res = self
.basebackup_prepare_sender
.send(BasebackupPrepareRequest {
tenant_shard_id: self.tenant_shard_id,
timeline_id: self.timeline_id,
lsn,
});
if let Err(e) = res {
// May happen during shutdown, it's not critical.
info!("Failed to send shutdown checkpoint: {e:#}");
}
}
}
/// Number of times we will compute partition within a checkpoint distance.
@@ -2545,6 +2578,13 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
}
pub(crate) fn get_pitr_interval(&self) -> Duration {
let tenant_conf = &self.tenant_conf.load().tenant_conf;
tenant_conf
.pitr_interval
.unwrap_or(self.conf.default_tenant_conf.pitr_interval)
}
fn get_compaction_period(&self) -> Duration {
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
tenant_conf
@@ -2820,6 +2860,13 @@ impl Timeline {
self.remote_client.update_config(&new_conf.location);
let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
if let Some(new_capacity) = new_conf.tenant_conf.relsize_snapshot_cache_capacity {
if new_capacity != rel_size_cache.capacity() {
rel_size_cache.set_capacity(new_capacity);
}
}
self.metrics
.evictions_with_low_residence_duration
.write()
@@ -2878,6 +2925,14 @@ impl Timeline {
ancestor_gc_info.insert_child(timeline_id, metadata.ancestor_lsn(), is_offloaded);
}
let relsize_snapshot_cache_capacity = {
let loaded_tenant_conf = tenant_conf.load();
loaded_tenant_conf
.tenant_conf
.relsize_snapshot_cache_capacity
.unwrap_or(conf.default_tenant_conf.relsize_snapshot_cache_capacity)
};
Arc::new_cyclic(|myself| {
let metrics = Arc::new(TimelineMetrics::new(
&tenant_shard_id,
@@ -2969,10 +3024,8 @@ impl Timeline {
last_image_layer_creation_check_instant: Mutex::new(None),
last_received_wal: Mutex::new(None),
rel_size_cache: RwLock::new(RelSizeCache {
complete_as_of: disk_consistent_lsn,
map: HashMap::new(),
}),
rel_size_latest_cache: RwLock::new(HashMap::new()),
rel_size_snapshot_cache: Mutex::new(LruCache::new(relsize_snapshot_cache_capacity)),
download_all_remote_layers_task_info: RwLock::new(None),
@@ -3017,6 +3070,8 @@ impl Timeline {
rel_size_v2_status: ArcSwapOption::from_pointee(rel_size_v2_status),
wait_lsn_log_slow: tokio::sync::Semaphore::new(1),
basebackup_prepare_sender: resources.basebackup_prepare_sender,
};
result.repartition_threshold =
@@ -3530,7 +3585,7 @@ impl Timeline {
};
let io_concurrency = IoConcurrency::spawn_from_conf(
self_ref.conf,
self_ref.conf.get_vectored_concurrent_io,
self_ref
.gate
.enter()
@@ -5559,7 +5614,7 @@ impl Timeline {
});
let io_concurrency = IoConcurrency::spawn_from_conf(
self.conf,
self.conf.get_vectored_concurrent_io,
self.gate
.enter()
.map_err(|_| CreateImageLayersError::Cancelled)?,
@@ -6230,14 +6285,12 @@ impl Timeline {
pausable_failpoint!("Timeline::find_gc_cutoffs-pausable");
if cfg!(test) {
if cfg!(test) && pitr == Duration::ZERO {
// Unit tests which specify zero PITR interval expect to avoid doing any I/O for timestamp lookup
if pitr == Duration::ZERO {
return Ok(GcCutoffs {
time: self.get_last_record_lsn(),
space: space_cutoff,
});
}
return Ok(GcCutoffs {
time: Some(self.get_last_record_lsn()),
space: space_cutoff,
});
}
// Calculate a time-based limit on how much to retain:
@@ -6251,14 +6304,14 @@ impl Timeline {
// PITR is not set. Retain the size-based limit, or the default time retention,
// whichever requires less data.
GcCutoffs {
time: self.get_last_record_lsn(),
time: Some(self.get_last_record_lsn()),
space: std::cmp::max(time_cutoff, space_cutoff),
}
}
(Duration::ZERO, None) => {
// PITR is not set, and time lookup failed
GcCutoffs {
time: self.get_last_record_lsn(),
time: Some(self.get_last_record_lsn()),
space: space_cutoff,
}
}
@@ -6266,7 +6319,7 @@ impl Timeline {
// PITR interval is set & we didn't look up a timestamp successfully. Conservatively assume PITR
// cannot advance beyond what was already GC'd, and respect space-based retention
GcCutoffs {
time: *self.get_applied_gc_cutoff_lsn(),
time: Some(*self.get_applied_gc_cutoff_lsn()),
space: space_cutoff,
}
}
@@ -6274,7 +6327,7 @@ impl Timeline {
// PITR interval is set and we looked up timestamp successfully. Ignore
// size based retention and make time cutoff authoritative
GcCutoffs {
time: time_cutoff,
time: Some(time_cutoff),
space: time_cutoff,
}
}
@@ -6327,7 +6380,7 @@ impl Timeline {
)
};
let mut new_gc_cutoff = Lsn::min(space_cutoff, time_cutoff);
let mut new_gc_cutoff = space_cutoff.min(time_cutoff.unwrap_or_default());
let standby_horizon = self.standby_horizon.load();
// Hold GC for the standby, but as a safety guard do it only within some
// reasonable lag.
@@ -6376,7 +6429,7 @@ impl Timeline {
async fn gc_timeline(
&self,
space_cutoff: Lsn,
time_cutoff: Lsn,
time_cutoff: Option<Lsn>, // None if uninitialized
retain_lsns: Vec<Lsn>,
max_lsn_with_valid_lease: Option<Lsn>,
new_gc_cutoff: Lsn,
@@ -6395,6 +6448,12 @@ impl Timeline {
return Ok(result);
}
let Some(time_cutoff) = time_cutoff else {
// The GC cutoff should have been computed by now, but let's be defensive.
info!("Nothing to GC: time_cutoff not yet computed");
return Ok(result);
};
// We need to ensure that no one tries to read page versions or create
// branches at a point before latest_gc_cutoff_lsn. See branch_timeline()
// for details. This will block until the old value is no longer in use.

View File

@@ -1526,7 +1526,7 @@ impl Timeline {
info!(
"starting shard ancestor compaction, rewriting {} layers and dropping {} layers, \
checked {layers_checked}/{layers_total} layers \
(latest_gc_cutoff={} pitr_cutoff={})",
(latest_gc_cutoff={} pitr_cutoff={:?})",
layers_to_rewrite.len(),
drop_layers.len(),
*latest_gc_cutoff,

View File

@@ -188,7 +188,7 @@ pub(crate) async fn generate_tombstone_image_layer(
"removing non-inherited keys by writing an image layer with tombstones at the detach LSN"
);
let io_concurrency = IoConcurrency::spawn_from_conf(
detached.conf,
detached.conf.get_vectored_concurrent_io,
detached.gate.enter().map_err(|_| Error::ShuttingDown)?,
);
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);

View File

@@ -1316,6 +1316,10 @@ impl WalIngest {
}
});
if info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN {
modification.tline.prepare_basebackup(lsn);
}
Ok(())
}
@@ -1684,31 +1688,31 @@ mod tests {
// The relation was created at LSN 2, not visible at LSN 1 yet.
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
.await?,
false
);
assert!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
.await
.is_err()
);
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
.await?,
1
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx)
.await?,
3
);
@@ -1719,7 +1723,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
0,
Version::Lsn(Lsn(0x20)),
Version::at(Lsn(0x20)),
&ctx,
io_concurrency.clone()
)
@@ -1733,7 +1737,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
0,
Version::Lsn(Lsn(0x30)),
Version::at(Lsn(0x30)),
&ctx,
io_concurrency.clone()
)
@@ -1747,7 +1751,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
0,
Version::Lsn(Lsn(0x40)),
Version::at(Lsn(0x40)),
&ctx,
io_concurrency.clone()
)
@@ -1760,7 +1764,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
1,
Version::Lsn(Lsn(0x40)),
Version::at(Lsn(0x40)),
&ctx,
io_concurrency.clone()
)
@@ -1774,7 +1778,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
0,
Version::Lsn(Lsn(0x50)),
Version::at(Lsn(0x50)),
&ctx,
io_concurrency.clone()
)
@@ -1787,7 +1791,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
1,
Version::Lsn(Lsn(0x50)),
Version::at(Lsn(0x50)),
&ctx,
io_concurrency.clone()
)
@@ -1800,7 +1804,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
2,
Version::Lsn(Lsn(0x50)),
Version::at(Lsn(0x50)),
&ctx,
io_concurrency.clone()
)
@@ -1820,7 +1824,7 @@ mod tests {
// Check reported size and contents after truncation
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x60)), &ctx)
.await?,
2
);
@@ -1829,7 +1833,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
0,
Version::Lsn(Lsn(0x60)),
Version::at(Lsn(0x60)),
&ctx,
io_concurrency.clone()
)
@@ -1842,7 +1846,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
1,
Version::Lsn(Lsn(0x60)),
Version::at(Lsn(0x60)),
&ctx,
io_concurrency.clone()
)
@@ -1854,7 +1858,7 @@ mod tests {
// should still see the truncated block with older LSN
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx)
.await?,
3
);
@@ -1863,7 +1867,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
2,
Version::Lsn(Lsn(0x50)),
Version::at(Lsn(0x50)),
&ctx,
io_concurrency.clone()
)
@@ -1880,7 +1884,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x68)), &ctx)
.await?,
0
);
@@ -1893,7 +1897,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x70)), &ctx)
.await?,
2
);
@@ -1902,7 +1906,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
0,
Version::Lsn(Lsn(0x70)),
Version::at(Lsn(0x70)),
&ctx,
io_concurrency.clone()
)
@@ -1915,7 +1919,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
1,
Version::Lsn(Lsn(0x70)),
Version::at(Lsn(0x70)),
&ctx,
io_concurrency.clone()
)
@@ -1932,7 +1936,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x80)), &ctx)
.await?,
1501
);
@@ -1942,7 +1946,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
blk,
Version::Lsn(Lsn(0x80)),
Version::at(Lsn(0x80)),
&ctx,
io_concurrency.clone()
)
@@ -1956,7 +1960,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
1500,
Version::Lsn(Lsn(0x80)),
Version::at(Lsn(0x80)),
&ctx,
io_concurrency.clone()
)
@@ -1990,13 +1994,13 @@ mod tests {
// Check that rel exists and size is correct
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
.await?,
1
);
@@ -2011,7 +2015,7 @@ mod tests {
// Check that rel is not visible anymore
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), &ctx)
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x30)), &ctx)
.await?,
false
);
@@ -2029,13 +2033,13 @@ mod tests {
// Check that rel exists and size is correct
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x40)), &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x40)), &ctx)
.await?,
1
);
@@ -2077,26 +2081,26 @@ mod tests {
// The relation was created at LSN 20, not visible at LSN 1 yet.
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
.await?,
false
);
assert!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
.await
.is_err()
);
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
.await?,
relsize
);
@@ -2110,7 +2114,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
blkno,
Version::Lsn(lsn),
Version::at(lsn),
&ctx,
io_concurrency.clone()
)
@@ -2131,7 +2135,7 @@ mod tests {
// Check reported size and contents after truncation
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x60)), &ctx)
.await?,
1
);
@@ -2144,7 +2148,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
blkno,
Version::Lsn(Lsn(0x60)),
Version::at(Lsn(0x60)),
&ctx,
io_concurrency.clone()
)
@@ -2157,7 +2161,7 @@ mod tests {
// should still see all blocks with older LSN
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx)
.await?,
relsize
);
@@ -2169,7 +2173,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
blkno,
Version::Lsn(Lsn(0x50)),
Version::at(Lsn(0x50)),
&ctx,
io_concurrency.clone()
)
@@ -2193,13 +2197,13 @@ mod tests {
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x80)), &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x80)), &ctx)
.await?,
relsize
);
@@ -2212,7 +2216,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
blkno,
Version::Lsn(Lsn(0x80)),
Version::at(Lsn(0x80)),
&ctx,
io_concurrency.clone()
)
@@ -2250,7 +2254,7 @@ mod tests {
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
.await?,
RELSEG_SIZE + 1
);
@@ -2264,7 +2268,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
.await?,
RELSEG_SIZE
);
@@ -2279,7 +2283,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
.await?,
RELSEG_SIZE - 1
);
@@ -2297,7 +2301,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
.await?,
size as BlockNumber
);

View File

@@ -936,6 +936,44 @@ lfc_prewarm_main(Datum main_arg)
lfc_ctl->prewarm_workers[worker_id].completed = GetCurrentTimestamp();
}
void
lfc_invalidate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks)
{
BufferTag tag;
FileCacheEntry *entry;
uint32 hash;
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return;
CopyNRelFileInfoToBufTag(tag, rinfo);
tag.forkNum = forkNum;
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (LFC_ENABLED())
{
for (BlockNumber blkno = 0; blkno < nblocks; blkno += lfc_blocks_per_chunk)
{
tag.blockNum = blkno;
hash = get_hash_value(lfc_hash, &tag);
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
if (entry != NULL)
{
for (int i = 0; i < lfc_blocks_per_chunk; i++)
{
if (GET_STATE(entry, i) == AVAILABLE)
{
lfc_ctl->used_pages -= 1;
SET_STATE(entry, i, UNAVAILABLE);
}
}
}
}
}
LWLockRelease(lfc_lock);
}
/*
* Check if page is present in the cache.

View File

@@ -28,6 +28,7 @@ typedef struct FileCacheState
extern bool lfc_store_prefetch_result;
/* functions for local file cache */
extern void lfc_invalidate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks);
extern void lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, const void *const *buffers,
BlockNumber nblocks);

View File

@@ -86,7 +86,7 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode,
#define InvalidRelFileNumber InvalidOid
#define SMgrRelGetRelInfo(reln) \
#define SMgrRelGetRelInfo(reln) \
(reln->smgr_rnode.node)
#define DropRelationAllLocalBuffers DropRelFileNodeAllLocalBuffers
@@ -148,6 +148,12 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode,
#define DropRelationAllLocalBuffers DropRelationAllLocalBuffers
#endif
#define NRelFileInfoInvalidate(rinfo) do { \
NInfoGetSpcOid(rinfo) = InvalidOid; \
NInfoGetDbOid(rinfo) = InvalidOid; \
NInfoGetRelNumber(rinfo) = InvalidRelFileNumber; \
} while (0)
#if PG_MAJORVERSION_NUM < 17
#define ProcNumber BackendId
#define INVALID_PROC_NUMBER InvalidBackendId

View File

@@ -108,7 +108,7 @@ typedef enum
UNLOGGED_BUILD_NOT_PERMANENT
} UnloggedBuildPhase;
static SMgrRelation unlogged_build_rel = NULL;
static NRelFileInfo unlogged_build_rel_info;
static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
static bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id);
@@ -912,16 +912,19 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
{
case 0:
neon_log(ERROR, "cannot call smgrextend() on rel with unknown persistence");
break;
case RELPERSISTENCE_PERMANENT:
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
{
mdextend(reln, forkNum, blkno, buffer, skipFsync);
return;
}
break;
case RELPERSISTENCE_TEMP:
case RELPERSISTENCE_UNLOGGED:
mdextend(reln, forkNum, blkno, buffer, skipFsync);
/* Update LFC in case of unlogged index build */
if (reln == unlogged_build_rel && unlogged_build_phase == UNLOGGED_BUILD_PHASE_2)
lfc_write(InfoFromSMgrRel(reln), forkNum, blkno, buffer);
return;
default:
@@ -1003,21 +1006,19 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
{
case 0:
neon_log(ERROR, "cannot call smgrextend() on rel with unknown persistence");
break;
case RELPERSISTENCE_PERMANENT:
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
{
mdzeroextend(reln, forkNum, blocknum, nblocks, skipFsync);
return;
}
break;
case RELPERSISTENCE_TEMP:
case RELPERSISTENCE_UNLOGGED:
mdzeroextend(reln, forkNum, blocknum, nblocks, skipFsync);
/* Update LFC in case of unlogged index build */
if (reln == unlogged_build_rel && unlogged_build_phase == UNLOGGED_BUILD_PHASE_2)
{
for (int i = 0; i < nblocks; i++)
{
lfc_write(InfoFromSMgrRel(reln), forkNum, blocknum + i, buffer.data);
}
}
return;
default:
@@ -1387,8 +1388,14 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
{
case 0:
neon_log(ERROR, "cannot call smgrread() on rel with unknown persistence");
break;
case RELPERSISTENCE_PERMANENT:
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
{
mdread(reln, forkNum, blkno, buffer);
return;
}
break;
case RELPERSISTENCE_TEMP:
@@ -1474,8 +1481,14 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
{
case 0:
neon_log(ERROR, "cannot call smgrread() on rel with unknown persistence");
break;
case RELPERSISTENCE_PERMANENT:
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
{
mdreadv(reln, forknum, blocknum, buffers, nblocks);
return;
}
break;
case RELPERSISTENCE_TEMP:
@@ -1608,6 +1621,15 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
break;
case RELPERSISTENCE_PERMANENT:
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
{
#if PG_MAJORVERSION_NUM >= 17
mdwritev(reln, forknum, blocknum, &buffer, 1, skipFsync);
#else
mdwrite(reln, forknum, blocknum, buffer, skipFsync);
#endif
return;
}
break;
case RELPERSISTENCE_TEMP:
@@ -1617,9 +1639,6 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
#else
mdwrite(reln, forknum, blocknum, buffer, skipFsync);
#endif
/* Update LFC in case of unlogged index build */
if (reln == unlogged_build_rel && unlogged_build_phase == UNLOGGED_BUILD_PHASE_2)
lfc_write(InfoFromSMgrRel(reln), forknum, blocknum, buffer);
return;
default:
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
@@ -1680,14 +1699,16 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
break;
case RELPERSISTENCE_PERMANENT:
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
{
mdwritev(reln, forknum, blkno, buffers, nblocks, skipFsync);
return;
}
break;
case RELPERSISTENCE_TEMP:
case RELPERSISTENCE_UNLOGGED:
mdwritev(reln, forknum, blkno, buffers, nblocks, skipFsync);
/* Update LFC in case of unlogged index build */
if (reln == unlogged_build_rel && unlogged_build_phase == UNLOGGED_BUILD_PHASE_2)
lfc_writev(InfoFromSMgrRel(reln), forknum, blkno, buffers, nblocks);
return;
default:
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
@@ -1723,6 +1744,10 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
break;
case RELPERSISTENCE_PERMANENT:
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
{
return mdnblocks(reln, forknum);
}
break;
case RELPERSISTENCE_TEMP:
@@ -1792,6 +1817,11 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, Blo
break;
case RELPERSISTENCE_PERMANENT:
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
{
mdtruncate(reln, forknum, old_blocks, nblocks);
return;
}
break;
case RELPERSISTENCE_TEMP:
@@ -1930,7 +1960,6 @@ neon_start_unlogged_build(SMgrRelation reln)
*/
if (unlogged_build_phase != UNLOGGED_BUILD_NOT_IN_PROGRESS)
neon_log(ERROR, "unlogged relation build is already in progress");
Assert(unlogged_build_rel == NULL);
ereport(SmgrTrace,
(errmsg(NEON_TAG "starting unlogged build of relation %u/%u/%u",
@@ -1947,7 +1976,7 @@ neon_start_unlogged_build(SMgrRelation reln)
case RELPERSISTENCE_TEMP:
case RELPERSISTENCE_UNLOGGED:
unlogged_build_rel = reln;
unlogged_build_rel_info = InfoFromSMgrRel(reln);
unlogged_build_phase = UNLOGGED_BUILD_NOT_PERMANENT;
#ifdef DEBUG_COMPARE_LOCAL
if (!IsParallelWorker())
@@ -1968,12 +1997,9 @@ neon_start_unlogged_build(SMgrRelation reln)
neon_log(ERROR, "cannot perform unlogged index build, index is not empty ");
#endif
unlogged_build_rel = reln;
unlogged_build_rel_info = InfoFromSMgrRel(reln);
unlogged_build_phase = UNLOGGED_BUILD_PHASE_1;
/* Make the relation look like it's unlogged */
reln->smgr_relpersistence = RELPERSISTENCE_UNLOGGED;
/*
* Create the local file. In a parallel build, the leader is expected to
* call this first and do it.
@@ -2000,17 +2026,16 @@ neon_start_unlogged_build(SMgrRelation reln)
static void
neon_finish_unlogged_build_phase_1(SMgrRelation reln)
{
Assert(unlogged_build_rel == reln);
Assert(RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)));
ereport(SmgrTrace,
(errmsg(NEON_TAG "finishing phase 1 of unlogged build of relation %u/%u/%u",
RelFileInfoFmt(InfoFromSMgrRel(reln)))));
RelFileInfoFmt((unlogged_build_rel_info)))));
if (unlogged_build_phase == UNLOGGED_BUILD_NOT_PERMANENT)
return;
Assert(unlogged_build_phase == UNLOGGED_BUILD_PHASE_1);
Assert(reln->smgr_relpersistence == RELPERSISTENCE_UNLOGGED);
/*
* In a parallel build, (only) the leader process performs the 2nd
@@ -2018,7 +2043,7 @@ neon_finish_unlogged_build_phase_1(SMgrRelation reln)
*/
if (IsParallelWorker())
{
unlogged_build_rel = NULL;
NRelFileInfoInvalidate(unlogged_build_rel_info);
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
}
else
@@ -2039,11 +2064,11 @@ neon_end_unlogged_build(SMgrRelation reln)
{
NRelFileInfoBackend rinfob = InfoBFromSMgrRel(reln);
Assert(unlogged_build_rel == reln);
Assert(RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)));
ereport(SmgrTrace,
(errmsg(NEON_TAG "ending unlogged build of relation %u/%u/%u",
RelFileInfoFmt(InfoFromNInfoB(rinfob)))));
RelFileInfoFmt(unlogged_build_rel_info))));
if (unlogged_build_phase != UNLOGGED_BUILD_NOT_PERMANENT)
{
@@ -2051,7 +2076,6 @@ neon_end_unlogged_build(SMgrRelation reln)
BlockNumber nblocks;
Assert(unlogged_build_phase == UNLOGGED_BUILD_PHASE_2);
Assert(reln->smgr_relpersistence == RELPERSISTENCE_UNLOGGED);
/*
* Update the last-written LSN cache.
@@ -2072,9 +2096,6 @@ neon_end_unlogged_build(SMgrRelation reln)
InfoFromNInfoB(rinfob),
MAIN_FORKNUM);
/* Make the relation look permanent again */
reln->smgr_relpersistence = RELPERSISTENCE_PERMANENT;
/* Remove local copy */
for (int forknum = 0; forknum <= MAX_FORKNUM; forknum++)
{
@@ -2083,6 +2104,8 @@ neon_end_unlogged_build(SMgrRelation reln)
forknum);
forget_cached_relsize(InfoFromNInfoB(rinfob), forknum);
lfc_invalidate(InfoFromNInfoB(rinfob), forknum, nblocks);
mdclose(reln, forknum);
#ifndef DEBUG_COMPARE_LOCAL
/* use isRedo == true, so that we drop it immediately */
@@ -2093,7 +2116,7 @@ neon_end_unlogged_build(SMgrRelation reln)
mdunlink(rinfob, INIT_FORKNUM, true);
#endif
}
unlogged_build_rel = NULL;
NRelFileInfoInvalidate(unlogged_build_rel_info);
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
}
@@ -2166,7 +2189,7 @@ AtEOXact_neon(XactEvent event, void *arg)
* Forget about any build we might have had in progress. The local
* file will be unlinked by smgrDoPendingDeletes()
*/
unlogged_build_rel = NULL;
NRelFileInfoInvalidate(unlogged_build_rel_info);
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
break;
@@ -2178,7 +2201,7 @@ AtEOXact_neon(XactEvent event, void *arg)
case XACT_EVENT_PRE_PREPARE:
if (unlogged_build_phase != UNLOGGED_BUILD_NOT_IN_PROGRESS)
{
unlogged_build_rel = NULL;
NRelFileInfoInvalidate(unlogged_build_rel_info);
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),

32
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
[[package]]
name = "aiohappyeyeballs"
@@ -1145,18 +1145,19 @@ dotenv = ["python-dotenv"]
[[package]]
name = "flask-cors"
version = "5.0.0"
description = "A Flask extension adding a decorator for CORS support"
version = "6.0.0"
description = "A Flask extension simplifying CORS support"
optional = false
python-versions = "*"
python-versions = "<4.0,>=3.9"
groups = ["main"]
files = [
{file = "Flask_Cors-5.0.0-py2.py3-none-any.whl", hash = "sha256:b9e307d082a9261c100d8fb0ba909eec6a228ed1b60a8315fd85f783d61910bc"},
{file = "flask_cors-5.0.0.tar.gz", hash = "sha256:5aadb4b950c4e93745034594d9f3ea6591f734bb3662e16e255ffbf5e89c88ef"},
{file = "flask_cors-6.0.0-py3-none-any.whl", hash = "sha256:6332073356452343a8ccddbfec7befdc3fdd040141fe776ec9b94c262f058657"},
{file = "flask_cors-6.0.0.tar.gz", hash = "sha256:4592c1570246bf7beee96b74bc0adbbfcb1b0318f6ba05c412e8909eceec3393"},
]
[package.dependencies]
Flask = ">=0.9"
flask = ">=0.9"
Werkzeug = ">=0.7"
[[package]]
name = "frozenlist"
@@ -3169,19 +3170,24 @@ pbr = "*"
[[package]]
name = "setuptools"
version = "70.0.0"
version = "78.1.1"
description = "Easily download, build, install, upgrade, and uninstall Python packages"
optional = false
python-versions = ">=3.8"
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "setuptools-70.0.0-py3-none-any.whl", hash = "sha256:54faa7f2e8d2d11bcd2c07bed282eef1046b5c080d1c32add737d7b5817b1ad4"},
{file = "setuptools-70.0.0.tar.gz", hash = "sha256:f211a66637b8fa059bb28183da127d4e86396c991a942b028c6650d4319c3fd0"},
{file = "setuptools-78.1.1-py3-none-any.whl", hash = "sha256:c3a9c4211ff4c309edb8b8c4f1cbfa7ae324c4ba9f91ff254e3d305b9fd54561"},
{file = "setuptools-78.1.1.tar.gz", hash = "sha256:fcc17fd9cd898242f6b4adfaca46137a9edef687f43e6f78469692a5e70d851d"},
]
[package.extras]
docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier"]
testing = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "importlib-metadata", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21) ; python_version >= \"3.9\" and sys_platform != \"cygwin\"", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "mypy (==1.9)", "packaging (>=23.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.1)", "pytest-checkdocs (>=2.4)", "pytest-cov ; platform_python_implementation != \"PyPy\"", "pytest-enabler (>=2.2)", "pytest-home (>=0.5)", "pytest-mypy", "pytest-perf ; sys_platform != \"cygwin\"", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\"", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"]
check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\"", "ruff (>=0.8.0) ; sys_platform != \"cygwin\""]
core = ["importlib_metadata (>=6) ; python_version < \"3.10\"", "jaraco.functools (>=4)", "jaraco.text (>=3.7)", "more_itertools", "more_itertools (>=8.8)", "packaging (>=24.2)", "platformdirs (>=4.2.2)", "tomli (>=2.0.1) ; python_version < \"3.11\"", "wheel (>=0.43.0)"]
cover = ["pytest-cov"]
doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier", "towncrier (<24.7)"]
enabler = ["pytest-enabler (>=2.2)"]
test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21) ; python_version >= \"3.9\" and sys_platform != \"cygwin\"", "jaraco.envs (>=2.2)", "jaraco.path (>=3.7.2)", "jaraco.test (>=5.5)", "packaging (>=24.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-home (>=0.5)", "pytest-perf ; sys_platform != \"cygwin\"", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel (>=0.44.0)"]
type = ["importlib_metadata (>=7.0.2) ; python_version < \"3.10\"", "jaraco.develop (>=7.21) ; sys_platform != \"cygwin\"", "mypy (==1.14.*)", "pytest-mypy"]
[[package]]
name = "six"

View File

@@ -127,3 +127,4 @@ rstest.workspace = true
walkdir.workspace = true
rand_distr = "0.4"
tokio-postgres.workspace = true
tracing-test = "0.2"

View File

@@ -80,10 +80,22 @@ impl std::fmt::Display for Backend<'_, ()> {
.field(&endpoint.url())
.finish(),
#[cfg(any(test, feature = "testing"))]
ControlPlaneClient::PostgresMock(endpoint) => fmt
.debug_tuple("ControlPlane::PostgresMock")
.field(&endpoint.url())
.finish(),
ControlPlaneClient::PostgresMock(endpoint) => {
let url = endpoint.url();
match url::Url::parse(url) {
Ok(mut url) => {
let _ = url.set_password(Some("_redacted_"));
let url = url.as_str();
fmt.debug_tuple("ControlPlane::PostgresMock")
.field(&url)
.finish()
}
Err(_) => fmt
.debug_tuple("ControlPlane::PostgresMock")
.field(&url)
.finish(),
}
}
#[cfg(test)]
ControlPlaneClient::Test(_) => fmt.debug_tuple("ControlPlane::Test").finish(),
},

View File

@@ -1,9 +1,13 @@
#[cfg(any(test, feature = "testing"))]
use std::env;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::pin::pin;
use std::sync::Arc;
use std::time::Duration;
#[cfg(any(test, feature = "testing"))]
use anyhow::Context;
use anyhow::{bail, ensure};
use arc_swap::ArcSwapOption;
use futures::future::Either;
@@ -23,6 +27,7 @@ use crate::config::{
ProxyConfig, ProxyProtocolV2, remote_storage_from_toml,
};
use crate::context::parquet::ParquetUploadArgs;
use crate::control_plane::client::cplane_proxy_v1::{GeoProximity, RegionProximityMap};
use crate::http::health_server::AppMetrics;
use crate::metrics::Metrics;
use crate::rate_limiter::{
@@ -35,6 +40,8 @@ use crate::scram::threadpool::ThreadPool;
use crate::serverless::GlobalConnPoolOptions;
use crate::serverless::cancel_set::CancelSet;
use crate::tls::client_config::compute_client_config_with_root_certs;
#[cfg(any(test, feature = "testing"))]
use crate::url::ApiUrl;
use crate::{auth, control_plane, http, serverless, usage_metrics};
project_git_version!(GIT_VERSION);
@@ -161,8 +168,11 @@ struct ProxyCliArgs {
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_REDIS_SET)]
redis_rps_limit: Vec<RateBucketInfo>,
/// Cancellation channel size (max queue size for redis kv client)
#[clap(long, default_value = "1024")]
#[clap(long, default_value_t = 1024)]
cancellation_ch_size: usize,
/// Cancellation ops batch size for redis
#[clap(long, default_value_t = 8)]
cancellation_batch_size: usize,
/// cache for `allowed_ips` (use `size=0` to disable)
#[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)]
allowed_ips_cache: String,
@@ -542,7 +552,12 @@ pub async fn run() -> anyhow::Result<()> {
if let Some(mut redis_kv_client) = redis_kv_client {
maintenance_tasks.spawn(async move {
redis_kv_client.try_connect().await?;
handle_cancel_messages(&mut redis_kv_client, rx_cancel).await?;
handle_cancel_messages(
&mut redis_kv_client,
rx_cancel,
args.cancellation_batch_size,
)
.await?;
drop(redis_kv_client);
@@ -752,12 +767,21 @@ fn build_auth_backend(
let wake_compute_endpoint_rate_limiter =
Arc::new(WakeComputeRateLimiter::new(wake_compute_rps_limit));
let geo_map = Box::leak(Box::new(RegionProximityMap::from([(
args.region.clone(),
GeoProximity {
_weight: 1,
_distance: 0,
},
)])));
let api = control_plane::client::cplane_proxy_v1::NeonControlPlaneClient::new(
endpoint,
args.control_plane_token.clone(),
caches,
locks,
wake_compute_endpoint_rate_limiter,
geo_map,
);
let api = control_plane::client::ControlPlaneClient::ProxyV1(api);
@@ -769,7 +793,13 @@ fn build_auth_backend(
#[cfg(any(test, feature = "testing"))]
AuthBackendType::Postgres => {
let url = args.auth_endpoint.parse()?;
let mut url: ApiUrl = args.auth_endpoint.parse()?;
if url.password().is_none() {
let password = env::var("PGPASSWORD")
.with_context(|| "auth-endpoint does not contain a password and environment variable `PGPASSWORD` is not set")?;
url.set_password(Some(&password))
.expect("Failed to set password");
}
let api = control_plane::client::mock::MockControlPlane::new(
url,
!args.is_private_access_proxy,
@@ -825,6 +855,14 @@ fn build_auth_backend(
let wake_compute_endpoint_rate_limiter =
Arc::new(WakeComputeRateLimiter::new(wake_compute_rps_limit));
let geo_map = Box::leak(Box::new(RegionProximityMap::from([(
args.region.clone(),
GeoProximity {
_weight: 1,
_distance: 0,
},
)])));
// Since we use only get_allowed_ips_and_secret() wake_compute_endpoint_rate_limiter
// and locks are not used in ConsoleRedirectBackend,
// but they are required by the NeonControlPlaneClient
@@ -834,6 +872,7 @@ fn build_auth_backend(
caches,
locks,
wake_compute_endpoint_rate_limiter,
geo_map,
);
let backend = ConsoleRedirectBackend::new(url, api);

View File

@@ -30,8 +30,6 @@ use crate::tls::postgres_rustls::MakeRustlsConnect;
type IpSubnetKey = IpNet;
const CANCEL_KEY_TTL: i64 = 1_209_600; // 2 weeks cancellation key expire time
const REDIS_SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(10);
const BATCH_SIZE: usize = 8;
// Message types for sending through mpsc channel
pub enum CancelKeyOp {
@@ -231,12 +229,13 @@ impl CancelReplyOp {
pub async fn handle_cancel_messages(
client: &mut RedisKVClient,
mut rx: mpsc::Receiver<CancelKeyOp>,
batch_size: usize,
) -> anyhow::Result<()> {
let mut batch = Vec::with_capacity(BATCH_SIZE);
let mut pipeline = Pipeline::with_capacity(BATCH_SIZE);
let mut batch = Vec::with_capacity(batch_size);
let mut pipeline = Pipeline::with_capacity(batch_size);
loop {
if rx.recv_many(&mut batch, BATCH_SIZE).await == 0 {
if rx.recv_many(&mut batch, batch_size).await == 0 {
warn!("shutting down cancellation queue");
break Ok(());
}
@@ -367,8 +366,7 @@ impl CancellationHandler {
return Err(CancelError::InternalError);
};
tx.send_timeout(op, REDIS_SEND_TIMEOUT)
.await
tx.try_send(op)
.map_err(|e| {
tracing::warn!("failed to send GetCancelData for {key}: {e}");
})
@@ -570,7 +568,7 @@ impl Session {
}
// Send the store key op to the cancellation handler and set TTL for the key
pub(crate) async fn write_cancel_key(
pub(crate) fn write_cancel_key(
&self,
cancel_closure: CancelClosure,
) -> Result<(), CancelError> {
@@ -596,14 +594,14 @@ impl Session {
expire: CANCEL_KEY_TTL,
};
let _ = tx.send_timeout(op, REDIS_SEND_TIMEOUT).await.map_err(|e| {
let _ = tx.try_send(op).map_err(|e| {
let key = self.key;
tracing::warn!("failed to send StoreCancelKey for {key}: {e}");
});
Ok(())
}
pub(crate) async fn remove_cancel_key(&self) -> Result<(), CancelError> {
pub(crate) fn remove_cancel_key(&self) -> Result<(), CancelError> {
let Some(tx) = &self.cancellation_handler.tx else {
tracing::warn!("cancellation handler is not available");
return Err(CancelError::InternalError);
@@ -619,7 +617,7 @@ impl Session {
.guard(RedisMsgKind::HDel),
};
let _ = tx.send_timeout(op, REDIS_SEND_TIMEOUT).await.map_err(|e| {
let _ = tx.try_send(op).map_err(|e| {
let key = self.key;
tracing::warn!("failed to send RemoveCancelKey for {key}: {e}");
});

View File

@@ -244,9 +244,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
let session = cancellation_handler_clone.get_key();
session
.write_cancel_key(node.cancel_closure.clone())
.await?;
session.write_cancel_key(node.cancel_closure.clone())?;
prepare_client_connection(&node, *session.key(), &mut stream).await?;

View File

@@ -296,6 +296,10 @@ impl RequestContext {
.has_private_peer_addr()
}
pub fn is_global(&self) -> bool {
self.0.try_lock().expect("should not deadlock").region == "global"
}
pub(crate) fn set_error_kind(&self, kind: ErrorKind) {
let mut this = self.0.try_lock().expect("should not deadlock");
// Do not record errors from the private address to metrics.

View File

@@ -1,5 +1,6 @@
//! Production console backend.
use std::collections::HashMap;
use std::net::IpAddr;
use std::str::FromStr;
use std::sync::Arc;
@@ -12,7 +13,10 @@ use postgres_client::config::SslMode;
use tokio::time::Instant;
use tracing::{Instrument, debug, info, info_span, warn};
use super::super::messages::{ControlPlaneErrorMessage, GetEndpointAccessControl, WakeCompute};
use super::super::messages::{
ControlPlaneErrorMessage, GetEndpointAccessControl, GetEndpointAccessControlReplicated,
WakeCompute,
};
use crate::auth::backend::ComputeUserInfo;
use crate::auth::backend::jwt::AuthRule;
use crate::cache::Cached;
@@ -34,6 +38,15 @@ use crate::{compute, http, scram};
pub(crate) const X_REQUEST_ID: HeaderName = HeaderName::from_static("x-request-id");
pub type RegionId = String;
pub type RegionProximityMap = HashMap<RegionId, GeoProximity>;
#[derive(Clone)]
pub struct GeoProximity {
pub _weight: u64, // load or preference-based parameter
pub _distance: u64, // approximate distance from the region to the current proxy
}
#[derive(Clone)]
pub struct NeonControlPlaneClient {
endpoint: http::Endpoint,
@@ -42,6 +55,7 @@ pub struct NeonControlPlaneClient {
pub(crate) wake_compute_endpoint_rate_limiter: Arc<WakeComputeRateLimiter>,
// put in a shared ref so we don't copy secrets all over in memory
jwt: Arc<str>,
geo_map: &'static RegionProximityMap,
}
impl NeonControlPlaneClient {
@@ -52,6 +66,7 @@ impl NeonControlPlaneClient {
caches: &'static ApiCaches,
locks: &'static ApiLocks<EndpointCacheKey>,
wake_compute_endpoint_rate_limiter: Arc<WakeComputeRateLimiter>,
geo_map: &'static RegionProximityMap,
) -> Self {
Self {
endpoint,
@@ -59,6 +74,7 @@ impl NeonControlPlaneClient {
locks,
wake_compute_endpoint_rate_limiter,
jwt,
geo_map,
}
}
@@ -81,10 +97,126 @@ impl NeonControlPlaneClient {
info!("endpoint is not valid, skipping the request");
return Ok(AuthInfo::default());
}
if ctx.is_global() {
return self
.do_get_auth_req_replicated(user_info, &ctx.session_id(), Some(ctx))
.await;
}
self.do_get_auth_req(user_info, &ctx.session_id(), Some(ctx))
.await
}
async fn do_get_auth_req_replicated(
&self,
user_info: &ComputeUserInfo,
session_id: &uuid::Uuid,
ctx: Option<&RequestContext>,
) -> Result<AuthInfo, GetAuthInfoError> {
let request_id: String = session_id.to_string();
let application_name = if let Some(ctx) = ctx {
ctx.console_application_name()
} else {
"auth_cancellation".to_string()
};
async {
let request = self
.endpoint
.get_path("get_endpoint_access_control_replicated")
.header(X_REQUEST_ID, &request_id)
.header(AUTHORIZATION, format!("Bearer {}", &self.jwt))
.query(&[("session_id", session_id)])
.query(&[
("application_name", application_name.as_str()),
("endpointish", user_info.endpoint.as_str()),
("role", user_info.user.as_str()),
])
.build()?;
debug!(url = request.url().as_str(), "sending http request");
let start = Instant::now();
let response = match ctx {
Some(ctx) => {
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Cplane);
let rsp = self.endpoint.execute(request).await;
drop(pause);
rsp?
}
None => self.endpoint.execute(request).await?,
};
info!(duration = ?start.elapsed(), "received http response");
let body = match parse_body::<GetEndpointAccessControlReplicated>(response).await {
Ok(body) => body,
// Error 404 is special: it's ok not to have a secret.
// TODO(anna): retry
Err(e) => {
return if e.get_reason().is_not_found() {
// TODO: refactor this because it's weird
// this is a failure to authenticate but we return Ok.
Ok(AuthInfo::default())
} else {
Err(e.into())
};
}
};
for endpoint in body.endpoints {
if let Some(region_id) = &endpoint.region_id {
if let Some(_proximity) = self.geo_map.get(region_id) {
// TODO:: calculate proximity and reroute
let secret = if endpoint.role_secret.is_empty() {
None
} else {
let secret = scram::ServerSecret::parse(&endpoint.role_secret)
.map(AuthSecret::Scram)
.ok_or(GetAuthInfoError::BadSecret)?;
Some(secret)
};
let allowed_ips = endpoint.allowed_ips.unwrap_or_default();
Metrics::get()
.proxy
.allowed_ips_number
.observe(allowed_ips.len() as f64);
let allowed_vpc_endpoint_ids =
endpoint.allowed_vpc_endpoint_ids.unwrap_or_default();
Metrics::get()
.proxy
.allowed_vpc_endpoint_ids
.observe(allowed_vpc_endpoint_ids.len() as f64);
let block_public_connections =
endpoint.block_public_connections.unwrap_or_default();
let block_vpc_connections =
endpoint.block_vpc_connections.unwrap_or_default();
// return the closest replica
return Ok(AuthInfo {
secret,
allowed_ips,
allowed_vpc_endpoint_ids,
project_id: endpoint.project_id,
account_id: endpoint.account_id,
access_blocker_flags: AccessBlockerFlags {
public_access_blocked: block_public_connections,
vpc_access_blocked: block_vpc_connections,
},
});
}
return Err(GetAuthInfoError::RegionNotFound);
}
}
Err(GetAuthInfoError::RegionNotFound)
}
.inspect_err(|e| tracing::debug!(error = ?e))
.instrument(info_span!("do_get_auth_info"))
.await
}
async fn do_get_auth_req(
&self,
user_info: &ComputeUserInfo,

View File

@@ -99,6 +99,9 @@ pub(crate) enum GetAuthInfoError {
#[error(transparent)]
ApiError(ControlPlaneError),
#[error("No endpoint found in this region")]
RegionNotFound,
}
// This allows more useful interactions than `#[from]`.
@@ -115,6 +118,7 @@ impl UserFacingError for GetAuthInfoError {
Self::BadSecret => REQUEST_FAILED.to_owned(),
// However, API might return a meaningful error.
Self::ApiError(e) => e.to_string_client(),
Self::RegionNotFound => "No endpoint found in this region".to_owned(),
}
}
}
@@ -124,6 +128,7 @@ impl ReportableError for GetAuthInfoError {
match self {
Self::BadSecret => crate::error::ErrorKind::ControlPlane,
Self::ApiError(_) => crate::error::ErrorKind::ControlPlane,
Self::RegionNotFound => crate::error::ErrorKind::User,
}
}
}

View File

@@ -222,6 +222,25 @@ pub(crate) struct UserFacingMessage {
pub(crate) message: Box<str>,
}
/// Response which holds client's auth secret, e.g. [`crate::scram::ServerSecret`].
/// Returned by the `/get_endpoint_access_control_replicated` API method.
#[derive(Deserialize)]
pub(crate) struct GetEndpointAccessControlReplicated {
pub(crate) endpoints: Vec<EndpointAccessControlReplicated>,
}
#[derive(Deserialize)]
pub(crate) struct EndpointAccessControlReplicated {
pub(crate) role_secret: Box<str>,
pub(crate) allowed_ips: Option<Vec<IpPattern>>,
pub(crate) allowed_vpc_endpoint_ids: Option<Vec<String>>,
pub(crate) project_id: Option<ProjectIdInt>,
pub(crate) account_id: Option<AccountIdInt>,
pub(crate) block_public_connections: Option<bool>,
pub(crate) block_vpc_connections: Option<bool>,
pub(crate) region_id: Option<String>,
}
/// Response which holds client's auth secret, e.g. [`crate::scram::ServerSecret`].
/// Returned by the `/get_endpoint_access_control` API method.
#[derive(Deserialize)]

View File

@@ -1,13 +1,11 @@
use std::cell::{Cell, RefCell};
use std::cell::RefCell;
use std::collections::HashMap;
use std::hash::BuildHasher;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::{array, env, fmt, io};
use std::{env, io};
use chrono::{DateTime, Utc};
use indexmap::IndexSet;
use opentelemetry::trace::TraceContextExt;
use scopeguard::defer;
use serde::ser::{SerializeMap, Serializer};
use tracing::subscriber::Interest;
use tracing::{Event, Metadata, Span, Subscriber, callsite, span};
@@ -19,7 +17,6 @@ use tracing_subscriber::fmt::{FormatEvent, FormatFields};
use tracing_subscriber::layer::{Context, Layer};
use tracing_subscriber::prelude::*;
use tracing_subscriber::registry::{LookupSpan, SpanRef};
use try_lock::TryLock;
/// Initialize logging and OpenTelemetry tracing and exporter.
///
@@ -55,7 +52,7 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
StderrWriter {
stderr: std::io::stderr(),
},
["request_id", "session_id", "conn_id"],
&["request_id", "session_id", "conn_id"],
))
} else {
None
@@ -183,50 +180,65 @@ impl Clock for RealClock {
/// Name of the field used by tracing crate to store the event message.
const MESSAGE_FIELD: &str = "message";
/// Tracing used to enforce that spans/events have no more than 32 fields.
/// It seems this is no longer the case, but it's still documented in some places.
/// Generally, we shouldn't expect more than 32 fields anyway, so we can try and
/// rely on it for some (minor) performance gains.
const MAX_TRACING_FIELDS: usize = 32;
thread_local! {
/// Protects against deadlocks and double panics during log writing.
/// The current panic handler will use tracing to log panic information.
static REENTRANCY_GUARD: Cell<bool> = const { Cell::new(false) };
/// Thread-local instance with per-thread buffer for log writing.
static EVENT_FORMATTER: RefCell<EventFormatter> = RefCell::new(EventFormatter::new());
static EVENT_FORMATTER: RefCell<EventFormatter> = const { RefCell::new(EventFormatter::new()) };
/// Cached OS thread ID.
static THREAD_ID: u64 = gettid::gettid();
}
/// Map for values fixed at callsite registration.
// We use papaya here because registration rarely happens post-startup.
// papaya is good for read-heavy workloads.
//
// We use rustc_hash here because callsite::Identifier will always be an integer with low-bit entropy,
// since it's always a pointer to static mutable data. rustc_hash was designed for low-bit entropy.
type CallsiteMap<T> =
papaya::HashMap<callsite::Identifier, T, std::hash::BuildHasherDefault<rustc_hash::FxHasher>>;
/// Implements tracing layer to handle events specific to logging.
struct JsonLoggingLayer<C: Clock, W: MakeWriter, const F: usize> {
struct JsonLoggingLayer<C: Clock, W: MakeWriter> {
clock: C,
skipped_field_indices: papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
callsite_ids: papaya::HashMap<callsite::Identifier, CallsiteId>,
writer: W,
// We use a const generic and arrays to bypass one heap allocation.
extract_fields: IndexSet<&'static str>,
_marker: std::marker::PhantomData<[&'static str; F]>,
/// tracks which fields of each **event** are duplicates
skipped_field_indices: CallsiteMap<SkippedFieldIndices>,
span_info: CallsiteMap<CallsiteSpanInfo>,
/// Fields we want to keep track of in a separate json object.
extract_fields: &'static [&'static str],
}
impl<C: Clock, W: MakeWriter, const F: usize> JsonLoggingLayer<C, W, F> {
fn new(clock: C, writer: W, extract_fields: [&'static str; F]) -> Self {
impl<C: Clock, W: MakeWriter> JsonLoggingLayer<C, W> {
fn new(clock: C, writer: W, extract_fields: &'static [&'static str]) -> Self {
JsonLoggingLayer {
clock,
skipped_field_indices: papaya::HashMap::default(),
callsite_ids: papaya::HashMap::default(),
skipped_field_indices: CallsiteMap::default(),
span_info: CallsiteMap::default(),
writer,
extract_fields: IndexSet::from_iter(extract_fields),
_marker: std::marker::PhantomData,
extract_fields,
}
}
#[inline]
fn callsite_id(&self, cs: callsite::Identifier) -> CallsiteId {
*self
.callsite_ids
fn span_info(&self, metadata: &'static Metadata<'static>) -> CallsiteSpanInfo {
self.span_info
.pin()
.get_or_insert_with(cs, CallsiteId::next)
.get_or_insert_with(metadata.callsite(), || {
CallsiteSpanInfo::new(metadata, self.extract_fields)
})
.clone()
}
}
impl<S, C: Clock + 'static, W: MakeWriter + 'static, const F: usize> Layer<S>
for JsonLoggingLayer<C, W, F>
impl<S, C: Clock + 'static, W: MakeWriter + 'static> Layer<S> for JsonLoggingLayer<C, W>
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
@@ -237,35 +249,25 @@ where
// early, before OTel machinery, and add as event extension.
let now = self.clock.now();
let res: io::Result<()> = REENTRANCY_GUARD.with(move |entered| {
if entered.get() {
let mut formatter = EventFormatter::new();
formatter.format::<S, F>(
now,
event,
&ctx,
&self.skipped_field_indices,
&self.callsite_ids,
&self.extract_fields,
)?;
self.writer.make_writer().write_all(formatter.buffer())
} else {
entered.set(true);
defer!(entered.set(false););
let res: io::Result<()> = EVENT_FORMATTER.with(|f| {
let mut borrow = f.try_borrow_mut();
let formatter = match borrow.as_deref_mut() {
Ok(formatter) => formatter,
// If the thread local formatter is borrowed,
// then we likely hit an edge case were we panicked during formatting.
// We allow the logging to proceed with an uncached formatter.
Err(_) => &mut EventFormatter::new(),
};
EVENT_FORMATTER.with_borrow_mut(move |formatter| {
formatter.reset();
formatter.format::<S, F>(
now,
event,
&ctx,
&self.skipped_field_indices,
&self.callsite_ids,
&self.extract_fields,
)?;
self.writer.make_writer().write_all(formatter.buffer())
})
}
formatter.reset();
formatter.format(
now,
event,
&ctx,
&self.skipped_field_indices,
self.extract_fields,
)?;
self.writer.make_writer().write_all(formatter.buffer())
});
// In case logging fails we generate a simpler JSON object.
@@ -287,50 +289,48 @@ where
/// Registers a SpanFields instance as span extension.
fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
let span = ctx.span(id).expect("span must exist");
let fields = SpanFields::default();
fields.record_fields(attrs);
// This could deadlock when there's a panic somewhere in the tracing
// event handling and a read or write guard is still held. This includes
// the OTel subscriber.
let mut exts = span.extensions_mut();
let mut fields = SpanFields::new(self.span_info(span.metadata()));
attrs.record(&mut fields);
exts.insert(fields);
// This is a new span: the extensions should not be locked
// unless some layer spawned a thread to process this span.
// I don't think any layers do that.
span.extensions_mut().insert(fields);
}
fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
let span = ctx.span(id).expect("span must exist");
let ext = span.extensions();
if let Some(data) = ext.get::<SpanFields>() {
data.record_fields(values);
// assumption: `on_record` is rarely called.
// assumption: a span being updated by one thread,
// and formatted by another thread is even rarer.
let mut ext = span.extensions_mut();
if let Some(fields) = ext.get_mut::<SpanFields>() {
values.record(fields);
}
}
/// Called (lazily) whenever a new log call is executed. We quickly check
/// for duplicate field names and record duplicates as skippable. Last one
/// wins.
/// Called (lazily) roughly once per event/span instance. We quickly check
/// for duplicate field names and record duplicates as skippable. Last field wins.
fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
debug_assert!(
metadata.fields().len() <= MAX_TRACING_FIELDS,
"callsite {metadata:?} has too many fields."
);
if !metadata.is_event() {
self.callsite_id(metadata.callsite());
// register the span info.
self.span_info(metadata);
// Must not be never because we wouldn't get trace and span data.
return Interest::always();
}
let mut field_indices = SkippedFieldIndices::default();
let mut seen_fields = HashMap::<&'static str, usize>::new();
let mut seen_fields = HashMap::new();
for field in metadata.fields() {
use std::collections::hash_map::Entry;
match seen_fields.entry(field.name()) {
Entry::Vacant(entry) => {
// field not seen yet
entry.insert(field.index());
}
Entry::Occupied(mut entry) => {
// replace currently stored index
let old_index = entry.insert(field.index());
// ... and append it to list of skippable indices
field_indices.push(old_index);
}
if let Some(old_index) = seen_fields.insert(field.name(), field.index()) {
field_indices.set(old_index);
}
}
@@ -344,110 +344,113 @@ where
}
}
#[derive(Copy, Clone, Debug, Default)]
#[repr(transparent)]
struct CallsiteId(u32);
/// Any span info that is fixed to a particular callsite. Not variable between span instances.
#[derive(Clone)]
struct CallsiteSpanInfo {
/// index of each field to extract. usize::MAX if not found.
extract: Arc<[usize]>,
impl CallsiteId {
#[inline]
fn next() -> Self {
// Start at 1 to reserve 0 for default.
static COUNTER: AtomicU32 = AtomicU32::new(1);
CallsiteId(COUNTER.fetch_add(1, Ordering::Relaxed))
}
/// tracks the fixed "callsite ID" for each span.
/// note: this is not stable between runs.
normalized_name: Arc<str>,
}
impl fmt::Display for CallsiteId {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
impl CallsiteSpanInfo {
fn new(metadata: &'static Metadata<'static>, extract_fields: &[&'static str]) -> Self {
// Start at 1 to reserve 0 for default.
static COUNTER: AtomicU32 = AtomicU32::new(1);
let names: Vec<&'static str> = metadata.fields().iter().map(|f| f.name()).collect();
// get all the indices of span fields we want to focus
let extract = extract_fields
.iter()
// use rposition, since we want last match wins.
.map(|f1| names.iter().rposition(|f2| f1 == f2).unwrap_or(usize::MAX))
.collect();
// normalized_name is unique for each callsite, but it is not
// unified across separate proxy instances.
// todo: can we do better here?
let cid = COUNTER.fetch_add(1, Ordering::Relaxed);
let normalized_name = format!("{}#{cid}", metadata.name()).into();
Self {
extract,
normalized_name,
}
}
}
/// Stores span field values recorded during the spans lifetime.
#[derive(Default)]
struct SpanFields {
// TODO: Switch to custom enum with lasso::Spur for Strings?
fields: papaya::HashMap<&'static str, serde_json::Value>,
values: [serde_json::Value; MAX_TRACING_FIELDS],
/// cached span info so we can avoid extra hashmap lookups in the hot path.
span_info: CallsiteSpanInfo,
}
impl SpanFields {
#[inline]
fn record_fields<R: tracing_subscriber::field::RecordFields>(&self, fields: R) {
fields.record(&mut SpanFieldsRecorder {
fields: self.fields.pin(),
});
fn new(span_info: CallsiteSpanInfo) -> Self {
Self {
span_info,
values: [const { serde_json::Value::Null }; MAX_TRACING_FIELDS],
}
}
}
/// Implements a tracing field visitor to convert and store values.
struct SpanFieldsRecorder<'m, S, G> {
fields: papaya::HashMapRef<'m, &'static str, serde_json::Value, S, G>,
}
impl<S: BuildHasher, G: papaya::Guard> tracing::field::Visit for SpanFieldsRecorder<'_, S, G> {
impl tracing::field::Visit for SpanFields {
#[inline]
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
self.values[field.index()] = serde_json::Value::from(value);
}
#[inline]
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
self.values[field.index()] = serde_json::Value::from(value);
}
#[inline]
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
self.values[field.index()] = serde_json::Value::from(value);
}
#[inline]
fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
if let Ok(value) = i64::try_from(value) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
self.values[field.index()] = serde_json::Value::from(value);
} else {
self.fields
.insert(field.name(), serde_json::Value::from(format!("{value}")));
self.values[field.index()] = serde_json::Value::from(format!("{value}"));
}
}
#[inline]
fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
if let Ok(value) = u64::try_from(value) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
self.values[field.index()] = serde_json::Value::from(value);
} else {
self.fields
.insert(field.name(), serde_json::Value::from(format!("{value}")));
self.values[field.index()] = serde_json::Value::from(format!("{value}"));
}
}
#[inline]
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
self.values[field.index()] = serde_json::Value::from(value);
}
#[inline]
fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
self.values[field.index()] = serde_json::Value::from(value);
}
#[inline]
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
self.values[field.index()] = serde_json::Value::from(value);
}
#[inline]
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
self.fields
.insert(field.name(), serde_json::Value::from(format!("{value:?}")));
self.values[field.index()] = serde_json::Value::from(format!("{value:?}"));
}
#[inline]
@@ -456,38 +459,33 @@ impl<S: BuildHasher, G: papaya::Guard> tracing::field::Visit for SpanFieldsRecor
field: &tracing::field::Field,
value: &(dyn std::error::Error + 'static),
) {
self.fields
.insert(field.name(), serde_json::Value::from(format!("{value}")));
self.values[field.index()] = serde_json::Value::from(format!("{value}"));
}
}
/// List of field indices skipped during logging. Can list duplicate fields or
/// metafields not meant to be logged.
#[derive(Clone, Default)]
#[derive(Copy, Clone, Default)]
struct SkippedFieldIndices {
bits: u64,
// 32-bits is large enough for `MAX_TRACING_FIELDS`
bits: u32,
}
impl SkippedFieldIndices {
#[inline]
fn is_empty(&self) -> bool {
fn is_empty(self) -> bool {
self.bits == 0
}
#[inline]
fn push(&mut self, index: usize) {
self.bits |= 1u64
.checked_shl(index as u32)
.expect("field index too large");
fn set(&mut self, index: usize) {
debug_assert!(index <= 32, "index out of bounds of 32-bit set");
self.bits |= 1 << index;
}
#[inline]
fn contains(&self, index: usize) -> bool {
self.bits
& 1u64
.checked_shl(index as u32)
.expect("field index too large")
!= 0
fn contains(self, index: usize) -> bool {
self.bits & (1 << index) != 0
}
}
@@ -499,7 +497,7 @@ struct EventFormatter {
impl EventFormatter {
#[inline]
fn new() -> Self {
const fn new() -> Self {
EventFormatter {
logline_buffer: Vec::new(),
}
@@ -515,14 +513,13 @@ impl EventFormatter {
self.logline_buffer.clear();
}
fn format<S, const F: usize>(
fn format<S>(
&mut self,
now: DateTime<Utc>,
event: &Event<'_>,
ctx: &Context<'_, S>,
skipped_field_indices: &papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
callsite_ids: &papaya::HashMap<callsite::Identifier, CallsiteId>,
extract_fields: &IndexSet<&'static str>,
skipped_field_indices: &CallsiteMap<SkippedFieldIndices>,
extract_fields: &'static [&'static str],
) -> io::Result<()>
where
S: Subscriber + for<'a> LookupSpan<'a>,
@@ -533,8 +530,11 @@ impl EventFormatter {
let normalized_meta = event.normalized_metadata();
let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata());
let skipped_field_indices = skipped_field_indices.pin();
let skipped_field_indices = skipped_field_indices.get(&meta.callsite());
let skipped_field_indices = skipped_field_indices
.pin()
.get(&meta.callsite())
.copied()
.unwrap_or_default();
let mut serialize = || {
let mut serializer = serde_json::Serializer::new(&mut self.logline_buffer);
@@ -565,9 +565,11 @@ impl EventFormatter {
}
let spans = SerializableSpans {
ctx,
callsite_ids,
extract: ExtractedSpanFields::<'_, F>::new(extract_fields),
// collect all spans from parent to root.
spans: ctx
.event_span(event)
.map_or(vec![], |parent| parent.scope().collect()),
extracted: ExtractedSpanFields::new(extract_fields),
};
serializer.serialize_entry("spans", &spans)?;
@@ -620,9 +622,9 @@ impl EventFormatter {
}
}
if spans.extract.has_values() {
if spans.extracted.has_values() {
// TODO: add fields from event, too?
serializer.serialize_entry("extract", &spans.extract)?;
serializer.serialize_entry("extract", &spans.extracted)?;
}
serializer.end()
@@ -635,15 +637,15 @@ impl EventFormatter {
}
/// Extracts the message field that's mixed will other fields.
struct MessageFieldExtractor<'a, S: serde::ser::SerializeMap> {
struct MessageFieldExtractor<S: serde::ser::SerializeMap> {
serializer: S,
skipped_field_indices: Option<&'a SkippedFieldIndices>,
skipped_field_indices: SkippedFieldIndices,
state: Option<Result<(), S::Error>>,
}
impl<'a, S: serde::ser::SerializeMap> MessageFieldExtractor<'a, S> {
impl<S: serde::ser::SerializeMap> MessageFieldExtractor<S> {
#[inline]
fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
fn new(serializer: S, skipped_field_indices: SkippedFieldIndices) -> Self {
Self {
serializer,
skipped_field_indices,
@@ -665,13 +667,11 @@ impl<'a, S: serde::ser::SerializeMap> MessageFieldExtractor<'a, S> {
fn accept_field(&self, field: &tracing::field::Field) -> bool {
self.state.is_none()
&& field.name() == MESSAGE_FIELD
&& !self
.skipped_field_indices
.is_some_and(|i| i.contains(field.index()))
&& !self.skipped_field_indices.contains(field.index())
}
}
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtractor<'_, S> {
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtractor<S> {
#[inline]
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
if self.accept_field(field) {
@@ -751,14 +751,14 @@ impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtracto
/// can be skipped.
// This is entirely optional and only cosmetic, though maybe helps a
// bit during log parsing in dashboards when there's no field with empty object.
struct FieldsPresent<'a>(pub bool, Option<&'a SkippedFieldIndices>);
struct FieldsPresent(pub bool, SkippedFieldIndices);
// Even though some methods have an overhead (error, bytes) it is assumed the
// compiler won't include this since we ignore the value entirely.
impl tracing::field::Visit for FieldsPresent<'_> {
impl tracing::field::Visit for FieldsPresent {
#[inline]
fn record_debug(&mut self, field: &tracing::field::Field, _: &dyn std::fmt::Debug) {
if !self.1.is_some_and(|i| i.contains(field.index()))
if !self.1.contains(field.index())
&& field.name() != MESSAGE_FIELD
&& !field.name().starts_with("log.")
{
@@ -768,10 +768,7 @@ impl tracing::field::Visit for FieldsPresent<'_> {
}
/// Serializes the fields directly supplied with a log event.
struct SerializableEventFields<'a, 'event>(
&'a tracing::Event<'event>,
Option<&'a SkippedFieldIndices>,
);
struct SerializableEventFields<'a, 'event>(&'a tracing::Event<'event>, SkippedFieldIndices);
impl serde::ser::Serialize for SerializableEventFields<'_, '_> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
@@ -788,15 +785,15 @@ impl serde::ser::Serialize for SerializableEventFields<'_, '_> {
}
/// A tracing field visitor that skips the message field.
struct MessageFieldSkipper<'a, S: serde::ser::SerializeMap> {
struct MessageFieldSkipper<S: serde::ser::SerializeMap> {
serializer: S,
skipped_field_indices: Option<&'a SkippedFieldIndices>,
skipped_field_indices: SkippedFieldIndices,
state: Result<(), S::Error>,
}
impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> {
impl<S: serde::ser::SerializeMap> MessageFieldSkipper<S> {
#[inline]
fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
fn new(serializer: S, skipped_field_indices: SkippedFieldIndices) -> Self {
Self {
serializer,
skipped_field_indices,
@@ -809,9 +806,7 @@ impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> {
self.state.is_ok()
&& field.name() != MESSAGE_FIELD
&& !field.name().starts_with("log.")
&& !self
.skipped_field_indices
.is_some_and(|i| i.contains(field.index()))
&& !self.skipped_field_indices.contains(field.index())
}
#[inline]
@@ -821,7 +816,7 @@ impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> {
}
}
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<'_, S> {
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<S> {
#[inline]
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
if self.accept_field(field) {
@@ -905,18 +900,17 @@ impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<
/// with the span names as keys. To prevent collision we append a numberic value
/// to the name. Also, collects any span fields we're interested in. Last one
/// wins.
struct SerializableSpans<'a, 'ctx, Span, const F: usize>
struct SerializableSpans<'ctx, S>
where
Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
S: for<'lookup> LookupSpan<'lookup>,
{
ctx: &'a Context<'ctx, Span>,
callsite_ids: &'a papaya::HashMap<callsite::Identifier, CallsiteId>,
extract: ExtractedSpanFields<'a, F>,
spans: Vec<SpanRef<'ctx, S>>,
extracted: ExtractedSpanFields,
}
impl<Span, const F: usize> serde::ser::Serialize for SerializableSpans<'_, '_, Span, F>
impl<S> serde::ser::Serialize for SerializableSpans<'_, S>
where
Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
S: for<'lookup> LookupSpan<'lookup>,
{
fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
where
@@ -924,25 +918,22 @@ where
{
let mut serializer = serializer.serialize_map(None)?;
if let Some(leaf_span) = self.ctx.lookup_current() {
for span in leaf_span.scope().from_root() {
// Append a numeric callsite ID to the span name to keep the name unique
// in the JSON object.
let cid = self
.callsite_ids
.pin()
.get(&span.metadata().callsite())
.copied()
.unwrap_or_default();
for span in self.spans.iter().rev() {
let ext = span.extensions();
// Loki turns the # into an underscore during field name concatenation.
serializer.serialize_key(&format_args!("{}#{}", span.metadata().name(), &cid))?;
// all spans should have this extension.
let Some(fields) = ext.get() else { continue };
serializer.serialize_value(&SerializableSpanFields {
span: &span,
extract: &self.extract,
})?;
}
self.extracted.layer_span(fields);
let SpanFields { values, span_info } = fields;
serializer.serialize_entry(
&*span_info.normalized_name,
&SerializableSpanFields {
fields: span.metadata().fields(),
values,
},
)?;
}
serializer.end()
@@ -950,80 +941,77 @@ where
}
/// Serializes the span fields as object.
struct SerializableSpanFields<'a, 'span, Span, const F: usize>
where
Span: for<'lookup> LookupSpan<'lookup>,
{
span: &'a SpanRef<'span, Span>,
extract: &'a ExtractedSpanFields<'a, F>,
struct SerializableSpanFields<'span> {
fields: &'span tracing::field::FieldSet,
values: &'span [serde_json::Value; MAX_TRACING_FIELDS],
}
impl<Span, const F: usize> serde::ser::Serialize for SerializableSpanFields<'_, '_, Span, F>
where
Span: for<'lookup> LookupSpan<'lookup>,
{
impl serde::ser::Serialize for SerializableSpanFields<'_> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
let mut serializer = serializer.serialize_map(None)?;
let ext = self.span.extensions();
if let Some(data) = ext.get::<SpanFields>() {
for (name, value) in &data.fields.pin() {
serializer.serialize_entry(name, value)?;
// TODO: replace clone with reference, if possible.
self.extract.set(name, value.clone());
for (field, value) in std::iter::zip(self.fields, self.values) {
if value.is_null() {
continue;
}
serializer.serialize_entry(field.name(), value)?;
}
serializer.end()
}
}
struct ExtractedSpanFields<'a, const F: usize> {
names: &'a IndexSet<&'static str>,
// TODO: replace TryLock with something local thread and interior mutability.
// serde API doesn't let us use `mut`.
values: TryLock<([Option<serde_json::Value>; F], bool)>,
struct ExtractedSpanFields {
names: &'static [&'static str],
values: RefCell<Vec<serde_json::Value>>,
}
impl<'a, const F: usize> ExtractedSpanFields<'a, F> {
fn new(names: &'a IndexSet<&'static str>) -> Self {
impl ExtractedSpanFields {
fn new(names: &'static [&'static str]) -> Self {
ExtractedSpanFields {
names,
values: TryLock::new((array::from_fn(|_| Option::default()), false)),
values: RefCell::new(vec![serde_json::Value::Null; names.len()]),
}
}
#[inline]
fn set(&self, name: &'static str, value: serde_json::Value) {
if let Some((index, _)) = self.names.get_full(name) {
let mut fields = self.values.try_lock().expect("thread-local use");
fields.0[index] = Some(value);
fields.1 = true;
fn layer_span(&self, fields: &SpanFields) {
let mut v = self.values.borrow_mut();
let SpanFields { values, span_info } = fields;
// extract the fields
for (i, &j) in span_info.extract.iter().enumerate() {
let Some(value) = values.get(j) else { continue };
if !value.is_null() {
// TODO: replace clone with reference, if possible.
v[i] = value.clone();
}
}
}
#[inline]
fn has_values(&self) -> bool {
self.values.try_lock().expect("thread-local use").1
self.values.borrow().iter().any(|v| !v.is_null())
}
}
impl<const F: usize> serde::ser::Serialize for ExtractedSpanFields<'_, F> {
impl serde::ser::Serialize for ExtractedSpanFields {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
let mut serializer = serializer.serialize_map(None)?;
let values = self.values.try_lock().expect("thread-local use");
for (i, value) in values.0.iter().enumerate() {
if let Some(value) = value {
let key = self.names[i];
serializer.serialize_entry(key, value)?;
let values = self.values.borrow();
for (key, value) in std::iter::zip(self.names, &*values) {
if value.is_null() {
continue;
}
serializer.serialize_entry(key, value)?;
}
serializer.end()
@@ -1032,7 +1020,6 @@ impl<const F: usize> serde::ser::Serialize for ExtractedSpanFields<'_, F> {
#[cfg(test)]
mod tests {
use std::marker::PhantomData;
use std::sync::{Arc, Mutex, MutexGuard};
use assert_json_diff::assert_json_eq;
@@ -1081,10 +1068,9 @@ mod tests {
let log_layer = JsonLoggingLayer {
clock: clock.clone(),
skipped_field_indices: papaya::HashMap::default(),
callsite_ids: papaya::HashMap::default(),
span_info: papaya::HashMap::default(),
writer: buffer.clone(),
extract_fields: IndexSet::from_iter(["x"]),
_marker: PhantomData::<[&'static str; 1]>,
extract_fields: &["x"],
};
let registry = tracing_subscriber::Registry::default().with(log_layer);

View File

@@ -383,9 +383,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
let session = cancellation_handler_clone.get_key();
session
.write_cancel_key(node.cancel_closure.clone())
.await?;
session.write_cancel_key(node.cancel_closure.clone())?;
prepare_client_connection(&node, *session.key(), &mut stream).await?;

View File

@@ -94,7 +94,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> ProxyPassthrough<S> {
tracing::warn!(session_id = ?self.session_id, ?err, "could not cancel the query in the database");
}
drop(self.cancel.remove_cancel_key().await); // we don't need a result. If the queue is full, we just log the error
drop(self.cancel.remove_cancel_key()); // we don't need a result. If the queue is full, we just log the error
res
}

View File

@@ -48,7 +48,7 @@ impl ShouldRetryWakeCompute for postgres_client::error::DbError {
use postgres_client::error::SqlState;
// Here are errors that happens after the user successfully authenticated to the database.
// TODO: there are pgbouncer errors that should be retried, but they are not listed here.
!matches!(
let non_retriable_pg_errors = matches!(
self.code(),
&SqlState::TOO_MANY_CONNECTIONS
| &SqlState::OUT_OF_MEMORY
@@ -56,8 +56,20 @@ impl ShouldRetryWakeCompute for postgres_client::error::DbError {
| &SqlState::T_R_SERIALIZATION_FAILURE
| &SqlState::INVALID_CATALOG_NAME
| &SqlState::INVALID_SCHEMA_NAME
| &SqlState::INVALID_PARAMETER_VALUE
)
| &SqlState::INVALID_PARAMETER_VALUE,
);
if non_retriable_pg_errors {
return false;
}
// PGBouncer errors that should not trigger a wake_compute retry.
if self.code() == &SqlState::PROTOCOL_VIOLATION {
// Source for the error message:
// https://github.com/pgbouncer/pgbouncer/blob/f15997fe3effe3a94ba8bcc1ea562e6117d1a131/src/client.c#L1070
return !self
.message()
.contains("no more connections allowed (max_client_conn)");
}
true
}
}
@@ -110,3 +122,55 @@ pub(crate) fn retry_after(num_retries: u32, config: RetryConfig) -> time::Durati
.base_delay
.mul_f64(config.backoff_factor.powi((num_retries as i32) - 1))
}
#[cfg(test)]
mod tests {
use super::ShouldRetryWakeCompute;
use postgres_client::error::{DbError, SqlState};
#[test]
fn should_retry_wake_compute_for_db_error() {
// These SQLStates should NOT trigger a wake_compute retry.
let non_retry_states = [
SqlState::TOO_MANY_CONNECTIONS,
SqlState::OUT_OF_MEMORY,
SqlState::SYNTAX_ERROR,
SqlState::T_R_SERIALIZATION_FAILURE,
SqlState::INVALID_CATALOG_NAME,
SqlState::INVALID_SCHEMA_NAME,
SqlState::INVALID_PARAMETER_VALUE,
];
for state in non_retry_states {
let err = DbError::new_test_error(state.clone(), "oops".to_string());
assert!(
!err.should_retry_wake_compute(),
"State {state:?} unexpectedly retried"
);
}
// Errors coming from pgbouncer should not trigger a wake_compute retry
let non_retry_pgbouncer_errors = ["no more connections allowed (max_client_conn)"];
for error in non_retry_pgbouncer_errors {
let err = DbError::new_test_error(SqlState::PROTOCOL_VIOLATION, error.to_string());
assert!(
!err.should_retry_wake_compute(),
"PGBouncer error {error:?} unexpectedly retried"
);
}
// These SQLStates should trigger a wake_compute retry.
let retry_states = [
SqlState::CONNECTION_FAILURE,
SqlState::CONNECTION_EXCEPTION,
SqlState::CONNECTION_DOES_NOT_EXIST,
SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION,
];
for state in retry_states {
let err = DbError::new_test_error(state.clone(), "oops".to_string());
assert!(
err.should_retry_wake_compute(),
"State {state:?} unexpectedly skipped retry"
);
}
}
}

View File

@@ -15,6 +15,7 @@ use rstest::rstest;
use rustls::crypto::ring;
use rustls::pki_types;
use tokio::io::DuplexStream;
use tracing_test::traced_test;
use super::connect_compute::ConnectMechanism;
use super::retry::CouldRetry;
@@ -381,8 +382,14 @@ enum ConnectAction {
WakeFail,
WakeRetry,
Connect,
// connect_once -> Err, could_retry = true, should_retry_wake_compute = true
Retry,
// connect_once -> Err, could_retry = true, should_retry_wake_compute = false
RetryNoWake,
// connect_once -> Err, could_retry = false, should_retry_wake_compute = true
Fail,
// connect_once -> Err, could_retry = false, should_retry_wake_compute = false
FailNoWake,
}
#[derive(Clone)]
@@ -424,6 +431,7 @@ struct TestConnection;
#[derive(Debug)]
struct TestConnectError {
retryable: bool,
wakeable: bool,
kind: crate::error::ErrorKind,
}
@@ -448,7 +456,7 @@ impl CouldRetry for TestConnectError {
}
impl ShouldRetryWakeCompute for TestConnectError {
fn should_retry_wake_compute(&self) -> bool {
true
self.wakeable
}
}
@@ -471,10 +479,22 @@ impl ConnectMechanism for TestConnectMechanism {
ConnectAction::Connect => Ok(TestConnection),
ConnectAction::Retry => Err(TestConnectError {
retryable: true,
wakeable: true,
kind: ErrorKind::Compute,
}),
ConnectAction::RetryNoWake => Err(TestConnectError {
retryable: true,
wakeable: false,
kind: ErrorKind::Compute,
}),
ConnectAction::Fail => Err(TestConnectError {
retryable: false,
wakeable: true,
kind: ErrorKind::Compute,
}),
ConnectAction::FailNoWake => Err(TestConnectError {
retryable: false,
wakeable: false,
kind: ErrorKind::Compute,
}),
x => panic!("expecting action {x:?}, connect is called instead"),
@@ -709,3 +729,92 @@ async fn wake_non_retry() {
.unwrap_err();
mechanism.verify();
}
#[tokio::test]
#[traced_test]
async fn fail_but_wake_invalidates_cache() {
let ctx = RequestContext::test();
let mech = TestConnectMechanism::new(vec![
ConnectAction::Wake,
ConnectAction::Fail,
ConnectAction::Wake,
ConnectAction::Connect,
]);
let user = helper_create_connect_info(&mech);
let cfg = config();
connect_to_compute(&ctx, &mech, &user, cfg.retry, &cfg)
.await
.unwrap();
assert!(logs_contain(
"invalidating stalled compute node info cache entry"
));
}
#[tokio::test]
#[traced_test]
async fn fail_no_wake_skips_cache_invalidation() {
let ctx = RequestContext::test();
let mech = TestConnectMechanism::new(vec![
ConnectAction::Wake,
ConnectAction::FailNoWake,
ConnectAction::Connect,
]);
let user = helper_create_connect_info(&mech);
let cfg = config();
connect_to_compute(&ctx, &mech, &user, cfg.retry, &cfg)
.await
.unwrap();
assert!(!logs_contain(
"invalidating stalled compute node info cache entry"
));
}
#[tokio::test]
#[traced_test]
async fn retry_but_wake_invalidates_cache() {
let _ = env_logger::try_init();
use ConnectAction::*;
let ctx = RequestContext::test();
// Wake → Retry (retryable + wakeable) → Wake → Connect
let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism);
let cfg = config();
connect_to_compute(&ctx, &mechanism, &user_info, cfg.retry, &cfg)
.await
.unwrap();
mechanism.verify();
// Because Retry has wakeable=true, we should see invalidate_cache
assert!(logs_contain(
"invalidating stalled compute node info cache entry"
));
}
#[tokio::test]
#[traced_test]
async fn retry_no_wake_skips_invalidation() {
let _ = env_logger::try_init();
use ConnectAction::*;
let ctx = RequestContext::test();
// Wake → RetryNoWake (retryable + NOT wakeable)
let mechanism = TestConnectMechanism::new(vec![Wake, RetryNoWake]);
let user_info = helper_create_connect_info(&mechanism);
let cfg = config();
connect_to_compute(&ctx, &mechanism, &user_info, cfg.retry, &cfg)
.await
.unwrap_err();
mechanism.verify();
// Because RetryNoWake has wakeable=false, we must NOT see invalidate_cache
assert!(!logs_contain(
"invalidating stalled compute node info cache entry"
));
}

View File

@@ -13,22 +13,19 @@ pub(crate) struct Pbkdf2 {
// inspired from <https://github.com/neondatabase/rust-postgres/blob/20031d7a9ee1addeae6e0968e3899ae6bf01cee2/postgres-protocol/src/authentication/sasl.rs#L36-L61>
impl Pbkdf2 {
pub(crate) fn start(str: &[u8], salt: &[u8], iterations: u32) -> Self {
let hmac =
// key the HMAC and derive the first block in-place
let mut hmac =
Hmac::<Sha256>::new_from_slice(str).expect("HMAC is able to accept all key sizes");
let prev = hmac
.clone()
.chain_update(salt)
.chain_update(1u32.to_be_bytes())
.finalize()
.into_bytes();
hmac.update(salt);
hmac.update(&1u32.to_be_bytes());
let init_block = hmac.finalize_reset().into_bytes();
Self {
hmac,
// one consumed for the hash above
// one iteration spent above
iterations: iterations - 1,
hi: prev,
prev,
hi: init_block,
prev: init_block,
}
}
@@ -44,14 +41,17 @@ impl Pbkdf2 {
iterations,
} = self;
// only do 4096 iterations per turn before sharing the thread for fairness
// only do up to 4096 iterations per turn for fairness
let n = (*iterations).clamp(0, 4096);
for _ in 0..n {
*prev = hmac.clone().chain_update(*prev).finalize().into_bytes();
hmac.update(prev);
let block = hmac.finalize_reset().into_bytes();
for (hi, prev) in hi.iter_mut().zip(*prev) {
*hi ^= prev;
for (hi_byte, &b) in hi.iter_mut().zip(block.iter()) {
*hi_byte ^= b;
}
*prev = block;
}
*iterations -= n;

View File

@@ -43,6 +43,12 @@ impl std::ops::Deref for ApiUrl {
}
}
impl std::ops::DerefMut for ApiUrl {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl std::fmt::Display for ApiUrl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)

View File

@@ -184,6 +184,7 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
"pageserver_evictions_with_low_residence_duration_total",
"pageserver_aux_file_estimated_size",
"pageserver_valid_lsn_lease_count",
"pageserver_tenant_offloaded_timelines",
counter("pageserver_tenant_throttling_count_accounted_start"),
counter("pageserver_tenant_throttling_count_accounted_finish"),
counter("pageserver_tenant_throttling_wait_usecs_sum"),

View File

@@ -103,7 +103,7 @@ class AbstractNeonCli:
else:
stdout = ""
log.warn(f"CLI timeout: stderr={stderr}, stdout={stdout}")
log.warning(f"CLI timeout: stderr={stderr}, stdout={stdout}")
raise
indent = " "

View File

@@ -187,6 +187,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
"args": {"format": "bincode", "compression": {"zstd": {"level": 1}}},
},
"rel_size_v2_enabled": True,
"relsize_snapshot_cache_capacity": 10000,
"gc_compaction_enabled": True,
"gc_compaction_verification": False,
"gc_compaction_initial_threshold_kb": 1024000,

View File

@@ -0,0 +1,77 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from fixtures.utils import wait_until
if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnvBuilder
def test_basebackup_cache(neon_env_builder: NeonEnvBuilder):
"""
Simple test for basebackup cache.
1. Check that we always hit the cache after compute restart.
2. Check that we eventually delete old basebackup files, but not the latest one.
3. Check that we delete basebackup file for timeline with active compute.
"""
neon_env_builder.pageserver_config_override = """
tenant_config = { basebackup_cache_enabled = true }
basebackup_cache_config = { cleanup_period = '1s' }
"""
env = neon_env_builder.init_start()
ep = env.endpoints.create("main")
ps = env.pageserver
ps_http = ps.http_client()
# 1. Check that we always hit the cache after compute restart.
for i in range(3):
ep.start()
ep.stop()
def check_metrics(i=i):
metrics = ps_http.get_metrics()
# Never miss.
# The first time compute_ctl sends `get_basebackup` with lsn=None, we do not cache such requests.
# All other requests should be a hit
assert (
metrics.query_one(
"pageserver_basebackup_cache_read_total", {"result": "miss"}
).value
== 0
)
# All but the first requests are hits.
assert (
metrics.query_one("pageserver_basebackup_cache_read_total", {"result": "hit"}).value
== i
)
# Every compute shut down should trigger a prepare reuest.
assert (
metrics.query_one(
"pageserver_basebackup_cache_prepare_total", {"result": "ok"}
).value
== i + 1
)
wait_until(check_metrics)
# 2. Check that we eventually delete old basebackup files, but not the latest one.
def check_bb_file_count():
bb_files = list(ps.workdir.joinpath("basebackup_cache").iterdir())
# tmp dir + 1 basebackup file.
assert len(bb_files) == 2
wait_until(check_bb_file_count)
# 3. Check that we delete basebackup file for timeline with active compute.
ep.start()
ep.safe_psql("create table t1 as select generate_series(1, 10) as n")
def check_bb_dir_empty():
bb_files = list(ps.workdir.joinpath("basebackup_cache").iterdir())
# only tmp dir.
assert len(bb_files) == 1
wait_until(check_bb_dir_empty)

View File

@@ -19,6 +19,16 @@ TEST_ROLE_NAMES = [
{"name": "role$"},
{"name": "role$$"},
{"name": "role$x$"},
{"name": "x"},
{"name": "xx"},
{"name": "$x"},
{"name": "x$"},
{"name": "$x$"},
{"name": "xx$"},
{"name": "$xx"},
{"name": "$xx$"},
# 63 bytes is the limit for role/DB names in Postgres
{"name": "x" * 63},
]
TEST_DB_NAMES = [
@@ -74,6 +84,43 @@ TEST_DB_NAMES = [
"name": "db name$x$",
"owner": "role$x$",
},
{
"name": "x",
"owner": "x",
},
{
"name": "xx",
"owner": "xx",
},
{
"name": "$x",
"owner": "$x",
},
{
"name": "x$",
"owner": "x$",
},
{
"name": "$x$",
"owner": "$x$",
},
{
"name": "xx$",
"owner": "xx$",
},
{
"name": "$xx",
"owner": "$xx",
},
{
"name": "$xx$",
"owner": "$xx$",
},
# 63 bytes is the limit for role/DB names in Postgres
{
"name": "x" * 63,
"owner": "x" * 63,
},
]
@@ -146,6 +193,10 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
"""
Test that compute_ctl can create and work with databases and roles
with special characters (whitespaces, %, tabs, etc.) in the name.
Also use `drop_subscriptions_before_start: true`. We do not actually
have any subscriptions in this test, so it should be no-op, but it
i) simulates the case when we create a second dev branch together with
a new project creation, and ii) just generally stresses more code paths.
"""
env = neon_simple_env
@@ -159,6 +210,7 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
**{
"spec": {
"skip_pg_catalog_updates": False,
"drop_subscriptions_before_start": True,
"cluster": {
"roles": TEST_ROLE_NAMES,
"databases": TEST_DB_NAMES,
@@ -202,6 +254,7 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
**{
"spec": {
"skip_pg_catalog_updates": False,
"drop_subscriptions_before_start": True,
"cluster": {
"roles": [],
"databases": [],

View File

@@ -508,6 +508,9 @@ PER_METRIC_VERIFIERS = {
"remote_storage_size": CannotVerifyAnything,
"written_size": WrittenDataVerifier,
"written_data_bytes_delta": WrittenDataDeltaVerifier,
"written_size_since_parent": WrittenDataVerifier, # same as written_size on root
"pitr_cutoff": CannotVerifyAnything,
"pitr_history_size_since_parent": WrittenDataVerifier, # same as written_size on root w/o GC
"timeline_logical_size": CannotVerifyAnything,
"synthetic_storage_size": SyntheticSizeVerifier,
}

View File

@@ -510,7 +510,7 @@ def list_elegible_layers(
except KeyError:
# Unexpected: tests should call this when pageservers are in a quiet state such that the layer map
# matches what's on disk.
log.warn(f"Lookup {layer_file_name} from {list(visible_map.keys())}")
log.warning(f"Lookup {layer_file_name} from {list(visible_map.keys())}")
raise
return list(c for c in candidates if is_visible(c))
@@ -636,7 +636,7 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
except:
# On assertion failures, log some details to help with debugging
heatmap = env.pageserver_remote_storage.heatmap_content(tenant_id)
log.warn(f"heatmap contents: {json.dumps(heatmap, indent=2)}")
log.warning(f"heatmap contents: {json.dumps(heatmap, indent=2)}")
raise
# Scrub the remote storage

View File

@@ -27,8 +27,9 @@ from contextlib import closing
import psycopg2
import pytest
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, wait_for_last_flush_lsn, wait_replica_caughtup
from fixtures.neon_fixtures import NeonEnv, PgBin, wait_for_last_flush_lsn, wait_replica_caughtup
from fixtures.pg_version import PgVersion
from fixtures.utils import query_scalar, skip_on_postgres, wait_until
@@ -695,3 +696,110 @@ def test_replica_start_with_too_many_unused_xids(neon_simple_env: NeonEnv):
with secondary.cursor() as secondary_cur:
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (n_restarts,)
def test_ephemeral_endpoints_vacuum(neon_simple_env: NeonEnv, pg_bin: PgBin):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
sql = """
CREATE TABLE CHAR_TBL(f1 char(4));
CREATE TABLE FLOAT8_TBL(f1 float8);
CREATE TABLE INT2_TBL(f1 int2);
CREATE TABLE INT4_TBL(f1 int4);
CREATE TABLE INT8_TBL(q1 int8, q2 int8);
CREATE TABLE POINT_TBL(f1 point);
CREATE TABLE TEXT_TBL (f1 text);
CREATE TABLE VARCHAR_TBL(f1 varchar(4));
CREATE TABLE onek (unique1 int4);
CREATE TABLE onek2 AS SELECT * FROM onek;
CREATE TABLE tenk1 (unique1 int4);
CREATE TABLE tenk2 AS SELECT * FROM tenk1;
CREATE TABLE person (name text, age int4,location point);
CREATE TABLE emp (salary int4, manager name) INHERITS (person);
CREATE TABLE student (gpa float8) INHERITS (person);
CREATE TABLE stud_emp ( percent int4) INHERITS (emp, student);
CREATE TABLE road (name text,thepath path);
CREATE TABLE ihighway () INHERITS (road);
CREATE TABLE shighway(surface text) INHERITS (road);
CREATE TABLE BOOLTBL3 (d text, b bool, o int);
CREATE TABLE booltbl4(isfalse bool, istrue bool, isnul bool);
DROP TABLE BOOLTBL3;
DROP TABLE BOOLTBL4;
CREATE TABLE ceil_floor_round (a numeric);
DROP TABLE ceil_floor_round;
CREATE TABLE width_bucket_test (operand_num numeric, operand_f8 float8);
DROP TABLE width_bucket_test;
CREATE TABLE num_input_test (n1 numeric);
CREATE TABLE num_variance (a numeric);
INSERT INTO num_variance VALUES (0);
CREATE TABLE snapshot_test (nr integer, snap txid_snapshot);
CREATE TABLE guid1(guid_field UUID, text_field TEXT DEFAULT(now()));
CREATE TABLE guid2(guid_field UUID, text_field TEXT DEFAULT(now()));
CREATE INDEX guid1_btree ON guid1 USING BTREE (guid_field);
CREATE INDEX guid1_hash ON guid1 USING HASH (guid_field);
TRUNCATE guid1;
DROP TABLE guid1;
DROP TABLE guid2 CASCADE;
CREATE TABLE numrange_test (nr NUMRANGE);
CREATE INDEX numrange_test_btree on numrange_test(nr);
CREATE TABLE numrange_test2(nr numrange);
CREATE INDEX numrange_test2_hash_idx on numrange_test2 using hash (nr);
INSERT INTO numrange_test2 VALUES('[, 5)');
CREATE TABLE textrange_test (tr text);
CREATE INDEX textrange_test_btree on textrange_test(tr);
CREATE TABLE test_range_gist(ir int4range);
CREATE INDEX test_range_gist_idx on test_range_gist using gist (ir);
DROP INDEX test_range_gist_idx;
CREATE INDEX test_range_gist_idx on test_range_gist using gist (ir);
CREATE TABLE test_range_spgist(ir int4range);
CREATE INDEX test_range_spgist_idx on test_range_spgist using spgist (ir);
DROP INDEX test_range_spgist_idx;
CREATE INDEX test_range_spgist_idx on test_range_spgist using spgist (ir);
CREATE TABLE test_range_elem(i int4);
CREATE INDEX test_range_elem_idx on test_range_elem (i);
CREATE INDEX ON test_range_elem using spgist(int4range(i,i+10));
DROP TABLE test_range_elem;
CREATE TABLE test_range_excl(room int4range, speaker int4range, during tsrange, exclude using gist (room with =, during with &&), exclude using gist (speaker with =, during with &&));
CREATE TABLE f_test(f text, i int);
CREATE TABLE i8r_array (f1 int, f2 text);
CREATE TYPE arrayrange as range (subtype=int4[]);
CREATE TYPE two_ints as (a int, b int);
DROP TYPE two_ints cascade;
CREATE TABLE text_support_test (t text);
CREATE TABLE TEMP_FLOAT (f1 FLOAT8);
CREATE TABLE TEMP_INT4 (f1 INT4);
CREATE TABLE TEMP_INT2 (f1 INT2);
CREATE TABLE TEMP_GROUP (f1 INT4, f2 INT4, f3 FLOAT8);
CREATE TABLE POLYGON_TBL(f1 polygon);
CREATE TABLE quad_poly_tbl (id int, p polygon);
INSERT INTO quad_poly_tbl SELECT (x - 1) * 100 + y, polygon(circle(point(x * 10, y * 10), 1 + (x + y) % 10)) FROM generate_series(1, 200) x, generate_series(1, 100) y;
CREATE TABLE quad_poly_tbl_ord_seq2 AS SELECT 1 FROM quad_poly_tbl;
CREATE TABLE quad_poly_tbl_ord_idx2 AS SELECT 1 FROM quad_poly_tbl;
"""
with endpoint.cursor() as cur:
lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
env.endpoints.create_start(branch_name="main", lsn=lsn)
log.info(f"lsn: {lsn}")
for line in sql.split("\n"):
if len(line.strip()) == 0 or line.startswith("--"):
continue
cur.execute(line)
lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
env.endpoints.create_start(branch_name="main", lsn=lsn)
log.info(f"lsn: {lsn}")
cur.execute("VACUUM FULL pg_class;")
for ep in env.endpoints.endpoints:
log.info(f"{ep.endpoint_id} / {ep.pg_port}")
pg_dump_command = ["pg_dumpall", "-f", f"/tmp/dump-{ep.endpoint_id}.sql"]
env_vars = {
"PGPORT": str(ep.pg_port),
"PGUSER": endpoint.default_options["user"],
"PGHOST": endpoint.default_options["host"],
}
pg_bin.run_capture(pg_dump_command, env=env_vars)

View File

@@ -193,6 +193,11 @@ def test_timeline_offloading(neon_env_builder: NeonEnvBuilder, manual_offload: b
"test_ancestor_branch_archive_branch1", tenant_id, "test_ancestor_branch_archive_parent"
)
offloaded_count = ps_http.get_metric_value(
"pageserver_tenant_offloaded_timelines", {"tenant_id": f"{tenant_id}"}
)
assert offloaded_count == 0
ps_http.timeline_archival_config(
tenant_id,
leaf_timeline_id,
@@ -244,6 +249,11 @@ def test_timeline_offloading(neon_env_builder: NeonEnvBuilder, manual_offload: b
wait_until(leaf_offloaded)
wait_until(parent_offloaded)
offloaded_count = ps_http.get_metric_value(
"pageserver_tenant_offloaded_timelines", {"tenant_id": f"{tenant_id}"}
)
assert offloaded_count == 2
# Offloaded child timelines should still prevent deletion
with pytest.raises(
PageserverApiException,

View File

@@ -1,18 +1,18 @@
{
"v17": [
"17.5",
"e5374b72997b0afc8374137674e873f7a558120a"
"8be779fd3ab9e87206da96a7e4842ef1abf04f44"
],
"v16": [
"16.9",
"15710a76b7d07912110fcbbaf0c8ad6d7e5a9fbc"
"0bf96bd6d70301a0b43b0b3457bb3cf8fb43c198"
],
"v15": [
"15.13",
"daa81cffcf063c54b29a9aabdb6604625f675ad0"
"de7640f55da07512834d5cc40c4b3fb376b5f04f"
],
"v14": [
"14.18",
"4cca6f8083483dda9e12eae292cf788d45bd561f"
"55c0d45abe6467c02084c2192bca117eda6ce1e7"
]
}

View File

@@ -107,6 +107,7 @@ tower = { version = "0.4", default-features = false, features = ["balance", "buf
tracing = { version = "0.1", features = ["log"] }
tracing-core = { version = "0.1" }
tracing-log = { version = "0.2" }
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
url = { version = "2", features = ["serde"] }
uuid = { version = "1", features = ["serde", "v4", "v7"] }
zeroize = { version = "1", features = ["derive", "serde"] }