Compare commits

..

6 Commits

Author SHA1 Message Date
Joonas Koivunen
57a9d7d1e2 wip 2023-09-27 09:29:00 +00:00
Joonas Koivunen
fcc3da7642 refactor: unify error contexts not to have failed to 2023-09-27 09:29:00 +00:00
Joonas Koivunen
8449572c0f fix: switch to formatting just a string
cloning all of the paths twice seems a bit too much.
2023-09-27 09:29:00 +00:00
Joonas Koivunen
45582a0ec4 DownloadError: align Display with anyhow impl 2023-09-27 09:29:00 +00:00
Joonas Koivunen
c3a027b8b4 detect simulated errors, never print them full 2023-09-27 09:29:00 +00:00
Joonas Koivunen
4feb12548b use a pub type for simulated errors 2023-09-27 09:29:00 +00:00
37 changed files with 626 additions and 1425 deletions

View File

@@ -834,7 +834,7 @@ jobs:
run:
shell: sh -eu {0}
env:
VM_BUILDER_VERSION: v0.17.12
VM_BUILDER_VERSION: v0.17.11
steps:
- name: Checkout

1
Cargo.lock generated
View File

@@ -3246,7 +3246,6 @@ dependencies = [
"reqwest-tracing",
"routerify",
"rstest",
"rustc-hash",
"rustls",
"rustls-pemfile",
"scopeguard",

View File

@@ -107,7 +107,6 @@ reqwest-middleware = "0.2.0"
reqwest-retry = "0.2.2"
routerify = "3"
rpds = "0.13"
rustc-hash = "1.1.0"
rustls = "0.21"
rustls-pemfile = "1"
rustls-split = "0.3"

View File

@@ -1,599 +0,0 @@
# Seamless tenant migration
- Author: john@neon.tech
- Created on 2023-08-11
- Implemented on ..
## Summary
The preceding [generation numbers RFC](025-generation-numbers.md) may be thought of as "making tenant
migration safe". Following that,
this RFC is about how those migrations are to be done:
1. Seamlessly (without interruption to client availability)
2. Quickly (enabling faster operations)
3. Efficiently (minimizing I/O and $ cost)
These points are in priority order: if we have to sacrifice
efficiency to make a migration seamless for clients, we will
do so, etc.
This is accomplished by introducing two high level changes:
- A dual-attached state for tenants, used in a control-plane-orchestrated
migration procedure that preserves availability during a migration.
- Warm secondary locations for tenants, where on-disk content is primed
for a fast migration of the tenant from its current attachment to this
secondary location.
## Motivation
Migrating tenants between pageservers is essential to operating a service
at scale, in several contexts:
1. Responding to a pageserver node failure by migrating tenants to other pageservers
2. Balancing load and capacity across pageservers, for example when a user expands their
database and they need to migrate to a pageserver with more capacity.
3. Restarting pageservers for upgrades and maintenance
The current situation steps for migration are:
- detach from old node; skip if old node is dead; (the [skip part is still WIP](https://github.com/neondatabase/cloud/issues/5426)).
- attach to new node
- re-configure endpoints to use the new node
Once [generation numbers](025-generation-numbers.md) are implemented,
the detach step is no longer critical for correctness. So, we can
- attach to a new node,
- re-configure endpoints to use the new node, and then
- detach from the old node.
However, this still does not meet our seamless/fast/efficient goals:
- Not fast: The new node will have to download potentially large amounts
of data from S3, which may take many minutes.
- Not seamless: If we attach to a new pageserver before detaching an old one,
the new one might delete some objects that interrupt availability of reads on the old one.
- Not efficient: the old pageserver will continue uploading
S3 content during the migration that will never be read.
The user expectations for availability are:
- For planned maintenance, there should be zero availability
gap. This expectation is fulfilled by this RFC.
- For unplanned changes (e.g. node failures), there should be
minimal availability gap. This RFC provides the _mechanism_
to fail over quickly, but does not provide the failure _detection_
nor failover _policy_.
## Non Goals
- Defining service tiers with different storage strategies: the same
level of HA & overhead will apply to all tenants. This doesn't rule out
adding such tiers in future.
- Enabling pageserver failover in the absence of a control plane: the control
plane will remain the source of truth for what should be attached where.
- Totally avoiding availability gaps on unplanned migrations during
a failure (we expect a small, bounded window of
read unavailability of very recent LSNs)
- Workload balancing: this RFC defines the mechanism for moving tenants
around, not the higher level logic for deciding who goes where.
- Defining all possible configuration flows for tenants: the migration process
defined in this RFC demonstrates the sufficiency of the pageserver API, but
is not the only kind of configuration change the control plane will ever do.
The APIs defined here should let the control plane move tenants around in
whatever way is needed while preserving data safety and read availability.
## Impacted components
Pageserver, control plane
## Terminology
- **Attachment**: a tenant is _attached_ to a pageserver if it has
been issued a generation number, and is running an instance of
the `Tenant` type, ingesting the WAL, and available to serve
page reads.
- **Location**: locations are a superset of attachments. A location
is a combination of a tenant and a pageserver. We may _attach_ at a _location_.
- **Secondary location**: a location which is not currently attached.
- **Warm secondary location**: a location which is not currently attached, but is endeavoring to maintain a warm local cache of layers. We avoid calling this a _warm standby_ to avoid confusion with similar postgres features.
## Implementation (high level)
### Warm secondary locations
To enable faster migrations, we will identify at least one _secondary location_
for each tenant. This secondary location will keep a warm cache of layers
for the tenant, so that if it is later attached, it can catch up with the
latest LSN quickly: rather than downloading everything, it only has to replay
the recent part of the WAL to advance from the remote_consistent_offset to the
most recent LSN in the WAL.
The control plane is responsible for selecting secondary locations, and
calling into pageservers to configure tenants into a secondary mode at this
new location, as well as attaching the tenant in its existing primary location.
The attached pageserver for a tenant will publish a [layer heatmap](#layer-heatmap)
to advise secondaries of which layers should be downloaded.
### Location modes
Currently, we consider a tenant to be in one of two states on a pageserver:
- Attached: active `Tenant` object, and layers on local disk
- Detached: no layers on local disk, no runtime state.
We will extend this with finer-grained modes, whose purpose will become
clear in later sections:
- **AttachedSingle**: equivalent the existing attached state.
- **AttachedMulti**: like AttachedSingle, holds an up to date generation, but
does not do deletions.
- **AttachedStale**: like AttachedSingle, holds a stale generation,
do not do any remote storage operations.
- **Secondary**: keep local state on disk, periodically update from S3.
- **Detached**: equivalent to existing detached state.
To control these finer grained states, a new pageserver API endpoint will be added.
### Cutover procedure
Define old location and new location as "Node A" and "Node B". Consider
the case where both nodes are available, and Node B was previously configured
as a secondary location for the tenant we are migrating.
The cutover procedure is orchestrated by the control plane, calling into
the pageservers' APIs:
1. Call to Node A requesting it to flush to S3 and enter AttachedStale state
2. Increment generation, and call to Node B requesting it to enter AttachedMulti
state with the new generation.
3. Call to Node B, requesting it to download the latest hot layers from remote storage,
according to the latest heatmap flushed by Node A.
4. Wait for Node B's WAL ingestion to catch up with node A's
5. Update endpoints to use node B instead of node A
6. Call to node B requesting it to enter state AttachedSingle.
7. Call to node A requesting it to enter state Secondary
The following table summarizes how the state of the system advances:
| Step | Node A | Node B | Node used by endpoints |
| :-----------: | :------------: | :------------: | :--------------------: |
| 1 (_initial_) | AttachedSingle | Secondary | A |
| 2 | AttachedStale | AttachedMulti | A |
| 3 | AttachedStale | AttachedMulti | A |
| 4 | AttachedStale | AttachedMulti | A |
| 5 (_cutover_) | AttachedStale | AttachedMulti | B |
| 6 | AttachedStale | AttachedSingle | B |
| 7 (_final_) | Secondary | AttachedSingle | B |
The procedure described for a clean handover from a live node to a secondary
is also used for failure cases and for migrations to a location that is not
configured as a secondary, by simply skipping irrelevant steps, as described in
the following sections.
#### Migration from an unresponsive node
If node A is unavailable, then all calls into
node A are skipped and we don't wait for B to catch up before
switching updating the endpoints to use B.
#### Migration to a location that is not a secondary
If node B is initially in Detached state, the procedure is identical. Since Node B
is coming from a Detached state rather than Secondary, the download of layers and
catch up with WAL will take much longer.
We might do this if:
- Attached and secondary locations are both critically low on disk, and we need
to migrate to a third node with more resources available.
- We are migrating a tenant which does not use secondary locations to save on cost.
#### Permanent migration away from a node
In the final step of the migration, we generally request the original node to enter a Secondary
state. This is typical if we are doing a planned migration during maintenance, or to
balance CPU/network load away from a node.
One might also want to permanently migrate away: this can be done by simply removing the secondary
location after the migration is complete, or as an optimization by substituting the Detached state
for the Secondary state in the final step.
#### Cutover diagram
```mermaid
sequenceDiagram
participant CP as Control plane
participant A as Node A
participant B as Node B
participant E as Endpoint
CP->>A: PUT Flush & go to AttachedStale
note right of A: A continues to ingest WAL
CP->>B: PUT AttachedMulti
CP->>B: PUT Download layers from latest heatmap
note right of B: B downloads from S3
loop Poll until download complete
CP->>B: GET download status
end
activate B
note right of B: B ingests WAL
loop Poll until catch up
CP->>B: GET visible WAL
CP->>A: GET visible WAL
end
deactivate B
CP->>E: Configure to use Node B
E->>B: Connect for reads
CP->>B: PUT AttachedSingle
CP->>A: PUT Secondary
```
#### Cutover from an unavailable pageserver
This case is far simpler: we may skip straight to our intended
end state.
```mermaid
sequenceDiagram
participant A as Node A
participant CP as Control plane
participant B as Node B
participant E as Endpoint
note right of A: Node A offline
activate A
CP->>B: PUT AttachedSingle
CP->>E: Configure to use Node B
E->>B: Connect for reads
deactivate A
```
## Implementation (detail)
### Purpose of AttachedMulti, AttachedStale
#### AttachedMulti
Ordinarily, an attached pageserver whose generation is the latest may delete
layers at will (e.g. during compaction). If a previous generation pageserver
is also still attached, and in use by endpoints, then this layer deletion could
lead to a loss of availability for the endpoint when reading from the previous
generation pageserver.
The _AttachedMulti_ state simply disables deletions. These will be enqueued
in `RemoteTimelineClient` until the control plane transitions the
node into AttachedSingle, which unblocks deletions. Other remote storage operations
such as uploads are not blocked.
AttachedMulti is not required for data safety, only to preserve availability
on pageservers running with stale generations.
A node enters AttachedMulti only when explicitly asked to by the control plane. It should
only remain in this state for the duration of a migration.
If a control plane bug leaves
the node in AttachedMulti for a long time, then we must avoid unbounded memory use from enqueued
deletions. This may be accomplished simply, by dropping enqueued deletions when some modest
threshold of delayed deletions (e.g. 10k layers per tenant) is reached. As with all deletions,
it is safe to skip them, and the leaked objects will be eventually cleaned up by scrub or
by timeline deletion.
During AttachedMulti, the Tenant is free to drop layers from local disk in response to
disk pressure: only the deletion of remote layers is blocked.
#### AttachedStale
Currently, a pageserver with a stale generation number will continue to
upload layers, but be prevented from completing deletions. This is safe, but inefficient: layers uploaded by this stale generation
will not be read back by future generations of pageservers.
The _AttachedStale_ state disables S3 uploads. The stale pageserver
will continue to ingest the WAL and write layers to local disk, but not to
do any uploads to S3.
A node may enter AttachedStale in two ways:
- Explicitly, when control plane calls into the node at the start of a migration.
- Implicitly, when the node tries to validate some deletions and discovers
that its generation is stale.
The AttachedStale state also disables sending consumption metrics from
that location: it is interpreted as an indication that some other pageserver
is already attached or is about to be attached, and that new pageserver will
be responsible for sending consumption metrics.
#### Disk Pressure & AttachedStale
Over long periods of time, a tenant location in AttachedStale will accumulate data
on local disk, as it cannot evict any layers written since it entered the
AttachStale state. We rely on the control plane to revert the location to
Secondary or Detached at the end of a migration.
This scenario is particularly noteworthy when evacuating all tenants on a pageserver:
since _all_ the attached tenants will go into AttachedStale, we will be doing no
uploads at all, therefore ingested data will cause disk usage to increase continuously.
Under nominal conditions, the available disk space on pageservers should be sufficient
to complete the evacuation before this becomes a problem, but we must also handle
the case where we hit a low disk situation while in this state.
The concept of disk pressure already exists in the pageserver: the `disk_usage_eviction_task`
touches each Tenant when it determines that a low-disk condition requires
some layer eviction. Having selected layers for eviction, the eviction
task calls `Timeline::evict_layers`.
**Safety**: If evict_layers is called while in AttachedStale state, and some of the to-be-evicted
layers are not yet uploaded to S3, then the block on uploads will be lifted. This
will result in leaking some objects once a migration is complete, but will enable
the node to manage its disk space properly: if a node is left with some tenants
in AttachedStale indefinitely due to a network partition or control plane bug,
these tenants will not cause a full disk condition.
### Warm secondary updates
#### Layer heatmap
The secondary location's job is to serve reads **with the same quality of service as the original location
was serving them around the time of a migration**. This does not mean the secondary
location needs the whole set of layers: inactive layers that might soon
be evicted on the attached pageserver need not be downloaded by the
secondary. A totally idle tenant only needs to maintain enough on-disk
state to enable a fast cold start (i.e. the most recent image layers are
typically sufficient).
To enable this, we introduce the concept of a _layer heatmap_, which
acts as an advisory input to secondary locations to decide which
layers to download from S3.
#### Attached pageserver
The attached pageserver, if in state AttachedSingle, periodically
uploads a serialized heat map to S3. It may skip this if there
is no change since the last time it uploaded (e.g. if the tenant
is totally idle).
Additionally, when the tenant is flushed to remote storage prior to a migration
(the first step in [cutover procedure](#cutover-procedure)),
the heatmap is written out. This enables a future attached pageserver
to get an up to date view when deciding which layers to download.
#### Secondary location behavior
Secondary warm locations run a simple loop, implemented separately from
the main `Tenant` type, which represents attached tenants:
- Download the layer heatmap
- Select any "hot enough" layers to download, if there is sufficient
free disk space.
- Download layers, if they were not previously evicted (see below)
- Download the latest index_part.json
- Check if any layers currently on disk are no longer referenced by
IndexPart & delete them
Note that the heatmap is only advisory: if a secondary location has plenty
of disk space, it may choose to retain layers that aren't referenced
by the heatmap, as long as they are still referenced by the IndexPart. Conversely,
if a node is very low on disk space, it might opt to raise the heat threshold required
to both downloading a layer, until more disk space is available.
#### Secondary locations & disk pressure
Secondary locations are subject to eviction on disk pressure, just as
attached locations are. For eviction purposes, the access time of a
layer in a secondary location will be the access time given in the heatmap,
rather than the literal time at which the local layer file was accessed.
The heatmap will indicate which layers are in local storage on the attached
location. The secondary will always attempt to get back to having that
set of layers on disk, but to avoid flapping, it will remember the access
time of the layer it was most recently asked to evict, and layers whose
access time is below that will not be re-downloaded.
The resulting behavior is that after a layer is evicted from a secondary
location, it is only re-downloaded once the attached pageserver accesses
the layer and uploads a heatmap reflecting that access time. On a pageserver
restart, the secondary location will attempt to download all layers in
the heatmap again, if they are not on local disk.
This behavior will be slightly different when secondary locations are
used for "low energy tenants", but that is beyond the scope of this RFC.
### Location configuration API
Currently, the `/tenant/<tenant_id>/config` API defines various
tunables like compaction settings, which apply to the tenant irrespective
of which pageserver it is running on.
A new "location config" structure will be introduced, which defines
configuration which is per-tenant, but local to a particular pageserver,
such as the attachment mode and whether it is a secondary.
The pageserver will expose a new per-tenant API for setting
the state: `/tenant/<tenant_id>/location/config`.
Body content:
```
{
state: 'enum{Detached, Secondary, AttachedSingle, AttachedMulti, AttachedStale}',
generation: Option<u32>,
configuration: `Option<TenantConfig>`
flush: bool
}
```
Existing `/attach` and `/detach` endpoint will have the same
behavior as calling `/location/config` with `AttachedSingle` and `Detached`
states respectively. These endpoints will be deprecated and later
removed.
The generation attribute is mandatory for entering `AttachedSingle` or
`AttachedMulti`.
The configuration attribute is mandatory when entering any state other
than `Detached`. This configuration is the same as the body for
the existing `/tenant/<tenant_id>/config` endpoint.
The `flush` argument indicates whether the pageservers should flush
to S3 before proceeding: this only has any effect if the node is
currently in AttachedSingle or AttachedMulti. This is used
during the first phase of migration, when transitioning the
old pageserver to AttachedSingle.
The `/re-attach` API response will be extended to include a `state` as
well as a `generation`, enabling the pageserver to enter the
correct state for each tenant on startup.
### Database schema for locations
A new table `ProjectLocation`:
- pageserver_id: int
- tenant_id: TenantId
- generation: Option<int>
- state: `enum(Secondary, AttachedSingle, AttachedMulti)`
Notes:
- It is legacy for a Project to have zero `ProjectLocation`s
- The `pageserver` column in `Project` now means "to which pageserver should
endpoints connect", rather than simply which pageserver is attached.
- The `generation` column in `Project` remains, and is incremented and used
to set the generation of `ProjectLocation` rows when they are set into
an attached state.
- The `Detached` state is implicitly represented as the absence of
a `ProjectLocation`.
### Executing migrations
Migrations will be implemented as Go functions, within the
existing `Operation` framework in the control plane. These
operations are persistent, such that they will always keep
trying until completion: this property is important to avoid
leaving garbage behind on pageservers, such as AttachedStale
locations.
### Recovery from failures during migration
During migration, the control plane may encounter failures of either
the original or new pageserver, or both:
- If the original fails, skip past waiting for the new pageserver
to catch up, and put it into AttachedSingle immediately.
- If the new node fails, put the old pageserver into Secondary
and then back into AttachedSingle (this has the effect of
retaining on-disk state and granting it a fresh generation number).
- If both nodes fail, keep trying until one of them is available
again.
### Control plane -> Pageserver reconciliation
A migration may be done while the old node is unavailable,
in which case the old node may still be running in an AttachedStale
state.
In this case, it is undesirable to have the migration `Operation`
stay alive until the old node eventually comes back online
and can be cleaned up. To handle this, the control plane
should run a background reconciliation process to compare
a pageserver's attachments with the database, and clean up
any that shouldn't be there any more.
Note that there will be no work to do if the old node was really
offline, as during startup it will call into `/re-attach` and
be updated that way. The reconciliation will only be needed
if the node was unavailable but still running.
## Alternatives considered
### Only enabling secondary locations for tenants on a higher service tier
This will make sense in future, especially for tiny databases that may be
downloaded from S3 in milliseconds when needed.
However, it is not wise to do it immediately, because pageservers contain
a mixture of higher and lower tier workloads. If we had 1 tenant with
a secondary location and 9 without, then those other 9 tenants will do
a lot of I/O as they try to recover from S3, which may degrade the
service of the tenant which had a secondary location.
Until we segregate tenant on different service tiers on different pageserver
nodes, or implement & test QoS to ensure that tenants with secondaries are
not harmed by tenants without, we should use the same failover approach
for all the tenants.
### Hot secondary locations (continuous WAL replay)
Instead of secondary locations populating their caches from S3, we could
have them consume the WAL from safekeepers. The downsides of this would be:
- Double load on safekeepers, which are a less scalable service than S3
- Secondary locations' on-disk state would end up subtly different to
the remote state, which would make synchronizing with S3 more complex/expensive
when going into attached state.
The downside of only updating secondary locations from S3 is that we will
have a delay during migration from replaying the LSN range between what's
in S3 and what's in the pageserver. This range will be very small on
planned migrations, as we have the old pageserver flush to S3 immediately
before attaching the new pageserver. On unplanned migrations (old pageserver
is unavailable), the range of LSNs to replay is bounded by the flush frequency
on the old pageserver. However, the migration doesn't have to wait for the
replay: it's just that not-yet-replayed LSNs will be unavailable for read
until the new pageserver catches up.
We expect that pageserver reads of the most recent LSNs will be relatively
rare, as for an active endpoint those pages will usually still be in the postgres
page cache: this leads us to prefer synchronizing from S3 on secondary
locations, rather than consuming the WAL from safekeepers.
### Cold secondary locations
It is not functionally necessary to keep warm caches on secondary locations at all. However, if we do not, then
we would experience a de-facto availability loss in unplanned migrations, as reads to the new node would take an extremely long time (many seconds, perhaps minutes).
Warm caches on secondary locations are necessary to meet
our availability goals.
### Pageserver-granularity failover
Instead of migrating tenants individually, we could have entire spare nodes,
and on a node death, move all its work to one of these spares.
This approach is avoided for several reasons:
- we would still need fine-grained tenant migration for other
purposes such as balancing load
- by sharing the spare capacity over many peers rather than one spare node,
these peers may use the capacity for other purposes, until it is needed
to handle migrated tenants. e.g. for keeping a deeper cache of their
attached tenants.
### Readonly during migration
We could simplify migrations by making both previous and new nodes go into a
readonly state, then flush remote content from the previous node, then activate
attachment on the secondary node.
The downside to this approach is a potentially large gap in readability of
recent LSNs while loading data onto the new node. To avoid this, it is worthwhile
to incur the extra cost of double-replaying the WAL onto old and new nodes' local
storage during a migration.
### Peer-to-peer pageserver communication
Rather than uploading the heatmap to S3, attached pageservers could make it
available to peers.
Currently, pageservers have no peer to peer communication, so adding this
for heatmaps would incur significant overhead in deployment and configuration
of the service, and ensuring that when a new pageserver is deployed, other
pageservers are updated to be aware of it.
As well as simplifying implementation, putting heatmaps in S3 will be useful
for future analytics purposes -- gathering aggregated statistics on activity
pattersn across many tenants may be done directly from data in S3.

View File

@@ -107,7 +107,7 @@ pub const CHUNK_SIZE: usize = 1000;
// Just a wrapper around a slice of events
// to serialize it as `{"events" : [ ] }
#[derive(serde::Serialize, serde::Deserialize)]
#[derive(serde::Serialize)]
pub struct EventChunk<'a, T: Clone> {
pub events: std::borrow::Cow<'a, [T]>,
}

View File

@@ -25,7 +25,11 @@ use tokio::io;
use toml_edit::Item;
use tracing::info;
pub use self::{local_fs::LocalFs, s3_bucket::S3Bucket, simulate_failures::UnreliableWrapper};
pub use self::{
local_fs::LocalFs,
s3_bucket::S3Bucket,
simulate_failures::{SimulatedError, UnreliableWrapper},
};
/// How many different timelines can be processed simultaneously when synchronizing layers with the remote storage.
/// During regular work, pageserver produces one layer file per timeline checkpoint, with bursts of concurrency
@@ -190,26 +194,44 @@ impl Debug for Download {
#[derive(Debug)]
pub enum DownloadError {
/// Validation or other error happened due to user input.
///
/// This is only used by LOCAL_FS.
BadInput(anyhow::Error),
/// The file was not found in the remote storage.
///
/// This can only happen during download, never during delete.
NotFound,
/// The file was found in the remote storage, but the download failed.
/// The file was found in the remote storage, but the operation failed.
///
/// The error should have context already describing the real failed operation.
Other(anyhow::Error),
}
impl std::fmt::Display for DownloadError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use DownloadError::*;
match self {
DownloadError::BadInput(e) => {
write!(f, "Failed to download a remote file due to user input: {e}")
}
DownloadError::NotFound => write!(f, "No file found for the remote object id given"),
DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"),
NotFound => write!(f, "No file found for the remote object id given"),
// this is same as thiserror error(transparent); it handles {} and {:#}
Other(e) | BadInput(e) => std::fmt::Display::fmt(e, f),
}
}
}
impl std::error::Error for DownloadError {}
impl std::error::Error for DownloadError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
use DownloadError::*;
match self {
NotFound => None,
Other(_) | BadInput(_) => {
// TODO: these are anyhow, cannot return here
None
}
}
}
}
/// Every storage, currently supported.
/// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.

View File

@@ -345,7 +345,7 @@ impl RemoteStorage for S3Bucket {
.set_max_keys(self.max_keys_per_list_response)
.send()
.await
.context("Failed to list S3 prefixes")
.context("list S3 prefixes")
.map_err(DownloadError::Other);
let started_at = ScopeGuard::into_inner(started_at);
@@ -397,7 +397,7 @@ impl RemoteStorage for S3Bucket {
.set_max_keys(self.max_keys_per_list_response)
.send()
.await
.context("Failed to list files in S3 bucket");
.context("list files in S3 bucket");
let started_at = ScopeGuard::into_inner(started_at);
metrics::BUCKET_METRICS
@@ -521,10 +521,7 @@ impl RemoteStorage for S3Bucket {
.deleted_objects_total
.inc_by(chunk.len() as u64);
if let Some(errors) = resp.errors {
return Err(anyhow::format_err!(
"Failed to delete {} objects",
errors.len()
));
return Err(anyhow::anyhow!("delete {} objects", errors.len()));
}
}
Err(e) => {

View File

@@ -18,7 +18,7 @@ pub struct UnreliableWrapper {
}
/// Used to identify retries of different unique operation.
#[derive(Debug, Hash, Eq, PartialEq)]
#[derive(Hash, Eq, PartialEq)]
enum RemoteOp {
ListPrefixes(Option<RemotePath>),
Upload(RemotePath),
@@ -27,6 +27,22 @@ enum RemoteOp {
DeleteObjects(Vec<RemotePath>),
}
impl std::fmt::Debug for RemoteOp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use RemoteOp::*;
match self {
ListPrefixes(arg0) => f.debug_tuple("ListPrefixes").field(arg0).finish(),
Upload(arg0) => f.debug_tuple("Upload").field(arg0).finish(),
Download(arg0) => f.debug_tuple("Download").field(arg0).finish(),
Delete(arg0) => f.debug_tuple("Delete").field(arg0).finish(),
DeleteObjects(many) if many.len() > 3 => {
write!(f, "DeleteObjects({} paths)", many.len())
}
DeleteObjects(few) => f.debug_tuple("DeleteObjects").field(few).finish(),
}
}
}
impl UnreliableWrapper {
pub fn new(inner: crate::GenericRemoteStorage, attempts_to_fail: u64) -> Self {
assert!(attempts_to_fail > 0);
@@ -59,13 +75,12 @@ impl UnreliableWrapper {
e.remove();
Ok(attempts_before_this)
} else {
let error =
anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
let error = anyhow::anyhow!(SimulatedError::from(e.key()));
Err(DownloadError::Other(error))
}
}
Entry::Vacant(e) => {
let error = anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
let error = anyhow::anyhow!(SimulatedError::from(e.key()));
e.insert(1);
Err(DownloadError::Other(error))
}
@@ -80,6 +95,26 @@ impl UnreliableWrapper {
}
}
/// `pub` type for checking if this is the root cause around logging.
///
/// This is just a string to avoid cloning a huge number of paths a second time.
#[derive(Debug)]
pub struct SimulatedError(String);
impl<'a> From<&'a RemoteOp> for SimulatedError {
fn from(value: &'_ RemoteOp) -> Self {
SimulatedError(format!("simulated failure of remote operation {:?}", value))
}
}
impl std::fmt::Display for SimulatedError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
impl std::error::Error for SimulatedError {}
#[async_trait::async_trait]
impl RemoteStorage for UnreliableWrapper {
async fn list_prefixes(

View File

@@ -216,24 +216,6 @@ impl std::fmt::Debug for PrettyLocation<'_, '_> {
}
}
/// When you will store a secret but want to make sure it won't
/// be accidentally logged, wrap it in a SecretString, whose Debug
/// implementation does not expose the contents.
#[derive(Clone, Eq, PartialEq)]
pub struct SecretString(String);
impl SecretString {
pub fn get_contents(&self) -> &str {
self.0.as_str()
}
}
impl std::fmt::Debug for SecretString {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[SECRET]")
}
}
#[cfg(test)]
mod tests {
use metrics::{core::Opts, IntCounterVec};

View File

@@ -431,14 +431,14 @@ impl CgroupWatcher {
.context("failed to request upscale")?;
let memory_high =
self.get_memory_high_bytes().context("failed to get memory.high")?;
self.get_high_bytes().context("failed to get memory.high")?;
let new_high = memory_high + self.config.memory_high_increase_by_bytes;
info!(
current_high_bytes = memory_high,
new_high_bytes = new_high,
"updating memory.high"
);
self.set_memory_high_bytes(new_high)
self.set_high_bytes(new_high)
.context("failed to set memory.high")?;
last_memory_high_increase_at = Some(Instant::now());
continue;
@@ -556,6 +556,14 @@ impl CgroupWatcher {
}
}
/// Represents a set of limits we apply to a cgroup to control memory usage.
///
/// Setting these values also affects the thresholds for receiving usage alerts.
#[derive(Debug)]
pub struct MemoryLimits {
pub high: u64,
}
// Methods for manipulating the actual cgroup
impl CgroupWatcher {
/// Get a handle on the freezer subsystem.
@@ -616,29 +624,50 @@ impl CgroupWatcher {
}
/// Set cgroup memory.high threshold.
pub fn set_memory_high_bytes(&self, bytes: u64) -> anyhow::Result<()> {
self.set_memory_high_internal(MaxValue::Value(u64::min(bytes, i64::MAX as u64) as i64))
}
/// Set the cgroup's memory.high to 'max', disabling it.
pub fn unset_memory_high(&self) -> anyhow::Result<()> {
self.set_memory_high_internal(MaxValue::Max)
}
fn set_memory_high_internal(&self, value: MaxValue) -> anyhow::Result<()> {
pub fn set_high_bytes(&self, bytes: u64) -> anyhow::Result<()> {
self.memory()
.context("failed to get memory subsystem")?
.set_mem(cgroups_rs::memory::SetMemory {
low: None,
high: Some(value),
high: Some(MaxValue::Value(u64::min(bytes, i64::MAX as u64) as i64)),
min: None,
max: None,
})
.map_err(anyhow::Error::from)
.context("failed to set memory.high")
}
/// Set cgroup memory.high and memory.max.
pub fn set_limits(&self, limits: &MemoryLimits) -> anyhow::Result<()> {
info!(limits.high, path = self.path(), "writing new memory limits",);
self.memory()
.context("failed to get memory subsystem while setting memory limits")?
.set_mem(cgroups_rs::memory::SetMemory {
min: None,
low: None,
high: Some(MaxValue::Value(
u64::min(limits.high, i64::MAX as u64) as i64
)),
max: None,
})
.context("failed to set memory limits")
}
/// Given some amount of available memory, set the desired cgroup memory limits
pub fn set_memory_limits(&mut self, available_memory: u64) -> anyhow::Result<()> {
let new_high = self.config.calculate_memory_high_value(available_memory);
let limits = MemoryLimits { high: new_high };
info!(
path = self.path(),
memory = ?limits,
"setting cgroup memory",
);
self.set_limits(&limits)
.context("failed to set cgroup memory limits")?;
Ok(())
}
/// Get memory.high threshold.
pub fn get_memory_high_bytes(&self) -> anyhow::Result<u64> {
pub fn get_high_bytes(&self) -> anyhow::Result<u64> {
let high = self
.memory()
.context("failed to get memory subsystem while getting memory statistics")?

View File

@@ -16,7 +16,7 @@ use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use crate::cgroup::{CgroupWatcher, Sequenced};
use crate::cgroup::{CgroupWatcher, MemoryLimits, Sequenced};
use crate::dispatcher::Dispatcher;
use crate::filecache::{FileCacheConfig, FileCacheState};
use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources};
@@ -106,51 +106,6 @@ impl Runner {
kill,
};
// If we have both the cgroup and file cache integrations enabled, it's possible for
// temporary failures to result in cgroup throttling (from memory.high), that in turn makes
// it near-impossible to connect to the file cache (because it times out). Unfortunately,
// we *do* still want to determine the file cache size before setting the cgroup's
// memory.high, so it's not as simple as just swapping the order.
//
// Instead, the resolution here is that on vm-monitor startup (note: happens on each
// connection from autoscaler-agent, possibly multiple times per compute_ctl lifecycle), we
// temporarily unset memory.high, to allow any existing throttling to dissipate. It's a bit
// of a hacky solution, but helps with reliability.
if let Some(name) = &args.cgroup {
// Best not to set up cgroup stuff more than once, so we'll initialize cgroup state
// now, and then set limits later.
info!("initializing cgroup");
let (cgroup, cgroup_event_stream) = CgroupWatcher::new(name.clone(), requesting_send)
.context("failed to create cgroup manager")?;
info!("temporarily unsetting memory.high");
// Temporarily un-set cgroup memory.high; see above.
cgroup
.unset_memory_high()
.context("failed to unset memory.high")?;
let cgroup = Arc::new(cgroup);
let cgroup_clone = Arc::clone(&cgroup);
spawn_with_cancel(
token.clone(),
|_| error!("cgroup watcher terminated"),
async move { cgroup_clone.watch(notified_recv, cgroup_event_stream).await },
);
state.cgroup = Some(cgroup);
} else {
// *NOTE*: We need to forget the sender so that its drop impl does not get ran.
// This allows us to poll it in `Monitor::run` regardless of whether we
// are managing a cgroup or not. If we don't forget it, all receives will
// immediately return an error because the sender is droped and it will
// claim all select! statements, effectively turning `Monitor::run` into
// `loop { fail to receive }`.
mem::forget(requesting_send);
}
let mut file_cache_reserved_bytes = 0;
let mem = get_total_system_memory();
@@ -164,7 +119,7 @@ impl Runner {
false => FileCacheConfig::default_in_memory(),
};
let mut file_cache = FileCacheState::new(connstr, config, token)
let mut file_cache = FileCacheState::new(connstr, config, token.clone())
.await
.context("failed to create file cache")?;
@@ -197,15 +152,35 @@ impl Runner {
state.filecache = Some(file_cache);
}
if let Some(cgroup) = &state.cgroup {
let available = mem - file_cache_reserved_bytes;
let value = cgroup.config.calculate_memory_high_value(available);
if let Some(name) = &args.cgroup {
let (mut cgroup, cgroup_event_stream) =
CgroupWatcher::new(name.clone(), requesting_send)
.context("failed to create cgroup manager")?;
info!(value, "setting memory.high");
let available = mem - file_cache_reserved_bytes;
cgroup
.set_memory_high_bytes(value)
.context("failed to set cgroup memory.high")?;
.set_memory_limits(available)
.context("failed to set cgroup memory limits")?;
let cgroup = Arc::new(cgroup);
// Some might call this . . . cgroup v2
let cgroup_clone = Arc::clone(&cgroup);
spawn_with_cancel(token, |_| error!("cgroup watcher terminated"), async move {
cgroup_clone.watch(notified_recv, cgroup_event_stream).await
});
state.cgroup = Some(cgroup);
} else {
// *NOTE*: We need to forget the sender so that its drop impl does not get ran.
// This allows us to poll it in `Monitor::run` regardless of whether we
// are managing a cgroup or not. If we don't forget it, all receives will
// immediately return an error because the sender is droped and it will
// claim all select! statements, effectively turning `Monitor::run` into
// `loop { fail to receive }`.
mem::forget(requesting_send);
}
Ok(state)
@@ -282,11 +257,14 @@ impl Runner {
new_cgroup_mem_high = cgroup.config.calculate_memory_high_value(available_memory);
}
// new_cgroup_mem_high is initialized to 0 but it is guaranteed to not be here
// since it is properly initialized in the previous cgroup if let block
let limits = MemoryLimits {
// new_cgroup_mem_high is initialized to 0 but it is guarancontextd to not be here
// since it is properly initialized in the previous cgroup if let block
high: new_cgroup_mem_high,
};
cgroup
.set_memory_high_bytes(new_cgroup_mem_high)
.context("failed to set cgroup memory.high")?;
.set_limits(&limits)
.context("failed to set cgroup memory limits")?;
let message = format!(
"set cgroup memory.high to {} MiB, of new max {} MiB",
@@ -349,9 +327,12 @@ impl Runner {
name = cgroup.path(),
"updating cgroup memory.high",
);
let limits = MemoryLimits {
high: new_cgroup_mem_high,
};
cgroup
.set_memory_high_bytes(new_cgroup_mem_high)
.context("failed to set cgroup memory.high")?;
.set_limits(&limits)
.context("failed to set file cache size")?;
}
Ok(())

View File

@@ -644,7 +644,7 @@ fn create_remote_storage_client(
let config = if let Some(config) = &conf.remote_storage_config {
config
} else {
tracing::warn!("no remote storage configured, this is a deprecated configuration");
// No remote storage configured.
return Ok(None);
};

View File

@@ -11,7 +11,6 @@ use std::env;
use storage_broker::Uri;
use utils::crashsafe::path_with_suffix_extension;
use utils::id::ConnectionId;
use utils::logging::SecretString;
use once_cell::sync::OnceCell;
use reqwest::Url;
@@ -208,9 +207,6 @@ pub struct PageServerConf {
pub background_task_maximum_delay: Duration,
pub control_plane_api: Option<Url>,
/// JWT token for use with the control plane API.
pub control_plane_api_token: Option<SecretString>,
}
/// We do not want to store this in a PageServerConf because the latter may be logged
@@ -287,7 +283,6 @@ struct PageServerConfigBuilder {
background_task_maximum_delay: BuilderValue<Duration>,
control_plane_api: BuilderValue<Option<Url>>,
control_plane_api_token: BuilderValue<Option<SecretString>>,
}
impl Default for PageServerConfigBuilder {
@@ -352,7 +347,6 @@ impl Default for PageServerConfigBuilder {
.unwrap()),
control_plane_api: Set(None),
control_plane_api_token: Set(None),
}
}
}
@@ -573,9 +567,6 @@ impl PageServerConfigBuilder {
control_plane_api: self
.control_plane_api
.ok_or(anyhow!("missing control_plane_api"))?,
control_plane_api_token: self
.control_plane_api_token
.ok_or(anyhow!("missing control_plane_api_token"))?,
})
}
}
@@ -954,7 +945,6 @@ impl PageServerConf {
ondemand_download_behavior_treat_error_as_warn: false,
background_task_maximum_delay: Duration::ZERO,
control_plane_api: None,
control_plane_api_token: None,
}
}
}
@@ -1178,8 +1168,7 @@ background_task_maximum_delay = '334 s'
background_task_maximum_delay: humantime::parse_duration(
defaults::DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY
)?,
control_plane_api: None,
control_plane_api_token: None
control_plane_api: None
},
"Correct defaults should be used when no config values are provided"
);
@@ -1235,8 +1224,7 @@ background_task_maximum_delay = '334 s'
test_remote_failures: 0,
ondemand_download_behavior_treat_error_as_warn: false,
background_task_maximum_delay: Duration::from_secs(334),
control_plane_api: None,
control_plane_api_token: None
control_plane_api: None
},
"Should be able to parse all basic config values correctly"
);

View File

@@ -53,16 +53,12 @@ impl ControlPlaneClient {
segs.pop_if_empty().push("");
}
let mut client = reqwest::ClientBuilder::new();
if let Some(jwt) = &conf.control_plane_api_token {
let mut headers = hyper::HeaderMap::new();
headers.insert("Authorization", jwt.get_contents().parse().unwrap());
client = client.default_headers(headers);
}
let client = reqwest::ClientBuilder::new()
.build()
.expect("Failed to construct http client");
Some(Self {
http_client: client.build().expect("Failed to construct HTTP client"),
http_client: client,
base_url: url,
node_id: conf.id,
cancel: cancel.clone(),

View File

@@ -291,14 +291,6 @@ static RESIDENT_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static RESIDENT_PHYSICAL_SIZE_GLOBAL: Lazy<UIntGauge> = Lazy::new(|| {
register_uint_gauge!(
"pageserver_resident_physical_size_global",
"Like `pageserver_resident_physical_size`, but without tenant/timeline dimensions."
)
.expect("failed to define a metric")
});
static REMOTE_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_remote_physical_size",
@@ -309,14 +301,6 @@ static REMOTE_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
static REMOTE_PHYSICAL_SIZE_GLOBAL: Lazy<UIntGauge> = Lazy::new(|| {
register_uint_gauge!(
"pageserver_remote_physical_size_global",
"Like `pageserver_remote_physical_size`, but without tenant/timeline dimensions."
)
.expect("failed to define a metric")
});
pub(crate) static REMOTE_ONDEMAND_DOWNLOADED_LAYERS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_remote_ondemand_downloaded_layers_total",
@@ -1225,7 +1209,7 @@ pub struct TimelineMetrics {
pub load_layer_map_histo: StorageTimeMetrics,
pub garbage_collect_histo: StorageTimeMetrics,
pub last_record_gauge: IntGauge,
resident_physical_size_gauge: UIntGauge,
pub resident_physical_size_gauge: UIntGauge,
/// copy of LayeredTimeline.current_logical_size
pub current_logical_size_gauge: UIntGauge,
pub num_persistent_files_created: IntCounter,
@@ -1303,29 +1287,10 @@ impl TimelineMetrics {
}
pub fn record_new_file_metrics(&self, sz: u64) {
self.resident_physical_size_add(sz);
self.resident_physical_size_gauge.add(sz);
self.num_persistent_files_created.inc_by(1);
self.persistent_bytes_written.inc_by(sz);
}
pub fn resident_physical_size_sub(&self, sz: u64) {
self.resident_physical_size_gauge.sub(sz);
crate::metrics::RESIDENT_PHYSICAL_SIZE_GLOBAL.sub(sz);
}
pub fn resident_physical_size_add(&self, sz: u64) {
self.resident_physical_size_gauge.add(sz);
crate::metrics::RESIDENT_PHYSICAL_SIZE_GLOBAL.add(sz);
}
pub fn resident_physical_size_set(&self, sz: u64) {
self.resident_physical_size_gauge.set(sz);
crate::metrics::RESIDENT_PHYSICAL_SIZE_GLOBAL.set(sz);
}
pub fn resident_physical_size_get(&self) -> u64 {
self.resident_physical_size_gauge.get()
}
}
impl Drop for TimelineMetrics {
@@ -1333,10 +1298,7 @@ impl Drop for TimelineMetrics {
let tenant_id = &self.tenant_id;
let timeline_id = &self.timeline_id;
let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, timeline_id]);
{
RESIDENT_PHYSICAL_SIZE_GLOBAL.sub(self.resident_physical_size_get());
let _ = RESIDENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]);
}
let _ = RESIDENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]);
let _ = CURRENT_LOGICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]);
let _ = NUM_PERSISTENT_FILES_CREATED.remove_label_values(&[tenant_id, timeline_id]);
let _ = PERSISTENT_BYTES_WRITTEN.remove_label_values(&[tenant_id, timeline_id]);
@@ -1390,43 +1352,10 @@ use std::time::{Duration, Instant};
use crate::context::{PageContentKind, RequestContext};
use crate::task_mgr::TaskKind;
/// Maintain a per timeline gauge in addition to the global gauge.
struct PerTimelineRemotePhysicalSizeGauge {
last_set: u64,
gauge: UIntGauge,
}
impl PerTimelineRemotePhysicalSizeGauge {
fn new(per_timeline_gauge: UIntGauge) -> Self {
Self {
last_set: per_timeline_gauge.get(),
gauge: per_timeline_gauge,
}
}
fn set(&mut self, sz: u64) {
self.gauge.set(sz);
if sz < self.last_set {
REMOTE_PHYSICAL_SIZE_GLOBAL.sub(self.last_set - sz);
} else {
REMOTE_PHYSICAL_SIZE_GLOBAL.add(sz - self.last_set);
};
self.last_set = sz;
}
fn get(&self) -> u64 {
self.gauge.get()
}
}
impl Drop for PerTimelineRemotePhysicalSizeGauge {
fn drop(&mut self) {
REMOTE_PHYSICAL_SIZE_GLOBAL.sub(self.last_set);
}
}
pub struct RemoteTimelineClientMetrics {
tenant_id: String,
timeline_id: String,
remote_physical_size_gauge: Mutex<Option<PerTimelineRemotePhysicalSizeGauge>>,
remote_physical_size_gauge: Mutex<Option<UIntGauge>>,
calls_unfinished_gauge: Mutex<HashMap<(&'static str, &'static str), IntGauge>>,
bytes_started_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
bytes_finished_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
@@ -1444,24 +1373,18 @@ impl RemoteTimelineClientMetrics {
}
}
pub(crate) fn remote_physical_size_set(&self, sz: u64) {
pub fn remote_physical_size_gauge(&self) -> UIntGauge {
let mut guard = self.remote_physical_size_gauge.lock().unwrap();
let gauge = guard.get_or_insert_with(|| {
PerTimelineRemotePhysicalSizeGauge::new(
guard
.get_or_insert_with(|| {
REMOTE_PHYSICAL_SIZE
.get_metric_with_label_values(&[
&self.tenant_id.to_string(),
&self.timeline_id.to_string(),
])
.unwrap(),
)
});
gauge.set(sz);
}
pub(crate) fn remote_physical_size_get(&self) -> u64 {
let guard = self.remote_physical_size_gauge.lock().unwrap();
guard.as_ref().map(|gauge| gauge.get()).unwrap_or(0)
.unwrap()
})
.clone()
}
pub fn remote_operation_time(

View File

@@ -453,11 +453,11 @@ impl RemoteTimelineClient {
} else {
0
};
self.metrics.remote_physical_size_set(size);
self.metrics.remote_physical_size_gauge().set(size);
}
pub fn get_remote_physical_size(&self) -> u64 {
self.metrics.remote_physical_size_get()
self.metrics.remote_physical_size_gauge().get()
}
//
@@ -1161,7 +1161,11 @@ impl RemoteTimelineClient {
// at info level at first, and only WARN if the operation fails repeatedly.
//
// (See similar logic for downloads in `download::download_retry`)
if retries < FAILED_UPLOAD_WARN_THRESHOLD {
let is_simulated = cfg!(feature = "testing")
&& e.root_cause().is::<remote_storage::SimulatedError>();
if retries < FAILED_UPLOAD_WARN_THRESHOLD || is_simulated {
info!(
"failed to perform remote task {}, will retry (attempt {}): {:#}",
task.op, retries, e

View File

@@ -559,7 +559,7 @@ impl Timeline {
}
pub fn resident_physical_size(&self) -> u64 {
self.metrics.resident_physical_size_get()
self.metrics.resident_physical_size_gauge.get()
}
///
@@ -1309,7 +1309,10 @@ impl Timeline {
// will treat the file as a local layer again, count it towards resident size,
// and it'll be like the layer removal never happened.
// The bump in resident size is perhaps unexpected but overall a robust behavior.
self.metrics.resident_physical_size_sub(layer_file_size);
self.metrics
.resident_physical_size_gauge
.sub(layer_file_size);
self.metrics.evictions.inc();
if let Some(delta) = local_layer_residence_duration {
@@ -1843,7 +1846,9 @@ impl Timeline {
"loaded layer map with {} layers at {}, total physical size: {}",
num_layers, disk_consistent_lsn, total_physical_size
);
self.metrics.resident_physical_size_set(total_physical_size);
self.metrics
.resident_physical_size_gauge
.set(total_physical_size);
timer.stop_and_record();
Ok(())
@@ -4393,7 +4398,7 @@ impl Timeline {
// XXX the temp file is still around in Err() case
// and consumes space until we clean up upon pageserver restart.
self_clone.metrics.resident_physical_size_add(*size);
self_clone.metrics.resident_physical_size_gauge.add(*size);
// Download complete. Replace the RemoteLayer with the corresponding
// Delta- or ImageLayer in the layer map.

View File

@@ -263,7 +263,7 @@ impl LayerManager {
let desc = layer.layer_desc();
if !layer.is_remote_layer() {
layer.delete_resident_layer_file()?;
metrics.resident_physical_size_sub(desc.file_size);
metrics.resident_physical_size_gauge.sub(desc.file_size);
}
// TODO Removing from the bottom of the layer map is expensive.

View File

@@ -1790,14 +1790,6 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
if (!XLogInsertAllowed())
return;
/* ensure we have enough xlog buffers to log max-sized records */
XLogEnsureRecordSpace(Min(remblocks, (XLR_MAX_BLOCK_ID - 1)), 0);
/*
* Iterate over all the pages. They are collected into batches of
* XLR_MAX_BLOCK_ID pages, and a single WAL-record is written for each
* batch.
*/
while (remblocks > 0)
{
int count = Min(remblocks, XLR_MAX_BLOCK_ID);

View File

@@ -42,7 +42,6 @@ reqwest-middleware.workspace = true
reqwest-retry.workspace = true
reqwest-tracing.workspace = true
routerify.workspace = true
rustc-hash.workspace = true
rustls-pemfile.workspace = true
rustls.workspace = true
scopeguard.workspace = true

View File

@@ -17,12 +17,11 @@ use std::{
use tokio::time;
use tokio_postgres::AsyncMessage;
use crate::{
auth, console,
metrics::{Ids, MetricCounter, USAGE_METRICS},
};
use crate::{auth, console};
use crate::{compute, config};
use super::sql_over_http::MAX_RESPONSE_SIZE;
use crate::proxy::ConnectMechanism;
use tracing::{error, warn};
@@ -401,6 +400,7 @@ async fn connect_to_compute_once(
.user(&conn_info.username)
.password(&conn_info.password)
.dbname(&conn_info.dbname)
.max_backend_message_size(MAX_RESPONSE_SIZE)
.connect_timeout(timeout)
.connect(tokio_postgres::NoTls)
.await?;
@@ -412,10 +412,6 @@ async fn connect_to_compute_once(
span.in_scope(|| {
info!(%conn_info, %session, "new connection");
});
let ids = Ids {
endpoint_id: node_info.aux.endpoint_id.to_string(),
branch_id: node_info.aux.branch_id.to_string(),
};
tokio::spawn(
poll_fn(move |cx| {
@@ -454,18 +450,10 @@ async fn connect_to_compute_once(
Ok(Client {
inner: client,
session: tx,
ids,
})
}
pub struct Client {
pub inner: tokio_postgres::Client,
session: tokio::sync::watch::Sender<uuid::Uuid>,
ids: Ids,
}
impl Client {
pub fn metrics(&self) -> Arc<MetricCounter> {
USAGE_METRICS.register(self.ids.clone())
}
}

View File

@@ -3,12 +3,10 @@ use std::sync::Arc;
use anyhow::bail;
use futures::pin_mut;
use futures::StreamExt;
use hashbrown::HashMap;
use hyper::body::HttpBody;
use hyper::header;
use hyper::http::HeaderName;
use hyper::http::HeaderValue;
use hyper::Response;
use hyper::StatusCode;
use hyper::{Body, HeaderMap, Request};
use serde_json::json;
use serde_json::Map;
@@ -18,11 +16,7 @@ use tokio_postgres::types::Type;
use tokio_postgres::GenericClient;
use tokio_postgres::IsolationLevel;
use tokio_postgres::Row;
use tracing::error;
use tracing::instrument;
use url::Url;
use utils::http::error::ApiError;
use utils::http::json::json_response;
use super::conn_pool::ConnInfo;
use super::conn_pool::GlobalConnPool;
@@ -45,6 +39,7 @@ enum Payload {
Batch(BatchQueryData),
}
pub const MAX_RESPONSE_SIZE: usize = 10 * 1024 * 1024; // 10 MB
const MAX_REQUEST_SIZE: u64 = 1024 * 1024; // 1 MB
static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
@@ -187,45 +182,7 @@ pub async fn handle(
sni_hostname: Option<String>,
conn_pool: Arc<GlobalConnPool>,
session_id: uuid::Uuid,
) -> Result<Response<Body>, ApiError> {
let result = handle_inner(request, sni_hostname, conn_pool, session_id).await;
let mut response = match result {
Ok(r) => r,
Err(e) => {
let message = format!("{:?}", e);
let code = match e.downcast_ref::<tokio_postgres::Error>() {
Some(e) => match e.code() {
Some(e) => serde_json::to_value(e.code()).unwrap(),
None => Value::Null,
},
None => Value::Null,
};
error!(
?code,
"sql-over-http per-client task finished with an error: {e:#}"
);
// TODO: this shouldn't always be bad request.
json_response(
StatusCode::BAD_REQUEST,
json!({ "message": message, "code": code }),
)?
}
};
response.headers_mut().insert(
"Access-Control-Allow-Origin",
hyper::http::HeaderValue::from_static("*"),
);
Ok(response)
}
#[instrument(name = "sql-over-http", skip_all)]
async fn handle_inner(
request: Request<Body>,
sni_hostname: Option<String>,
conn_pool: Arc<GlobalConnPool>,
session_id: uuid::Uuid,
) -> anyhow::Result<Response<Body>> {
) -> anyhow::Result<(Value, HashMap<HeaderName, HeaderValue>)> {
//
// Determine the destination and connection params
//
@@ -276,18 +233,13 @@ async fn handle_inner(
let mut client = conn_pool.get(&conn_info, !allow_pool, session_id).await?;
let mut response = Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/json");
//
// Now execute the query and return the result
//
let mut size = 0;
let result = match payload {
Payload::Single(query) => {
query_to_json(&client.inner, query, &mut size, raw_output, array_mode).await
}
Payload::Single(query) => query_to_json(&client.inner, query, raw_output, array_mode)
.await
.map(|x| (x, HashMap::default())),
Payload::Batch(batch_query) => {
let mut results = Vec::new();
let mut builder = client.inner.build_transaction();
@@ -302,8 +254,7 @@ async fn handle_inner(
}
let transaction = builder.start().await?;
for query in batch_query.queries {
let result =
query_to_json(&transaction, query, &mut size, raw_output, array_mode).await;
let result = query_to_json(&transaction, query, raw_output, array_mode).await;
match result {
Ok(r) => results.push(r),
Err(e) => {
@@ -313,27 +264,26 @@ async fn handle_inner(
}
}
transaction.commit().await?;
let mut headers = HashMap::default();
if txn_read_only {
response = response.header(
headers.insert(
TXN_READ_ONLY.clone(),
HeaderValue::try_from(txn_read_only.to_string())?,
);
}
if txn_deferrable {
response = response.header(
headers.insert(
TXN_DEFERRABLE.clone(),
HeaderValue::try_from(txn_deferrable.to_string())?,
);
}
if let Some(txn_isolation_level) = txn_isolation_level_raw {
response = response.header(TXN_ISOLATION_LEVEL.clone(), txn_isolation_level);
headers.insert(TXN_ISOLATION_LEVEL.clone(), txn_isolation_level);
}
Ok(json!({ "results": results }))
Ok((json!({ "results": results }), headers))
}
};
let metrics = client.metrics();
if allow_pool {
let current_span = tracing::Span::current();
// return connection to the pool
@@ -343,30 +293,12 @@ async fn handle_inner(
});
}
match result {
Ok(value) => {
// how could this possibly fail
let body = serde_json::to_string(&value).expect("json serialization should not fail");
let len = body.len();
let response = response
.body(Body::from(body))
// only fails if invalid status code or invalid header/values are given.
// these are not user configurable so it cannot fail dynamically
.expect("building response payload should not fail");
// count the egress bytes - we miss the TLS and header overhead but oh well...
// moving this later in the stack is going to be a lot of effort and ehhhh
metrics.record_egress(len as u64);
Ok(response)
}
Err(e) => Err(e),
}
result
}
async fn query_to_json<T: GenericClient>(
client: &T,
data: QueryData,
current_size: &mut usize,
raw_output: bool,
array_mode: bool,
) -> anyhow::Result<Value> {
@@ -380,10 +312,16 @@ async fn query_to_json<T: GenericClient>(
// big.
pin_mut!(row_stream);
let mut rows: Vec<tokio_postgres::Row> = Vec::new();
let mut current_size = 0;
while let Some(row) = row_stream.next().await {
let row = row?;
*current_size += row.body_len();
current_size += row.body_len();
rows.push(row);
if current_size > MAX_RESPONSE_SIZE {
return Err(anyhow::anyhow!(
"response is too large (max is {MAX_RESPONSE_SIZE} bytes)"
));
}
}
// grab the command tag and number of rows affected

View File

@@ -7,6 +7,7 @@ use crate::{
};
use bytes::{Buf, Bytes};
use futures::{Sink, Stream, StreamExt};
use hashbrown::HashMap;
use hyper::{
server::{
accept,
@@ -17,6 +18,7 @@ use hyper::{
};
use hyper_tungstenite::{tungstenite::Message, HyperWebsocket, WebSocketStream};
use pin_project_lite::pin_project;
use serde_json::{json, Value};
use std::{
convert::Infallible,
@@ -202,7 +204,44 @@ async fn ws_handler(
// TODO: that deserves a refactor as now this function also handles http json client besides websockets.
// Right now I don't want to blow up sql-over-http patch with file renames and do that as a follow up instead.
} else if request.uri().path() == "/sql" && request.method() == Method::POST {
sql_over_http::handle(request, sni_hostname, conn_pool, session_id).await
let result = sql_over_http::handle(request, sni_hostname, conn_pool, session_id)
.instrument(info_span!("sql-over-http"))
.await;
let status_code = match result {
Ok(_) => StatusCode::OK,
Err(_) => StatusCode::BAD_REQUEST,
};
let (json, headers) = match result {
Ok(r) => r,
Err(e) => {
let message = format!("{:?}", e);
let code = match e.downcast_ref::<tokio_postgres::Error>() {
Some(e) => match e.code() {
Some(e) => serde_json::to_value(e.code()).unwrap(),
None => Value::Null,
},
None => Value::Null,
};
error!(
?code,
"sql-over-http per-client task finished with an error: {e:#}"
);
(
json!({ "message": message, "code": code }),
HashMap::default(),
)
}
};
json_response(status_code, json).map(|mut r| {
r.headers_mut().insert(
"Access-Control-Allow-Origin",
hyper::http::HeaderValue::from_static("*"),
);
for (k, v) in headers {
r.headers_mut().insert(k, v);
}
r
})
} else if request.uri().path() == "/sql" && request.method() == Method::OPTIONS {
Response::builder()
.header("Allow", "OPTIONS, POST")
@@ -214,7 +253,7 @@ async fn ws_handler(
.header("Access-Control-Max-Age", "86400" /* 24 hours */)
.status(StatusCode::OK) // 204 is also valid, but see: https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods/OPTIONS#status_code
.body(Body::empty())
.map_err(|e| ApiError::InternalServerError(e.into()))
.map_err(|e| ApiError::BadRequest(e.into()))
} else {
json_response(StatusCode::BAD_REQUEST, "query is not supported")
}

View File

@@ -3,18 +3,9 @@
use crate::{config::MetricCollectionConfig, http};
use chrono::{DateTime, Utc};
use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
use dashmap::{mapref::entry::Entry, DashMap};
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use std::{
convert::Infallible,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use tracing::{error, info, instrument, trace};
use serde::Serialize;
use std::{collections::HashMap, convert::Infallible, time::Duration};
use tracing::{error, info, instrument, trace, warn};
const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
@@ -27,95 +18,12 @@ const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
/// Both the proxy and the ingestion endpoint will live in the same region (or cell)
/// so while the project-id is unique across regions the whole pipeline will work correctly
/// because we enrich the event with project_id in the control-plane endpoint.
#[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
#[derive(Eq, Hash, PartialEq, Serialize, Debug, Clone)]
pub struct Ids {
pub endpoint_id: String,
pub branch_id: String,
}
#[derive(Debug)]
pub struct MetricCounter {
transmitted: AtomicU64,
opened_connections: AtomicUsize,
}
impl MetricCounter {
/// Record that some bytes were sent from the proxy to the client
pub fn record_egress(&self, bytes: u64) {
self.transmitted.fetch_add(bytes, Ordering::AcqRel);
}
/// extract the value that should be reported
fn should_report(self: &Arc<Self>) -> Option<u64> {
// heuristic to see if the branch is still open
// if a clone happens while we are observing, the heuristic will be incorrect.
//
// Worst case is that we won't report an event for this endpoint.
// However, for the strong count to be 1 it must have occured that at one instant
// all the endpoints were closed, so missing a report because the endpoints are closed is valid.
let is_open = Arc::strong_count(self) > 1;
let opened = self.opened_connections.swap(0, Ordering::AcqRel);
// update cached metrics eagerly, even if they can't get sent
// (to avoid sending the same metrics twice)
// see the relevant discussion on why to do so even if the status is not success:
// https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
let value = self.transmitted.swap(0, Ordering::AcqRel);
// Our only requirement is that we report in every interval if there was an open connection
// if there were no opened connections since, then we don't need to report
if value == 0 && !is_open && opened == 0 {
None
} else {
Some(value)
}
}
/// Determine whether the counter should be cleared from the global map.
fn should_clear(self: &mut Arc<Self>) -> bool {
// we can't clear this entry if it's acquired elsewhere
let Some(counter) = Arc::get_mut(self) else {
return false;
};
let opened = *counter.opened_connections.get_mut();
let value = *counter.transmitted.get_mut();
// clear if there's no data to report
value == 0 && opened == 0
}
}
// endpoint and branch IDs are not user generated so we don't run the risk of hash-dos
type FastHasher = std::hash::BuildHasherDefault<rustc_hash::FxHasher>;
#[derive(Default)]
pub struct Metrics {
endpoints: DashMap<Ids, Arc<MetricCounter>, FastHasher>,
}
impl Metrics {
/// Register a new byte metrics counter for this endpoint
pub fn register(&self, ids: Ids) -> Arc<MetricCounter> {
let entry = if let Some(entry) = self.endpoints.get(&ids) {
entry.clone()
} else {
self.endpoints
.entry(ids)
.or_insert_with(|| {
Arc::new(MetricCounter {
transmitted: AtomicU64::new(0),
opened_connections: AtomicUsize::new(0),
})
})
.clone()
};
entry.opened_connections.fetch_add(1, Ordering::AcqRel);
entry
}
}
pub static USAGE_METRICS: Lazy<Metrics> = Lazy::new(Metrics::default);
pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infallible> {
info!("metrics collector config: {config:?}");
scopeguard::defer! {
@@ -123,83 +31,145 @@ pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infall
}
let http_client = http::new_client_with_timeout(DEFAULT_HTTP_REPORTING_TIMEOUT);
let mut cached_metrics: HashMap<Ids, (u64, DateTime<Utc>)> = HashMap::new();
let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
let mut prev = Utc::now();
let mut ticker = tokio::time::interval(config.interval);
loop {
ticker.tick().await;
let now = Utc::now();
collect_metrics_iteration(
&USAGE_METRICS,
let res = collect_metrics_iteration(
&http_client,
&mut cached_metrics,
&config.endpoint,
&hostname,
prev,
now,
)
.await;
prev = now;
match res {
Err(e) => error!("failed to send consumption metrics: {e} "),
Ok(_) => trace!("periodic metrics collection completed successfully"),
}
}
}
fn gather_proxy_io_bytes_per_client() -> Vec<(Ids, (u64, DateTime<Utc>))> {
let mut current_metrics: Vec<(Ids, (u64, DateTime<Utc>))> = Vec::new();
let metrics = prometheus::default_registry().gather();
for m in metrics {
if m.get_name() == "proxy_io_bytes_per_client" {
for ms in m.get_metric() {
let direction = ms
.get_label()
.iter()
.find(|l| l.get_name() == "direction")
.unwrap()
.get_value();
// Only collect metric for outbound traffic
if direction == "tx" {
let endpoint_id = ms
.get_label()
.iter()
.find(|l| l.get_name() == "endpoint_id")
.unwrap()
.get_value();
let branch_id = ms
.get_label()
.iter()
.find(|l| l.get_name() == "branch_id")
.unwrap()
.get_value();
let value = ms.get_counter().get_value() as u64;
// Report if the metric value is suspiciously large
if value > (1u64 << 40) {
warn!(
"potentially abnormal counter value: branch_id {} endpoint_id {} val: {}",
branch_id, endpoint_id, value
);
}
current_metrics.push((
Ids {
endpoint_id: endpoint_id.to_string(),
branch_id: branch_id.to_string(),
},
(value, Utc::now()),
));
}
}
}
}
current_metrics
}
#[instrument(skip_all)]
async fn collect_metrics_iteration(
metrics: &Metrics,
client: &http::ClientWithMiddleware,
cached_metrics: &mut HashMap<Ids, (u64, DateTime<Utc>)>,
metric_collection_endpoint: &reqwest::Url,
hostname: &str,
prev: DateTime<Utc>,
now: DateTime<Utc>,
) {
) -> anyhow::Result<()> {
info!(
"starting collect_metrics_iteration. metric_collection_endpoint: {}",
metric_collection_endpoint
);
let mut metrics_to_clear = Vec::new();
let current_metrics = gather_proxy_io_bytes_per_client();
let metrics_to_send: Vec<(Ids, u64)> = metrics
.endpoints
let metrics_to_send: Vec<Event<Ids, &'static str>> = current_metrics
.iter()
.filter_map(|counter| {
let key = counter.key().clone();
let Some(value) = counter.should_report() else {
metrics_to_clear.push(key);
return None;
.filter_map(|(curr_key, (curr_val, curr_time))| {
let mut start_time = *curr_time;
let mut value = *curr_val;
if let Some((prev_val, prev_time)) = cached_metrics.get(curr_key) {
// Only send metrics updates if the metric has increased
if curr_val > prev_val {
value = curr_val - prev_val;
start_time = *prev_time;
} else {
if curr_val < prev_val {
error!("proxy_io_bytes_per_client metric value decreased from {} to {} for key {:?}",
prev_val, curr_val, curr_key);
}
return None;
}
};
Some((key, value))
Some(Event {
kind: EventType::Incremental {
start_time,
stop_time: *curr_time,
},
metric: PROXY_IO_BYTES_PER_CLIENT,
idempotency_key: idempotency_key(hostname),
value,
extra: Ids {
endpoint_id: curr_key.endpoint_id.clone(),
branch_id: curr_key.branch_id.clone(),
},
})
})
.collect();
if metrics_to_send.is_empty() {
trace!("no new metrics to send");
return Ok(());
}
// Send metrics.
// Split into chunks of 1000 metrics to avoid exceeding the max request size
for chunk in metrics_to_send.chunks(CHUNK_SIZE) {
let events = chunk
.iter()
.map(|(ids, value)| Event {
kind: EventType::Incremental {
start_time: prev,
stop_time: now,
},
metric: PROXY_IO_BYTES_PER_CLIENT,
idempotency_key: idempotency_key(hostname),
value: *value,
extra: Ids {
endpoint_id: ids.endpoint_id.clone(),
branch_id: ids.branch_id.clone(),
},
})
.collect();
let res = client
.post(metric_collection_endpoint.clone())
.json(&EventChunk { events })
.json(&EventChunk {
events: chunk.into(),
})
.send()
.await;
@@ -213,113 +183,34 @@ async fn collect_metrics_iteration(
if !res.status().is_success() {
error!("metrics endpoint refused the sent metrics: {:?}", res);
for metric in chunk.iter().filter(|(_, value)| *value > (1u64 << 40)) {
for metric in chunk.iter().filter(|metric| metric.value > (1u64 << 40)) {
// Report if the metric value is suspiciously large
error!("potentially abnormal metric value: {:?}", metric);
}
}
}
// update cached metrics after they were sent
// (to avoid sending the same metrics twice)
// see the relevant discussion on why to do so even if the status is not success:
// https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
for send_metric in chunk {
let stop_time = match send_metric.kind {
EventType::Incremental { stop_time, .. } => stop_time,
_ => unreachable!(),
};
for metric in metrics_to_clear {
match metrics.endpoints.entry(metric) {
Entry::Occupied(mut counter) => {
if counter.get_mut().should_clear() {
counter.remove_entry();
}
}
Entry::Vacant(_) => {}
cached_metrics
.entry(Ids {
endpoint_id: send_metric.extra.endpoint_id.clone(),
branch_id: send_metric.extra.branch_id.clone(),
})
// update cached value (add delta) and time
.and_modify(|e| {
e.0 = e.0.saturating_add(send_metric.value);
e.1 = stop_time
})
// cache new metric
.or_insert((send_metric.value, stop_time));
}
}
}
#[cfg(test)]
mod tests {
use std::{
net::TcpListener,
sync::{Arc, Mutex},
};
use anyhow::Error;
use chrono::Utc;
use consumption_metrics::{Event, EventChunk};
use hyper::{
service::{make_service_fn, service_fn},
Body, Response,
};
use url::Url;
use super::{collect_metrics_iteration, Ids, Metrics};
use crate::http;
#[tokio::test]
async fn metrics() {
let listener = TcpListener::bind("0.0.0.0:0").unwrap();
let reports = Arc::new(Mutex::new(vec![]));
let reports2 = reports.clone();
let server = hyper::server::Server::from_tcp(listener)
.unwrap()
.serve(make_service_fn(move |_| {
let reports = reports.clone();
async move {
Ok::<_, Error>(service_fn(move |req| {
let reports = reports.clone();
async move {
let bytes = hyper::body::to_bytes(req.into_body()).await?;
let events: EventChunk<'static, Event<Ids, String>> =
serde_json::from_slice(&bytes)?;
reports.lock().unwrap().push(events);
Ok::<_, Error>(Response::new(Body::from(vec![])))
}
}))
}
}));
let addr = server.local_addr();
tokio::spawn(server);
let metrics = Metrics::default();
let client = http::new_client();
let endpoint = Url::parse(&format!("http://{addr}")).unwrap();
let now = Utc::now();
// no counters have been registered
collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
let r = std::mem::take(&mut *reports2.lock().unwrap());
assert!(r.is_empty());
// register a new counter
let counter = metrics.register(Ids {
endpoint_id: "e1".to_string(),
branch_id: "b1".to_string(),
});
// the counter should be observed despite 0 egress
collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
let r = std::mem::take(&mut *reports2.lock().unwrap());
assert_eq!(r.len(), 1);
assert_eq!(r[0].events.len(), 1);
assert_eq!(r[0].events[0].value, 0);
// record egress
counter.record_egress(1);
// egress should be observered
collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
let r = std::mem::take(&mut *reports2.lock().unwrap());
assert_eq!(r.len(), 1);
assert_eq!(r[0].events.len(), 1);
assert_eq!(r[0].events[0].value, 1);
// release counter
drop(counter);
// we do not observe the counter
collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
let r = std::mem::take(&mut *reports2.lock().unwrap());
assert!(r.is_empty());
// counter is unregistered
assert!(metrics.endpoints.is_empty());
}
Ok(())
}

View File

@@ -129,18 +129,12 @@ impl<T: AsyncRead> WithClientIp<T> {
// exit for bad header
let len = usize::min(self.buf.len(), HEADER.len());
if self.buf[..len] != HEADER[..len] {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid proxy protocol v2 header",
)));
return Poll::Ready(Ok(None));
}
// if no more bytes available then exit
if ready!(bytes_read) == 0 {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"missing proxy protocol v2 header",
)));
return Poll::Ready(Ok(None));
};
}
@@ -152,27 +146,27 @@ impl<T: AsyncRead> WithClientIp<T> {
let command = vc & 0b1111;
if version != 2 {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::InvalidInput,
io::ErrorKind::Other,
"invalid proxy protocol version. expected version 2",
)));
}
let local = match command {
match command {
// the connection was established on purpose by the proxy
// without being relayed. The connection endpoints are the sender and the
// receiver. Such connections exist when the proxy sends health-checks to the
// server. The receiver must accept this connection as valid and must use the
// real connection endpoints and discard the protocol block including the
// family which is ignored.
0 => true,
0 => {}
// the connection was established on behalf of another node,
// and reflects the original connection endpoints. The receiver must then use
// the information provided in the protocol block to get original the address.
1 => false,
1 => {}
// other values are unassigned and must not be emitted by senders. Receivers
// must drop connections presenting unexpected values here.
_ => {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::InvalidInput,
io::ErrorKind::Other,
"invalid proxy protocol command. expected local (0) or proxy (1)",
)))
}
@@ -192,29 +186,8 @@ impl<T: AsyncRead> WithClientIp<T> {
// - \x22 : UDP over IPv6 : the forwarded connection uses UDP over the AF_INET6
// protocol family. Address length is 2*16 + 2*2 = 36 bytes.
0x21 | 0x22 => 36,
// - \x31 : UNIX stream : the forwarded connection uses SOCK_STREAM over the
// AF_UNIX protocol family. Address length is 2*108 = 216 bytes.
// - \x32 : UNIX datagram : the forwarded connection uses SOCK_DGRAM over the
// AF_UNIX protocol family. Address length is 2*108 = 216 bytes.
0x31 | 0x32 => 216,
// UNSPEC : the connection is forwarded for an unknown, unspecified
// or unsupported protocol. The sender should use this family when sending
// LOCAL commands or when dealing with unsupported protocol families. When
// used with a LOCAL command, the receiver must accept the connection and
// ignore any address information. For other commands, the receiver is free
// to accept the connection anyway and use the real endpoints addresses or to
// reject the connection. The receiver should ignore address information.
0x00 | 0x01 | 0x02 | 0x10 | 0x20 | 0x30 if local => 0,
// unspecified or invalid. ignore the addresses
_ => {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid proxy protocol address family/transport protocol",
)))
}
// unspecified or unix stream. ignore the addresses
_ => 0,
};
// The 15th and 16th bytes is the address length in bytes in network endian order.
@@ -446,7 +419,6 @@ mod tests {
}
#[tokio::test]
#[should_panic(expected = "invalid proxy protocol v2 header")]
async fn test_invalid() {
let data = [0x55; 256];
@@ -454,15 +426,20 @@ mod tests {
let mut bytes = vec![];
read.read_to_end(&mut bytes).await.unwrap();
assert_eq!(bytes, data);
assert_eq!(read.state, ProxyParse::None);
}
#[tokio::test]
#[should_panic(expected = "missing proxy protocol v2 header")]
async fn test_short() {
let mut read = pin!(WithClientIp::new(&super::HEADER.as_slice()[..10]));
let data = [0x55; 10];
let mut read = pin!(WithClientIp::new(data.as_slice()));
let mut bytes = vec![];
read.read_to_end(&mut bytes).await.unwrap();
assert_eq!(bytes, data);
assert_eq!(read.state, ProxyParse::None);
}
#[tokio::test]

View File

@@ -7,7 +7,6 @@ use crate::{
compute::{self, PostgresConnection},
config::{ProxyConfig, TlsConfig},
console::{self, errors::WakeComputeError, messages::MetricsAuxInfo, Api},
metrics::{Ids, USAGE_METRICS},
protocol2::WithClientIp,
stream::{PqStream, Stream},
};
@@ -603,11 +602,6 @@ pub async fn proxy_pass(
compute: impl AsyncRead + AsyncWrite + Unpin,
aux: &MetricsAuxInfo,
) -> anyhow::Result<()> {
let usage = USAGE_METRICS.register(Ids {
endpoint_id: aux.endpoint_id.to_string(),
branch_id: aux.branch_id.to_string(),
});
let m_sent = NUM_BYTES_PROXIED_COUNTER.with_label_values(&aux.traffic_labels("tx"));
let mut client = MeasuredStream::new(
client,
@@ -615,7 +609,6 @@ pub async fn proxy_pass(
|cnt| {
// Number of bytes we sent to the client (outbound).
m_sent.inc_by(cnt as u64);
usage.record_egress(cnt as u64);
},
);

View File

@@ -105,8 +105,6 @@ class NeonCompare(PgCompare):
self._pg_bin = pg_bin
self.pageserver_http_client = self.env.pageserver.http_client()
# note that neon_simple_env now uses LOCAL_FS remote storage
# Create tenant
tenant_conf: Dict[str, str] = {}
if False: # TODO add pytest setting for this

View File

@@ -460,11 +460,9 @@ class NeonEnvBuilder:
), "Unexpectedly instantiated from outside a test function"
self.test_name = test_name
def init_configs(self, default_remote_storage_if_missing: bool = True) -> NeonEnv:
def init_configs(self) -> NeonEnv:
# Cannot create more than one environment from one builder
assert self.env is None, "environment already initialized"
if default_remote_storage_if_missing and self.pageserver_remote_storage is None:
self.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
self.env = NeonEnv(self)
return self.env
@@ -472,19 +470,8 @@ class NeonEnvBuilder:
assert self.env is not None, "environment is not already initialized, call init() first"
self.env.start()
def init_start(
self,
initial_tenant_conf: Optional[Dict[str, str]] = None,
default_remote_storage_if_missing: bool = True,
) -> NeonEnv:
"""
Default way to create and start NeonEnv. Also creates the initial_tenant with root initial_timeline.
To avoid creating initial_tenant, call init_configs to setup the environment.
Configuring pageserver with remote storage is now the default. There will be a warning if pageserver is created without one.
"""
env = self.init_configs(default_remote_storage_if_missing=default_remote_storage_if_missing)
def init_start(self, initial_tenant_conf: Optional[Dict[str, str]] = None) -> NeonEnv:
env = self.init_configs()
self.start()
# Prepare the default branch to start the postgres on later.
@@ -559,7 +546,7 @@ class NeonEnvBuilder:
user: RemoteStorageUser,
bucket_name: Optional[str] = None,
bucket_region: Optional[str] = None,
) -> RemoteStorage:
) -> Optional[RemoteStorage]:
ret = kind.configure(
self.repo_dir,
self.mock_s3_server,
@@ -902,8 +889,6 @@ def _shared_simple_env(
"""
# Internal fixture backing the `neon_simple_env` fixture. If TEST_SHARED_FIXTURES
is set, this is shared by all tests using `neon_simple_env`.
This fixture will use RemoteStorageKind.LOCAL_FS with pageserver.
"""
if os.environ.get("TEST_SHARED_FIXTURES") is None:

View File

@@ -202,6 +202,9 @@ class RemoteStorageKind(str, enum.Enum):
LOCAL_FS = "local_fs"
MOCK_S3 = "mock_s3"
REAL_S3 = "real_s3"
# Pass to tests that are generic to remote storage
# to ensure the test pass with or without the remote storage
NOOP = "noop"
def configure(
self,
@@ -212,7 +215,10 @@ class RemoteStorageKind(str, enum.Enum):
user: RemoteStorageUser,
bucket_name: Optional[str] = None,
bucket_region: Optional[str] = None,
) -> RemoteStorage:
) -> Optional[RemoteStorage]:
if self == RemoteStorageKind.NOOP:
return None
if self == RemoteStorageKind.LOCAL_FS:
return LocalFsStorage(LocalFsStorage.component_path(repo_dir, user))

View File

@@ -4,12 +4,7 @@ from typing import List, Tuple
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
NeonEnvBuilder,
wait_for_last_flush_lsn,
)
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder
from fixtures.types import TenantId, TimelineId
@@ -31,18 +26,17 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
tenant_timelines: List[Tuple[TenantId, TimelineId, Endpoint]] = []
for _ in range(3):
for _ in range(4):
tenant_id, timeline_id = env.neon_cli.create_tenant()
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
with endpoint.cursor() as cur:
cur.execute("CREATE TABLE t(key int primary key, value text)")
cur.execute("INSERT INTO t SELECT generate_series(1,100), 'payload'")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
endpoint.stop()
tenant_timelines.append((tenant_id, timeline_id, endpoint))
# Stop the pageserver -- this has to be not immediate or we need to wait for uploads
# Stop the pageserver
env.pageserver.stop()
# Leave the first timeline alone, but corrupt the others in different ways
@@ -51,21 +45,30 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
(tenant1, timeline1, pg1) = tenant_timelines[1]
metadata_path = f"{env.pageserver.workdir}/tenants/{tenant1}/timelines/{timeline1}/metadata"
with open(metadata_path, "w") as f:
f.write("overwritten with garbage!")
f = open(metadata_path, "w")
f.write("overwritten with garbage!")
f.close()
log.info(f"Timeline {tenant1}/{timeline1} got its metadata spoiled")
(tenant2, timeline2, pg2) = tenant_timelines[2]
timeline_path = f"{env.pageserver.workdir}/tenants/{tenant2}/timelines/{timeline2}/"
for filename in os.listdir(timeline_path):
if filename.startswith("00000"):
# Looks like a layer file. Remove it
os.remove(f"{timeline_path}/{filename}")
log.info(
f"Timeline {tenant2}/{timeline2} got its layer files removed (no remote storage enabled)"
)
(tenant3, timeline3, pg3) = tenant_timelines[3]
timeline_path = f"{env.pageserver.workdir}/tenants/{tenant3}/timelines/{timeline3}/"
for filename in os.listdir(timeline_path):
if filename.startswith("00000"):
# Looks like a layer file. Corrupt it
p = f"{timeline_path}/{filename}"
size = os.path.getsize(p)
with open(p, "wb") as f:
f.truncate(0)
f.truncate(size)
log.info(f"Timeline {tenant2}/{timeline2} got its local layer files spoiled")
f = open(f"{timeline_path}/{filename}", "w")
f.write("overwritten with garbage!")
f.close()
log.info(f"Timeline {tenant3}/{timeline3} got its layer files spoiled")
env.pageserver.start()
@@ -84,13 +87,22 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
f"As expected, compute startup failed eagerly for timeline with corrupt metadata: {err}"
)
# Second timeline will fail during basebackup, because the local layer file is corrupt.
# Second timeline has no ancestors, only the metadata file and no layer files locally,
# and we don't have the remote storage enabled. It is loaded into memory, but getting
# the basebackup from it will fail.
with pytest.raises(
Exception, match=f"Tenant {tenant2} will not become active. Current state: Broken"
) as err:
pg2.start()
log.info(f"As expected, compute startup failed for timeline with missing layers: {err}")
# Third timeline will also fail during basebackup, because the layer file is corrupt.
# It will fail when we try to read (and reconstruct) a page from it, ergo the error message.
# (We don't check layer file contents on startup, when loading the timeline)
with pytest.raises(Exception, match="Failed to load delta layer") as err:
pg2.start()
pg3.start()
log.info(
f"As expected, compute startup failed for timeline {tenant2}/{timeline2} with corrupt layers: {err}"
f"As expected, compute startup failed for timeline {tenant3}/{timeline3} with corrupt layers: {err}"
)

View File

@@ -74,13 +74,11 @@ class EvictionEnv:
pgbench_init_lsns: Dict[TenantId, Lsn]
def timelines_du(self) -> Tuple[int, int, int]:
return poor_mans_du(
self.neon_env, [(tid, tlid) for tid, tlid in self.timelines], verbose=False
)
return poor_mans_du(self.neon_env, [(tid, tlid) for tid, tlid in self.timelines])
def du_by_timeline(self) -> Dict[Tuple[TenantId, TimelineId], int]:
return {
(tid, tlid): poor_mans_du(self.neon_env, [(tid, tlid)], verbose=True)[0]
(tid, tlid): poor_mans_du(self.neon_env, [(tid, tlid)])[0]
for tid, tlid in self.timelines
}
@@ -91,21 +89,7 @@ class EvictionEnv:
"""
lsn = self.pgbench_init_lsns[tenant_id]
with self.neon_env.endpoints.create_start("main", tenant_id=tenant_id, lsn=lsn) as endpoint:
# instead of using pgbench --select-only which does point selects,
# run full table scans for all tables
with endpoint.connect() as conn:
cur = conn.cursor()
tables_cols = {
"pgbench_accounts": "abalance",
"pgbench_tellers": "tbalance",
"pgbench_branches": "bbalance",
"pgbench_history": "delta",
}
for table, column in tables_cols.items():
cur.execute(f"select avg({column}) from {table}")
_avg = cur.fetchone()
self.pg_bin.run(["pgbench", "-S", endpoint.connstr()])
def pageserver_start_with_disk_usage_eviction(
self, period, max_usage_pct, min_avail_bytes, mock_behavior
@@ -143,19 +127,6 @@ class EvictionEnv:
self.neon_env.pageserver.allowed_errors.append(".*WARN.* disk usage still high.*")
def human_bytes(amt: float) -> str:
suffixes = ["", "Ki", "Mi", "Gi"]
last = suffixes[-1]
for name in suffixes:
if amt < 1024 or name == last:
return f"{int(round(amt))} {name}B"
amt = amt / 1024
raise RuntimeError("unreachable")
@pytest.fixture
def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> EvictionEnv:
"""
@@ -244,12 +215,8 @@ def test_broken_tenants_are_skipped(eviction_env: EvictionEnv):
healthy_tenant_id, healthy_timeline_id = env.timelines[1]
broken_size_pre, _, _ = poor_mans_du(
env.neon_env, [(broken_tenant_id, broken_timeline_id)], verbose=True
)
healthy_size_pre, _, _ = poor_mans_du(
env.neon_env, [(healthy_tenant_id, healthy_timeline_id)], verbose=True
)
broken_size_pre, _, _ = poor_mans_du(env.neon_env, [(broken_tenant_id, broken_timeline_id)])
healthy_size_pre, _, _ = poor_mans_du(env.neon_env, [(healthy_tenant_id, healthy_timeline_id)])
# try to evict everything, then validate that broken tenant wasn't touched
target = broken_size_pre + healthy_size_pre
@@ -257,12 +224,8 @@ def test_broken_tenants_are_skipped(eviction_env: EvictionEnv):
response = env.pageserver_http.disk_usage_eviction_run({"evict_bytes": target})
log.info(f"{response}")
broken_size_post, _, _ = poor_mans_du(
env.neon_env, [(broken_tenant_id, broken_timeline_id)], verbose=True
)
healthy_size_post, _, _ = poor_mans_du(
env.neon_env, [(healthy_tenant_id, healthy_timeline_id)], verbose=True
)
broken_size_post, _, _ = poor_mans_du(env.neon_env, [(broken_tenant_id, broken_timeline_id)])
healthy_size_post, _, _ = poor_mans_du(env.neon_env, [(healthy_tenant_id, healthy_timeline_id)])
assert broken_size_pre == broken_size_post, "broken tenant should not be touched"
assert healthy_size_post < healthy_size_pre
@@ -403,16 +366,18 @@ def test_partial_evict_tenant(eviction_env: EvictionEnv):
du_by_timeline = env.du_by_timeline()
# pick any tenant
[warm, cold] = list(du_by_timeline.keys())
(tenant_id, timeline_id) = warm
[our_tenant, other_tenant] = list(du_by_timeline.keys())
(tenant_id, timeline_id) = our_tenant
# make picked tenant more recently used than the other one
# make our tenant more recently used than the other one
env.warm_up_tenant(tenant_id)
# Build up enough pressure to require evictions from both tenants,
# but not enough to fall into global LRU.
# So, set target to all occupied space, except 2*env.layer_size per tenant
target = du_by_timeline[cold] + (du_by_timeline[warm] // 2) - 2 * 2 * env.layer_size
# So, set target to all occipied space, except 2*env.layer_size per tenant
target = (
du_by_timeline[other_tenant] + (du_by_timeline[our_tenant] // 2) - 2 * 2 * env.layer_size
)
response = ps_http.disk_usage_eviction_run({"evict_bytes": target})
log.info(f"{response}")
@@ -427,33 +392,22 @@ def test_partial_evict_tenant(eviction_env: EvictionEnv):
later_tenant_usage < du_by_timeline[tenant]
), "all tenants should have lost some layers"
warm_size = later_du_by_timeline[warm]
# bounds for warmed_size
warm_lower = 0.5 * du_by_timeline[warm]
# We don't know exactly whether the cold tenant needs 2 or just 1 env.layer_size wiggle room.
# So, check for up to 3 here.
warm_upper = warm_lower + 3 * env.layer_size
cold_size = later_du_by_timeline[cold]
cold_upper = 2 * env.layer_size
log.info(
f"expecting for warm tenant: {human_bytes(warm_lower)} < {human_bytes(warm_size)} < {human_bytes(warm_upper)}"
)
log.info(f"expecting for cold tenant: {human_bytes(cold_size)} < {human_bytes(cold_upper)}")
assert warm_size > warm_lower, "warmed up tenant should be at about half size (lower)"
assert warm_size < warm_upper, "warmed up tenant should be at about half size (upper)"
assert (
cold_size < cold_upper
), "the cold tenant should be evicted to its min_resident_size, i.e., max layer file size"
later_du_by_timeline[our_tenant] > 0.5 * du_by_timeline[our_tenant]
), "our warmed up tenant should be at about half capacity, part 1"
assert (
# We don't know exactly whether the cold tenant needs 2 or just 1 env.layer_size wiggle room.
# So, check for up to 3 here.
later_du_by_timeline[our_tenant]
< 0.5 * du_by_timeline[our_tenant] + 3 * env.layer_size
), "our warmed up tenant should be at about half capacity, part 2"
assert (
later_du_by_timeline[other_tenant] < 2 * env.layer_size
), "the other tenant should be evicted to is min_resident_size, i.e., max layer file size"
def poor_mans_du(
env: NeonEnv, timelines: list[Tuple[TenantId, TimelineId]], verbose: bool = False
env: NeonEnv, timelines: list[Tuple[TenantId, TimelineId]]
) -> Tuple[int, int, int]:
"""
Disk usage, largest, smallest layer for layer files over the given (tenant, timeline) tuples;
@@ -476,11 +430,9 @@ def poor_mans_du(
smallest_layer = min(smallest_layer, size)
else:
smallest_layer = size
if verbose:
log.info(f"{tenant_id}/{timeline_id} => {file.name} {size} ({human_bytes(size)})")
log.info(f"{tenant_id}/{timeline_id} => {file.name} {size}")
if verbose:
log.info(f"{tenant_id}/{timeline_id}: sum {total} ({human_bytes(total)})")
log.info(f"{tenant_id}/{timeline_id}: sum {total}")
total_on_disk += total
assert smallest_layer is not None or total_on_disk == 0 and largest_layer == 0

View File

@@ -5,6 +5,7 @@ from pathlib import Path
from queue import SimpleQueue
from typing import Any, Dict, Set
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
@@ -16,13 +17,15 @@ from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
# TODO: collect all of the env setup *AFTER* removal of RemoteStorageKind.NOOP
@pytest.mark.parametrize(
"remote_storage_kind", [RemoteStorageKind.NOOP, RemoteStorageKind.LOCAL_FS]
)
def test_metric_collection(
httpserver: HTTPServer,
neon_env_builder: NeonEnvBuilder,
httpserver_listen_address,
remote_storage_kind: RemoteStorageKind,
):
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
@@ -52,7 +55,7 @@ def test_metric_collection(
synthetic_size_calculation_interval="3s"
"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}")
@@ -65,14 +68,6 @@ def test_metric_collection(
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
# httpserver is shut down before pageserver during passing run
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
# we have a fast rate of calculation, these can happen at shutdown
env.pageserver.allowed_errors.append(
".*synthetic_size_worker:calculate_synthetic_size.*:gather_size_inputs.*: failed to calculate logical size at .*: cancelled.*"
)
env.pageserver.allowed_errors.append(
".*synthetic_size_worker: failed to calculate synthetic size for tenant .*: failed to calculate some logical_sizes"
)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
@@ -103,14 +98,17 @@ def test_metric_collection(
total += sample[2]
return int(total)
# upload some data to remote storage
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
pageserver_http = env.pageserver.http_client()
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
pageserver_http.timeline_gc(tenant_id, timeline_id, 10000)
remote_uploaded = 0
remote_uploaded = get_num_remote_ops("index", "upload")
assert remote_uploaded > 0
# upload some data to remote storage
if remote_storage_kind == RemoteStorageKind.LOCAL_FS:
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
pageserver_http = env.pageserver.http_client()
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
pageserver_http.timeline_gc(tenant_id, timeline_id, 10000)
remote_uploaded = get_num_remote_ops("index", "upload")
assert remote_uploaded > 0
# we expect uploads at 1Hz, on busy runners this could be too optimistic,
# so give 5s we only want to get the following upload after "ready" value.
@@ -213,14 +211,6 @@ def test_metric_collection_cleans_up_tempfile(
# httpserver is shut down before pageserver during passing run
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
# we have a fast rate of calculation, these can happen at shutdown
env.pageserver.allowed_errors.append(
".*synthetic_size_worker:calculate_synthetic_size.*:gather_size_inputs.*: failed to calculate logical size at .*: cancelled.*"
)
env.pageserver.allowed_errors.append(
".*synthetic_size_worker: failed to calculate synthetic size for tenant .*: failed to calculate some logical_sizes"
)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)

View File

@@ -30,7 +30,9 @@ from fixtures.types import TenantId
from fixtures.utils import run_pg_bench_small
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
@pytest.mark.parametrize(
"remote_storage_kind", [RemoteStorageKind.NOOP, *available_remote_storages()]
)
def test_tenant_delete_smoke(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
@@ -142,12 +144,18 @@ FAILPOINTS_BEFORE_BACKGROUND = [
def combinations():
result = []
remotes = [RemoteStorageKind.MOCK_S3]
remotes = [RemoteStorageKind.NOOP, RemoteStorageKind.MOCK_S3]
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE"):
remotes.append(RemoteStorageKind.REAL_S3)
for remote_storage_kind in remotes:
for delete_failpoint in FAILPOINTS:
if remote_storage_kind is RemoteStorageKind.NOOP and delete_failpoint in (
"timeline-delete-before-index-delete",
):
# the above failpoint are not relevant for config without remote storage
continue
# Simulate failures for only one type of remote storage
# to avoid log pollution and make tests run faster
if remote_storage_kind is RemoteStorageKind.MOCK_S3:
@@ -207,18 +215,21 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
with env.endpoints.create_start("delete", tenant_id=tenant_id) as endpoint:
# generate enough layers
run_pg_bench_small(pg_bin, endpoint.connstr())
last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
if remote_storage_kind is RemoteStorageKind.NOOP:
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
else:
last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
if remote_storage_kind in available_s3_storages():
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
if remote_storage_kind in available_s3_storages():
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
ps_http.configure_failpoints((failpoint, "return"))
@@ -249,7 +260,12 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
env.pageserver.stop()
env.pageserver.start()
if failpoint in (
if (
remote_storage_kind is RemoteStorageKind.NOOP
and failpoint == "tenant-delete-before-create-local-mark"
):
tenant_delete_wait_completed(ps_http, tenant_id, iterations=iterations)
elif failpoint in (
"tenant-delete-before-shutdown",
"tenant-delete-before-create-remote-mark",
):

View File

@@ -519,8 +519,11 @@ def test_detach_while_attaching(
# * restart the pageserver and verify that ignored tenant is still not loaded
# * `load` the same tenant
# * ensure that it's status is `Active` and it's present in pageserver's memory with all timelines
def test_ignored_tenant_reattach(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3)
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.NOOP, RemoteStorageKind.MOCK_S3])
def test_ignored_tenant_reattach(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
):
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()

View File

@@ -12,6 +12,7 @@ from fixtures.log_helper import log
from fixtures.metrics import (
PAGESERVER_GLOBAL_METRICS,
PAGESERVER_PER_TENANT_METRICS,
PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
parse_metrics,
)
from fixtures.neon_fixtures import (
@@ -231,10 +232,17 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder):
assert value
def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize(
"remote_storage_kind",
# exercise both the code paths where remote_storage=None and remote_storage=Some(...)
[RemoteStorageKind.NOOP, RemoteStorageKind.MOCK_S3],
)
def test_pageserver_metrics_removed_after_detach(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
):
"""Tests that when a tenant is detached, the tenant specific metrics are not left behind"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3)
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
neon_env_builder.num_safekeepers = 3
@@ -274,6 +282,9 @@ def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilde
for tenant in [tenant_1, tenant_2]:
pre_detach_samples = set([x.name for x in get_ps_metric_samples_for_tenant(tenant)])
expected = set(PAGESERVER_PER_TENANT_METRICS)
if remote_storage_kind == RemoteStorageKind.NOOP:
# if there's no remote storage configured, we don't expose the remote timeline client metrics
expected -= set(PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS)
assert pre_detach_samples == expected
env.pageserver.http_client().tenant_detach(tenant)
@@ -283,7 +294,9 @@ def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilde
# Check that empty tenants work with or without the remote storage
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
@pytest.mark.parametrize(
"remote_storage_kind", available_remote_storages() + [RemoteStorageKind.NOOP]
)
def test_pageserver_with_empty_tenants(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
):

View File

@@ -12,6 +12,7 @@ from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
last_flush_lsn_upload,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import (
@@ -144,12 +145,19 @@ DELETE_FAILPOINTS = [
def combinations():
result = []
remotes = [RemoteStorageKind.MOCK_S3]
remotes = [RemoteStorageKind.NOOP, RemoteStorageKind.MOCK_S3]
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE"):
remotes.append(RemoteStorageKind.REAL_S3)
for remote_storage_kind in remotes:
for delete_failpoint in DELETE_FAILPOINTS:
if remote_storage_kind == RemoteStorageKind.NOOP and delete_failpoint in (
"timeline-delete-before-index-delete",
"timeline-delete-after-index-delete",
):
# the above failpoints are not relevant for config without remote storage
continue
result.append((remote_storage_kind, delete_failpoint))
return result
@@ -197,21 +205,23 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
with env.endpoints.create_start("delete") as endpoint:
# generate enough layers
run_pg_bench_small(pg_bin, endpoint.connstr())
if remote_storage_kind is RemoteStorageKind.NOOP:
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, timeline_id)
else:
last_flush_lsn_upload(env, endpoint, env.initial_tenant, timeline_id)
last_flush_lsn_upload(env, endpoint, env.initial_tenant, timeline_id)
if remote_storage_kind in available_s3_storages():
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(env.initial_tenant),
"timelines",
str(timeline_id),
)
),
)
if remote_storage_kind in available_s3_storages():
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(env.initial_tenant),
"timelines",
str(timeline_id),
)
),
)
env.pageserver.allowed_errors.append(f".*{timeline_id}.*failpoint: {failpoint}")
# It appears when we stopped flush loop during deletion and then pageserver is stopped

View File

@@ -301,8 +301,12 @@ def test_timeline_initial_logical_size_calculation_cancellation(
# message emitted by the code behind failpoint "timeline-calculate-logical-size-check-dir-exists"
def test_timeline_physical_size_init(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
@pytest.mark.parametrize("remote_storage_kind", [None, RemoteStorageKind.LOCAL_FS])
def test_timeline_physical_size_init(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: Optional[RemoteStorageKind]
):
if remote_storage_kind is not None:
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
@@ -333,12 +337,17 @@ def test_timeline_physical_size_init(neon_env_builder: NeonEnvBuilder):
)
assert_physical_size_invariants(
get_physical_size_values(env, env.initial_tenant, new_timeline_id),
get_physical_size_values(env, env.initial_tenant, new_timeline_id, remote_storage_kind),
remote_storage_kind,
)
def test_timeline_physical_size_post_checkpoint(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
@pytest.mark.parametrize("remote_storage_kind", [None, RemoteStorageKind.LOCAL_FS])
def test_timeline_physical_size_post_checkpoint(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: Optional[RemoteStorageKind]
):
if remote_storage_kind is not None:
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
@@ -360,14 +369,19 @@ def test_timeline_physical_size_post_checkpoint(neon_env_builder: NeonEnvBuilder
def check():
assert_physical_size_invariants(
get_physical_size_values(env, env.initial_tenant, new_timeline_id),
get_physical_size_values(env, env.initial_tenant, new_timeline_id, remote_storage_kind),
remote_storage_kind,
)
wait_until(10, 1, check)
def test_timeline_physical_size_post_compaction(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
@pytest.mark.parametrize("remote_storage_kind", [None, RemoteStorageKind.LOCAL_FS])
def test_timeline_physical_size_post_compaction(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: Optional[RemoteStorageKind]
):
if remote_storage_kind is not None:
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
# Disable background compaction as we don't want it to happen after `get_physical_size` request
# and before checking the expected size on disk, which makes the assertion failed
@@ -406,15 +420,21 @@ def test_timeline_physical_size_post_compaction(neon_env_builder: NeonEnvBuilder
pageserver_http.timeline_checkpoint(env.initial_tenant, new_timeline_id)
pageserver_http.timeline_compact(env.initial_tenant, new_timeline_id)
wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, new_timeline_id)
if remote_storage_kind is not None:
wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, new_timeline_id)
assert_physical_size_invariants(
get_physical_size_values(env, env.initial_tenant, new_timeline_id),
get_physical_size_values(env, env.initial_tenant, new_timeline_id, remote_storage_kind),
remote_storage_kind,
)
def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
@pytest.mark.parametrize("remote_storage_kind", [None, RemoteStorageKind.LOCAL_FS])
def test_timeline_physical_size_post_gc(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: Optional[RemoteStorageKind]
):
if remote_storage_kind is not None:
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
# Disable background compaction and GC as we don't want it to happen after `get_physical_size` request
# and before checking the expected size on disk, which makes the assertion failed
@@ -451,10 +471,12 @@ def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder):
pageserver_http.timeline_checkpoint(env.initial_tenant, new_timeline_id)
pageserver_http.timeline_gc(env.initial_tenant, new_timeline_id, gc_horizon=None)
wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, new_timeline_id)
if remote_storage_kind is not None:
wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, new_timeline_id)
assert_physical_size_invariants(
get_physical_size_values(env, env.initial_tenant, new_timeline_id),
get_physical_size_values(env, env.initial_tenant, new_timeline_id, remote_storage_kind),
remote_storage_kind,
)
@@ -538,10 +560,14 @@ def test_timeline_size_metrics(
assert math.isclose(dbsize_sum, tl_logical_size_metric, abs_tol=2 * 1024 * 1024)
def test_tenant_physical_size(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize("remote_storage_kind", [None, RemoteStorageKind.LOCAL_FS])
def test_tenant_physical_size(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: Optional[RemoteStorageKind]
):
random.seed(100)
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
if remote_storage_kind is not None:
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
@@ -549,10 +575,12 @@ def test_tenant_physical_size(neon_env_builder: NeonEnvBuilder):
client = env.pageserver.http_client()
tenant, timeline = env.neon_cli.create_tenant()
if remote_storage_kind is not None:
wait_for_upload_queue_empty(pageserver_http, tenant, timeline)
def get_timeline_resident_physical_size(timeline: TimelineId):
sizes = get_physical_size_values(env, tenant, timeline)
assert_physical_size_invariants(sizes)
sizes = get_physical_size_values(env, tenant, timeline, remote_storage_kind)
assert_physical_size_invariants(sizes, remote_storage_kind)
return sizes.prometheus_resident_physical
timeline_total_resident_physical_size = get_timeline_resident_physical_size(timeline)
@@ -572,7 +600,8 @@ def test_tenant_physical_size(neon_env_builder: NeonEnvBuilder):
wait_for_last_flush_lsn(env, endpoint, tenant, timeline)
pageserver_http.timeline_checkpoint(tenant, timeline)
wait_for_upload_queue_empty(pageserver_http, tenant, timeline)
if remote_storage_kind is not None:
wait_for_upload_queue_empty(pageserver_http, tenant, timeline)
timeline_total_resident_physical_size += get_timeline_resident_physical_size(timeline)
@@ -601,6 +630,7 @@ def get_physical_size_values(
env: NeonEnv,
tenant_id: TenantId,
timeline_id: TimelineId,
remote_storage_kind: Optional[RemoteStorageKind],
) -> TimelinePhysicalSizeValues:
res = TimelinePhysicalSizeValues()
@@ -616,9 +646,12 @@ def get_physical_size_values(
res.prometheus_resident_physical = metrics.query_one(
"pageserver_resident_physical_size", metrics_filter
).value
res.prometheus_remote_physical = metrics.query_one(
"pageserver_remote_physical_size", metrics_filter
).value
if remote_storage_kind is not None:
res.prometheus_remote_physical = metrics.query_one(
"pageserver_remote_physical_size", metrics_filter
).value
else:
res.prometheus_remote_physical = None
detail = client.timeline_detail(
tenant_id, timeline_id, include_timeline_dir_layer_file_size_sum=True
@@ -631,15 +664,20 @@ def get_physical_size_values(
return res
def assert_physical_size_invariants(sizes: TimelinePhysicalSizeValues):
def assert_physical_size_invariants(
sizes: TimelinePhysicalSizeValues, remote_storage_kind: Optional[RemoteStorageKind]
):
# resident phyiscal size is defined as
assert sizes.python_timelinedir_layerfiles_physical == sizes.prometheus_resident_physical
assert sizes.python_timelinedir_layerfiles_physical == sizes.layer_map_file_size_sum
# we don't do layer eviction, so, all layers are resident
assert sizes.api_current_physical == sizes.prometheus_resident_physical
assert sizes.prometheus_resident_physical == sizes.prometheus_remote_physical
# XXX would be nice to assert layer file physical storage utilization here as well, but we can only do that for LocalFS
if remote_storage_kind is not None:
assert sizes.prometheus_resident_physical == sizes.prometheus_remote_physical
# XXX would be nice to assert layer file physical storage utilization here as well, but we can only do that for LocalFS
else:
assert sizes.prometheus_remote_physical is None
# Timeline logical size initialization is an asynchronous background task that runs once,