Compare commits

..

16 Commits

Author SHA1 Message Date
John Spray
75638230b2 comments 2025-04-15 09:31:09 +01:00
John Spray
1ad48b2eaf docs/rfcs: add storage encryption key RFC 2025-04-14 13:09:00 +01:00
Vlad Lazar
a338984dc7 pageserver: support keys at different LSNs in one get page batch (#11494)
## Problem

Get page batching stops when we encounter requests at different LSNs.
We are leaving batching factor on the table.

## Summary of changes

The goal is to support keys with different LSNs in a single batch and
still serve them with a single vectored get.
Important restriction: the same key at different LSNs is not supported
in one batch. Returning different key
versions is a much more intrusive change.

Firstly, the read path is changed to support "scattered" queries. This
is a conceptually simple step from
https://github.com/neondatabase/neon/pull/11463. Instead of initializing
the fringe for one keyspace,
we do it for multiple at different LSNs and let the logic already
present into the fringe handle selection.

Secondly, page service code is updated to support batching at different
LSNs. Eeach request parsed from the wire determines its effective
request LSN and keeps it in mem for the batcher toinspect. The batcher
allows keys at
different LSNs in one batch as long one key is not requested at
different LSNs.

I'd suggest doing the first pass commit by commit to get a feel for the
changes.

## Results

I used the batching test from [Christian's
PR](https://github.com/neondatabase/neon/pull/11391) which increases the
change of batch breaks. Looking at the logs I think the new code is at
the max batching factor for the workload (we
only break batches due to them being oversized or because the executor
is idle).

```
Main:
Reasons for stopping batching: {'LSN changed': 22843, 'of batch size': 33417}
test_throughput[release-pg16-50-pipelining_config0-30-100-128-batchable {'max_batch_size': 32, 'execution': 'concurrent-futures', 'mode': 'pipelined'}].perfmetric.batching_factor: 14.6662

My branch:
Reasons for stopping batching: {'of batch size': 37024}
test_throughput[release-pg16-50-pipelining_config0-30-100-128-batchable {'max_batch_size': 32, 'execution': 'concurrent-futures', 'mode': 'pipelined'}].perfmetric.batching_factor: 19.8333
```

Related: https://github.com/neondatabase/neon/issues/10765
2025-04-14 09:05:29 +00:00
Konstantin Knizhnik
8936a7abd8 Increase limit for worker processes for isolation test (#11504)
## Problem

See https://github.com/neondatabase/neon/issues/10652

Neon extension launches 2 BGW which reduce limit for parallel workers
and so affecting parallel_deadlock isolation test.

## Summary of changes

Increase `max_worker_processes` from default 8 to 16 for isolation test.

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-04-12 18:09:12 +00:00
Conrad Ludgate
946e971df8 feat(proxy): add batching to cancellation queue processing (#10607)
Add batching to the redis queue, which allows us to clear it out quicker
should it slow down temporarily.
2025-04-12 09:16:22 +00:00
Dmitrii Kovalkov
d109bf8c1d neon_local: use ed25519 to gen local ssl certs (#11542)
## Problem
neon_local uses rsa to generate local SSL certs, which is slow
Follow-up on:
- https://github.com/neondatabase/neon/pull/11025#discussion_r1989453785
- https://github.com/neondatabase/neon/pull/11538

## Summary of changes
- Change key from rsa to ed25519 in neon_local
2025-04-11 17:49:15 +00:00
Alex Chi Z.
4f7b2cdd4f feat(pageserver): gc-compaction result verification (#11515)
## Problem

Part of #9114 

There was a debug-mode verification mode that verifies at every
retain_lsn. However, the code was tangled within the actual history
generation itself and it's hard to reason about correctness. This patch
adds a separate post-verification of the gc-compaction result that redos
logs at every retain_lsn and every record above the GC horizon. This
ensures that all key history we produce with gc-compaction is readable,
and if there're read errors after gc-compaction, it can only be
read-path errors instead of gc-compaction bugs.

## Summary of changes

* Add gc_compaction_verification flag, default to true.
* Implement a post-verification process.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-11 15:50:29 +00:00
Alex Chi Z.
66f56ddaec fix(pageserver): allow shutdown errors for gc compaction tests (#11530)
## Problem

`test_pageserver_compaction_preempt` is flaky.

## Summary of changes

Allow the shutdown errors.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-11 15:20:51 +00:00
Erik Grinaker
fd16caa7d0 pageserver: yield for L0 during ancestor compaction (#11536)
## Problem

Shard ancestor compaction does not yield for L0 compaction, potentially
starving it.

close https://github.com/neondatabase/neon/issues/11125

## Summary of changes

* Yield for L0 during shard ancestor compaction.
* Return `CompactionOutcome::Pending` when limited by `rewrite_max`, for
eager rescheduling.
2025-04-11 15:09:28 +00:00
Tristan Partin
ff5a527167 Consolidate compute_ctl configuration structures (#11514)
Previously, the structure of the spec file was just the compute spec.
However, the response from the control plane get spec request included
the compute spec and the compute_ctl config. This divergence was
hindering other work such as adding regression tests for compute_ctl
HTTP authorization.

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-04-11 15:06:29 +00:00
Arpad Müller
c66444ea15 Add timeline_import http endpoint (#11484)
The added `timleine_import` endpoint allows us to migrate safekeeper
timelines from control plane managed to storcon managed.
 
Part of #9011
2025-04-11 14:10:27 +00:00
Arpad Müller
88f01c1ca1 Introduce WalIngestError (#11506)
Introduces a `WalIngestError` struct together with a
`WalIngestErrorKind` enum, to be used for walingest related failures and
errors.

* the enum captures backtraces, so we don't regress in comparison to
`anyhow::Error`s (backtraces might be a bit shorter if we use one of the
`anyhow::Error` wrappers)
* it explicitly lists most/all of the potential cases that can occur.

I've originally been inspired to do this in #11496, but it's a
longer-term TODO.
2025-04-11 14:08:46 +00:00
Erik Grinaker
a6937a3281 pageserver: improve shard ancestor compaction logging (#11535)
## Problem

Shard ancestor compaction always logs "starting shard ancestor
compaction", even if there is no work to do. This is very spammy (every
20 seconds for every shard). It also has limited progress logging.

## Summary of changes

* Only log "starting shard ancestor compaction" when there's work to do.
* Include details about the amount of work.
* Log progress messages for each layer, and when waiting for uploads.
* Log when compaction is completed, with elapsed duration and whether
there is more work for a later iteration.
2025-04-11 12:14:08 +00:00
Erik Grinaker
3c8565a194 test_runner: propagate config via attach_hook for test fix (#11529)
## Problem

The `pagebench` benchmarks set up an initial dataset by creating a
template tenant, copying the remote storage to a bunch of new tenants,
and attaching them to Pageservers.

In #11420, we found that
`test_pageserver_characterize_throughput_with_n_tenants` had degraded
performance because it set a custom tenant config in Pageservers that
was then replaced with the default tenant config by the storage
controller.

The initial fix was to register the tenants directly in the storage
controller, but this created the tenants with generation 1. This broke
`test_basebackup_with_high_slru_count`, where the template tenant was at
generation 2, leading to all layer files at generation 2 being ignored.

Resolves #11485.
Touches #11381.

## Summary of changes

This patch addresses both test issues by modifying `attach_hook` to also
take a custom tenant config. This allows attaching tenants to
Pageservers from pre-existing remote storage, specifying both the
generation and tenant config when registering them in the storage
controller.
2025-04-11 11:31:12 +00:00
Christian Schwarz
979fa0682b tests: update batching perf test workload to include scattered LSNs (#11391)
The batching perf test workload is currently read-only sequential scans.
However, realistic workloads have concurrent writes (to other pages)
going on.

This PR simulates concurrent writes to other pages by emitting logical
replication messages.

These degrade the achieved batching factor, for the reason see
- https://github.com/neondatabase/neon/issues/10765

PR 
- https://github.com/neondatabase/neon/pull/11494

will fix this problem and get batching factor back up.

---------

Co-authored-by: Vlad Lazar <vlad@neon.tech>
2025-04-11 09:55:49 +00:00
Christian Schwarz
8884865bca tests: make test_pageserver_getpage_throttle less flaky (#11482)
# Refs

- fixes https://github.com/neondatabase/neon/issues/11395

# Problem

Since 2025-03-10, we have observed increased flakiness of
`test_pageserver_getpage_throttle`.

The test is timing-dependent by nature, and was hitting the

```
 assert duration_secs >= 10 * actual_smgr_query_seconds, (
        "smgr metrics should not include throttle wait time"
    )
```

quite frequently.

# Analysis

These failures are not reproducible.

In this PR's history is a commit that reran the test 100 times without
requiring a single retry.

In https://github.com/neondatabase/neon/issues/11395 there is a link to
a query to the test results database.
It shows that the flakiness was not constant, but rather episodic:
2025-03-{10,11,12,13} 2025-03-{19,20,21} 2025-03-31 and 2025-04-01.

To me, this suggests variability in available CPU.

# Solution

The point of the offending assertion is to ensure that most of the
request latency is spent on throttling, because testing of the
throttling mechanism is the point of the test.
The `10` magic number means at most 10% of mean latency may be spent on
request processing.

Ideally we would control the passage of time (virtual clock source) to
make this test deterministic.

But I don't see that happening in our regression test setup.

So, this PR de-flakes the test as follows:
- allot up to 66% of mean latency for request processing
- increase duration from 10s to 20s, hoping to get better protection
from momentary CPU spikes in noisy neighbor tests or VMs on the runner
host

As a drive-by, switch to `pytest.approx` and remove one self-test
assertion I can't make sense of anymore.
2025-04-11 09:38:05 +00:00
42 changed files with 1755 additions and 725 deletions

View File

@@ -1677,7 +1677,7 @@ RUN set -e \
&& apt clean && rm -rf /var/lib/apt/lists/*
# Use `dist_man_MANS=` to skip manpage generation (which requires python3/pandoc)
ENV PGBOUNCER_TAG=pgbouncer_1_24_1
ENV PGBOUNCER_TAG=pgbouncer_1_22_1
RUN set -e \
&& git clone --recurse-submodules --depth 1 --branch ${PGBOUNCER_TAG} https://github.com/pgbouncer/pgbouncer.git pgbouncer \
&& cd pgbouncer \

View File

@@ -116,7 +116,9 @@ struct Cli {
#[arg(long)]
pub set_disk_quota_for_fs: Option<String>,
#[arg(short = 'c', long)]
// TODO(tristan957): remove alias after compatibility tests are no longer
// an issue
#[arg(short = 'c', long, alias = "spec-path")]
pub config: Option<OsString>,
#[arg(short = 'i', long, group = "compute-id")]

View File

@@ -710,6 +710,10 @@ impl Endpoint {
}
};
// TODO(tristan957): Remove the write to spec.json after compatibility
// tests work themselves out
let spec_path = self.endpoint_path().join("spec.json");
std::fs::write(spec_path, serde_json::to_string_pretty(&config.spec)?)?;
let config_path = self.endpoint_path().join("config.json");
std::fs::write(config_path, serde_json::to_string_pretty(&config)?)?;
@@ -719,6 +723,16 @@ impl Endpoint {
.append(true)
.open(self.endpoint_path().join("compute.log"))?;
// TODO(tristan957): Remove when compatibility tests are no longer an
// issue
let old_compute_ctl = {
let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
let help_output = cmd.arg("--help").output()?;
let help_output = String::from_utf8_lossy(&help_output.stdout);
!help_output.contains("--config")
};
// Launch compute_ctl
let conn_str = self.connstr("cloud_admin", "postgres");
println!("Starting postgres node at '{}'", conn_str);
@@ -737,8 +751,19 @@ impl Endpoint {
])
.args(["--pgdata", self.pgdata().to_str().unwrap()])
.args(["--connstr", &conn_str])
.arg("--config")
.arg(self.endpoint_path().join("config.json").as_os_str())
// TODO(tristan957): Change this to --config when compatibility tests
// are no longer an issue
.args([
"--spec-path",
self.endpoint_path()
.join(if old_compute_ctl {
"spec.json"
} else {
"config.json"
})
.to_str()
.unwrap(),
])
.args([
"--pgbin",
self.env

View File

@@ -980,7 +980,7 @@ fn generate_ssl_ca_cert(cert_path: &Path, key_path: &Path) -> anyhow::Result<()>
// -out rootCA.crt -keyout rootCA.key
let keygen_output = Command::new("openssl")
.args([
"req", "-x509", "-newkey", "rsa:2048", "-nodes", "-days", "36500",
"req", "-x509", "-newkey", "ed25519", "-nodes", "-days", "36500",
])
.args(["-subj", "/CN=Neon Local CA"])
.args(["-out", cert_path.to_str().unwrap()])
@@ -1010,7 +1010,7 @@ fn generate_ssl_cert(
// -subj "/CN=localhost" -addext "subjectAltName=DNS:localhost,IP:127.0.0.1"
let keygen_output = Command::new("openssl")
.args(["req", "-new", "-nodes"])
.args(["-newkey", "rsa:2048"])
.args(["-newkey", "ed25519"])
.args(["-subj", "/CN=localhost"])
.args(["-addext", "subjectAltName=DNS:localhost,IP:127.0.0.1"])
.args(["-keyout", key_path.to_str().unwrap()])

View File

@@ -535,6 +535,11 @@ impl PageServerNode {
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'gc_compaction_enabled' as bool")?,
gc_compaction_verification: settings
.remove("gc_compaction_verification")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'gc_compaction_verification' as bool")?,
gc_compaction_initial_threshold_kb: settings
.remove("gc_compaction_initial_threshold_kb")
.map(|x| x.parse::<u64>())

View File

@@ -13,7 +13,9 @@ use pageserver_api::controller_api::{
NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest,
TenantCreateResponse, TenantLocateResponse,
};
use pageserver_api::models::{TenantConfigRequest, TimelineCreateRequest, TimelineInfo};
use pageserver_api::models::{
TenantConfig, TenantConfigRequest, TimelineCreateRequest, TimelineInfo,
};
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
use postgres_backend::AuthType;
@@ -82,7 +84,8 @@ impl NeonStorageControllerStopArgs {
pub struct AttachHookRequest {
pub tenant_shard_id: TenantShardId,
pub node_id: Option<NodeId>,
pub generation_override: Option<i32>,
pub generation_override: Option<i32>, // only new tenants
pub config: Option<TenantConfig>, // only new tenants
}
#[derive(Serialize, Deserialize)]
@@ -805,6 +808,7 @@ impl StorageController {
tenant_shard_id,
node_id: Some(pageserver_id),
generation_override: None,
config: None,
};
let response = self

View File

@@ -81,9 +81,19 @@ sed -i "s/TIMELINE_ID/${timeline_id}/" ${CONFIG_FILE}
cat ${CONFIG_FILE}
# TODO(tristan957): Remove these workarounds for backwards compatibility after
# the next compute release. That includes these next few lines and the
# --spec-path in the compute_ctl invocation.
if compute_ctl --help | grep --quiet -- '--config'; then
SPEC_PATH="$CONFIG_FILE"
else
jq '.spec' < "$CONFIG_FILE" > /tmp/spec.json
SPEC_PATH=/tmp/spec.json
fi
echo "Start compute node"
/usr/local/bin/compute_ctl --pgdata /var/db/postgres/compute \
-C "postgresql://cloud_admin@localhost:55433/postgres" \
-b /usr/local/bin/postgres \
--compute-id "compute-$RANDOM" \
--config "$CONFIG_FILE"
--spec-path "$SPEC_PATH"

View File

@@ -0,0 +1,246 @@
# Storage Encryption Key Management
## Summary
As a precursor to adding new encryption capabilities to Neon's storage services, this RFC proposes
mechanisms for creating and storing fine-grained encryption keys for user data in Neon. We aim
to provide at least tenant granularity, but will use timeline granularity when it is simpler to do
so.
Out of scope:
- We describe lifecycle of keys here but not the encryption of user data with these keys.
- We describe an abstract KMS interface, but not particular platform implementations (such as how
to authenticate with KMS).
## Terminology
_wrapped/unwrapped_: a wrapped encryption key is a key encrypted by another key. For example, the key for
encrypting a timeline's pageserver data might be wrapped by some "root" key for the tenant's user account, stored in a KMS system.
_key hierarchy_: the relationships between keys which wrap each other. For example, a layer file key might
be wrapped by a pageserver timeline key, which is wrapped by a tenant's root key.
## Design Choices
Storage: S3 will be the store of record for wrapped keys
Separate keys: Safekeeper and Pageserver will use independent keys.
AES256: rather than building a generic system for keys, we will assume that all the keys
we manage are AES256 keys -- this is the de-facto standard for enterprise data storage.
Per-object keys: rather than encrypting data objects (layer files and segment files) with
the tenant keys directly, they will be encrypted with separate keys. This avoids cryptographic
safety issues from re-using the same key for large quantities of potentially repetitive plaintext.
Key storage is optional at a per-tenant granularity: eventually this would be on by default, but:
- initially only some environments will have a KMS set up.
- Encryption has some overhead and it may be that some tenants don't want or need it.
## Design
### Summary of format changes
- Pageserver layer files and safekeeper segment objects get new metadata fields to
store wrapped key and version of the wrapping key
- Pageserver timeline index gets a new `keys` field to store wrapped timeline keys
- Safekeeper gets a new per-timeline manifest object in S3 to store wrapped timeline keys
- Pageserver timeline index gets per-layer metadata for wrapped key and wrapping version
### Summary of API changes
- Pageserver TenantConf API gets a new field for account ID
- Pageserver TenantConf API gets a new field for encryption mode
- Safekeeper timeline creation API gets a new field for account ID
- Controller, pageserver & safekeeper get a new timeline-scoped `rotate_key` API
### KMS interface
Neon will interoperate with different KMS APIs on different platforms. We will implement a generic interface,
similar to how `remote_storage` wraps different object storage APIs:
- `generate(accountId, keyType, alias) -> (wrapped key, plaintext key)`
- `unwrap(accountId, ciphertext key) -> plaintext key`
Hereafter, when we talk about generating or unwrapping a key, this means a call into the KMS API.
The KMS deals with abstract "account IDs", which are not equal to tenant IDs and may not be
1:1 with tenants. The account ID will be provided as part of tenant configuration, along
with a field to identify an encryption mode.
### Pageserver key storage
The wrapped pageserver timeline key will be stored in the timeline index object. Because of
key rotation, multiple keys will be stored in an array, with each key having a counter version.
```
"keys": [
{
# The key version: a new key with the next version is generated when rekeying
"version": 1,
# The wrapped key: this is unwrapped by a KMS API call when the key is to be used
"wrapped": "<base64 string>",
# The time the key was generated: this may be used to implement rekeying/key rotation
# policies.
"ctime": "<ISO 8601 timestamp>",
},
...
]
```
Wrapped pageserver layer file keys will be stored in the `index_part` file, as part
of the layer metadata.
```
# LayerFileMetadata
{
"key": {
"version":
}
}
```
To enable re-key procedure to drop deleted versions with old keys, and to avoid mistakes in index_part leading to irretreivable data loss, wrapped keys & version will also be stored
in the object store metadata of uploaded objects.
### Safekeeper key storage
All safekeeper storage is per-timeline. The only concept of a tenant in the safekeeper
is as a namespace for timelines.
As the safekeeper doesn't currently have a flexible metadata object in remote storage,
we will add one. This will initially contain:
- A configuration object that contains the accountId
- An array of keys idential to those used in the pageserver's index.
Because multiple safekeeper processes share the same remote storage path, we must be
sure to handle write races safely. To avoid giving safekeepers a pageserver-like generation
concept (not to be confused with safekeeper's configuration generation), we may use
the conditional write primitive that is available on S3 and ABS, to implement a safe
read-then-write for operations such as key rotation, such that a given key version is
only ever implemented once.
### Key rotation
The process of key rotation is:
1. Load the version of the existing key
2. Generate a new key
3. Store the new key with the previous version incremented by 1
4. **Only once durably stored** use the new key for subsequent generation of object keys
This is the same for safekeepers and pageservers.
A storage controller API will be exposed for key rotation.
For the pageserver, it is very important that key rotation
operations respect generation safety rules, the same as timeline CRUD operations: i.e.
the operation is only durable once the new index_part is uploaded _and_ the generation of the tenant location updated is still the latest generation when the operation is complete.
For the safekeeper, the controller can call into one safekeeper to write the new
key to remote storage, and then when calling into the others they should just load
the key from remote storage. Any safekeeper that is unavailable at the time of a key
rotation will need to be marked "dirty" by the controller and contacted as soon as it
comes back online (key rotation should not be failed if a single safekeeper is down).
### Re-keying
While re-keying and key-rotation are sometimes used synonymously, we distinguish them:
- Key rotation is generating a new key to use for new data
- Re-keying is rewriting existing data so that old keys are no longer used at all
Re-keying is a bulk data operation, and not fully defined in this RFC: it can be defined
quite simply as "For object in objects, if object key version is < the rekeying horizon,
then do a read/write cycle on the object using latest key". This is a simple but potentially very
expensive operation, so we discuss efficiency here.
#### Pageserver re-key
For pageservers, occasional rekeying may be implemented efficiently if one tolerates using
the last few keys and doesn't insist on the latest, because pageservers periodically rewrite
their data for GC-compaction anyway. Thereby an API call to re-key any data with an overly old
key would often be a no-op because all data was rewritten recently anyway.
When object versioning is enabled in storage, re-keying is not fully accomplished by just
re-writing live data: old versions would still contain user data encrypted with older keys. To
fully re-key, an extra step is needed to purge old objects. Ideally, we should only purge
old objects which were encrypted using old keys. To this end, it would be useful to store
the encryption key version as metadata on objects, so that a scrub of deleted object versions
can efficiently select those objects that should be purged during re-key.
Checks on object versions should not only be on deleted objects: because pageserver can emit
"orphan" objects not referenced in the index under some circumstances, re-key must also
check non-deleted objects.
To summarize, the pageserver re-key operation is:
- Iterate over index of layer files, select those with too-old key and rewrite them
- Iterate over all versions in object storage, select those with a too-old key version
in their metadata and purge them (with a safety check that these are not referenced
by the latest index).
It would be wise to combine the re-key procedure with an exhaustive read of a timeline's data,
to ensure that when testing & rolling this feature out we are not rendering anything unreadable
due to bugs in implementation. Since we are deleting old versions in object storage, our
time travel recovery tool will not be any help if we get something wrong in this process.
#### Safekeeper re-key
Re-keying a safekeeper timeline requires an exhaustive walk of segment objects, read
metadata on each one and decide whether it requires rewrite.
Safekeeper currently keeps historic objects forever, so re-keying this data will get
more expensive as time goes on. This would be a good time to add cleanup of old safekeeper
segments, but doing so is beyond the scope of this RFC.
### Enabling encryption for existing tenants
To enable encryption for an existing tenant, we may simply call key-rotation API (to generate a key),
and then re-key API (to rewrite existing data using this key).
## Observability
- To enable some external service to implement re-keying, we should publish metrics per-timeline
on the age of their latest encryption key.
- Calls to KMS should be tracked with typical request rate/result/latency histograms to enable
detection of a slow KMS server and/or errors.
## Alternatives considered
### Use same tenant key for safekeeper and pageserver
We could halve the number of keys in circulation by having the safekeeper and pageserver
share a key rather than working independently.
However, this would be substantially more complex to implement, as safekeepers and pageservers
currently share no storage, so some new communication path would be needed. There is minimal
upside in sharing a key.
### No KMS dependency
We could choose to do all key management ourselves. However, the industry standard approach
to enabling users of cloud SaaS software to self-manage keys is to use the KMS as the intermediary
between our system and the user's control of their key. Although this RFC does not propose user-managed keys, we should design with this in mind.
### Do all key generation/wrapping in KMS service
We could avoid generating and wrapping/unwrapping object keys in our storage
services by delegating all responsibility for key operations to the KMS. However,
KMS services have limited throughput and in some cases may charge per operation, so
it is useful to avoid doing KMS operations per-object, and restrict them to per-timeline
frequency.
### Per-tenant instead of per-timeline pageserver keys
For tenants with many timelines, we may reduce load on KMS service by
using per-tenant instead of per-timeline keys, so that we may do operations
such as creating a timeline without needing to do a KMS unwrap operation.
However, per-timeline key management is much simpler to implement on the safekeeper,
which currently has no concept of a tenant (other than as a namespace for timelines).
It is also slightly simpler to implement on the pageserver, as it avoids implementing
a tenant-scoped creation operation to initialize keys (instead, we may initialize keys
during timeline creation).
As a side benefit, per-timeline key management also enables implementing secure deletion in future
at a per-timeline granularity.

View File

@@ -207,6 +207,10 @@ pub struct PageServicePipeliningConfigPipelined {
/// Causes runtime errors if larger than max get_vectored batch size.
pub max_batch_size: NonZeroUsize,
pub execution: PageServiceProtocolPipelinedExecutionStrategy,
// The default below is such that new versions of the software can start
// with the old configuration.
#[serde(default)]
pub batching: PageServiceProtocolPipelinedBatchingStrategy,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -216,6 +220,19 @@ pub enum PageServiceProtocolPipelinedExecutionStrategy {
Tasks,
}
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum PageServiceProtocolPipelinedBatchingStrategy {
/// All get page requests in a batch will be at the same LSN
#[default]
UniformLsn,
/// Get page requests in a batch may be at different LSN
///
/// One key cannot be present more than once at different LSNs in
/// the same batch.
ScatteredLsn,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(tag = "mode", rename_all = "kebab-case")]
pub enum GetVectoredConcurrentIo {
@@ -452,6 +469,8 @@ pub struct TenantConfigToml {
// gc-compaction related configs
/// Enable automatic gc-compaction trigger on this tenant.
pub gc_compaction_enabled: bool,
/// Enable verification of gc-compaction results.
pub gc_compaction_verification: bool,
/// The initial threshold for gc-compaction in KB. Once the total size of layers below the gc-horizon is above this threshold,
/// gc-compaction will be triggered.
pub gc_compaction_initial_threshold_kb: u64,
@@ -613,9 +632,12 @@ impl Default for ConfigToml {
page_service_pipelining: if !cfg!(test) {
PageServicePipeliningConfig::Serial
} else {
// Do not turn this into the default until scattered reads have been
// validated and rolled-out fully.
PageServicePipeliningConfig::Pipelined(PageServicePipeliningConfigPipelined {
max_batch_size: NonZeroUsize::new(32).unwrap(),
execution: PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures,
batching: PageServiceProtocolPipelinedBatchingStrategy::ScatteredLsn,
})
},
get_vectored_concurrent_io: if !cfg!(test) {
@@ -692,6 +714,7 @@ pub mod tenant_conf_defaults {
// image layers should be created.
pub const DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD: u8 = 2;
pub const DEFAULT_GC_COMPACTION_ENABLED: bool = false;
pub const DEFAULT_GC_COMPACTION_VERIFICATION: bool = true;
pub const DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB: u64 = 5 * 1024 * 1024; // 5GB
pub const DEFAULT_GC_COMPACTION_RATIO_PERCENT: u64 = 100;
}
@@ -746,6 +769,7 @@ impl Default for TenantConfigToml {
wal_receiver_protocol_override: None,
rel_size_v2_enabled: false,
gc_compaction_enabled: DEFAULT_GC_COMPACTION_ENABLED,
gc_compaction_verification: DEFAULT_GC_COMPACTION_VERIFICATION,
gc_compaction_initial_threshold_kb: DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB,
gc_compaction_ratio_percent: DEFAULT_GC_COMPACTION_RATIO_PERCENT,
sampling_ratio: None,

View File

@@ -7,7 +7,8 @@ use std::time::{Duration, Instant};
/// API (`/control/v1` prefix). Implemented by the server
/// in [`storage_controller::http`]
use serde::{Deserialize, Serialize};
use utils::id::{NodeId, TenantId};
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
use crate::models::{PageserverUtilization, ShardParameters, TenantConfig};
use crate::shard::{ShardStripeSize, TenantShardId};
@@ -499,6 +500,15 @@ pub struct SafekeeperSchedulingPolicyRequest {
pub scheduling_policy: SkSchedulingPolicy,
}
/// Import request for safekeeper timelines.
#[derive(Serialize, Deserialize, Clone)]
pub struct TimelineImportRequest {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub start_lsn: Lsn,
pub sk_set: Vec<NodeId>,
}
#[cfg(test)]
mod test {
use serde_json;

View File

@@ -927,7 +927,7 @@ impl Key {
/// Guaranteed to return `Ok()` if [`Self::is_rel_block_key`] returns `true` for `key`.
#[inline(always)]
pub fn to_rel_block(self) -> anyhow::Result<(RelTag, BlockNumber)> {
pub fn to_rel_block(self) -> Result<(RelTag, BlockNumber), ToRelBlockError> {
Ok(match self.field1 {
0x00 => (
RelTag {
@@ -938,7 +938,7 @@ impl Key {
},
self.field6,
),
_ => anyhow::bail!("unexpected value kind 0x{:02x}", self.field1),
_ => return Err(ToRelBlockError(self.field1)),
})
}
}
@@ -951,6 +951,17 @@ impl std::str::FromStr for Key {
}
}
#[derive(Debug)]
pub struct ToRelBlockError(u8);
impl fmt::Display for ToRelBlockError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "unexpected value kind 0x{:02x}", self.0)
}
}
impl std::error::Error for ToRelBlockError {}
#[cfg(test)]
mod tests {
use std::str::FromStr;

View File

@@ -576,6 +576,8 @@ pub struct TenantConfigPatch {
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub gc_compaction_enabled: FieldPatch<bool>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub gc_compaction_verification: FieldPatch<bool>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub gc_compaction_initial_threshold_kb: FieldPatch<u64>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub gc_compaction_ratio_percent: FieldPatch<u64>,
@@ -696,6 +698,9 @@ pub struct TenantConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub gc_compaction_enabled: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub gc_compaction_verification: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub gc_compaction_initial_threshold_kb: Option<u64>,
@@ -744,6 +749,7 @@ impl TenantConfig {
mut wal_receiver_protocol_override,
mut rel_size_v2_enabled,
mut gc_compaction_enabled,
mut gc_compaction_verification,
mut gc_compaction_initial_threshold_kb,
mut gc_compaction_ratio_percent,
mut sampling_ratio,
@@ -835,6 +841,9 @@ impl TenantConfig {
patch
.gc_compaction_enabled
.apply(&mut gc_compaction_enabled);
patch
.gc_compaction_verification
.apply(&mut gc_compaction_verification);
patch
.gc_compaction_initial_threshold_kb
.apply(&mut gc_compaction_initial_threshold_kb);
@@ -876,6 +885,7 @@ impl TenantConfig {
wal_receiver_protocol_override,
rel_size_v2_enabled,
gc_compaction_enabled,
gc_compaction_verification,
gc_compaction_initial_threshold_kb,
gc_compaction_ratio_percent,
sampling_ratio,
@@ -974,6 +984,9 @@ impl TenantConfig {
gc_compaction_enabled: self
.gc_compaction_enabled
.unwrap_or(global_conf.gc_compaction_enabled),
gc_compaction_verification: self
.gc_compaction_verification
.unwrap_or(global_conf.gc_compaction_verification),
gc_compaction_initial_threshold_kb: self
.gc_compaction_initial_threshold_kb
.unwrap_or(global_conf.gc_compaction_initial_threshold_kb),

View File

@@ -34,7 +34,7 @@ use utils::lsn::Lsn;
use crate::context::RequestContext;
use crate::pgdatadir_mapping::Version;
use crate::tenant::storage_layer::IoConcurrency;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::timeline::{GetVectoredError, VersionedKeySpaceQuery};
use crate::tenant::{PageReconstructError, Timeline};
#[derive(Debug, thiserror::Error)]
@@ -353,9 +353,10 @@ where
let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar);
for part in slru_partitions.parts {
let query = VersionedKeySpaceQuery::uniform(part, self.lsn);
let blocks = self
.timeline
.get_vectored(part, self.lsn, self.io_concurrency.clone(), self.ctx)
.get_vectored(query, self.io_concurrency.clone(), self.ctx)
.await?;
for (key, block) in blocks {

View File

@@ -3253,7 +3253,7 @@ async fn ingest_aux_files(
modification
.put_file(&fname, content.as_bytes(), &ctx)
.await
.map_err(ApiError::InternalServerError)?;
.map_err(|e| ApiError::InternalServerError(e.into()))?;
}
modification
.commit(&ctx)

View File

@@ -27,7 +27,7 @@ use crate::context::RequestContext;
use crate::metrics::WAL_INGEST;
use crate::pgdatadir_mapping::*;
use crate::tenant::Timeline;
use crate::walingest::WalIngest;
use crate::walingest::{WalIngest, WalIngestErrorKind};
// Returns checkpoint LSN from controlfile
pub fn get_lsn_from_controlfile(path: &Utf8Path) -> Result<Lsn> {
@@ -157,9 +157,9 @@ async fn import_rel(
.put_rel_creation(rel, nblocks as u32, ctx)
.await
{
match e {
RelationError::AlreadyExists => {
debug!("Relation {} already exist. We must be extending it.", rel)
match e.kind {
WalIngestErrorKind::RelationAlreadyExists(rel) => {
debug!("Relation {rel} already exists. We must be extending it.")
}
_ => return Err(e.into()),
}

View File

@@ -17,7 +17,7 @@ use metrics::{
use once_cell::sync::Lazy;
use pageserver_api::config::{
PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
PageServiceProtocolPipelinedExecutionStrategy,
PageServiceProtocolPipelinedBatchingStrategy, PageServiceProtocolPipelinedExecutionStrategy,
};
use pageserver_api::models::InMemoryLayerInfo;
use pageserver_api::shard::TenantShardId;
@@ -1863,7 +1863,7 @@ pub(crate) static PAGE_SERVICE_CONFIG_MAX_BATCH_SIZE: Lazy<IntGaugeVec> = Lazy::
"pageserver_page_service_config_max_batch_size",
"Configured maximum batch size for the server-side batching functionality of page_service. \
Labels expose more of the configuration parameters.",
&["mode", "execution"]
&["mode", "execution", "batching"]
)
.expect("failed to define a metric")
});
@@ -1871,10 +1871,11 @@ pub(crate) static PAGE_SERVICE_CONFIG_MAX_BATCH_SIZE: Lazy<IntGaugeVec> = Lazy::
fn set_page_service_config_max_batch_size(conf: &PageServicePipeliningConfig) {
PAGE_SERVICE_CONFIG_MAX_BATCH_SIZE.reset();
let (label_values, value) = match conf {
PageServicePipeliningConfig::Serial => (["serial", "-"], 1),
PageServicePipeliningConfig::Serial => (["serial", "-", "-"], 1),
PageServicePipeliningConfig::Pipelined(PageServicePipeliningConfigPipelined {
max_batch_size,
execution,
batching,
}) => {
let mode = "pipelined";
let execution = match execution {
@@ -1883,7 +1884,12 @@ fn set_page_service_config_max_batch_size(conf: &PageServicePipeliningConfig) {
}
PageServiceProtocolPipelinedExecutionStrategy::Tasks => "tasks",
};
([mode, execution], max_batch_size.get())
let batching = match batching {
PageServiceProtocolPipelinedBatchingStrategy::UniformLsn => "uniform-lsn",
PageServiceProtocolPipelinedBatchingStrategy::ScatteredLsn => "scattered-lsn",
};
([mode, execution, batching], max_batch_size.get())
}
};
PAGE_SERVICE_CONFIG_MAX_BATCH_SIZE

View File

@@ -18,7 +18,7 @@ use itertools::Itertools;
use once_cell::sync::OnceCell;
use pageserver_api::config::{
PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
PageServiceProtocolPipelinedExecutionStrategy,
PageServiceProtocolPipelinedBatchingStrategy, PageServiceProtocolPipelinedExecutionStrategy,
};
use pageserver_api::key::rel_block_to_key;
use pageserver_api::models::{
@@ -641,6 +641,7 @@ impl std::fmt::Display for BatchedPageStreamError {
struct BatchedGetPageRequest {
req: PagestreamGetPageRequest,
timer: SmgrOpTimer,
effective_request_lsn: Lsn,
ctx: RequestContext,
}
@@ -670,7 +671,6 @@ enum BatchedFeMessage {
GetPage {
span: Span,
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
effective_request_lsn: Lsn,
pages: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
},
DbSize {
@@ -1025,34 +1025,28 @@ impl PageServerHandler {
.await?;
// We're holding the Handle
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
let res = Self::wait_or_get_last_lsn(
let effective_request_lsn = match Self::effective_request_lsn(
&shard,
shard.get_last_record_lsn(),
req.hdr.request_lsn,
req.hdr.not_modified_since,
&shard.get_applied_gc_cutoff_lsn(),
&ctx,
)
.maybe_perf_instrument(&ctx, |current_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: current_perf_span,
"WAIT_LSN",
)
})
.await;
let effective_request_lsn = match res {
) {
Ok(lsn) => lsn,
Err(e) => {
return respond_error!(span, e);
}
};
BatchedFeMessage::GetPage {
span,
shard: shard.downgrade(),
effective_request_lsn,
pages: smallvec::smallvec![BatchedGetPageRequest { req, timer, ctx }],
pages: smallvec::smallvec![BatchedGetPageRequest {
req,
timer,
effective_request_lsn,
ctx,
}],
}
}
#[cfg(feature = "testing")]
@@ -1078,6 +1072,7 @@ impl PageServerHandler {
#[instrument(skip_all, level = tracing::Level::TRACE)]
#[allow(clippy::boxed_local)]
fn pagestream_do_batch(
batching_strategy: PageServiceProtocolPipelinedBatchingStrategy,
max_batch_size: NonZeroUsize,
batch: &mut Result<BatchedFeMessage, QueryError>,
this_msg: Result<BatchedFeMessage, QueryError>,
@@ -1096,33 +1091,61 @@ impl PageServerHandler {
span: _,
shard: accum_shard,
pages: accum_pages,
effective_request_lsn: accum_lsn,
}),
BatchedFeMessage::GetPage {
span: _,
shard: this_shard,
pages: this_pages,
effective_request_lsn: this_lsn,
},
) if (|| {
assert_eq!(this_pages.len(), 1);
if accum_pages.len() >= max_batch_size.get() {
trace!(%accum_lsn, %this_lsn, %max_batch_size, "stopping batching because of batch size");
trace!(%max_batch_size, "stopping batching because of batch size");
assert_eq!(accum_pages.len(), max_batch_size.get());
return false;
}
if !accum_shard.is_same_handle_as(&this_shard) {
trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch");
trace!("stopping batching because timeline object mismatch");
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
// But the current logic for keeping responses in order does not support that.
return false;
}
// the vectored get currently only supports a single LSN, so, bounce as soon
// as the effective request_lsn changes
if *accum_lsn != this_lsn {
trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed");
return false;
match batching_strategy {
PageServiceProtocolPipelinedBatchingStrategy::UniformLsn => {
if let Some(last_in_batch) = accum_pages.last() {
if last_in_batch.effective_request_lsn
!= this_pages[0].effective_request_lsn
{
return false;
}
}
}
PageServiceProtocolPipelinedBatchingStrategy::ScatteredLsn => {
// The read path doesn't curently support serving the same page at different LSNs.
// While technically possible, it's uncertain if the complexity is worth it.
// Break the batch if such a case is encountered.
//
// TODO(vlad): Include a metric for batch breaks with a reason label.
let same_page_different_lsn = accum_pages.iter().any(|batched| {
batched.req.rel == this_pages[0].req.rel
&& batched.req.blkno == this_pages[0].req.blkno
&& batched.effective_request_lsn
!= this_pages[0].effective_request_lsn
});
if same_page_different_lsn {
trace!(
rel=%this_pages[0].req.rel,
blkno=%this_pages[0].req.blkno,
lsn=%this_pages[0].effective_request_lsn,
"stopping batching because same page was requested at different LSNs"
);
return false;
}
}
}
true
})() =>
{
@@ -1390,12 +1413,7 @@ impl PageServerHandler {
span,
)
}
BatchedFeMessage::GetPage {
span,
shard,
effective_request_lsn,
pages,
} => {
BatchedFeMessage::GetPage { span, shard, pages } => {
fail::fail_point!("ps::handle-pagerequest-message::getpage");
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
(
@@ -1405,7 +1423,6 @@ impl PageServerHandler {
let res = self
.handle_get_page_at_lsn_request_batched(
&shard,
effective_request_lsn,
pages,
io_concurrency,
&ctx,
@@ -1724,6 +1741,7 @@ impl PageServerHandler {
let PageServicePipeliningConfigPipelined {
max_batch_size,
execution,
batching: batching_strategy,
} = pipelining_config;
// Macro to _define_ a pipeline stage.
@@ -1775,7 +1793,7 @@ impl PageServerHandler {
exit |= read_res.is_err();
let could_send = batch_tx
.send(read_res, |batch, res| {
Self::pagestream_do_batch(max_batch_size, batch, res)
Self::pagestream_do_batch(batching_strategy, max_batch_size, batch, res)
})
.await;
exit |= could_send.is_err();
@@ -1871,7 +1889,39 @@ impl PageServerHandler {
ctx: &RequestContext,
) -> Result<Lsn, PageStreamError> {
let last_record_lsn = timeline.get_last_record_lsn();
let effective_request_lsn = Self::effective_request_lsn(
timeline,
last_record_lsn,
request_lsn,
not_modified_since,
latest_gc_cutoff_lsn,
)?;
if effective_request_lsn > last_record_lsn {
timeline
.wait_lsn(
not_modified_since,
crate::tenant::timeline::WaitLsnWaiter::PageService,
timeline::WaitLsnTimeout::Default,
ctx,
)
.await?;
// Since we waited for 'effective_request_lsn' to arrive, that is now the last
// record LSN. (Or close enough for our purposes; the last-record LSN can
// advance immediately after we return anyway)
}
Ok(effective_request_lsn)
}
fn effective_request_lsn(
timeline: &Timeline,
last_record_lsn: Lsn,
request_lsn: Lsn,
not_modified_since: Lsn,
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
) -> Result<Lsn, PageStreamError> {
// Sanity check the request
if request_lsn < not_modified_since {
return Err(PageStreamError::BadRequest(
@@ -1906,19 +1956,7 @@ impl PageServerHandler {
}
}
// Wait for WAL up to 'not_modified_since' to arrive, if necessary
if not_modified_since > last_record_lsn {
timeline
.wait_lsn(
not_modified_since,
crate::tenant::timeline::WaitLsnWaiter::PageService,
timeline::WaitLsnTimeout::Default,
ctx,
)
.await?;
// Since we waited for 'not_modified_since' to arrive, that is now the last
// record LSN. (Or close enough for our purposes; the last-record LSN can
// advance immediately after we return anyway)
Ok(not_modified_since)
} else {
// It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
@@ -2073,7 +2111,6 @@ impl PageServerHandler {
async fn handle_get_page_at_lsn_request_batched(
&mut self,
timeline: &Timeline,
effective_lsn: Lsn,
requests: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
io_concurrency: IoConcurrency,
ctx: &RequestContext,
@@ -2092,20 +2129,81 @@ impl PageServerHandler {
// Ignore error (trace buffer may be full or tracer may have disconnected).
_ = page_trace.try_send(PageTraceEvent {
key,
effective_lsn,
effective_lsn: batch.effective_request_lsn,
time,
});
}
}
// If any request in the batch needs to wait for LSN, then do so now.
let mut perf_instrument = false;
let max_effective_lsn = requests
.iter()
.map(|req| {
if req.ctx.has_perf_span() {
perf_instrument = true;
}
req.effective_request_lsn
})
.max()
.expect("batch is never empty");
let ctx = match perf_instrument {
true => RequestContextBuilder::from(ctx)
.root_perf_span(|| {
info_span!(
target: PERF_TRACE_TARGET,
"GET_VECTORED",
tenant_id = %timeline.tenant_shard_id.tenant_id,
timeline_id = %timeline.timeline_id,
shard = %timeline.tenant_shard_id.shard_slug(),
%max_effective_lsn
)
})
.attached_child(),
false => ctx.attached_child(),
};
let last_record_lsn = timeline.get_last_record_lsn();
if max_effective_lsn > last_record_lsn {
if let Err(e) = timeline
.wait_lsn(
max_effective_lsn,
crate::tenant::timeline::WaitLsnWaiter::PageService,
timeline::WaitLsnTimeout::Default,
&ctx,
)
.maybe_perf_instrument(&ctx, |current_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: current_perf_span,
"WAIT_LSN",
)
})
.await
{
return Vec::from_iter(requests.into_iter().map(|req| {
Err(BatchedPageStreamError {
err: PageStreamError::from(e.clone()),
req: req.req.hdr,
})
}));
}
}
let results = timeline
.get_rel_page_at_lsn_batched(
requests
.iter()
.map(|p| (&p.req.rel, &p.req.blkno, p.ctx.attached_child())),
effective_lsn,
requests.iter().map(|p| {
(
&p.req.rel,
&p.req.blkno,
p.effective_request_lsn,
p.ctx.attached_child(),
)
}),
io_concurrency,
ctx,
&ctx,
)
.await;
assert_eq!(results.len(), requests.len());

View File

@@ -6,14 +6,14 @@
//! walingest.rs handles a few things like implicit relation creation and extension.
//! Clarify that)
//!
use std::collections::{BTreeMap, HashMap, HashSet, hash_map};
use std::collections::{HashMap, HashSet, hash_map};
use std::ops::{ControlFlow, Range};
use crate::PERF_TRACE_TARGET;
use anyhow::{Context, ensure};
use crate::walingest::{WalIngestError, WalIngestErrorKind};
use crate::{PERF_TRACE_TARGET, ensure_walingest};
use anyhow::Context;
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
use itertools::Itertools;
use pageserver_api::key::{
AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists,
TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range,
@@ -21,7 +21,7 @@ use pageserver_api::key::{
repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
};
use pageserver_api::keyspace::SparseKeySpace;
use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
use pageserver_api::models::RelSizeMigration;
use pageserver_api::record::NeonWalRecord;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
@@ -40,7 +40,7 @@ use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
use super::tenant::{PageReconstructError, Timeline};
use crate::aux_file;
use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder};
use crate::context::{PerfInstrumentFutureExt, RequestContext};
use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::metrics::{
RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
@@ -50,7 +50,7 @@ use crate::span::{
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
};
use crate::tenant::storage_layer::IoConcurrency;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::timeline::{GetVectoredError, VersionedKeySpaceQuery};
/// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
pub const MAX_AUX_FILE_DELTAS: usize = 1024;
@@ -136,12 +136,8 @@ impl From<PageReconstructError> for CalculateLogicalSizeError {
#[derive(Debug, thiserror::Error)]
pub enum RelationError {
#[error("Relation Already Exists")]
AlreadyExists,
#[error("invalid relnode")]
InvalidRelnode,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
///
@@ -210,10 +206,9 @@ impl Timeline {
let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
let res = self
.get_rel_page_at_lsn_batched(
pages
.iter()
.map(|(tag, blknum)| (tag, blknum, ctx.attached_child())),
effective_lsn,
pages.iter().map(|(tag, blknum)| {
(tag, blknum, effective_lsn, ctx.attached_child())
}),
io_concurrency.clone(),
ctx,
)
@@ -251,8 +246,7 @@ impl Timeline {
/// The ordering of the returned vec corresponds to the ordering of `pages`.
pub(crate) async fn get_rel_page_at_lsn_batched(
&self,
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, RequestContext)>,
effective_lsn: Lsn,
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, Lsn, RequestContext)>,
io_concurrency: IoConcurrency,
ctx: &RequestContext,
) -> Vec<Result<Bytes, PageReconstructError>> {
@@ -265,11 +259,13 @@ impl Timeline {
let mut result = Vec::with_capacity(pages.len());
let result_slots = result.spare_capacity_mut();
let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[(usize, RequestContext); 1]>> =
BTreeMap::default();
let mut keys_slots: HashMap<Key, smallvec::SmallVec<[(usize, RequestContext); 1]>> =
HashMap::with_capacity(pages.len());
let mut perf_instrument = false;
for (response_slot_idx, (tag, blknum, ctx)) in pages.enumerate() {
let mut req_keyspaces: HashMap<Lsn, KeySpaceRandomAccum> =
HashMap::with_capacity(pages.len());
for (response_slot_idx, (tag, blknum, lsn, ctx)) in pages.enumerate() {
if tag.relnode == 0 {
result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
@@ -280,14 +276,14 @@ impl Timeline {
}
let nblocks = match self
.get_rel_size(*tag, Version::Lsn(effective_lsn), &ctx)
.get_rel_size(*tag, Version::Lsn(lsn), &ctx)
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"GET_REL_SIZE",
reltag=%tag,
lsn=%effective_lsn,
lsn=%lsn,
)
})
.await
@@ -303,7 +299,7 @@ impl Timeline {
if *blknum >= nblocks {
debug!(
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
tag, blknum, effective_lsn, nblocks
tag, blknum, lsn, nblocks
);
result_slots[response_slot_idx].write(Ok(ZERO_PAGE.clone()));
slots_filled += 1;
@@ -312,46 +308,29 @@ impl Timeline {
let key = rel_block_to_key(*tag, *blknum);
if ctx.has_perf_span() {
perf_instrument = true;
}
let key_slots = keys_slots.entry(key).or_default();
key_slots.push((response_slot_idx, ctx));
let acc = req_keyspaces.entry(lsn).or_default();
acc.add_key(key);
}
let keyspace = {
// add_key requires monotonicity
let mut acc = KeySpaceAccum::new();
for key in keys_slots
.keys()
// in fact it requires strong monotonicity
.dedup()
{
acc.add_key(*key);
}
acc.to_keyspace()
};
let ctx = match perf_instrument {
true => RequestContextBuilder::from(ctx)
.root_perf_span(|| {
info_span!(
target: PERF_TRACE_TARGET,
"GET_VECTORED",
tenant_id = %self.tenant_shard_id.tenant_id,
timeline_id = %self.timeline_id,
lsn = %effective_lsn,
shard = %self.tenant_shard_id.shard_slug(),
)
})
.attached_child(),
false => ctx.attached_child(),
};
let query: Vec<(Lsn, KeySpace)> = req_keyspaces
.into_iter()
.map(|(lsn, acc)| (lsn, acc.to_keyspace()))
.collect();
let query = VersionedKeySpaceQuery::scattered(query);
let res = self
.get_vectored(keyspace, effective_lsn, io_concurrency, &ctx)
.maybe_perf_instrument(&ctx, |current_perf_span| current_perf_span.clone())
.get_vectored(query, io_concurrency, ctx)
.maybe_perf_instrument(ctx, |current_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: current_perf_span,
"GET_BATCH",
batch_size = %page_count,
)
})
.await;
match res {
@@ -381,12 +360,12 @@ impl Timeline {
// There is no standardized way to express that the batched span followed from N request spans.
// So, abuse the system and mark the request contexts as follows_from the batch span, so we get
// some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
req_ctx.perf_follows_from(&ctx);
req_ctx.perf_follows_from(ctx);
slots_filled += 1;
}
result_slots[first_slot].write(res);
first_req_ctx.perf_follows_from(&ctx);
first_req_ctx.perf_follows_from(ctx);
slots_filled += 1;
}
}
@@ -425,7 +404,7 @@ impl Timeline {
}
};
req_ctx.perf_follows_from(&ctx);
req_ctx.perf_follows_from(ctx);
result_slots[*slot].write(err);
}
@@ -664,8 +643,9 @@ impl Timeline {
let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
for batch in batches.parts {
let query = VersionedKeySpaceQuery::uniform(batch, lsn);
let blocks = self
.get_vectored(batch, lsn, io_concurrency.clone(), ctx)
.get_vectored(query, io_concurrency.clone(), ctx)
.await?;
for (_key, block) in blocks {
@@ -902,8 +882,9 @@ impl Timeline {
);
for batch in batches.parts.into_iter().rev() {
let query = VersionedKeySpaceQuery::uniform(batch, probe_lsn);
let blocks = self
.get_vectored(batch, probe_lsn, io_concurrency.clone(), ctx)
.get_vectored(query, io_concurrency.clone(), ctx)
.await?;
for (_key, clog_page) in blocks.into_iter().rev() {
@@ -1478,8 +1459,8 @@ impl DatadirModification<'_> {
}
/// Set the current lsn
pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
ensure!(
pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> Result<(), WalIngestError> {
ensure_walingest!(
lsn >= self.lsn,
"setting an older lsn {} than {} is not allowed",
lsn,
@@ -1578,7 +1559,7 @@ impl DatadirModification<'_> {
&mut self,
rel: RelTag,
ctx: &RequestContext,
) -> Result<u32, PageReconstructError> {
) -> Result<u32, WalIngestError> {
// Get current size and put rel creation if rel doesn't exist
//
// NOTE: we check the cache first even though get_rel_exists and get_rel_size would
@@ -1593,14 +1574,13 @@ impl DatadirModification<'_> {
.await?
{
// create it with 0 size initially, the logic below will extend it
self.put_rel_creation(rel, 0, ctx)
.await
.context("Relation Error")?;
self.put_rel_creation(rel, 0, ctx).await?;
Ok(0)
} else {
self.tline
Ok(self
.tline
.get_rel_size(rel, Version::Modified(self), ctx)
.await
.await?)
}
}
@@ -1637,11 +1617,14 @@ impl DatadirModification<'_> {
// TODO(vlad): remove this argument and replace the shard check with is_key_local
shard: &ShardIdentity,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let mut gaps_at_lsns = Vec::default();
for meta in batch.metadata.iter() {
let (rel, blkno) = Key::from_compact(meta.key()).to_rel_block()?;
let key = Key::from_compact(meta.key());
let (rel, blkno) = key
.to_rel_block()
.map_err(|_| WalIngestErrorKind::InvalidKey(key, meta.lsn()))?;
let new_nblocks = blkno + 1;
let old_nblocks = self.create_relation_if_required(rel, ctx).await?;
@@ -1683,8 +1666,8 @@ impl DatadirModification<'_> {
rel: RelTag,
blknum: BlockNumber,
rec: NeonWalRecord,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
) -> Result<(), WalIngestError> {
ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
Ok(())
}
@@ -1696,7 +1679,7 @@ impl DatadirModification<'_> {
segno: u32,
blknum: BlockNumber,
rec: NeonWalRecord,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
if !self.tline.tenant_shard_id.is_shard_zero() {
return Ok(());
}
@@ -1714,14 +1697,11 @@ impl DatadirModification<'_> {
rel: RelTag,
blknum: BlockNumber,
img: Bytes,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
) -> Result<(), WalIngestError> {
ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
let key = rel_block_to_key(rel, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver at {}",
key
);
Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
}
self.put(rel_block_to_key(rel, blknum), Value::Image(img));
Ok(())
@@ -1733,15 +1713,12 @@ impl DatadirModification<'_> {
segno: u32,
blknum: BlockNumber,
img: Bytes,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
assert!(self.tline.tenant_shard_id.is_shard_zero());
let key = slru_block_to_key(kind, segno, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver at {}",
key
);
Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
}
self.put(key, Value::Image(img));
Ok(())
@@ -1751,15 +1728,11 @@ impl DatadirModification<'_> {
&mut self,
rel: RelTag,
blknum: BlockNumber,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
) -> Result<(), WalIngestError> {
ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
let key = rel_block_to_key(rel, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver: {} @ {}",
key,
self.lsn
);
Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
}
let batch = self
@@ -1776,15 +1749,11 @@ impl DatadirModification<'_> {
kind: SlruKind,
segno: u32,
blknum: BlockNumber,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
assert!(self.tline.tenant_shard_id.is_shard_zero());
let key = slru_block_to_key(kind, segno, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver: {} @ {}",
key,
self.lsn
);
Err(WalIngestErrorKind::InvalidKey(key, self.lsn))?;
}
let batch = self
@@ -1832,8 +1801,10 @@ impl DatadirModification<'_> {
dbnode: Oid,
img: Bytes,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let v2_enabled = self.maybe_enable_rel_size_v2()?;
) -> Result<(), WalIngestError> {
let v2_enabled = self
.maybe_enable_rel_size_v2()
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
// Add it to the directory (if it doesn't exist already)
let buf = self.get(DBDIR_KEY, ctx).await?;
@@ -1874,13 +1845,13 @@ impl DatadirModification<'_> {
xid: u64,
img: Bytes,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
// Add it to the directory entry
let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
let newdirbuf = if self.tline.pg_version >= 17 {
let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
if !dir.xids.insert(xid) {
anyhow::bail!("twophase file for xid {} already exists", xid);
Err(WalIngestErrorKind::FileAlreadyExists(xid))?;
}
self.pending_directory_entries.push((
DirectoryKind::TwoPhase,
@@ -1891,7 +1862,7 @@ impl DatadirModification<'_> {
let xid = xid as u32;
let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
if !dir.xids.insert(xid) {
anyhow::bail!("twophase file for xid {} already exists", xid);
Err(WalIngestErrorKind::FileAlreadyExists(xid.into()))?;
}
self.pending_directory_entries.push((
DirectoryKind::TwoPhase,
@@ -1909,22 +1880,22 @@ impl DatadirModification<'_> {
&mut self,
origin_id: RepOriginId,
origin_lsn: Lsn,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let key = repl_origin_key(origin_id);
self.put(key, Value::Image(origin_lsn.ser().unwrap().into()));
Ok(())
}
pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> anyhow::Result<()> {
pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> Result<(), WalIngestError> {
self.set_replorigin(origin_id, Lsn::INVALID).await
}
pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
pub fn put_control_file(&mut self, img: Bytes) -> Result<(), WalIngestError> {
self.put(CONTROLFILE_KEY, Value::Image(img));
Ok(())
}
pub fn put_checkpoint(&mut self, img: Bytes) -> anyhow::Result<()> {
pub fn put_checkpoint(&mut self, img: Bytes) -> Result<(), WalIngestError> {
self.put(CHECKPOINT_KEY, Value::Image(img));
Ok(())
}
@@ -1934,7 +1905,7 @@ impl DatadirModification<'_> {
spcnode: Oid,
dbnode: Oid,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let total_blocks = self
.tline
.get_db_size(spcnode, dbnode, Version::Modified(self), ctx)
@@ -1973,20 +1944,21 @@ impl DatadirModification<'_> {
rel: RelTag,
nblocks: BlockNumber,
ctx: &RequestContext,
) -> Result<(), RelationError> {
) -> Result<(), WalIngestError> {
if rel.relnode == 0 {
return Err(RelationError::InvalidRelnode);
Err(WalIngestErrorKind::LogicalError(anyhow::anyhow!(
"invalid relnode"
)))?;
}
// It's possible that this is the first rel for this db in this
// tablespace. Create the reldir entry for it if so.
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
.context("deserialize db")?;
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?;
let dbdir_exists =
if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
// Didn't exist. Update dbdir
e.insert(false);
let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
let buf = DbDirectory::ser(&dbdir)?;
self.pending_directory_entries.push((
DirectoryKind::Db,
MetricsUpdate::Set(dbdir.dbdirs.len() as u64),
@@ -2003,27 +1975,25 @@ impl DatadirModification<'_> {
RelDirectory::default()
} else {
// reldir already exists, fetch it
RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
.context("deserialize db")?
RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?
};
let v2_enabled = self.maybe_enable_rel_size_v2()?;
let v2_enabled = self
.maybe_enable_rel_size_v2()
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
if v2_enabled {
if rel_dir.rels.contains(&(rel.relnode, rel.forknum)) {
return Err(RelationError::AlreadyExists);
Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
}
let sparse_rel_dir_key =
rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
// check if the rel_dir_key exists in v2
let val = self
.sparse_get(sparse_rel_dir_key, ctx)
.await
.map_err(|e| RelationError::Other(e.into()))?;
let val = self.sparse_get(sparse_rel_dir_key, ctx).await?;
let val = RelDirExists::decode_option(val)
.map_err(|_| RelationError::Other(anyhow::anyhow!("invalid reldir key")))?;
.map_err(|_| WalIngestErrorKind::InvalidRelDirKey(sparse_rel_dir_key))?;
if val == RelDirExists::Exists {
return Err(RelationError::AlreadyExists);
Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
}
self.put(
sparse_rel_dir_key,
@@ -2039,9 +2009,7 @@ impl DatadirModification<'_> {
// will be key not found errors if we don't create an empty one for rel_size_v2.
self.put(
rel_dir_key,
Value::Image(Bytes::from(
RelDirectory::ser(&RelDirectory::default()).context("serialize")?,
)),
Value::Image(Bytes::from(RelDirectory::ser(&RelDirectory::default())?)),
);
}
self.pending_directory_entries
@@ -2049,7 +2017,7 @@ impl DatadirModification<'_> {
} else {
// Add the new relation to the rel directory entry, and write it back
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
return Err(RelationError::AlreadyExists);
Err(WalIngestErrorKind::RelationAlreadyExists(rel))?;
}
if !dbdir_exists {
self.pending_directory_entries
@@ -2059,9 +2027,7 @@ impl DatadirModification<'_> {
.push((DirectoryKind::Rel, MetricsUpdate::Add(1)));
self.put(
rel_dir_key,
Value::Image(Bytes::from(
RelDirectory::ser(&rel_dir).context("serialize")?,
)),
Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)),
);
}
@@ -2086,8 +2052,8 @@ impl DatadirModification<'_> {
rel: RelTag,
nblocks: BlockNumber,
ctx: &RequestContext,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
) -> Result<(), WalIngestError> {
ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
if self
.tline
.get_rel_exists(rel, Version::Modified(self), ctx)
@@ -2117,8 +2083,8 @@ impl DatadirModification<'_> {
rel: RelTag,
nblocks: BlockNumber,
ctx: &RequestContext,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
) -> Result<(), WalIngestError> {
ensure_walingest!(rel.relnode != 0, RelationError::InvalidRelnode);
// Put size
let size_key = rel_size_to_key(rel);
@@ -2142,8 +2108,10 @@ impl DatadirModification<'_> {
&mut self,
drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let v2_enabled = self.maybe_enable_rel_size_v2()?;
) -> Result<(), WalIngestError> {
let v2_enabled = self
.maybe_enable_rel_size_v2()
.map_err(WalIngestErrorKind::MaybeRelSizeV2Error)?;
for ((spc_node, db_node), rel_tags) in drop_relations {
let dir_key = rel_dir_to_key(spc_node, db_node);
let buf = self.get(dir_key, ctx).await?;
@@ -2163,7 +2131,7 @@ impl DatadirModification<'_> {
let key =
rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum);
let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?)
.map_err(|_| RelationError::Other(anyhow::anyhow!("invalid reldir key")))?;
.map_err(|_| WalIngestErrorKind::InvalidKey(key, self.lsn))?;
if val == RelDirExists::Exists {
self.pending_directory_entries
.push((DirectoryKind::RelV2, MetricsUpdate::Sub(1)));
@@ -2206,7 +2174,7 @@ impl DatadirModification<'_> {
segno: u32,
nblocks: BlockNumber,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
assert!(self.tline.tenant_shard_id.is_shard_zero());
// Add it to the directory entry
@@ -2215,7 +2183,7 @@ impl DatadirModification<'_> {
let mut dir = SlruSegmentDirectory::des(&buf)?;
if !dir.segments.insert(segno) {
anyhow::bail!("slru segment {kind:?}/{segno} already exists");
Err(WalIngestErrorKind::SlruAlreadyExists(kind, segno))?;
}
self.pending_directory_entries.push((
DirectoryKind::SlruSegment(kind),
@@ -2242,7 +2210,7 @@ impl DatadirModification<'_> {
kind: SlruKind,
segno: u32,
nblocks: BlockNumber,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
assert!(self.tline.tenant_shard_id.is_shard_zero());
// Put size
@@ -2258,7 +2226,7 @@ impl DatadirModification<'_> {
kind: SlruKind,
segno: u32,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
// Remove it from the directory entry
let dir_key = slru_dir_to_key(kind);
let buf = self.get(dir_key, ctx).await?;
@@ -2283,7 +2251,7 @@ impl DatadirModification<'_> {
}
/// Drop a relmapper file (pg_filenode.map)
pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> anyhow::Result<()> {
pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> Result<(), WalIngestError> {
// TODO
Ok(())
}
@@ -2293,7 +2261,7 @@ impl DatadirModification<'_> {
&mut self,
xid: u64,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
// Remove it from the directory entry
let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
let newdirbuf = if self.tline.pg_version >= 17 {
@@ -2308,7 +2276,8 @@ impl DatadirModification<'_> {
));
Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
} else {
let xid: u32 = u32::try_from(xid)?;
let xid: u32 = u32::try_from(xid)
.map_err(|e| WalIngestErrorKind::LogicalError(anyhow::Error::from(e)))?;
let mut dir = TwoPhaseDirectory::des(&buf)?;
if !dir.xids.remove(&xid) {
@@ -2333,7 +2302,7 @@ impl DatadirModification<'_> {
path: &str,
content: &[u8],
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let key = aux_file::encode_aux_file_key(path);
// retrieve the key from the engine
let old_val = match self.get(key, ctx).await {
@@ -2342,7 +2311,7 @@ impl DatadirModification<'_> {
Err(e) => return Err(e.into()),
};
let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
aux_file::decode_file_value(old_val)?
aux_file::decode_file_value(old_val).map_err(WalIngestErrorKind::EncodeAuxFileError)?
} else {
Vec::new()
};
@@ -2387,7 +2356,8 @@ impl DatadirModification<'_> {
}
(None, true) => warn!("removing non-existing aux file: {}", path),
}
let new_val = aux_file::encode_file_value(&new_files)?;
let new_val = aux_file::encode_file_value(&new_files)
.map_err(WalIngestErrorKind::EncodeAuxFileError)?;
self.put(key, Value::Image(new_val.into()));
Ok(())

View File

@@ -5948,7 +5948,7 @@ mod tests {
use timeline::InMemoryLayerTestDesc;
#[cfg(feature = "testing")]
use timeline::compaction::{KeyHistoryRetention, KeyLogAtLsn};
use timeline::{CompactOptions, DeltaLayerTestDesc};
use timeline::{CompactOptions, DeltaLayerTestDesc, VersionedKeySpaceQuery};
use utils::id::TenantId;
use super::*;
@@ -6786,10 +6786,11 @@ mod tests {
for read in reads {
info!("Doing vectored read on {:?}", read);
let query = VersionedKeySpaceQuery::uniform(read.clone(), reads_lsn);
let vectored_res = tline
.get_vectored_impl(
read.clone(),
reads_lsn,
query,
&mut ValuesReconstructState::new(io_concurrency.clone()),
&ctx,
)
@@ -6868,10 +6869,11 @@ mod tests {
};
let read_lsn = child_timeline.get_last_record_lsn();
let query = VersionedKeySpaceQuery::uniform(aux_keyspace.clone(), read_lsn);
let vectored_res = child_timeline
.get_vectored_impl(
aux_keyspace.clone(),
read_lsn,
query,
&mut ValuesReconstructState::new(io_concurrency.clone()),
&ctx,
)
@@ -7017,10 +7019,12 @@ mod tests {
let read = KeySpace {
ranges: vec![key_near_gap..gap_at_key.next(), key_near_end..current_key],
};
let query = VersionedKeySpaceQuery::uniform(read.clone(), current_lsn);
let results = child_timeline
.get_vectored_impl(
read.clone(),
current_lsn,
query,
&mut ValuesReconstructState::new(io_concurrency.clone()),
&ctx,
)
@@ -7151,12 +7155,16 @@ mod tests {
}
for query_lsn in query_lsns {
let query = VersionedKeySpaceQuery::uniform(
KeySpace {
ranges: vec![child_gap_at_key..child_gap_at_key.next()],
},
query_lsn,
);
let results = child_timeline
.get_vectored_impl(
KeySpace {
ranges: vec![child_gap_at_key..child_gap_at_key.next()],
},
query_lsn,
query,
&mut ValuesReconstructState::new(io_concurrency.clone()),
&ctx,
)
@@ -7655,10 +7663,11 @@ mod tests {
}
let mut cnt = 0;
let query = VersionedKeySpaceQuery::uniform(keyspace.clone(), lsn);
for (key, value) in tline
.get_vectored_impl(
keyspace.clone(),
lsn,
query,
&mut ValuesReconstructState::new(io_concurrency.clone()),
&ctx,
)
@@ -7865,8 +7874,9 @@ mod tests {
io_concurrency: IoConcurrency,
) -> anyhow::Result<(BTreeMap<Key, Result<Bytes, PageReconstructError>>, usize)> {
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);
let query = VersionedKeySpaceQuery::uniform(keyspace.clone(), lsn);
let res = tline
.get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx)
.get_vectored_impl(query, &mut reconstruct_state, ctx)
.await?;
Ok((res, reconstruct_state.get_delta_layers_visited() as usize))
}
@@ -8163,13 +8173,10 @@ mod tests {
// test vectored scan on parent timeline
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency.clone());
let query =
VersionedKeySpaceQuery::uniform(KeySpace::single(Key::metadata_key_range()), lsn);
let res = tline
.get_vectored_impl(
KeySpace::single(Key::metadata_key_range()),
lsn,
&mut reconstruct_state,
&ctx,
)
.get_vectored_impl(query, &mut reconstruct_state, &ctx)
.await?;
assert_eq!(
@@ -8189,13 +8196,10 @@ mod tests {
// test vectored scan on child timeline
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency.clone());
let query =
VersionedKeySpaceQuery::uniform(KeySpace::single(Key::metadata_key_range()), lsn);
let res = child
.get_vectored_impl(
KeySpace::single(Key::metadata_key_range()),
lsn,
&mut reconstruct_state,
&ctx,
)
.get_vectored_impl(query, &mut reconstruct_state, &ctx)
.await?;
assert_eq!(
@@ -8229,13 +8233,9 @@ mod tests {
let io_concurrency =
IoConcurrency::spawn_from_conf(tline.conf, tline.gate.enter().unwrap());
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);
let query = VersionedKeySpaceQuery::uniform(KeySpace::single(key..key.next()), lsn);
let mut res = tline
.get_vectored_impl(
KeySpace::single(key..key.next()),
lsn,
&mut reconstruct_state,
ctx,
)
.get_vectored_impl(query, &mut reconstruct_state, ctx)
.await?;
Ok(res.pop_last().map(|(k, v)| {
assert_eq!(k, key);
@@ -9257,6 +9257,7 @@ mod tests {
&[Lsn(0x20), Lsn(0x40), Lsn(0x50)],
3,
None,
true,
)
.await
.unwrap();
@@ -9381,7 +9382,15 @@ mod tests {
),
];
let res = tline
.generate_key_retention(key, &history, Lsn(0x60), &[Lsn(0x40), Lsn(0x50)], 3, None)
.generate_key_retention(
key,
&history,
Lsn(0x60),
&[Lsn(0x40), Lsn(0x50)],
3,
None,
true,
)
.await
.unwrap();
let expected_res = KeyHistoryRetention {
@@ -9460,6 +9469,7 @@ mod tests {
&[],
3,
Some((key, Lsn(0x10), Bytes::copy_from_slice(b"0x10"))),
true,
)
.await
.unwrap();
@@ -9508,6 +9518,7 @@ mod tests {
&[Lsn(0x30)],
3,
Some((key, Lsn(0x10), Bytes::copy_from_slice(b"0x10"))),
true,
)
.await
.unwrap();
@@ -10358,14 +10369,13 @@ mod tests {
)
.await?;
let keyspace = KeySpace::single(get_key(0)..get_key(10));
let query = VersionedKeySpaceQuery::uniform(
KeySpace::single(get_key(0)..get_key(10)),
delta_layer_end_lsn,
);
let results = tline
.get_vectored(
keyspace,
delta_layer_end_lsn,
IoConcurrency::sequential(),
&ctx,
)
.get_vectored(query, IoConcurrency::sequential(), &ctx)
.await
.expect("No vectored errors");
for (key, res) in results {
@@ -10513,9 +10523,13 @@ mod tests {
)
.await?;
let keyspace = KeySpace::single(get_key(0)..get_key(10));
let query = VersionedKeySpaceQuery::uniform(
KeySpace::single(get_key(0)..get_key(10)),
last_record_lsn,
);
let results = tline
.get_vectored(keyspace, last_record_lsn, IoConcurrency::sequential(), &ctx)
.get_vectored(query, IoConcurrency::sequential(), &ctx)
.await
.expect("No vectored errors");
for (key, res) in results {

View File

@@ -715,13 +715,34 @@ pub(crate) enum LayerId {
}
/// Uniquely identify a layer visit by the layer
/// and LSN floor (or start LSN) of the reads.
/// The layer itself is not enough since we may
/// have different LSN lower bounds for delta layer reads.
/// and LSN range of the reads. Note that the end of the range is exclusive.
///
/// The layer itself is not enough since we may have different LSN lower
/// bounds for delta layer reads. Scenarios where this can happen are:
///
/// 1. Layer overlaps: imagine an image layer inside and in-memory layer
/// and a query that only partially hits the image layer. Part of the query
/// needs to read the whole in-memory layer and the other part needs to read
/// only up to the image layer. Hence, they'll have different LSN floor values
/// for the read.
///
/// 2. Scattered reads: the read path supports starting at different LSNs. Imagine
/// The start LSN for one range is inside a layer and the start LSN for another range
/// Is above the layer (includes all of it). Both ranges need to read the layer all the
/// Way to the end but starting at different points. Hence, they'll have different LSN
/// Ceil values.
///
/// The implication is that we might visit the same layer multiple times
/// in order to read different LSN ranges from it. In practice, this isn't very concerning
/// because:
/// 1. Layer overlaps are rare and generally not intended
/// 2. Scattered reads will stabilise after the first few layers provided their starting LSNs
/// are grouped tightly enough (likely the case).
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
struct LayerToVisitId {
layer_id: LayerId,
lsn_floor: Lsn,
lsn_ceil: Lsn,
}
#[derive(Debug, PartialEq, Eq, Hash)]
@@ -805,6 +826,7 @@ impl LayerFringe {
let layer_to_visit_id = LayerToVisitId {
layer_id: layer.id(),
lsn_floor: lsn_range.start,
lsn_ceil: lsn_range.end,
};
let entry = self.visit_reads.entry(layer_to_visit_id.clone());

View File

@@ -585,7 +585,7 @@ pub(crate) enum PageReconstructError {
WalRedo(anyhow::Error),
#[error("{0}")]
MissingKey(MissingKeyError),
MissingKey(Box<MissingKeyError>),
}
impl From<anyhow::Error> for PageReconstructError {
@@ -690,16 +690,23 @@ impl std::fmt::Display for ReadPath {
#[derive(thiserror::Error)]
pub struct MissingKeyError {
key: Key,
keyspace: KeySpace,
shard: ShardNumber,
cont_lsn: Lsn,
request_lsn: Lsn,
query: Option<VersionedKeySpaceQuery>,
// This is largest request LSN from the get page request batch
original_hwm_lsn: Lsn,
ancestor_lsn: Option<Lsn>,
/// Debug information about the read path if there's an error
read_path: Option<ReadPath>,
backtrace: Option<std::backtrace::Backtrace>,
}
impl MissingKeyError {
fn enrich(&mut self, query: VersionedKeySpaceQuery) {
self.query = Some(query);
}
}
impl std::fmt::Debug for MissingKeyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self)
@@ -710,14 +717,18 @@ impl std::fmt::Display for MissingKeyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"could not find data for key {} (shard {:?}) at LSN {}, request LSN {}",
self.key, self.shard, self.cont_lsn, self.request_lsn
"could not find data for key {} (shard {:?}), original HWM LSN {}",
self.keyspace, self.shard, self.original_hwm_lsn
)?;
if let Some(ref ancestor_lsn) = self.ancestor_lsn {
write!(f, ", ancestor {}", ancestor_lsn)?;
}
if let Some(ref query) = self.query {
write!(f, ", query {}", query)?;
}
if let Some(ref read_path) = self.read_path {
write!(f, "\n{}", read_path)?;
}
@@ -817,7 +828,7 @@ pub(crate) enum GetVectoredError {
InvalidLsn(Lsn),
#[error("requested key not found: {0}")]
MissingKey(MissingKeyError),
MissingKey(Box<MissingKeyError>),
#[error("ancestry walk")]
GetReadyAncestorError(#[source] GetReadyAncestorError),
@@ -928,7 +939,7 @@ impl std::fmt::Debug for Timeline {
}
}
#[derive(thiserror::Error, Debug)]
#[derive(thiserror::Error, Debug, Clone)]
pub(crate) enum WaitLsnError {
// Called on a timeline which is shutting down
#[error("Shutdown")]
@@ -1128,14 +1139,12 @@ impl Timeline {
// page_service.
debug_assert!(!self.shard_identity.is_key_disposable(&key));
let keyspace = KeySpace {
ranges: vec![key..key.next()],
};
let mut reconstruct_state = ValuesReconstructState::new(IoConcurrency::sequential());
let query = VersionedKeySpaceQuery::uniform(KeySpace::single(key..key.next()), lsn);
let vectored_res = self
.get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx)
.get_vectored_impl(query, &mut reconstruct_state, ctx)
.await;
let key_value = vectored_res?.pop_first();
@@ -1153,15 +1162,17 @@ impl Timeline {
value
}
}
None => Err(PageReconstructError::MissingKey(MissingKeyError {
key,
shard: self.shard_identity.get_shard_number(&key),
cont_lsn: Lsn(0),
request_lsn: lsn,
ancestor_lsn: None,
backtrace: None,
read_path: None,
})),
None => Err(PageReconstructError::MissingKey(Box::new(
MissingKeyError {
keyspace: KeySpace::single(key..key.next()),
shard: self.shard_identity.get_shard_number(&key),
original_hwm_lsn: lsn,
ancestor_lsn: None,
backtrace: None,
read_path: None,
query: None,
},
))),
}
}
@@ -1174,21 +1185,18 @@ impl Timeline {
/// which actually vectorizes the read path.
pub(crate) async fn get_vectored(
&self,
keyspace: KeySpace,
lsn: Lsn,
query: VersionedKeySpaceQuery,
io_concurrency: super::storage_layer::IoConcurrency,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
if !lsn.is_valid() {
return Err(GetVectoredError::InvalidLsn(lsn));
}
let total_keyspace = query.total_keyspace();
let key_count = keyspace.total_raw_size().try_into().unwrap();
let key_count = total_keyspace.total_raw_size().try_into().unwrap();
if key_count > Timeline::MAX_GET_VECTORED_KEYS {
return Err(GetVectoredError::Oversized(key_count));
}
for range in &keyspace.ranges {
for range in &total_keyspace.ranges {
let mut key = range.start;
while key != range.end {
assert!(!self.shard_identity.is_key_disposable(&key));
@@ -1197,9 +1205,8 @@ impl Timeline {
}
trace!(
"get vectored request for {:?}@{} from task kind {:?}",
keyspace,
lsn,
"get vectored query {} from task kind {:?}",
query,
ctx.task_kind(),
);
@@ -1208,12 +1215,7 @@ impl Timeline {
.map(|metric| (metric, Instant::now()));
let res = self
.get_vectored_impl(
keyspace.clone(),
lsn,
&mut ValuesReconstructState::new(io_concurrency),
ctx,
)
.get_vectored_impl(query, &mut ValuesReconstructState::new(io_concurrency), ctx)
.await;
if let Some((metric, start)) = start {
@@ -1264,13 +1266,10 @@ impl Timeline {
.for_task_kind(ctx.task_kind())
.map(ScanLatencyOngoingRecording::start_recording);
let query = VersionedKeySpaceQuery::uniform(keyspace, lsn);
let vectored_res = self
.get_vectored_impl(
keyspace.clone(),
lsn,
&mut ValuesReconstructState::new(io_concurrency),
ctx,
)
.get_vectored_impl(query, &mut ValuesReconstructState::new(io_concurrency), ctx)
.await;
if let Some(recording) = start {
@@ -1282,16 +1281,19 @@ impl Timeline {
pub(super) async fn get_vectored_impl(
&self,
keyspace: KeySpace,
lsn: Lsn,
query: VersionedKeySpaceQuery,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
let read_path = if self.conf.enable_read_path_debugging || ctx.read_path_debug() {
Some(ReadPath::new(keyspace.clone(), lsn))
Some(ReadPath::new(
query.total_keyspace(),
query.high_watermark_lsn()?,
))
} else {
None
};
reconstruct_state.read_path = read_path;
let redo_attempt_type = if ctx.task_kind() == TaskKind::Compaction {
@@ -1311,7 +1313,7 @@ impl Timeline {
})
.attached_child();
self.get_vectored_reconstruct_data(keyspace.clone(), lsn, reconstruct_state, &ctx)
self.get_vectored_reconstruct_data(query.clone(), reconstruct_state, &ctx)
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
.await
};
@@ -1324,6 +1326,13 @@ impl Timeline {
.map(|state| state.collect_pending_ios())
.collect::<FuturesUnordered<_>>();
while collect_futs.next().await.is_some() {}
// Enrich the missing key error with the original query.
if let GetVectoredError::MissingKey(mut missing_err) = err {
missing_err.enrich(query.clone());
return Err(GetVectoredError::MissingKey(missing_err));
}
return Err(err);
};
@@ -1341,6 +1350,8 @@ impl Timeline {
let futs = FuturesUnordered::new();
for (key, state) in std::mem::take(&mut reconstruct_state.keys) {
let req_lsn_for_key = query.map_key_to_lsn(&key);
futs.push({
let walredo_self = self.myself.upgrade().expect("&self method holds the arc");
let ctx = RequestContextBuilder::from(&ctx)
@@ -1387,7 +1398,7 @@ impl Timeline {
let walredo_deltas = converted.num_deltas();
let walredo_res = walredo_self
.reconstruct_value(key, lsn, converted, redo_attempt_type)
.reconstruct_value(key, req_lsn_for_key, converted, redo_attempt_type)
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
@@ -1414,15 +1425,18 @@ impl Timeline {
// to avoid infinite results.
if !results.is_empty() {
if layers_visited >= Self::LAYERS_VISITED_WARN_THRESHOLD {
let total_keyspace = query.total_keyspace();
let max_request_lsn = query.high_watermark_lsn().expect("Validated previously");
static LOG_PACER: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(60))));
LOG_PACER.lock().unwrap().call(|| {
let num_keys = keyspace.total_raw_size();
let num_keys = total_keyspace.total_raw_size();
let num_pages = results.len();
tracing::info!(
shard_id = %self.tenant_shard_id.shard_slug(),
lsn = %lsn,
"Vectored read for {keyspace} visited {layers_visited} layers. Returned {num_pages}/{num_keys} pages.",
lsn = %max_request_lsn,
"Vectored read for {total_keyspace} visited {layers_visited} layers. Returned {num_pages}/{num_keys} pages.",
);
});
}
@@ -2723,6 +2737,10 @@ impl Timeline {
.tenant_conf
.gc_compaction_enabled
.unwrap_or(self.conf.default_tenant_conf.gc_compaction_enabled);
let gc_compaction_verification = tenant_conf
.tenant_conf
.gc_compaction_verification
.unwrap_or(self.conf.default_tenant_conf.gc_compaction_verification);
let gc_compaction_initial_threshold_kb = tenant_conf
.tenant_conf
.gc_compaction_initial_threshold_kb
@@ -2737,6 +2755,7 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.gc_compaction_ratio_percent);
GcCompactionCombinedSettings {
gc_compaction_enabled,
gc_compaction_verification,
gc_compaction_initial_threshold_kb,
gc_compaction_ratio_percent,
}
@@ -3935,6 +3954,154 @@ impl Timeline {
}
}
#[derive(Clone)]
/// Type representing a query in the ([`Lsn`], [`Key`]) space.
/// In other words, a set of segments in a 2D space.
///
/// This representation has the advatange of avoiding hash map
/// allocations for uniform queries.
pub(crate) enum VersionedKeySpaceQuery {
/// Variant for queries at a single [`Lsn`]
Uniform { keyspace: KeySpace, lsn: Lsn },
/// Variant for queries at multiple [`Lsn`]s
Scattered {
keyspaces_at_lsn: Vec<(Lsn, KeySpace)>,
},
}
impl VersionedKeySpaceQuery {
pub(crate) fn uniform(keyspace: KeySpace, lsn: Lsn) -> Self {
Self::Uniform { keyspace, lsn }
}
pub(crate) fn scattered(keyspaces_at_lsn: Vec<(Lsn, KeySpace)>) -> Self {
Self::Scattered { keyspaces_at_lsn }
}
/// Returns the most recent (largest) LSN included in the query.
/// If any of the LSNs included in the query are invalid, returns
/// an error instead.
fn high_watermark_lsn(&self) -> Result<Lsn, GetVectoredError> {
match self {
Self::Uniform { lsn, .. } => {
if !lsn.is_valid() {
return Err(GetVectoredError::InvalidLsn(*lsn));
}
Ok(*lsn)
}
Self::Scattered { keyspaces_at_lsn } => {
let mut max_lsn = None;
for (lsn, _keyspace) in keyspaces_at_lsn.iter() {
if !lsn.is_valid() {
return Err(GetVectoredError::InvalidLsn(*lsn));
}
max_lsn = std::cmp::max(max_lsn, Some(lsn));
}
if let Some(computed) = max_lsn {
Ok(*computed)
} else {
Err(GetVectoredError::Other(anyhow!("empty input")))
}
}
}
}
/// Returns the total keyspace being queried: the result of projecting
/// everything in the key dimensions onto the key axis.
fn total_keyspace(&self) -> KeySpace {
match self {
Self::Uniform { keyspace, .. } => keyspace.clone(),
Self::Scattered { keyspaces_at_lsn } => keyspaces_at_lsn
.iter()
.map(|(_lsn, keyspace)| keyspace)
.fold(KeySpace::default(), |mut acc, v| {
acc.merge(v);
acc
}),
}
}
/// Returns LSN for a specific key.
///
/// Invariant: requested key must be part of [`Self::total_keyspace`]
fn map_key_to_lsn(&self, key: &Key) -> Lsn {
match self {
Self::Uniform { lsn, .. } => *lsn,
Self::Scattered { keyspaces_at_lsn } => {
keyspaces_at_lsn
.iter()
.find(|(_lsn, keyspace)| keyspace.contains(key))
.expect("Returned key was requested")
.0
}
}
}
/// Remove any parts of the query (segments) which overlap with the provided
/// key space (also segments).
fn remove_overlapping_with(&mut self, to_remove: &KeySpace) -> KeySpace {
match self {
Self::Uniform { keyspace, .. } => keyspace.remove_overlapping_with(to_remove),
Self::Scattered { keyspaces_at_lsn } => {
let mut removed_accum = KeySpaceRandomAccum::new();
keyspaces_at_lsn.iter_mut().for_each(|(_lsn, keyspace)| {
let removed = keyspace.remove_overlapping_with(to_remove);
removed_accum.add_keyspace(removed);
});
removed_accum.to_keyspace()
}
}
}
fn is_empty(&self) -> bool {
match self {
Self::Uniform { keyspace, .. } => keyspace.is_empty(),
Self::Scattered { keyspaces_at_lsn } => keyspaces_at_lsn
.iter()
.all(|(_lsn, keyspace)| keyspace.is_empty()),
}
}
/// "Lower" the query on the LSN dimension
fn lower(&mut self, to: Lsn) {
match self {
Self::Uniform { lsn, .. } => {
// If the originally requested LSN is smaller than the starting
// LSN of the ancestor we are descending into, we need to respect that.
// Hence the min.
*lsn = std::cmp::min(*lsn, to);
}
Self::Scattered { keyspaces_at_lsn } => {
keyspaces_at_lsn.iter_mut().for_each(|(lsn, _keyspace)| {
*lsn = std::cmp::min(*lsn, to);
});
}
}
}
}
impl std::fmt::Display for VersionedKeySpaceQuery {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[")?;
match self {
VersionedKeySpaceQuery::Uniform { keyspace, lsn } => {
write!(f, "{keyspace} @ {lsn}")?;
}
VersionedKeySpaceQuery::Scattered { keyspaces_at_lsn } => {
for (lsn, keyspace) in keyspaces_at_lsn.iter() {
write!(f, "{keyspace} @ {lsn},")?;
}
}
}
write!(f, "]")
}
}
impl Timeline {
#[allow(clippy::doc_lazy_continuation)]
/// Get the data needed to reconstruct all keys in the provided keyspace
@@ -3949,16 +4116,15 @@ impl Timeline {
/// 2.4. If the fringe is empty, go back to 1
async fn get_vectored_reconstruct_data(
&self,
mut keyspace: KeySpace,
request_lsn: Lsn,
mut query: VersionedKeySpaceQuery,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let original_hwm_lsn = query.high_watermark_lsn().unwrap();
let mut timeline_owned: Arc<Timeline>;
let mut timeline = self;
let mut cont_lsn = Lsn(request_lsn.0 + 1);
let missing_keyspace = loop {
if self.cancel.is_cancelled() {
return Err(GetVectoredError::Cancelled);
@@ -3975,15 +4141,14 @@ impl Timeline {
parent: crnt_perf_span,
"PLAN_IO_TIMELINE",
timeline = %timeline.timeline_id,
lsn = %cont_lsn,
high_watermark_lsn = %query.high_watermark_lsn().unwrap(),
)
})
.attached_child();
Self::get_vectored_reconstruct_data_timeline(
timeline,
keyspace.clone(),
cont_lsn,
&query,
reconstruct_state,
&self.cancel,
&ctx,
@@ -3992,23 +4157,23 @@ impl Timeline {
.await?
};
keyspace.remove_overlapping_with(&completed);
query.remove_overlapping_with(&completed);
// Do not descend into the ancestor timeline for aux files.
// We don't return a blanket [`GetVectoredError::MissingKey`] to avoid
// stalling compaction.
keyspace.remove_overlapping_with(&KeySpace {
query.remove_overlapping_with(&KeySpace {
ranges: vec![NON_INHERITED_RANGE, Key::sparse_non_inherited_keyspace()],
});
// Keyspace is fully retrieved
if keyspace.is_empty() {
if query.is_empty() {
break None;
}
let Some(ancestor_timeline) = timeline.ancestor_timeline.as_ref() else {
// Not fully retrieved but no ancestor timeline.
break Some(keyspace);
break Some(query.total_keyspace());
};
// Now we see if there are keys covered by the image layer but does not exist in the
@@ -4019,7 +4184,7 @@ impl Timeline {
// keys from `keyspace`, we expect there to be no overlap between it and the image covered key
// space. If that's not the case, we had at least one key encounter a gap in the image layer
// and stop the search as a result of that.
let mut removed = keyspace.remove_overlapping_with(&image_covered_keyspace);
let mut removed = query.remove_overlapping_with(&image_covered_keyspace);
// Do not fire missing key error and end early for sparse keys. Note that we hava already removed
// non-inherited keyspaces before, so we can safely do a full `SPARSE_RANGE` remove instead of
// figuring out what is the inherited key range and do a fine-grained pruning.
@@ -4029,11 +4194,11 @@ impl Timeline {
if !removed.is_empty() {
break Some(removed);
}
// If we reached this point, `remove_overlapping_with` should not have made any change to the
// keyspace.
// Take the min to avoid reconstructing a page with data newer than request Lsn.
cont_lsn = std::cmp::min(Lsn(request_lsn.0 + 1), Lsn(timeline.ancestor_lsn.0 + 1));
// Each key range in the original query is at some point in the LSN space.
// When descending into the ancestor, lower all ranges in the LSN space
// such that new changes on the parent timeline are not visible.
query.lower(timeline.ancestor_lsn);
let ctx = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
@@ -4042,7 +4207,6 @@ impl Timeline {
parent: crnt_perf_span,
"GET_ANCESTOR",
timeline = %timeline.timeline_id,
lsn = %cont_lsn,
ancestor = %ancestor_timeline.timeline_id,
ancestor_lsn = %timeline.ancestor_lsn
)
@@ -4072,22 +4236,47 @@ impl Timeline {
};
if let Some(missing_keyspace) = missing_keyspace {
return Err(GetVectoredError::MissingKey(MissingKeyError {
key: missing_keyspace.start().unwrap(), /* better if we can store the full keyspace */
shard: self
.shard_identity
.get_shard_number(&missing_keyspace.start().unwrap()),
cont_lsn,
request_lsn,
return Err(GetVectoredError::MissingKey(Box::new(MissingKeyError {
keyspace: missing_keyspace, /* better if we can store the full keyspace */
shard: self.shard_identity.number,
original_hwm_lsn,
ancestor_lsn: Some(timeline.ancestor_lsn),
backtrace: None,
read_path: std::mem::take(&mut reconstruct_state.read_path),
}));
query: None,
})));
}
Ok(())
}
async fn get_vectored_init_fringe(
&self,
query: &VersionedKeySpaceQuery,
) -> Result<LayerFringe, GetVectoredError> {
let mut fringe = LayerFringe::new();
let guard = self.layers.read().await;
match query {
VersionedKeySpaceQuery::Uniform { keyspace, lsn } => {
// LSNs requested by the compute or determined by the pageserver
// are inclusive. Queries to the layer map use exclusive LSNs.
// Hence, bump the value before the query - same in the other
// match arm.
let cont_lsn = Lsn(lsn.0 + 1);
guard.update_search_fringe(keyspace, cont_lsn, &mut fringe)?;
}
VersionedKeySpaceQuery::Scattered { keyspaces_at_lsn } => {
for (lsn, keyspace) in keyspaces_at_lsn.iter() {
let cont_lsn_for_keyspace = Lsn(lsn.0 + 1);
guard.update_search_fringe(keyspace, cont_lsn_for_keyspace, &mut fringe)?;
}
}
}
Ok(fringe)
}
/// Collect the reconstruct data for a keyspace from the specified timeline.
///
/// Maintain a fringe [`LayerFringe`] which tracks all the layers that intersect
@@ -4106,8 +4295,7 @@ impl Timeline {
/// decides how to deal with these two keyspaces.
async fn get_vectored_reconstruct_data_timeline(
timeline: &Timeline,
keyspace: KeySpace,
mut cont_lsn: Lsn,
query: &VersionedKeySpaceQuery,
reconstruct_state: &mut ValuesReconstructState,
cancel: &CancellationToken,
ctx: &RequestContext,
@@ -4123,14 +4311,7 @@ impl Timeline {
let _guard = timeline.gc_compaction_layer_update_lock.read().await;
// Initialize the fringe
let mut fringe = {
let mut fringe = LayerFringe::new();
let guard = timeline.layers.read().await;
guard.update_search_fringe(&keyspace, cont_lsn, &mut fringe)?;
fringe
};
let mut fringe = timeline.get_vectored_init_fringe(query).await?;
let mut completed_keyspace = KeySpace::default();
let mut image_covered_keyspace = KeySpaceRandomAccum::new();
@@ -4156,7 +4337,7 @@ impl Timeline {
.await?;
let mut unmapped_keyspace = keyspace_to_read;
cont_lsn = next_cont_lsn;
let cont_lsn = next_cont_lsn;
reconstruct_state.on_layer_visited(&layer_to_read);
@@ -4991,13 +5172,11 @@ impl Timeline {
if key_request_accum.raw_size() >= Timeline::MAX_GET_VECTORED_KEYS
|| (last_key_in_range && key_request_accum.raw_size() > 0)
{
let query =
VersionedKeySpaceQuery::uniform(key_request_accum.consume_keyspace(), lsn);
let results = self
.get_vectored(
key_request_accum.consume_keyspace(),
lsn,
io_concurrency.clone(),
ctx,
)
.get_vectored(query, io_concurrency.clone(), ctx)
.await?;
if self.cancel.is_cancelled() {
@@ -5086,7 +5265,11 @@ impl Timeline {
// Directly use `get_vectored_impl` to skip the max_vectored_read_key limit check. Note that the keyspace should
// not contain too many keys, otherwise this takes a lot of memory.
let data = self
.get_vectored_impl(partition.clone(), lsn, &mut reconstruct_state, ctx)
.get_vectored_impl(
VersionedKeySpaceQuery::uniform(partition.clone(), lsn),
&mut reconstruct_state,
ctx,
)
.await?;
let (data, total_kb_retrieved, total_keys_retrieved) = {
let mut new_data = BTreeMap::new();

View File

@@ -80,6 +80,7 @@ impl std::fmt::Display for GcCompactionJobId {
pub struct GcCompactionCombinedSettings {
pub gc_compaction_enabled: bool,
pub gc_compaction_verification: bool,
pub gc_compaction_initial_threshold_kb: u64,
pub gc_compaction_ratio_percent: u64,
}
@@ -225,6 +226,7 @@ impl GcCompactionQueue {
gc_compaction_enabled,
gc_compaction_initial_threshold_kb,
gc_compaction_ratio_percent,
..
} = timeline.get_gc_compaction_settings();
if !gc_compaction_enabled {
return Ok(());
@@ -788,6 +790,114 @@ impl KeyHistoryRetention {
}
Ok(())
}
/// Verify if every key in the retention is readable by replaying the logs.
async fn verify(
&self,
key: Key,
base_img_from_ancestor: &Option<(Key, Lsn, Bytes)>,
full_history: &[(Key, Lsn, Value)],
tline: &Arc<Timeline>,
) -> anyhow::Result<()> {
// Usually the min_lsn should be the first record but we do a full iteration to be safe.
let Some(min_lsn) = full_history.iter().map(|(_, lsn, _)| *lsn).min() else {
// This should never happen b/c if we don't have any history of a key, we won't even do `generate_key_retention`.
return Ok(());
};
let Some(max_lsn) = full_history.iter().map(|(_, lsn, _)| *lsn).max() else {
// This should never happen b/c if we don't have any history of a key, we won't even do `generate_key_retention`.
return Ok(());
};
let mut base_img = base_img_from_ancestor
.as_ref()
.map(|(_, lsn, img)| (*lsn, img));
let mut history = Vec::new();
async fn collect_and_verify(
key: Key,
lsn: Lsn,
base_img: &Option<(Lsn, &Bytes)>,
history: &[(Lsn, &NeonWalRecord)],
tline: &Arc<Timeline>,
) -> anyhow::Result<()> {
let mut records = history
.iter()
.map(|(lsn, val)| (*lsn, (*val).clone()))
.collect::<Vec<_>>();
// WAL redo requires records in the reverse LSN order
records.reverse();
let data = ValueReconstructState {
img: base_img.as_ref().map(|(lsn, img)| (*lsn, (*img).clone())),
records,
};
tline
.reconstruct_value(key, lsn, data, RedoAttemptType::GcCompaction)
.await
.with_context(|| format!("verification failed for key {} at lsn {}", key, lsn))?;
Ok(())
}
for (retain_lsn, KeyLogAtLsn(logs)) in &self.below_horizon {
for (lsn, val) in logs {
match val {
Value::Image(img) => {
base_img = Some((*lsn, img));
history.clear();
}
Value::WalRecord(rec) if val.will_init() => {
base_img = None;
history.clear();
history.push((*lsn, rec));
}
Value::WalRecord(rec) => {
history.push((*lsn, rec));
}
}
}
if *retain_lsn >= min_lsn {
// Only verify after the key appears in the full history for the first time.
if base_img.is_none() && history.is_empty() {
anyhow::bail!(
"verificatoin failed: key {} has no history at {}",
key,
retain_lsn
);
};
// We don't modify history: in theory, we could replace the history with a single
// image as in `generate_key_retention` to make redos at later LSNs faster. But we
// want to verify everything as if they are read from the real layer map.
collect_and_verify(key, *retain_lsn, &base_img, &history, tline).await?;
}
}
for (lsn, val) in &self.above_horizon.0 {
match val {
Value::Image(img) => {
// Above the GC horizon, we verify every time we see an image.
collect_and_verify(key, *lsn, &base_img, &history, tline).await?;
base_img = Some((*lsn, img));
history.clear();
}
Value::WalRecord(rec) if val.will_init() => {
// Above the GC horizon, we verify every time we see an init record.
collect_and_verify(key, *lsn, &base_img, &history, tline).await?;
base_img = None;
history.clear();
history.push((*lsn, rec));
}
Value::WalRecord(rec) => {
history.push((*lsn, rec));
}
}
}
// Ensure the latest record is readable.
collect_and_verify(key, max_lsn, &base_img, &history, tline).await?;
Ok(())
}
}
#[derive(Debug, Serialize, Default)]
@@ -1119,7 +1229,17 @@ impl Timeline {
// being potentially much longer.
let rewrite_max = partition_count;
self.compact_shard_ancestors(rewrite_max, ctx).await?;
let outcome = self
.compact_shard_ancestors(
rewrite_max,
options.flags.contains(CompactFlags::YieldForL0),
ctx,
)
.await?;
match outcome {
CompactionOutcome::Pending | CompactionOutcome::YieldForL0 => return Ok(outcome),
CompactionOutcome::Done | CompactionOutcome::Skipped => {}
}
}
Ok(CompactionOutcome::Done)
@@ -1136,8 +1256,10 @@ impl Timeline {
async fn compact_shard_ancestors(
self: &Arc<Self>,
rewrite_max: usize,
yield_for_l0: bool,
ctx: &RequestContext,
) -> Result<(), CompactionError> {
) -> Result<CompactionOutcome, CompactionError> {
let mut outcome = CompactionOutcome::Done;
let mut drop_layers = Vec::new();
let mut layers_to_rewrite: Vec<Layer> = Vec::new();
@@ -1148,12 +1270,7 @@ impl Timeline {
// Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
// are rewriting layers.
let latest_gc_cutoff = self.get_applied_gc_cutoff_lsn();
tracing::info!(
"starting shard ancestor compaction, latest_gc_cutoff: {}, pitr cutoff {}",
*latest_gc_cutoff,
self.gc_info.read().unwrap().cutoffs.time
);
let pitr_cutoff = self.gc_info.read().unwrap().cutoffs.time;
let layers = self.layers.read().await;
for layer_desc in layers.layer_map()?.iter_historic_layers() {
@@ -1171,8 +1288,8 @@ impl Timeline {
// This ancestral layer only covers keys that belong to other shards.
// We include the full metadata in the log: if we had some critical bug that caused
// us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
info!(%layer, old_metadata=?layer.metadata(),
"dropping layer after shard split, contains no keys for this shard.",
debug!(%layer, old_metadata=?layer.metadata(),
"dropping layer after shard split, contains no keys for this shard",
);
if cfg!(debug_assertions) {
@@ -1234,19 +1351,35 @@ impl Timeline {
}
if layers_to_rewrite.len() >= rewrite_max {
tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
debug!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
layers_to_rewrite.len()
);
continue;
outcome = CompactionOutcome::Pending;
break;
}
// Fall through: all our conditions for doing a rewrite passed.
layers_to_rewrite.push(layer);
}
// Drop read lock on layer map before we start doing time-consuming I/O
// Drop read lock on layer map before we start doing time-consuming I/O.
drop(layers);
// Drop out early if there's nothing to do.
if layers_to_rewrite.is_empty() && drop_layers.is_empty() {
return Ok(CompactionOutcome::Done);
}
info!(
"starting shard ancestor compaction, rewriting {} layers and dropping {} layers \
(latest_gc_cutoff={} pitr_cutoff={})",
layers_to_rewrite.len(),
drop_layers.len(),
*latest_gc_cutoff,
pitr_cutoff,
);
let started = Instant::now();
let mut replace_image_layers = Vec::new();
for layer in layers_to_rewrite {
@@ -1254,7 +1387,7 @@ impl Timeline {
return Err(CompactionError::ShuttingDown);
}
tracing::info!(layer=%layer, "Rewriting layer after shard split...");
info!(layer=%layer, "rewriting layer after shard split");
let mut image_layer_writer = ImageLayerWriter::new(
self.conf,
self.timeline_id,
@@ -1292,7 +1425,7 @@ impl Timeline {
.map_err(CompactionError::Other)?;
let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
.map_err(CompactionError::Other)?;
tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
info!(layer=%new_layer, "rewrote layer, {} -> {} bytes",
layer.metadata().file_size,
new_layer.metadata().file_size);
@@ -1302,6 +1435,26 @@ impl Timeline {
// the layer has no data for us with the ShardedRange check above, but
drop_layers.push(layer);
}
// Yield for L0 compaction if necessary, but make sure we update the layer map below
// with the work we've already done.
if yield_for_l0
&& self
.l0_compaction_trigger
.notified()
.now_or_never()
.is_some()
{
info!("shard ancestor compaction yielding for L0 compaction");
outcome = CompactionOutcome::YieldForL0;
break;
}
}
for layer in &drop_layers {
info!(%layer, old_metadata=?layer.metadata(),
"dropping layer after shard split (no keys for this shard)",
);
}
// At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
@@ -1319,17 +1472,36 @@ impl Timeline {
// necessary for correctness, but it simplifies testing, and avoids proceeding with another
// Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
// load.
match self.remote_client.wait_completion().await {
Ok(()) => (),
Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
return Err(CompactionError::ShuttingDown);
if outcome != CompactionOutcome::YieldForL0 {
info!("shard ancestor compaction waiting for uploads");
tokio::select! {
result = self.remote_client.wait_completion() => match result {
Ok(()) => {},
Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
return Err(CompactionError::ShuttingDown);
}
},
// Don't wait if there's L0 compaction to do. We don't need to update the outcome
// here, because we've already done the actual work.
_ = self.l0_compaction_trigger.notified(), if yield_for_l0 => {},
}
}
info!(
"shard ancestor compaction done in {:.3}s{}",
started.elapsed().as_secs_f64(),
match outcome {
CompactionOutcome::Pending =>
format!(", with pending work (rewrite_max={rewrite_max})"),
CompactionOutcome::YieldForL0 => String::from(", yielding for L0 compaction"),
CompactionOutcome::Skipped | CompactionOutcome::Done => String::new(),
}
);
fail::fail_point!("compact-shard-ancestors-persistent");
Ok(())
Ok(outcome)
}
/// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
@@ -2148,6 +2320,7 @@ impl Timeline {
/// ```
///
/// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
#[allow(clippy::too_many_arguments)]
pub(crate) async fn generate_key_retention(
self: &Arc<Timeline>,
key: Key,
@@ -2156,6 +2329,7 @@ impl Timeline {
retain_lsn_below_horizon: &[Lsn],
delta_threshold_cnt: usize,
base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
verification: bool,
) -> anyhow::Result<KeyHistoryRetention> {
// Pre-checks for the invariants
@@ -2242,8 +2416,8 @@ impl Timeline {
"should have at least below + above horizon batches"
);
let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
if let Some((key, lsn, img)) = base_img_from_ancestor {
replay_history.push((key, lsn, Value::Image(img)));
if let Some((key, lsn, ref img)) = base_img_from_ancestor {
replay_history.push((key, lsn, Value::Image(img.clone())));
}
/// Generate debug information for the replay history
@@ -2357,22 +2531,15 @@ impl Timeline {
// Whether to reconstruct the image. In debug mode, we will generate an image
// at every retain_lsn to ensure data is not corrupted, but we won't put the
// image into the final layer.
let generate_image = produce_image || debug_mode;
if produce_image {
let img_and_lsn = if produce_image {
records_since_last_image = 0;
}
let img_and_lsn = if generate_image {
let replay_history_for_debug = if debug_mode {
Some(replay_history.clone())
} else {
None
};
let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
let history = if produce_image {
std::mem::take(&mut replay_history)
} else {
replay_history.clone()
};
let history = std::mem::take(&mut replay_history);
let mut img = None;
let mut records = Vec::with_capacity(history.len());
if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
@@ -2407,6 +2574,7 @@ impl Timeline {
records.push((lsn, rec));
}
}
// WAL redo requires records in the reverse LSN order
records.reverse();
let state = ValueReconstructState { img, records };
// last batch does not generate image so i is always in range, unless we force generate
@@ -2439,10 +2607,16 @@ impl Timeline {
assert_eq!(retention.len(), lsn_split_points.len() + 1);
for (idx, logs) in retention.into_iter().enumerate() {
if idx == lsn_split_points.len() {
return Ok(KeyHistoryRetention {
let retention = KeyHistoryRetention {
below_horizon: result,
above_horizon: KeyLogAtLsn(logs),
});
};
if verification {
retention
.verify(key, &base_img_from_ancestor, full_history, self)
.await?;
}
return Ok(retention);
} else {
result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
}
@@ -2909,6 +3083,9 @@ impl Timeline {
}
(false, res)
};
let verification = self.get_gc_compaction_settings().gc_compaction_verification;
info!(
"picked {} layers for compaction ({} layers need rewriting) with max_layer_lsn={} min_layer_lsn={} gc_cutoff={} lowest_retain_lsn={}, key_range={}..{}, has_data_below={}",
job_desc.selected_layers.len(),
@@ -3225,6 +3402,7 @@ impl Timeline {
.await
.context("failed to get ancestor image")
.map_err(CompactionError::Other)?,
verification,
)
.await
.context("failed to generate key retention")
@@ -3265,6 +3443,7 @@ impl Timeline {
.await
.context("failed to get ancestor image")
.map_err(CompactionError::Other)?,
verification,
)
.await
.context("failed to generate key retention")

View File

@@ -30,6 +30,7 @@ use crate::tenant::storage_layer::{
AsLayerDesc as _, DeltaLayerWriter, ImageLayerWriter, IoConcurrency, Layer, ResidentLayer,
ValuesReconstructState,
};
use crate::tenant::timeline::VersionedKeySpaceQuery;
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
#[derive(Debug, thiserror::Error)]
@@ -212,13 +213,9 @@ async fn generate_tombstone_image_layer(
}
}
let query = VersionedKeySpaceQuery::uniform(KeySpace::single(key_range.clone()), image_lsn);
let data = ancestor
.get_vectored_impl(
KeySpace::single(key_range.clone()),
image_lsn,
&mut reconstruct_state,
ctx,
)
.get_vectored_impl(query, &mut reconstruct_state, ctx)
.await
.context("failed to retrieve aux keys")
.map_err(|e| Error::launder(e, Error::Prepare))?;

View File

@@ -580,6 +580,7 @@ impl ConnectionManagerState {
);
Ok(())
}
WalReceiverError::Cancelled => Ok(()),
WalReceiverError::Other(e) => {
// give out an error to have task_mgr give it a really verbose logging
if cancellation.is_cancelled() {

View File

@@ -73,6 +73,7 @@ pub(super) enum WalReceiverError {
/// Generic error
Other(anyhow::Error),
ClosedGate,
Cancelled,
}
impl From<tokio_postgres::Error> for WalReceiverError {
@@ -200,6 +201,9 @@ pub(super) async fn handle_walreceiver_connection(
// with a similar error.
},
WalReceiverError::SuccessfulCompletion(_) => {}
WalReceiverError::Cancelled => {
debug!("Connection cancelled")
}
WalReceiverError::ClosedGate => {
// doesn't happen at runtime
}
@@ -273,7 +277,12 @@ pub(super) async fn handle_walreceiver_connection(
let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version);
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx).await?;
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx)
.await
.map_err(|e| match e.kind {
crate::walingest::WalIngestErrorKind::Cancelled => WalReceiverError::Cancelled,
_ => WalReceiverError::Other(e.into()),
})?;
let shard = vec![*timeline.get_shard_identity()];

View File

@@ -21,13 +21,13 @@
//! redo Postgres process, but some records it can handle directly with
//! bespoken Rust code.
use std::backtrace::Backtrace;
use std::collections::HashMap;
use std::sync::{Arc, OnceLock};
use std::time::{Duration, Instant, SystemTime};
use anyhow::{Result, bail};
use bytes::{Buf, Bytes};
use pageserver_api::key::rel_block_to_key;
use pageserver_api::key::{Key, rel_block_to_key};
use pageserver_api::record::NeonWalRecord;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use pageserver_api::shard::ShardIdentity;
@@ -38,7 +38,7 @@ use postgres_ffi::{
fsm_logical_to_physical, pg_constants,
};
use tracing::*;
use utils::bin_ser::SerializeError;
use utils::bin_ser::{DeserializeError, SerializeError};
use utils::lsn::Lsn;
use utils::rate_limit::RateLimit;
use utils::{critical, failpoint_support};
@@ -104,12 +104,101 @@ struct WarnIngestLag {
timestamp_invalid_msg_ratelimit: RateLimit,
}
pub struct WalIngestError {
pub backtrace: std::backtrace::Backtrace,
pub kind: WalIngestErrorKind,
}
#[derive(thiserror::Error, Debug)]
pub enum WalIngestErrorKind {
#[error(transparent)]
#[allow(private_interfaces)]
PageReconstructError(#[from] PageReconstructError),
#[error(transparent)]
DeserializationFailure(#[from] DeserializeError),
#[error(transparent)]
SerializationFailure(#[from] SerializeError),
#[error("the request contains data not supported by pageserver: {0} @ {1}")]
InvalidKey(Key, Lsn),
#[error("twophase file for xid {0} already exists")]
FileAlreadyExists(u64),
#[error("slru segment {0:?}/{1} already exists")]
SlruAlreadyExists(SlruKind, u32),
#[error("relation already exists")]
RelationAlreadyExists(RelTag),
#[error("invalid reldir key {0}")]
InvalidRelDirKey(Key),
#[error(transparent)]
LogicalError(anyhow::Error),
#[error(transparent)]
EncodeAuxFileError(anyhow::Error),
#[error(transparent)]
MaybeRelSizeV2Error(anyhow::Error),
#[error("timeline shutting down")]
Cancelled,
}
impl<T> From<T> for WalIngestError
where
WalIngestErrorKind: From<T>,
{
fn from(value: T) -> Self {
WalIngestError {
backtrace: Backtrace::capture(),
kind: WalIngestErrorKind::from(value),
}
}
}
impl std::error::Error for WalIngestError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.kind.source()
}
}
impl core::fmt::Display for WalIngestError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
self.kind.fmt(f)
}
}
impl core::fmt::Debug for WalIngestError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
if f.alternate() {
f.debug_map()
.key(&"backtrace")
.value(&self.backtrace)
.key(&"kind")
.value(&self.kind)
.finish()
} else {
writeln!(f, "Error: {:?}", self.kind)?;
if self.backtrace.status() == std::backtrace::BacktraceStatus::Captured {
writeln!(f, "Stack backtrace: {:?}", self.backtrace)?;
}
Ok(())
}
}
}
#[macro_export]
macro_rules! ensure_walingest {
($($t:tt)*) => {
_ = || -> Result<(), anyhow::Error> {
anyhow::ensure!($($t)*);
Ok(())
}().map_err(WalIngestErrorKind::LogicalError)?;
};
}
impl WalIngest {
pub async fn new(
timeline: &Timeline,
startpoint: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<WalIngest> {
) -> Result<WalIngest, WalIngestError> {
// Fetch the latest checkpoint into memory, so that we can compare with it
// quickly in `ingest_record` and update it when it changes.
let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
@@ -145,7 +234,7 @@ impl WalIngest {
interpreted: InterpretedWalRecord,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> anyhow::Result<bool> {
) -> Result<bool, WalIngestError> {
WAL_INGEST.records_received.inc();
let prev_len = modification.len();
@@ -288,7 +377,7 @@ impl WalIngest {
}
/// This is the same as AdjustToFullTransactionId(xid) in PostgreSQL
fn adjust_to_full_transaction_id(&self, xid: TransactionId) -> Result<u64> {
fn adjust_to_full_transaction_id(&self, xid: TransactionId) -> Result<u64, WalIngestError> {
let next_full_xid =
enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, cp, { cp.nextXid.value });
@@ -298,9 +387,9 @@ impl WalIngest {
if xid > next_xid {
// Wraparound occurred, must be from a prev epoch.
if epoch == 0 {
bail!(
Err(WalIngestErrorKind::LogicalError(anyhow::anyhow!(
"apparent XID wraparound with prepared transaction XID {xid}, nextXid is {next_full_xid}"
);
)))?;
}
epoch -= 1;
}
@@ -313,7 +402,7 @@ impl WalIngest {
clear_vm_bits: ClearVmBits,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let ClearVmBits {
new_heap_blkno,
old_heap_blkno,
@@ -402,7 +491,7 @@ impl WalIngest {
create: DbaseCreate,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let DbaseCreate {
db_id,
tablespace_id,
@@ -505,7 +594,7 @@ impl WalIngest {
dbase_drop: DbaseDrop,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let DbaseDrop {
db_id,
tablespace_ids,
@@ -523,7 +612,7 @@ impl WalIngest {
create: SmgrCreate,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let SmgrCreate { rel } = create;
self.put_rel_creation(modification, rel, ctx).await?;
Ok(())
@@ -537,7 +626,7 @@ impl WalIngest {
truncate: XlSmgrTruncate,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let XlSmgrTruncate {
blkno,
rnode,
@@ -689,7 +778,7 @@ impl WalIngest {
record: XactRecord,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let (xact_common, is_commit, is_prepared) = match record {
XactRecord::Prepare(XactPrepare { xl_xid, data }) => {
let xid: u64 = if modification.tline.pg_version >= 17 {
@@ -813,7 +902,7 @@ impl WalIngest {
truncate: ClogTruncate,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let ClogTruncate {
pageno,
oldest_xid,
@@ -889,7 +978,7 @@ impl WalIngest {
zero_page: ClogZeroPage,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
let ClogZeroPage { segno, rpageno } = zero_page;
self.put_slru_page_image(
@@ -907,7 +996,7 @@ impl WalIngest {
&mut self,
modification: &mut DatadirModification,
xlrec: &XlMultiXactCreate,
) -> Result<()> {
) -> Result<(), WalIngestError> {
// Create WAL record for updating the multixact-offsets page
let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
@@ -1010,7 +1099,7 @@ impl WalIngest {
modification: &mut DatadirModification<'_>,
xlrec: &XlMultiXactTruncate,
ctx: &RequestContext,
) -> Result<()> {
) -> Result<(), WalIngestError> {
let (maxsegment, startsegment, endsegment) =
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
cp.oldestMulti = xlrec.end_trunc_off;
@@ -1058,7 +1147,7 @@ impl WalIngest {
zero_page: MultiXactZeroPage,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> Result<()> {
) -> Result<(), WalIngestError> {
let MultiXactZeroPage {
slru_kind,
segno,
@@ -1080,7 +1169,7 @@ impl WalIngest {
update: RelmapUpdate,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> Result<()> {
) -> Result<(), WalIngestError> {
let RelmapUpdate { update, buf } = update;
modification
@@ -1093,7 +1182,7 @@ impl WalIngest {
raw_record: RawXlogRecord,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> Result<()> {
) -> Result<(), WalIngestError> {
let RawXlogRecord { info, lsn, mut buf } = raw_record;
let pg_version = modification.tline.pg_version;
@@ -1235,12 +1324,12 @@ impl WalIngest {
put: PutLogicalMessage,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> Result<()> {
) -> Result<(), WalIngestError> {
let PutLogicalMessage { path, buf } = put;
modification.put_file(path.as_str(), &buf, ctx).await
}
fn ingest_standby_record(&mut self, record: StandbyRecord) -> Result<()> {
fn ingest_standby_record(&mut self, record: StandbyRecord) -> Result<(), WalIngestError> {
match record {
StandbyRecord::RunningXacts(running_xacts) => {
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
@@ -1258,7 +1347,7 @@ impl WalIngest {
&mut self,
record: ReploriginRecord,
modification: &mut DatadirModification<'_>,
) -> Result<()> {
) -> Result<(), WalIngestError> {
match record {
ReploriginRecord::Set(set) => {
modification
@@ -1278,7 +1367,7 @@ impl WalIngest {
modification: &mut DatadirModification<'_>,
rel: RelTag,
ctx: &RequestContext,
) -> Result<()> {
) -> Result<(), WalIngestError> {
modification.put_rel_creation(rel, 0, ctx).await?;
Ok(())
}
@@ -1291,7 +1380,7 @@ impl WalIngest {
blknum: BlockNumber,
img: Bytes,
ctx: &RequestContext,
) -> Result<(), PageReconstructError> {
) -> Result<(), WalIngestError> {
self.handle_rel_extend(modification, rel, blknum, ctx)
.await?;
modification.put_rel_page_image(rel, blknum, img)?;
@@ -1305,7 +1394,7 @@ impl WalIngest {
blknum: BlockNumber,
rec: NeonWalRecord,
ctx: &RequestContext,
) -> Result<()> {
) -> Result<(), WalIngestError> {
self.handle_rel_extend(modification, rel, blknum, ctx)
.await?;
modification.put_rel_wal_record(rel, blknum, rec)?;
@@ -1318,7 +1407,7 @@ impl WalIngest {
rel: RelTag,
nblocks: BlockNumber,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
modification.put_rel_truncation(rel, nblocks, ctx).await?;
Ok(())
}
@@ -1329,7 +1418,7 @@ impl WalIngest {
rel: RelTag,
blknum: BlockNumber,
ctx: &RequestContext,
) -> Result<(), PageReconstructError> {
) -> Result<(), WalIngestError> {
let new_nblocks = blknum + 1;
// Check if the relation exists. We implicitly create relations on first
// record.
@@ -1423,7 +1512,7 @@ impl WalIngest {
blknum: BlockNumber,
img: Bytes,
ctx: &RequestContext,
) -> Result<()> {
) -> Result<(), WalIngestError> {
if !self.shard.is_shard_zero() {
return Ok(());
}
@@ -1441,7 +1530,7 @@ impl WalIngest {
segno: u32,
blknum: BlockNumber,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), WalIngestError> {
// we don't use a cache for this like we do for relations. SLRUS are explcitly
// extended with ZEROPAGE records, not with commit records, so it happens
// a lot less frequently.
@@ -1509,6 +1598,7 @@ async fn get_relsize(
#[allow(clippy::bool_assert_comparison)]
#[cfg(test)]
mod tests {
use anyhow::Result;
use postgres_ffi::RELSEG_SIZE;
use super::*;
@@ -1530,7 +1620,7 @@ mod tests {
}
#[tokio::test]
async fn test_zeroed_checkpoint_decodes_correctly() -> Result<()> {
async fn test_zeroed_checkpoint_decodes_correctly() -> Result<(), anyhow::Error> {
for i in 14..=16 {
dispatch_pgversion!(i, {
pgv::CheckPoint::decode(&pgv::ZERO_CHECKPOINT)?;

View File

@@ -1362,7 +1362,7 @@ pg_init_libpagestore(void)
"",
PGC_POSTMASTER,
0, /* no flags required */
NULL, NULL, NULL);
check_neon_id, NULL, NULL);
DefineCustomStringVariable("neon.branch_id",
"Neon branch_id the server is running on",
NULL,
@@ -1370,7 +1370,7 @@ pg_init_libpagestore(void)
"",
PGC_POSTMASTER,
0, /* no flags required */
NULL, NULL, NULL);
check_neon_id, NULL, NULL);
DefineCustomStringVariable("neon.endpoint_id",
"Neon endpoint_id the server is running on",
NULL,
@@ -1378,7 +1378,7 @@ pg_init_libpagestore(void)
"",
PGC_POSTMASTER,
0, /* no flags required */
NULL, NULL, NULL);
check_neon_id, NULL, NULL);
DefineCustomIntVariable("neon.stripe_size",
"sharding stripe size",

View File

@@ -509,7 +509,14 @@ pub async fn run() -> anyhow::Result<()> {
if let Some(mut redis_kv_client) = redis_kv_client {
maintenance_tasks.spawn(async move {
redis_kv_client.try_connect().await?;
handle_cancel_messages(&mut redis_kv_client, rx_cancel).await
handle_cancel_messages(&mut redis_kv_client, rx_cancel).await?;
drop(redis_kv_client);
// `handle_cancel_messages` was terminated due to the tx_cancel
// being dropped. this is not worthy of an error, and this task can only return `Err`,
// so let's wait forever instead.
std::future::pending().await
});
}

View File

@@ -1,16 +1,17 @@
use std::convert::Infallible;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use anyhow::{Context, anyhow};
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
use postgres_client::CancelToken;
use postgres_client::tls::MakeTlsConnect;
use pq_proto::CancelKeyData;
use redis::{FromRedisValue, Pipeline, Value, pipe};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::net::TcpStream;
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, info};
use tracing::{debug, info, warn};
use crate::auth::backend::ComputeUserInfo;
use crate::auth::{AuthError, check_peer_addr_is_in_list};
@@ -30,6 +31,7 @@ type IpSubnetKey = IpNet;
const CANCEL_KEY_TTL: i64 = 1_209_600; // 2 weeks cancellation key expire time
const REDIS_SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(10);
const BATCH_SIZE: usize = 8;
// Message types for sending through mpsc channel
pub enum CancelKeyOp {
@@ -54,78 +56,168 @@ pub enum CancelKeyOp {
},
}
impl CancelKeyOp {
fn register(self, pipe: &mut Pipeline) -> Option<CancelReplyOp> {
#[allow(clippy::used_underscore_binding)]
match self {
CancelKeyOp::StoreCancelKey {
key,
field,
value,
resp_tx,
_guard,
expire,
} => {
pipe.hset(&key, field, value);
pipe.expire(key, expire);
let resp_tx = resp_tx?;
Some(CancelReplyOp::StoreCancelKey { resp_tx, _guard })
}
CancelKeyOp::GetCancelData {
key,
resp_tx,
_guard,
} => {
pipe.hgetall(key);
Some(CancelReplyOp::GetCancelData { resp_tx, _guard })
}
CancelKeyOp::RemoveCancelKey {
key,
field,
resp_tx,
_guard,
} => {
pipe.hdel(key, field);
let resp_tx = resp_tx?;
Some(CancelReplyOp::RemoveCancelKey { resp_tx, _guard })
}
}
}
}
// Message types for sending through mpsc channel
pub enum CancelReplyOp {
StoreCancelKey {
resp_tx: oneshot::Sender<anyhow::Result<()>>,
_guard: CancelChannelSizeGuard<'static>,
},
GetCancelData {
resp_tx: oneshot::Sender<anyhow::Result<Vec<(String, String)>>>,
_guard: CancelChannelSizeGuard<'static>,
},
RemoveCancelKey {
resp_tx: oneshot::Sender<anyhow::Result<()>>,
_guard: CancelChannelSizeGuard<'static>,
},
}
impl CancelReplyOp {
fn send_err(self, e: anyhow::Error) {
match self {
CancelReplyOp::StoreCancelKey { resp_tx, _guard } => {
resp_tx
.send(Err(e))
.inspect_err(|_| tracing::debug!("could not send reply"))
.ok();
}
CancelReplyOp::GetCancelData { resp_tx, _guard } => {
resp_tx
.send(Err(e))
.inspect_err(|_| tracing::debug!("could not send reply"))
.ok();
}
CancelReplyOp::RemoveCancelKey { resp_tx, _guard } => {
resp_tx
.send(Err(e))
.inspect_err(|_| tracing::debug!("could not send reply"))
.ok();
}
}
}
fn send_value(self, v: redis::Value) {
match self {
CancelReplyOp::StoreCancelKey { resp_tx, _guard } => {
let send =
FromRedisValue::from_owned_redis_value(v).context("could not parse value");
resp_tx
.send(send)
.inspect_err(|_| tracing::debug!("could not send reply"))
.ok();
}
CancelReplyOp::GetCancelData { resp_tx, _guard } => {
let send =
FromRedisValue::from_owned_redis_value(v).context("could not parse value");
resp_tx
.send(send)
.inspect_err(|_| tracing::debug!("could not send reply"))
.ok();
}
CancelReplyOp::RemoveCancelKey { resp_tx, _guard } => {
let send =
FromRedisValue::from_owned_redis_value(v).context("could not parse value");
resp_tx
.send(send)
.inspect_err(|_| tracing::debug!("could not send reply"))
.ok();
}
}
}
}
// Running as a separate task to accept messages through the rx channel
// In case of problems with RTT: switch to recv_many() + redis pipeline
pub async fn handle_cancel_messages(
client: &mut RedisKVClient,
mut rx: mpsc::Receiver<CancelKeyOp>,
) -> anyhow::Result<Infallible> {
) -> anyhow::Result<()> {
let mut batch = Vec::new();
let mut replies = vec![];
loop {
if let Some(msg) = rx.recv().await {
match msg {
CancelKeyOp::StoreCancelKey {
key,
field,
value,
resp_tx,
_guard,
expire,
} => {
let res = client.hset(&key, field, value).await;
if let Some(resp_tx) = resp_tx {
if res.is_ok() {
resp_tx
.send(client.expire(key, expire).await)
.inspect_err(|e| {
tracing::debug!(
"failed to send StoreCancelKey response: {:?}",
e
);
})
.ok();
} else {
resp_tx
.send(res)
.inspect_err(|e| {
tracing::debug!(
"failed to send StoreCancelKey response: {:?}",
e
);
})
.ok();
}
} else if res.is_ok() {
drop(client.expire(key, expire).await);
} else {
tracing::warn!("failed to store cancel key: {:?}", res);
}
if rx.recv_many(&mut batch, BATCH_SIZE).await == 0 {
warn!("shutting down cancellation queue");
break Ok(());
}
let batch_size = batch.len();
debug!(batch_size, "running cancellation jobs");
let mut pipe = pipe();
for msg in batch.drain(..) {
if let Some(reply) = msg.register(&mut pipe) {
replies.push(reply);
} else {
pipe.ignore();
}
}
let responses = replies.len();
match client.query(pipe).await {
// for each reply, we expect that many values.
Ok(Value::Array(values)) if values.len() == responses => {
debug!(
batch_size,
responses, "successfully completed cancellation jobs",
);
for (value, reply) in std::iter::zip(values, replies.drain(..)) {
reply.send_value(value);
}
CancelKeyOp::GetCancelData {
key,
resp_tx,
_guard,
} => {
drop(resp_tx.send(client.hget_all(key).await));
}
Ok(value) => {
debug!(?value, "unexpected redis return value");
for reply in replies.drain(..) {
reply.send_err(anyhow!("incorrect response type from redis"));
}
CancelKeyOp::RemoveCancelKey {
key,
field,
resp_tx,
_guard,
} => {
if let Some(resp_tx) = resp_tx {
resp_tx
.send(client.hdel(key, field).await)
.inspect_err(|e| {
tracing::debug!("failed to send StoreCancelKey response: {:?}", e);
})
.ok();
} else {
drop(client.hdel(key, field).await);
}
}
Err(err) => {
for reply in replies.drain(..) {
reply.send_err(anyhow!("could not send cmd to redis: {err}"));
}
}
}
replies.clear();
}
}

View File

@@ -1,4 +1,5 @@
use redis::{AsyncCommands, ToRedisArgs};
use redis::aio::ConnectionLike;
use redis::{Cmd, FromRedisValue, Pipeline, RedisResult};
use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
use crate::rate_limiter::{GlobalRateLimiter, RateBucketInfo};
@@ -8,6 +9,23 @@ pub struct RedisKVClient {
limiter: GlobalRateLimiter,
}
#[allow(async_fn_in_trait)]
pub trait Queryable {
async fn query<T: FromRedisValue>(&self, conn: &mut impl ConnectionLike) -> RedisResult<T>;
}
impl Queryable for Pipeline {
async fn query<T: FromRedisValue>(&self, conn: &mut impl ConnectionLike) -> RedisResult<T> {
self.query_async(conn).await
}
}
impl Queryable for Cmd {
async fn query<T: FromRedisValue>(&self, conn: &mut impl ConnectionLike) -> RedisResult<T> {
self.query_async(conn).await
}
}
impl RedisKVClient {
pub fn new(client: ConnectionWithCredentialsProvider, info: &'static [RateBucketInfo]) -> Self {
Self {
@@ -27,158 +45,24 @@ impl RedisKVClient {
Ok(())
}
pub(crate) async fn hset<K, F, V>(&mut self, key: K, field: F, value: V) -> anyhow::Result<()>
where
K: ToRedisArgs + Send + Sync,
F: ToRedisArgs + Send + Sync,
V: ToRedisArgs + Send + Sync,
{
if !self.limiter.check() {
tracing::info!("Rate limit exceeded. Skipping hset");
return Err(anyhow::anyhow!("Rate limit exceeded"));
}
match self.client.hset(&key, &field, &value).await {
Ok(()) => return Ok(()),
Err(e) => {
tracing::error!("failed to set a key-value pair: {e}");
}
}
tracing::info!("Redis client is disconnected. Reconnectiong...");
self.try_connect().await?;
self.client
.hset(key, field, value)
.await
.map_err(anyhow::Error::new)
}
#[allow(dead_code)]
pub(crate) async fn hset_multiple<K, V>(
pub(crate) async fn query<T: FromRedisValue>(
&mut self,
key: &str,
items: &[(K, V)],
) -> anyhow::Result<()>
where
K: ToRedisArgs + Send + Sync,
V: ToRedisArgs + Send + Sync,
{
q: impl Queryable,
) -> anyhow::Result<T> {
if !self.limiter.check() {
tracing::info!("Rate limit exceeded. Skipping hset_multiple");
tracing::info!("Rate limit exceeded. Skipping query");
return Err(anyhow::anyhow!("Rate limit exceeded"));
}
match self.client.hset_multiple(key, items).await {
Ok(()) => return Ok(()),
match q.query(&mut self.client).await {
Ok(t) => return Ok(t),
Err(e) => {
tracing::error!("failed to set a key-value pair: {e}");
tracing::error!("failed to run query: {e}");
}
}
tracing::info!("Redis client is disconnected. Reconnectiong...");
tracing::info!("Redis client is disconnected. Reconnecting...");
self.try_connect().await?;
self.client
.hset_multiple(key, items)
.await
.map_err(anyhow::Error::new)
}
#[allow(dead_code)]
pub(crate) async fn expire<K>(&mut self, key: K, seconds: i64) -> anyhow::Result<()>
where
K: ToRedisArgs + Send + Sync,
{
if !self.limiter.check() {
tracing::info!("Rate limit exceeded. Skipping expire");
return Err(anyhow::anyhow!("Rate limit exceeded"));
}
match self.client.expire(&key, seconds).await {
Ok(()) => return Ok(()),
Err(e) => {
tracing::error!("failed to set a key-value pair: {e}");
}
}
tracing::info!("Redis client is disconnected. Reconnectiong...");
self.try_connect().await?;
self.client
.expire(key, seconds)
.await
.map_err(anyhow::Error::new)
}
#[allow(dead_code)]
pub(crate) async fn hget<K, F, V>(&mut self, key: K, field: F) -> anyhow::Result<V>
where
K: ToRedisArgs + Send + Sync,
F: ToRedisArgs + Send + Sync,
V: redis::FromRedisValue,
{
if !self.limiter.check() {
tracing::info!("Rate limit exceeded. Skipping hget");
return Err(anyhow::anyhow!("Rate limit exceeded"));
}
match self.client.hget(&key, &field).await {
Ok(value) => return Ok(value),
Err(e) => {
tracing::error!("failed to get a value: {e}");
}
}
tracing::info!("Redis client is disconnected. Reconnectiong...");
self.try_connect().await?;
self.client
.hget(key, field)
.await
.map_err(anyhow::Error::new)
}
pub(crate) async fn hget_all<K, V>(&mut self, key: K) -> anyhow::Result<V>
where
K: ToRedisArgs + Send + Sync,
V: redis::FromRedisValue,
{
if !self.limiter.check() {
tracing::info!("Rate limit exceeded. Skipping hgetall");
return Err(anyhow::anyhow!("Rate limit exceeded"));
}
match self.client.hgetall(&key).await {
Ok(value) => return Ok(value),
Err(e) => {
tracing::error!("failed to get a value: {e}");
}
}
tracing::info!("Redis client is disconnected. Reconnectiong...");
self.try_connect().await?;
self.client.hgetall(key).await.map_err(anyhow::Error::new)
}
pub(crate) async fn hdel<K, F>(&mut self, key: K, field: F) -> anyhow::Result<()>
where
K: ToRedisArgs + Send + Sync,
F: ToRedisArgs + Send + Sync,
{
if !self.limiter.check() {
tracing::info!("Rate limit exceeded. Skipping hdel");
return Err(anyhow::anyhow!("Rate limit exceeded"));
}
match self.client.hdel(&key, &field).await {
Ok(()) => return Ok(()),
Err(e) => {
tracing::error!("failed to delete a key-value pair: {e}");
}
}
tracing::info!("Redis client is disconnected. Reconnectiong...");
self.try_connect().await?;
self.client
.hdel(key, field)
.await
.map_err(anyhow::Error::new)
Ok(q.query(&mut self.client).await?)
}
}

View File

@@ -22,6 +22,7 @@ use pageserver_api::controller_api::{
MetadataHealthListUnhealthyResponse, MetadataHealthUpdateRequest, MetadataHealthUpdateResponse,
NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, SafekeeperSchedulingPolicyRequest,
ShardsPreferredAzsRequest, TenantCreateRequest, TenantPolicyRequest, TenantShardMigrateRequest,
TimelineImportRequest,
};
use pageserver_api::models::{
DetachBehavior, LsnLeaseRequest, TenantConfigPatchRequest, TenantConfigRequest,
@@ -1286,6 +1287,37 @@ async fn handle_tenant_import(req: Request<Body>) -> Result<Response<Body>, ApiE
)
}
async fn handle_timeline_import(req: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
check_permissions(&req, Scope::PageServerApi)?;
maybe_rate_limit(&req, tenant_id).await;
let mut req = match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
return res;
}
ForwardOutcome::NotForwarded(req) => req,
};
let import_req = json_request::<TimelineImportRequest>(&mut req).await?;
let state = get_state(&req);
if import_req.tenant_id != tenant_id || import_req.timeline_id != timeline_id {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"tenant id or timeline id mismatch: url={tenant_id}/{timeline_id}, body={}/{}",
import_req.tenant_id,
import_req.timeline_id
)));
}
json_response(
StatusCode::OK,
state.service.timeline_import(import_req).await?,
)
}
async fn handle_tenants_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
@@ -1959,6 +1991,16 @@ pub fn make_router(
RequestName("debug_v1_tenant_locate"),
)
})
.post(
"/debug/v1/tenant/:tenant_id/timeline/:timeline_id/import",
|r| {
named_request_span(
r,
handle_timeline_import,
RequestName("debug_v1_timeline_import"),
)
},
)
.get("/debug/v1/scheduler", |r| {
named_request_span(r, handle_scheduler_dump, RequestName("debug_v1_scheduler"))
})

View File

@@ -1852,6 +1852,7 @@ impl Service {
};
if insert {
let config = attach_req.config.clone().unwrap_or_default();
let tsp = TenantShardPersistence {
tenant_id: attach_req.tenant_shard_id.tenant_id.to_string(),
shard_number: attach_req.tenant_shard_id.shard_number.0 as i32,
@@ -1860,7 +1861,7 @@ impl Service {
generation: attach_req.generation_override.or(Some(0)),
generation_pageserver: None,
placement_policy: serde_json::to_string(&PlacementPolicy::Attached(0)).unwrap(),
config: serde_json::to_string(&TenantConfig::default()).unwrap(),
config: serde_json::to_string(&config).unwrap(),
splitting: SplitState::default(),
scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default())
.unwrap(),
@@ -1883,16 +1884,16 @@ impl Service {
Ok(()) => {
tracing::info!("Inserted shard {} in database", attach_req.tenant_shard_id);
let mut locked = self.inner.write().unwrap();
locked.tenants.insert(
let mut shard = TenantShard::new(
attach_req.tenant_shard_id,
TenantShard::new(
attach_req.tenant_shard_id,
ShardIdentity::unsharded(),
PlacementPolicy::Attached(0),
None,
),
ShardIdentity::unsharded(),
PlacementPolicy::Attached(0),
None,
);
shard.config = config;
let mut locked = self.inner.write().unwrap();
locked.tenants.insert(attach_req.tenant_shard_id, shard);
tracing::info!("Inserted shard {} in memory", attach_req.tenant_shard_id);
}
}
@@ -1977,11 +1978,12 @@ impl Service {
.set_attached(scheduler, attach_req.node_id);
tracing::info!(
"attach_hook: tenant {} set generation {:?}, pageserver {}",
"attach_hook: tenant {} set generation {:?}, pageserver {}, config {:?}",
attach_req.tenant_shard_id,
tenant_shard.generation,
// TODO: this is an odd number of 0xf's
attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff))
attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff)),
attach_req.config,
);
// Trick the reconciler into not doing anything for this tenant: this helps

View File

@@ -12,13 +12,16 @@ use crate::persistence::{
use crate::safekeeper::Safekeeper;
use anyhow::Context;
use http_utils::error::ApiError;
use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy};
use pageserver_api::controller_api::{
SafekeeperDescribeResponse, SkSchedulingPolicy, TimelineImportRequest,
};
use pageserver_api::models::{self, SafekeeperInfo, SafekeepersInfo, TimelineInfo};
use safekeeper_api::membership::{MemberSet, SafekeeperId};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use utils::id::{NodeId, TenantId, TimelineId};
use utils::logging::SecretString;
use utils::lsn::Lsn;
use super::Service;
@@ -298,6 +301,31 @@ impl Service {
timeline_id,
})
}
/// Directly insert the timeline into the database without reconciling it with safekeepers.
///
/// Useful if the timeline already exists on the specified safekeepers,
/// but we want to make it storage controller managed.
pub(crate) async fn timeline_import(&self, req: TimelineImportRequest) -> Result<(), ApiError> {
let persistence = TimelinePersistence {
tenant_id: req.tenant_id.to_string(),
timeline_id: req.timeline_id.to_string(),
start_lsn: Lsn::INVALID.into(),
generation: 1,
sk_set: req.sk_set.iter().map(|sk_id| sk_id.0 as i64).collect(),
new_sk_set: None,
cplane_notified_generation: 1,
deleted_at: None,
};
let inserted = self.persistence.insert_timeline(persistence).await?;
if inserted {
tracing::info!("imported timeline into db");
} else {
tracing::info!("didn't import timeline into db, as it is already present in db");
}
Ok(())
}
/// Perform timeline deletion on safekeepers. Will return success: we persist the deletion into the reconciler.
pub(super) async fn tenant_timeline_delete_safekeepers(
self: &Arc<Self>,

View File

@@ -1255,6 +1255,7 @@ class NeonEnv:
"mode": "pipelined",
"execution": "concurrent-futures",
"max_batch_size": 32,
"batching": "scattered-lsn",
}
get_vectored_concurrent_io = self.pageserver_get_vectored_concurrent_io
@@ -1321,6 +1322,10 @@ class NeonEnv:
log.info("test may use old binaries, ignoring warnings about unknown config items")
ps.allowed_errors.append(".*ignoring unknown configuration item.*")
# Allow old software to start until https://github.com/neondatabase/neon/pull/11275
# lands in the compatiblity snapshot.
ps_cfg["page_service_pipelining"].pop("batching")
self.pageservers.append(ps)
cfg["pageservers"].append(ps_cfg)
@@ -1986,10 +1991,13 @@ class NeonStorageController(MetricsGetter, LogUtils):
tenant_shard_id: TenantId | TenantShardId,
pageserver_id: int,
generation_override: int | None = None,
config: None | dict[str, Any] = None,
) -> int:
body = {"tenant_shard_id": str(tenant_shard_id), "node_id": pageserver_id}
if generation_override is not None:
body["generation_override"] = generation_override
if config is not None:
body["config"] = config
response = self.request(
"POST",
@@ -2884,13 +2892,14 @@ class NeonPageserver(PgProtocol, LogUtils):
self,
immediate: bool = False,
timeout_in_seconds: int | None = None,
extra_env_vars: dict[str, str] | None = None,
):
"""
High level wrapper for restart: restarts the process, and waits for
tenant state to stabilize.
"""
self.stop(immediate=immediate)
self.start(timeout_in_seconds=timeout_in_seconds)
self.start(timeout_in_seconds=timeout_in_seconds, extra_env_vars=extra_env_vars)
self.quiesce_tenants()
def quiesce_tenants(self):
@@ -2979,11 +2988,12 @@ class NeonPageserver(PgProtocol, LogUtils):
to call into the pageserver HTTP client.
"""
client = self.http_client()
if generation is None:
generation = self.env.storage_controller.attach_hook_issue(tenant_id, self.id)
elif override_storage_controller_generation:
if generation is None or override_storage_controller_generation:
generation = self.env.storage_controller.attach_hook_issue(
tenant_id, self.id, generation
tenant_id,
self.id,
generation_override=generation if override_storage_controller_generation else None,
config=config,
)
return client.tenant_attach(
tenant_id,

View File

@@ -65,13 +65,11 @@ def single_timeline(
assert ps_http.tenant_list() == []
def attach(tenant):
# NB: create the new tenant in the storage controller with the correct tenant config. This
# will pick up the existing tenant data from remote storage. If we just attach it to the
# Pageserver, the storage controller will reset the tenant config to the default.
env.create_tenant(
tenant_id=tenant,
timeline_id=template_timeline,
conf=template_config,
env.pageserver.tenant_attach(
tenant,
config=template_config,
generation=100,
override_storage_controller_generation=True,
)
with concurrent.futures.ThreadPoolExecutor(max_workers=22) as executor:

View File

@@ -66,11 +66,11 @@ def test_basebackup_with_high_slru_count(
n_txns = 500000
def setup_wrapper(env: NeonEnv):
return setup_tenant_template(env, n_txns)
env = setup_pageserver_with_tenants(
neon_env_builder, f"large_slru_count-{n_tenants}-{n_txns}", n_tenants, setup_wrapper
neon_env_builder,
f"large_slru_count-{n_tenants}-{n_txns}",
n_tenants,
lambda env: setup_tenant_template(env, n_txns),
)
run_benchmark(env, pg_bin, record, duration)
@@ -80,10 +80,6 @@ def setup_tenant_template(env: NeonEnv, n_txns: int):
"gc_period": "0s", # disable periodic gc
"checkpoint_timeout": "10 years",
"compaction_period": "0s", # disable periodic compaction
"compaction_threshold": 10,
"compaction_target_size": 134217728,
"checkpoint_distance": 268435456,
"image_creation_threshold": 3,
}
template_tenant, template_timeline = env.create_tenant(set_default=True)

View File

@@ -1,5 +1,8 @@
import concurrent.futures
import dataclasses
import json
import re
import threading
import time
from dataclasses import dataclass
from pathlib import Path
@@ -28,38 +31,33 @@ class PageServicePipeliningConfigSerial(PageServicePipeliningConfig):
class PageServicePipeliningConfigPipelined(PageServicePipeliningConfig):
max_batch_size: int
execution: str
batching: str
mode: str = "pipelined"
EXECUTION = ["concurrent-futures", "tasks"]
EXECUTION = ["concurrent-futures"]
BATCHING = ["uniform-lsn", "scattered-lsn"]
NON_BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
for max_batch_size in [1, 32]:
for execution in EXECUTION:
NON_BATCHABLE.append(PageServicePipeliningConfigPipelined(max_batch_size, execution))
for batching in BATCHING:
NON_BATCHABLE.append(
PageServicePipeliningConfigPipelined(max_batch_size, execution, batching)
)
BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
for max_batch_size in [1, 2, 4, 8, 16, 32]:
BATCHABLE: list[PageServicePipeliningConfig] = []
for max_batch_size in [32]:
for execution in EXECUTION:
BATCHABLE.append(PageServicePipeliningConfigPipelined(max_batch_size, execution))
for batching in BATCHING:
BATCHABLE.append(
PageServicePipeliningConfigPipelined(max_batch_size, execution, batching)
)
@pytest.mark.parametrize(
"tablesize_mib, pipelining_config, target_runtime, effective_io_concurrency, readhead_buffer_size, name",
[
# non-batchable workloads
# (A separate benchmark will consider latency).
*[
(
50,
config,
TARGET_RUNTIME,
1,
128,
f"not batchable {dataclasses.asdict(config)}",
)
for config in NON_BATCHABLE
],
# batchable workloads should show throughput and CPU efficiency improvements
*[
(
@@ -137,7 +135,14 @@ def test_throughput(
env = neon_env_builder.init_start()
ps_http = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
endpoint = env.endpoints.create_start(
"main",
config_lines=[
# minimal lfc & small shared buffers to force requests to pageserver
"neon.max_file_cache_size=1MB",
"shared_buffers=10MB",
],
)
conn = endpoint.connect()
cur = conn.cursor()
@@ -155,7 +160,6 @@ def test_throughput(
tablesize = tablesize_mib * 1024 * 1024
npages = tablesize // (8 * 1024)
cur.execute("INSERT INTO t SELECT generate_series(1, %s)", (npages,))
# TODO: can we force postgres to do sequential scans?
#
# Run the workload, collect `Metrics` before and after, calculate difference, normalize.
@@ -211,31 +215,73 @@ def test_throughput(
).value,
)
def workload() -> Metrics:
def workload(disruptor_started: threading.Event) -> Metrics:
disruptor_started.wait()
start = time.time()
iters = 0
while time.time() - start < target_runtime or iters < 2:
log.info("Seqscan %d", iters)
if iters == 1:
# round zero for warming up
before = get_metrics()
cur.execute(
"select clear_buffer_cache()"
) # TODO: what about LFC? doesn't matter right now because LFC isn't enabled by default in tests
cur.execute("select sum(data::bigint) from t")
assert cur.fetchall()[0][0] == npages * (npages + 1) // 2
iters += 1
after = get_metrics()
return (after - before).normalize(iters - 1)
def disruptor(disruptor_started: threading.Event, stop_disruptor: threading.Event):
conn = endpoint.connect()
cur = conn.cursor()
iters = 0
while True:
cur.execute("SELECT pg_logical_emit_message(true, 'test', 'advancelsn')")
if stop_disruptor.is_set():
break
disruptor_started.set()
iters += 1
time.sleep(0.001)
return iters
env.pageserver.patch_config_toml_nonrecursive(
{"page_service_pipelining": dataclasses.asdict(pipelining_config)}
)
env.pageserver.restart()
metrics = workload()
# set trace for log analysis below
env.pageserver.restart(extra_env_vars={"RUST_LOG": "info,pageserver::page_service=trace"})
log.info("Starting workload")
with concurrent.futures.ThreadPoolExecutor() as executor:
disruptor_started = threading.Event()
stop_disruptor = threading.Event()
disruptor_fut = executor.submit(disruptor, disruptor_started, stop_disruptor)
workload_fut = executor.submit(workload, disruptor_started)
metrics = workload_fut.result()
stop_disruptor.set()
ndisruptions = disruptor_fut.result()
log.info("Disruptor issued %d disrupting requests", ndisruptions)
log.info("Results: %s", metrics)
since_last_start: list[str] = []
for line in env.pageserver.logfile.read_text().splitlines():
if "git:" in line:
since_last_start = []
since_last_start.append(line)
stopping_batching_because_re = re.compile(
r"stopping batching because (LSN changed|of batch size|timeline object mismatch|batch key changed|same page was requested at different LSNs|.*)"
)
reasons_for_stopping_batching = {}
for line in since_last_start:
match = stopping_batching_because_re.search(line)
if match:
if match.group(1) not in reasons_for_stopping_batching:
reasons_for_stopping_batching[match.group(1)] = 0
reasons_for_stopping_batching[match.group(1)] += 1
log.info("Reasons for stopping batching: %s", reasons_for_stopping_batching)
#
# Sanity-checks on the collected data
#
@@ -262,7 +308,10 @@ def test_throughput(
PRECISION_CONFIGS: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
for max_batch_size in [1, 32]:
for execution in EXECUTION:
PRECISION_CONFIGS.append(PageServicePipeliningConfigPipelined(max_batch_size, execution))
for batching in BATCHING:
PRECISION_CONFIGS.append(
PageServicePipeliningConfigPipelined(max_batch_size, execution, batching)
)
@pytest.mark.parametrize(

View File

@@ -187,6 +187,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
},
"rel_size_v2_enabled": False, # test suite enables it by default as of https://github.com/neondatabase/neon/issues/11081, so, custom config means disabling it
"gc_compaction_enabled": True,
"gc_compaction_verification": False,
"gc_compaction_initial_threshold_kb": 1024000,
"gc_compaction_ratio_percent": 200,
"image_creation_preempt_threshold": 5,

View File

@@ -162,6 +162,8 @@ def test_pageserver_compaction_preempt(
conf = PREEMPT_COMPACTION_TENANT_CONF.copy()
env = neon_env_builder.init_start(initial_tenant_conf=conf)
env.pageserver.allowed_errors.append(".*The timeline or pageserver is shutting down.*")
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline

View File

@@ -16,6 +16,7 @@ def test_slow_flush(neon_env_builder: NeonEnvBuilder, neon_binpath: Path, kind:
"mode": "pipelined",
"max_batch_size": 32,
"execution": "concurrent-futures",
"batching": "uniform-lsn",
}
neon_env_builder.pageserver_config_override = patch_pageserver_toml

View File

@@ -15,7 +15,6 @@ if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
@pytest.mark.skip("See https://github.com/neondatabase/neon/issues/11395")
def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
env = neon_env_builder.init_start()
@@ -96,17 +95,12 @@ def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: P
_, marker_offset = wait_until(lambda: env.pageserver.assert_log_contains(marker, offset=None))
log.info("run pagebench")
duration_secs = 10
duration_secs = 20
actual_ncompleted = run_pagebench_at_max_speed_and_get_total_requests_completed(duration_secs)
log.info("validate the client is capped at the configured rps limit")
expect_ncompleted = duration_secs * rate_limit_rps
delta_abs = abs(expect_ncompleted - actual_ncompleted)
threshold = 0.05 * expect_ncompleted
assert threshold / rate_limit_rps < 0.1 * duration_secs, (
"test self-test: unrealistic expecations regarding precision in this test"
)
assert delta_abs < 0.05 * expect_ncompleted, (
assert pytest.approx(expect_ncompleted, 0.05) == actual_ncompleted, (
"the throttling deviates more than 5percent from the expectation"
)
@@ -120,6 +114,7 @@ def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: P
timeout=compaction_period,
)
log.info("validate the metrics")
smgr_query_seconds_post = ps_http.get_metric_value(smgr_metric_name, smgr_metrics_query)
assert smgr_query_seconds_post is not None
throttled_usecs_post = ps_http.get_metric_value(throttle_metric_name, throttle_metrics_query)
@@ -128,12 +123,13 @@ def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: P
actual_throttled_usecs = throttled_usecs_post - throttled_usecs_pre
actual_throttled_secs = actual_throttled_usecs / 1_000_000
log.info("validate that the metric doesn't include throttle wait time")
assert duration_secs >= 10 * actual_smgr_query_seconds, (
"smgr metrics should not include throttle wait time"
assert pytest.approx(actual_throttled_secs + actual_smgr_query_seconds, 0.1) == duration_secs, (
"throttling and processing latency = total request time; this assert validates thi holds on average"
)
log.info("validate that the throttling wait time metrics is correct")
assert pytest.approx(actual_throttled_secs + actual_smgr_query_seconds, 0.1) == duration_secs, (
"most of the time in this test is spent throttled because the rate-limit's contribution to latency dominates"
# without this assertion, the test would pass even if the throttling was completely broken
# but the request processing is so slow that it makes up for the latency that a correct throttling
# implementation would add
assert actual_smgr_query_seconds < 0.66 * duration_secs, (
"test self-test: request processing is consuming most of the wall clock time; this risks that we're not actually testing throttling"
)

View File

@@ -239,6 +239,8 @@ def test_isolation(
"neon.regress_test_mode = true",
# Stack size should be increased for tests to pass with asan.
"max_stack_depth = 4MB",
# Neon extensiosn starts 2 BGW so decreasing number of parallel workers which can affect deadlock-parallel test if it hits max_worker_processes.
"max_worker_processes = 16",
],
)
endpoint.safe_psql(f"CREATE DATABASE {DBNAME}")