mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-22 21:02:56 +00:00
Compare commits
6 Commits
proxy-prot
...
silence_si
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
57a9d7d1e2 | ||
|
|
fcc3da7642 | ||
|
|
8449572c0f | ||
|
|
45582a0ec4 | ||
|
|
c3a027b8b4 | ||
|
|
4feb12548b |
2
.github/workflows/build_and_test.yml
vendored
2
.github/workflows/build_and_test.yml
vendored
@@ -834,7 +834,7 @@ jobs:
|
||||
run:
|
||||
shell: sh -eu {0}
|
||||
env:
|
||||
VM_BUILDER_VERSION: v0.17.12
|
||||
VM_BUILDER_VERSION: v0.17.11
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
|
||||
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -3246,7 +3246,6 @@ dependencies = [
|
||||
"reqwest-tracing",
|
||||
"routerify",
|
||||
"rstest",
|
||||
"rustc-hash",
|
||||
"rustls",
|
||||
"rustls-pemfile",
|
||||
"scopeguard",
|
||||
|
||||
@@ -107,7 +107,6 @@ reqwest-middleware = "0.2.0"
|
||||
reqwest-retry = "0.2.2"
|
||||
routerify = "3"
|
||||
rpds = "0.13"
|
||||
rustc-hash = "1.1.0"
|
||||
rustls = "0.21"
|
||||
rustls-pemfile = "1"
|
||||
rustls-split = "0.3"
|
||||
|
||||
@@ -1,599 +0,0 @@
|
||||
# Seamless tenant migration
|
||||
|
||||
- Author: john@neon.tech
|
||||
- Created on 2023-08-11
|
||||
- Implemented on ..
|
||||
|
||||
## Summary
|
||||
|
||||
The preceding [generation numbers RFC](025-generation-numbers.md) may be thought of as "making tenant
|
||||
migration safe". Following that,
|
||||
this RFC is about how those migrations are to be done:
|
||||
|
||||
1. Seamlessly (without interruption to client availability)
|
||||
2. Quickly (enabling faster operations)
|
||||
3. Efficiently (minimizing I/O and $ cost)
|
||||
|
||||
These points are in priority order: if we have to sacrifice
|
||||
efficiency to make a migration seamless for clients, we will
|
||||
do so, etc.
|
||||
|
||||
This is accomplished by introducing two high level changes:
|
||||
|
||||
- A dual-attached state for tenants, used in a control-plane-orchestrated
|
||||
migration procedure that preserves availability during a migration.
|
||||
- Warm secondary locations for tenants, where on-disk content is primed
|
||||
for a fast migration of the tenant from its current attachment to this
|
||||
secondary location.
|
||||
|
||||
## Motivation
|
||||
|
||||
Migrating tenants between pageservers is essential to operating a service
|
||||
at scale, in several contexts:
|
||||
|
||||
1. Responding to a pageserver node failure by migrating tenants to other pageservers
|
||||
2. Balancing load and capacity across pageservers, for example when a user expands their
|
||||
database and they need to migrate to a pageserver with more capacity.
|
||||
3. Restarting pageservers for upgrades and maintenance
|
||||
|
||||
The current situation steps for migration are:
|
||||
|
||||
- detach from old node; skip if old node is dead; (the [skip part is still WIP](https://github.com/neondatabase/cloud/issues/5426)).
|
||||
- attach to new node
|
||||
- re-configure endpoints to use the new node
|
||||
|
||||
Once [generation numbers](025-generation-numbers.md) are implemented,
|
||||
the detach step is no longer critical for correctness. So, we can
|
||||
|
||||
- attach to a new node,
|
||||
- re-configure endpoints to use the new node, and then
|
||||
- detach from the old node.
|
||||
|
||||
However, this still does not meet our seamless/fast/efficient goals:
|
||||
|
||||
- Not fast: The new node will have to download potentially large amounts
|
||||
of data from S3, which may take many minutes.
|
||||
- Not seamless: If we attach to a new pageserver before detaching an old one,
|
||||
the new one might delete some objects that interrupt availability of reads on the old one.
|
||||
- Not efficient: the old pageserver will continue uploading
|
||||
S3 content during the migration that will never be read.
|
||||
|
||||
The user expectations for availability are:
|
||||
|
||||
- For planned maintenance, there should be zero availability
|
||||
gap. This expectation is fulfilled by this RFC.
|
||||
- For unplanned changes (e.g. node failures), there should be
|
||||
minimal availability gap. This RFC provides the _mechanism_
|
||||
to fail over quickly, but does not provide the failure _detection_
|
||||
nor failover _policy_.
|
||||
|
||||
## Non Goals
|
||||
|
||||
- Defining service tiers with different storage strategies: the same
|
||||
level of HA & overhead will apply to all tenants. This doesn't rule out
|
||||
adding such tiers in future.
|
||||
- Enabling pageserver failover in the absence of a control plane: the control
|
||||
plane will remain the source of truth for what should be attached where.
|
||||
- Totally avoiding availability gaps on unplanned migrations during
|
||||
a failure (we expect a small, bounded window of
|
||||
read unavailability of very recent LSNs)
|
||||
- Workload balancing: this RFC defines the mechanism for moving tenants
|
||||
around, not the higher level logic for deciding who goes where.
|
||||
- Defining all possible configuration flows for tenants: the migration process
|
||||
defined in this RFC demonstrates the sufficiency of the pageserver API, but
|
||||
is not the only kind of configuration change the control plane will ever do.
|
||||
The APIs defined here should let the control plane move tenants around in
|
||||
whatever way is needed while preserving data safety and read availability.
|
||||
|
||||
## Impacted components
|
||||
|
||||
Pageserver, control plane
|
||||
|
||||
## Terminology
|
||||
|
||||
- **Attachment**: a tenant is _attached_ to a pageserver if it has
|
||||
been issued a generation number, and is running an instance of
|
||||
the `Tenant` type, ingesting the WAL, and available to serve
|
||||
page reads.
|
||||
- **Location**: locations are a superset of attachments. A location
|
||||
is a combination of a tenant and a pageserver. We may _attach_ at a _location_.
|
||||
|
||||
- **Secondary location**: a location which is not currently attached.
|
||||
- **Warm secondary location**: a location which is not currently attached, but is endeavoring to maintain a warm local cache of layers. We avoid calling this a _warm standby_ to avoid confusion with similar postgres features.
|
||||
|
||||
## Implementation (high level)
|
||||
|
||||
### Warm secondary locations
|
||||
|
||||
To enable faster migrations, we will identify at least one _secondary location_
|
||||
for each tenant. This secondary location will keep a warm cache of layers
|
||||
for the tenant, so that if it is later attached, it can catch up with the
|
||||
latest LSN quickly: rather than downloading everything, it only has to replay
|
||||
the recent part of the WAL to advance from the remote_consistent_offset to the
|
||||
most recent LSN in the WAL.
|
||||
|
||||
The control plane is responsible for selecting secondary locations, and
|
||||
calling into pageservers to configure tenants into a secondary mode at this
|
||||
new location, as well as attaching the tenant in its existing primary location.
|
||||
|
||||
The attached pageserver for a tenant will publish a [layer heatmap](#layer-heatmap)
|
||||
to advise secondaries of which layers should be downloaded.
|
||||
|
||||
### Location modes
|
||||
|
||||
Currently, we consider a tenant to be in one of two states on a pageserver:
|
||||
|
||||
- Attached: active `Tenant` object, and layers on local disk
|
||||
- Detached: no layers on local disk, no runtime state.
|
||||
|
||||
We will extend this with finer-grained modes, whose purpose will become
|
||||
clear in later sections:
|
||||
|
||||
- **AttachedSingle**: equivalent the existing attached state.
|
||||
- **AttachedMulti**: like AttachedSingle, holds an up to date generation, but
|
||||
does not do deletions.
|
||||
- **AttachedStale**: like AttachedSingle, holds a stale generation,
|
||||
do not do any remote storage operations.
|
||||
- **Secondary**: keep local state on disk, periodically update from S3.
|
||||
- **Detached**: equivalent to existing detached state.
|
||||
|
||||
To control these finer grained states, a new pageserver API endpoint will be added.
|
||||
|
||||
### Cutover procedure
|
||||
|
||||
Define old location and new location as "Node A" and "Node B". Consider
|
||||
the case where both nodes are available, and Node B was previously configured
|
||||
as a secondary location for the tenant we are migrating.
|
||||
|
||||
The cutover procedure is orchestrated by the control plane, calling into
|
||||
the pageservers' APIs:
|
||||
|
||||
1. Call to Node A requesting it to flush to S3 and enter AttachedStale state
|
||||
2. Increment generation, and call to Node B requesting it to enter AttachedMulti
|
||||
state with the new generation.
|
||||
3. Call to Node B, requesting it to download the latest hot layers from remote storage,
|
||||
according to the latest heatmap flushed by Node A.
|
||||
4. Wait for Node B's WAL ingestion to catch up with node A's
|
||||
5. Update endpoints to use node B instead of node A
|
||||
6. Call to node B requesting it to enter state AttachedSingle.
|
||||
7. Call to node A requesting it to enter state Secondary
|
||||
|
||||
The following table summarizes how the state of the system advances:
|
||||
|
||||
| Step | Node A | Node B | Node used by endpoints |
|
||||
| :-----------: | :------------: | :------------: | :--------------------: |
|
||||
| 1 (_initial_) | AttachedSingle | Secondary | A |
|
||||
| 2 | AttachedStale | AttachedMulti | A |
|
||||
| 3 | AttachedStale | AttachedMulti | A |
|
||||
| 4 | AttachedStale | AttachedMulti | A |
|
||||
| 5 (_cutover_) | AttachedStale | AttachedMulti | B |
|
||||
| 6 | AttachedStale | AttachedSingle | B |
|
||||
| 7 (_final_) | Secondary | AttachedSingle | B |
|
||||
|
||||
The procedure described for a clean handover from a live node to a secondary
|
||||
is also used for failure cases and for migrations to a location that is not
|
||||
configured as a secondary, by simply skipping irrelevant steps, as described in
|
||||
the following sections.
|
||||
|
||||
#### Migration from an unresponsive node
|
||||
|
||||
If node A is unavailable, then all calls into
|
||||
node A are skipped and we don't wait for B to catch up before
|
||||
switching updating the endpoints to use B.
|
||||
|
||||
#### Migration to a location that is not a secondary
|
||||
|
||||
If node B is initially in Detached state, the procedure is identical. Since Node B
|
||||
is coming from a Detached state rather than Secondary, the download of layers and
|
||||
catch up with WAL will take much longer.
|
||||
|
||||
We might do this if:
|
||||
|
||||
- Attached and secondary locations are both critically low on disk, and we need
|
||||
to migrate to a third node with more resources available.
|
||||
- We are migrating a tenant which does not use secondary locations to save on cost.
|
||||
|
||||
#### Permanent migration away from a node
|
||||
|
||||
In the final step of the migration, we generally request the original node to enter a Secondary
|
||||
state. This is typical if we are doing a planned migration during maintenance, or to
|
||||
balance CPU/network load away from a node.
|
||||
|
||||
One might also want to permanently migrate away: this can be done by simply removing the secondary
|
||||
location after the migration is complete, or as an optimization by substituting the Detached state
|
||||
for the Secondary state in the final step.
|
||||
|
||||
#### Cutover diagram
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
participant CP as Control plane
|
||||
participant A as Node A
|
||||
participant B as Node B
|
||||
participant E as Endpoint
|
||||
|
||||
CP->>A: PUT Flush & go to AttachedStale
|
||||
note right of A: A continues to ingest WAL
|
||||
CP->>B: PUT AttachedMulti
|
||||
CP->>B: PUT Download layers from latest heatmap
|
||||
note right of B: B downloads from S3
|
||||
loop Poll until download complete
|
||||
CP->>B: GET download status
|
||||
end
|
||||
activate B
|
||||
note right of B: B ingests WAL
|
||||
loop Poll until catch up
|
||||
CP->>B: GET visible WAL
|
||||
CP->>A: GET visible WAL
|
||||
end
|
||||
deactivate B
|
||||
CP->>E: Configure to use Node B
|
||||
E->>B: Connect for reads
|
||||
CP->>B: PUT AttachedSingle
|
||||
CP->>A: PUT Secondary
|
||||
```
|
||||
|
||||
#### Cutover from an unavailable pageserver
|
||||
|
||||
This case is far simpler: we may skip straight to our intended
|
||||
end state.
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
participant A as Node A
|
||||
participant CP as Control plane
|
||||
participant B as Node B
|
||||
participant E as Endpoint
|
||||
|
||||
note right of A: Node A offline
|
||||
activate A
|
||||
CP->>B: PUT AttachedSingle
|
||||
CP->>E: Configure to use Node B
|
||||
E->>B: Connect for reads
|
||||
deactivate A
|
||||
```
|
||||
|
||||
## Implementation (detail)
|
||||
|
||||
### Purpose of AttachedMulti, AttachedStale
|
||||
|
||||
#### AttachedMulti
|
||||
|
||||
Ordinarily, an attached pageserver whose generation is the latest may delete
|
||||
layers at will (e.g. during compaction). If a previous generation pageserver
|
||||
is also still attached, and in use by endpoints, then this layer deletion could
|
||||
lead to a loss of availability for the endpoint when reading from the previous
|
||||
generation pageserver.
|
||||
|
||||
The _AttachedMulti_ state simply disables deletions. These will be enqueued
|
||||
in `RemoteTimelineClient` until the control plane transitions the
|
||||
node into AttachedSingle, which unblocks deletions. Other remote storage operations
|
||||
such as uploads are not blocked.
|
||||
|
||||
AttachedMulti is not required for data safety, only to preserve availability
|
||||
on pageservers running with stale generations.
|
||||
|
||||
A node enters AttachedMulti only when explicitly asked to by the control plane. It should
|
||||
only remain in this state for the duration of a migration.
|
||||
|
||||
If a control plane bug leaves
|
||||
the node in AttachedMulti for a long time, then we must avoid unbounded memory use from enqueued
|
||||
deletions. This may be accomplished simply, by dropping enqueued deletions when some modest
|
||||
threshold of delayed deletions (e.g. 10k layers per tenant) is reached. As with all deletions,
|
||||
it is safe to skip them, and the leaked objects will be eventually cleaned up by scrub or
|
||||
by timeline deletion.
|
||||
|
||||
During AttachedMulti, the Tenant is free to drop layers from local disk in response to
|
||||
disk pressure: only the deletion of remote layers is blocked.
|
||||
|
||||
#### AttachedStale
|
||||
|
||||
Currently, a pageserver with a stale generation number will continue to
|
||||
upload layers, but be prevented from completing deletions. This is safe, but inefficient: layers uploaded by this stale generation
|
||||
will not be read back by future generations of pageservers.
|
||||
|
||||
The _AttachedStale_ state disables S3 uploads. The stale pageserver
|
||||
will continue to ingest the WAL and write layers to local disk, but not to
|
||||
do any uploads to S3.
|
||||
|
||||
A node may enter AttachedStale in two ways:
|
||||
|
||||
- Explicitly, when control plane calls into the node at the start of a migration.
|
||||
- Implicitly, when the node tries to validate some deletions and discovers
|
||||
that its generation is stale.
|
||||
|
||||
The AttachedStale state also disables sending consumption metrics from
|
||||
that location: it is interpreted as an indication that some other pageserver
|
||||
is already attached or is about to be attached, and that new pageserver will
|
||||
be responsible for sending consumption metrics.
|
||||
|
||||
#### Disk Pressure & AttachedStale
|
||||
|
||||
Over long periods of time, a tenant location in AttachedStale will accumulate data
|
||||
on local disk, as it cannot evict any layers written since it entered the
|
||||
AttachStale state. We rely on the control plane to revert the location to
|
||||
Secondary or Detached at the end of a migration.
|
||||
|
||||
This scenario is particularly noteworthy when evacuating all tenants on a pageserver:
|
||||
since _all_ the attached tenants will go into AttachedStale, we will be doing no
|
||||
uploads at all, therefore ingested data will cause disk usage to increase continuously.
|
||||
Under nominal conditions, the available disk space on pageservers should be sufficient
|
||||
to complete the evacuation before this becomes a problem, but we must also handle
|
||||
the case where we hit a low disk situation while in this state.
|
||||
|
||||
The concept of disk pressure already exists in the pageserver: the `disk_usage_eviction_task`
|
||||
touches each Tenant when it determines that a low-disk condition requires
|
||||
some layer eviction. Having selected layers for eviction, the eviction
|
||||
task calls `Timeline::evict_layers`.
|
||||
|
||||
**Safety**: If evict_layers is called while in AttachedStale state, and some of the to-be-evicted
|
||||
layers are not yet uploaded to S3, then the block on uploads will be lifted. This
|
||||
will result in leaking some objects once a migration is complete, but will enable
|
||||
the node to manage its disk space properly: if a node is left with some tenants
|
||||
in AttachedStale indefinitely due to a network partition or control plane bug,
|
||||
these tenants will not cause a full disk condition.
|
||||
|
||||
### Warm secondary updates
|
||||
|
||||
#### Layer heatmap
|
||||
|
||||
The secondary location's job is to serve reads **with the same quality of service as the original location
|
||||
was serving them around the time of a migration**. This does not mean the secondary
|
||||
location needs the whole set of layers: inactive layers that might soon
|
||||
be evicted on the attached pageserver need not be downloaded by the
|
||||
secondary. A totally idle tenant only needs to maintain enough on-disk
|
||||
state to enable a fast cold start (i.e. the most recent image layers are
|
||||
typically sufficient).
|
||||
|
||||
To enable this, we introduce the concept of a _layer heatmap_, which
|
||||
acts as an advisory input to secondary locations to decide which
|
||||
layers to download from S3.
|
||||
|
||||
#### Attached pageserver
|
||||
|
||||
The attached pageserver, if in state AttachedSingle, periodically
|
||||
uploads a serialized heat map to S3. It may skip this if there
|
||||
is no change since the last time it uploaded (e.g. if the tenant
|
||||
is totally idle).
|
||||
|
||||
Additionally, when the tenant is flushed to remote storage prior to a migration
|
||||
(the first step in [cutover procedure](#cutover-procedure)),
|
||||
the heatmap is written out. This enables a future attached pageserver
|
||||
to get an up to date view when deciding which layers to download.
|
||||
|
||||
#### Secondary location behavior
|
||||
|
||||
Secondary warm locations run a simple loop, implemented separately from
|
||||
the main `Tenant` type, which represents attached tenants:
|
||||
|
||||
- Download the layer heatmap
|
||||
- Select any "hot enough" layers to download, if there is sufficient
|
||||
free disk space.
|
||||
- Download layers, if they were not previously evicted (see below)
|
||||
- Download the latest index_part.json
|
||||
- Check if any layers currently on disk are no longer referenced by
|
||||
IndexPart & delete them
|
||||
|
||||
Note that the heatmap is only advisory: if a secondary location has plenty
|
||||
of disk space, it may choose to retain layers that aren't referenced
|
||||
by the heatmap, as long as they are still referenced by the IndexPart. Conversely,
|
||||
if a node is very low on disk space, it might opt to raise the heat threshold required
|
||||
to both downloading a layer, until more disk space is available.
|
||||
|
||||
#### Secondary locations & disk pressure
|
||||
|
||||
Secondary locations are subject to eviction on disk pressure, just as
|
||||
attached locations are. For eviction purposes, the access time of a
|
||||
layer in a secondary location will be the access time given in the heatmap,
|
||||
rather than the literal time at which the local layer file was accessed.
|
||||
|
||||
The heatmap will indicate which layers are in local storage on the attached
|
||||
location. The secondary will always attempt to get back to having that
|
||||
set of layers on disk, but to avoid flapping, it will remember the access
|
||||
time of the layer it was most recently asked to evict, and layers whose
|
||||
access time is below that will not be re-downloaded.
|
||||
|
||||
The resulting behavior is that after a layer is evicted from a secondary
|
||||
location, it is only re-downloaded once the attached pageserver accesses
|
||||
the layer and uploads a heatmap reflecting that access time. On a pageserver
|
||||
restart, the secondary location will attempt to download all layers in
|
||||
the heatmap again, if they are not on local disk.
|
||||
|
||||
This behavior will be slightly different when secondary locations are
|
||||
used for "low energy tenants", but that is beyond the scope of this RFC.
|
||||
|
||||
### Location configuration API
|
||||
|
||||
Currently, the `/tenant/<tenant_id>/config` API defines various
|
||||
tunables like compaction settings, which apply to the tenant irrespective
|
||||
of which pageserver it is running on.
|
||||
|
||||
A new "location config" structure will be introduced, which defines
|
||||
configuration which is per-tenant, but local to a particular pageserver,
|
||||
such as the attachment mode and whether it is a secondary.
|
||||
|
||||
The pageserver will expose a new per-tenant API for setting
|
||||
the state: `/tenant/<tenant_id>/location/config`.
|
||||
|
||||
Body content:
|
||||
|
||||
```
|
||||
{
|
||||
state: 'enum{Detached, Secondary, AttachedSingle, AttachedMulti, AttachedStale}',
|
||||
generation: Option<u32>,
|
||||
configuration: `Option<TenantConfig>`
|
||||
flush: bool
|
||||
}
|
||||
```
|
||||
|
||||
Existing `/attach` and `/detach` endpoint will have the same
|
||||
behavior as calling `/location/config` with `AttachedSingle` and `Detached`
|
||||
states respectively. These endpoints will be deprecated and later
|
||||
removed.
|
||||
|
||||
The generation attribute is mandatory for entering `AttachedSingle` or
|
||||
`AttachedMulti`.
|
||||
|
||||
The configuration attribute is mandatory when entering any state other
|
||||
than `Detached`. This configuration is the same as the body for
|
||||
the existing `/tenant/<tenant_id>/config` endpoint.
|
||||
|
||||
The `flush` argument indicates whether the pageservers should flush
|
||||
to S3 before proceeding: this only has any effect if the node is
|
||||
currently in AttachedSingle or AttachedMulti. This is used
|
||||
during the first phase of migration, when transitioning the
|
||||
old pageserver to AttachedSingle.
|
||||
|
||||
The `/re-attach` API response will be extended to include a `state` as
|
||||
well as a `generation`, enabling the pageserver to enter the
|
||||
correct state for each tenant on startup.
|
||||
|
||||
### Database schema for locations
|
||||
|
||||
A new table `ProjectLocation`:
|
||||
|
||||
- pageserver_id: int
|
||||
- tenant_id: TenantId
|
||||
- generation: Option<int>
|
||||
- state: `enum(Secondary, AttachedSingle, AttachedMulti)`
|
||||
|
||||
Notes:
|
||||
|
||||
- It is legacy for a Project to have zero `ProjectLocation`s
|
||||
- The `pageserver` column in `Project` now means "to which pageserver should
|
||||
endpoints connect", rather than simply which pageserver is attached.
|
||||
- The `generation` column in `Project` remains, and is incremented and used
|
||||
to set the generation of `ProjectLocation` rows when they are set into
|
||||
an attached state.
|
||||
- The `Detached` state is implicitly represented as the absence of
|
||||
a `ProjectLocation`.
|
||||
|
||||
### Executing migrations
|
||||
|
||||
Migrations will be implemented as Go functions, within the
|
||||
existing `Operation` framework in the control plane. These
|
||||
operations are persistent, such that they will always keep
|
||||
trying until completion: this property is important to avoid
|
||||
leaving garbage behind on pageservers, such as AttachedStale
|
||||
locations.
|
||||
|
||||
### Recovery from failures during migration
|
||||
|
||||
During migration, the control plane may encounter failures of either
|
||||
the original or new pageserver, or both:
|
||||
|
||||
- If the original fails, skip past waiting for the new pageserver
|
||||
to catch up, and put it into AttachedSingle immediately.
|
||||
- If the new node fails, put the old pageserver into Secondary
|
||||
and then back into AttachedSingle (this has the effect of
|
||||
retaining on-disk state and granting it a fresh generation number).
|
||||
- If both nodes fail, keep trying until one of them is available
|
||||
again.
|
||||
|
||||
### Control plane -> Pageserver reconciliation
|
||||
|
||||
A migration may be done while the old node is unavailable,
|
||||
in which case the old node may still be running in an AttachedStale
|
||||
state.
|
||||
|
||||
In this case, it is undesirable to have the migration `Operation`
|
||||
stay alive until the old node eventually comes back online
|
||||
and can be cleaned up. To handle this, the control plane
|
||||
should run a background reconciliation process to compare
|
||||
a pageserver's attachments with the database, and clean up
|
||||
any that shouldn't be there any more.
|
||||
|
||||
Note that there will be no work to do if the old node was really
|
||||
offline, as during startup it will call into `/re-attach` and
|
||||
be updated that way. The reconciliation will only be needed
|
||||
if the node was unavailable but still running.
|
||||
|
||||
## Alternatives considered
|
||||
|
||||
### Only enabling secondary locations for tenants on a higher service tier
|
||||
|
||||
This will make sense in future, especially for tiny databases that may be
|
||||
downloaded from S3 in milliseconds when needed.
|
||||
|
||||
However, it is not wise to do it immediately, because pageservers contain
|
||||
a mixture of higher and lower tier workloads. If we had 1 tenant with
|
||||
a secondary location and 9 without, then those other 9 tenants will do
|
||||
a lot of I/O as they try to recover from S3, which may degrade the
|
||||
service of the tenant which had a secondary location.
|
||||
|
||||
Until we segregate tenant on different service tiers on different pageserver
|
||||
nodes, or implement & test QoS to ensure that tenants with secondaries are
|
||||
not harmed by tenants without, we should use the same failover approach
|
||||
for all the tenants.
|
||||
|
||||
### Hot secondary locations (continuous WAL replay)
|
||||
|
||||
Instead of secondary locations populating their caches from S3, we could
|
||||
have them consume the WAL from safekeepers. The downsides of this would be:
|
||||
|
||||
- Double load on safekeepers, which are a less scalable service than S3
|
||||
- Secondary locations' on-disk state would end up subtly different to
|
||||
the remote state, which would make synchronizing with S3 more complex/expensive
|
||||
when going into attached state.
|
||||
|
||||
The downside of only updating secondary locations from S3 is that we will
|
||||
have a delay during migration from replaying the LSN range between what's
|
||||
in S3 and what's in the pageserver. This range will be very small on
|
||||
planned migrations, as we have the old pageserver flush to S3 immediately
|
||||
before attaching the new pageserver. On unplanned migrations (old pageserver
|
||||
is unavailable), the range of LSNs to replay is bounded by the flush frequency
|
||||
on the old pageserver. However, the migration doesn't have to wait for the
|
||||
replay: it's just that not-yet-replayed LSNs will be unavailable for read
|
||||
until the new pageserver catches up.
|
||||
|
||||
We expect that pageserver reads of the most recent LSNs will be relatively
|
||||
rare, as for an active endpoint those pages will usually still be in the postgres
|
||||
page cache: this leads us to prefer synchronizing from S3 on secondary
|
||||
locations, rather than consuming the WAL from safekeepers.
|
||||
|
||||
### Cold secondary locations
|
||||
|
||||
It is not functionally necessary to keep warm caches on secondary locations at all. However, if we do not, then
|
||||
we would experience a de-facto availability loss in unplanned migrations, as reads to the new node would take an extremely long time (many seconds, perhaps minutes).
|
||||
|
||||
Warm caches on secondary locations are necessary to meet
|
||||
our availability goals.
|
||||
|
||||
### Pageserver-granularity failover
|
||||
|
||||
Instead of migrating tenants individually, we could have entire spare nodes,
|
||||
and on a node death, move all its work to one of these spares.
|
||||
|
||||
This approach is avoided for several reasons:
|
||||
|
||||
- we would still need fine-grained tenant migration for other
|
||||
purposes such as balancing load
|
||||
- by sharing the spare capacity over many peers rather than one spare node,
|
||||
these peers may use the capacity for other purposes, until it is needed
|
||||
to handle migrated tenants. e.g. for keeping a deeper cache of their
|
||||
attached tenants.
|
||||
|
||||
### Readonly during migration
|
||||
|
||||
We could simplify migrations by making both previous and new nodes go into a
|
||||
readonly state, then flush remote content from the previous node, then activate
|
||||
attachment on the secondary node.
|
||||
|
||||
The downside to this approach is a potentially large gap in readability of
|
||||
recent LSNs while loading data onto the new node. To avoid this, it is worthwhile
|
||||
to incur the extra cost of double-replaying the WAL onto old and new nodes' local
|
||||
storage during a migration.
|
||||
|
||||
### Peer-to-peer pageserver communication
|
||||
|
||||
Rather than uploading the heatmap to S3, attached pageservers could make it
|
||||
available to peers.
|
||||
|
||||
Currently, pageservers have no peer to peer communication, so adding this
|
||||
for heatmaps would incur significant overhead in deployment and configuration
|
||||
of the service, and ensuring that when a new pageserver is deployed, other
|
||||
pageservers are updated to be aware of it.
|
||||
|
||||
As well as simplifying implementation, putting heatmaps in S3 will be useful
|
||||
for future analytics purposes -- gathering aggregated statistics on activity
|
||||
pattersn across many tenants may be done directly from data in S3.
|
||||
@@ -107,7 +107,7 @@ pub const CHUNK_SIZE: usize = 1000;
|
||||
|
||||
// Just a wrapper around a slice of events
|
||||
// to serialize it as `{"events" : [ ] }
|
||||
#[derive(serde::Serialize, serde::Deserialize)]
|
||||
#[derive(serde::Serialize)]
|
||||
pub struct EventChunk<'a, T: Clone> {
|
||||
pub events: std::borrow::Cow<'a, [T]>,
|
||||
}
|
||||
|
||||
@@ -25,7 +25,11 @@ use tokio::io;
|
||||
use toml_edit::Item;
|
||||
use tracing::info;
|
||||
|
||||
pub use self::{local_fs::LocalFs, s3_bucket::S3Bucket, simulate_failures::UnreliableWrapper};
|
||||
pub use self::{
|
||||
local_fs::LocalFs,
|
||||
s3_bucket::S3Bucket,
|
||||
simulate_failures::{SimulatedError, UnreliableWrapper},
|
||||
};
|
||||
|
||||
/// How many different timelines can be processed simultaneously when synchronizing layers with the remote storage.
|
||||
/// During regular work, pageserver produces one layer file per timeline checkpoint, with bursts of concurrency
|
||||
@@ -190,26 +194,44 @@ impl Debug for Download {
|
||||
#[derive(Debug)]
|
||||
pub enum DownloadError {
|
||||
/// Validation or other error happened due to user input.
|
||||
///
|
||||
/// This is only used by LOCAL_FS.
|
||||
BadInput(anyhow::Error),
|
||||
|
||||
/// The file was not found in the remote storage.
|
||||
///
|
||||
/// This can only happen during download, never during delete.
|
||||
NotFound,
|
||||
/// The file was found in the remote storage, but the download failed.
|
||||
|
||||
/// The file was found in the remote storage, but the operation failed.
|
||||
///
|
||||
/// The error should have context already describing the real failed operation.
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for DownloadError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
use DownloadError::*;
|
||||
match self {
|
||||
DownloadError::BadInput(e) => {
|
||||
write!(f, "Failed to download a remote file due to user input: {e}")
|
||||
}
|
||||
DownloadError::NotFound => write!(f, "No file found for the remote object id given"),
|
||||
DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"),
|
||||
NotFound => write!(f, "No file found for the remote object id given"),
|
||||
// this is same as thiserror error(transparent); it handles {} and {:#}
|
||||
Other(e) | BadInput(e) => std::fmt::Display::fmt(e, f),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for DownloadError {}
|
||||
impl std::error::Error for DownloadError {
|
||||
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
||||
use DownloadError::*;
|
||||
match self {
|
||||
NotFound => None,
|
||||
Other(_) | BadInput(_) => {
|
||||
// TODO: these are anyhow, cannot return here
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Every storage, currently supported.
|
||||
/// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
|
||||
|
||||
@@ -345,7 +345,7 @@ impl RemoteStorage for S3Bucket {
|
||||
.set_max_keys(self.max_keys_per_list_response)
|
||||
.send()
|
||||
.await
|
||||
.context("Failed to list S3 prefixes")
|
||||
.context("list S3 prefixes")
|
||||
.map_err(DownloadError::Other);
|
||||
|
||||
let started_at = ScopeGuard::into_inner(started_at);
|
||||
@@ -397,7 +397,7 @@ impl RemoteStorage for S3Bucket {
|
||||
.set_max_keys(self.max_keys_per_list_response)
|
||||
.send()
|
||||
.await
|
||||
.context("Failed to list files in S3 bucket");
|
||||
.context("list files in S3 bucket");
|
||||
|
||||
let started_at = ScopeGuard::into_inner(started_at);
|
||||
metrics::BUCKET_METRICS
|
||||
@@ -521,10 +521,7 @@ impl RemoteStorage for S3Bucket {
|
||||
.deleted_objects_total
|
||||
.inc_by(chunk.len() as u64);
|
||||
if let Some(errors) = resp.errors {
|
||||
return Err(anyhow::format_err!(
|
||||
"Failed to delete {} objects",
|
||||
errors.len()
|
||||
));
|
||||
return Err(anyhow::anyhow!("delete {} objects", errors.len()));
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
|
||||
@@ -18,7 +18,7 @@ pub struct UnreliableWrapper {
|
||||
}
|
||||
|
||||
/// Used to identify retries of different unique operation.
|
||||
#[derive(Debug, Hash, Eq, PartialEq)]
|
||||
#[derive(Hash, Eq, PartialEq)]
|
||||
enum RemoteOp {
|
||||
ListPrefixes(Option<RemotePath>),
|
||||
Upload(RemotePath),
|
||||
@@ -27,6 +27,22 @@ enum RemoteOp {
|
||||
DeleteObjects(Vec<RemotePath>),
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for RemoteOp {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
use RemoteOp::*;
|
||||
match self {
|
||||
ListPrefixes(arg0) => f.debug_tuple("ListPrefixes").field(arg0).finish(),
|
||||
Upload(arg0) => f.debug_tuple("Upload").field(arg0).finish(),
|
||||
Download(arg0) => f.debug_tuple("Download").field(arg0).finish(),
|
||||
Delete(arg0) => f.debug_tuple("Delete").field(arg0).finish(),
|
||||
DeleteObjects(many) if many.len() > 3 => {
|
||||
write!(f, "DeleteObjects({} paths)", many.len())
|
||||
}
|
||||
DeleteObjects(few) => f.debug_tuple("DeleteObjects").field(few).finish(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl UnreliableWrapper {
|
||||
pub fn new(inner: crate::GenericRemoteStorage, attempts_to_fail: u64) -> Self {
|
||||
assert!(attempts_to_fail > 0);
|
||||
@@ -59,13 +75,12 @@ impl UnreliableWrapper {
|
||||
e.remove();
|
||||
Ok(attempts_before_this)
|
||||
} else {
|
||||
let error =
|
||||
anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
|
||||
let error = anyhow::anyhow!(SimulatedError::from(e.key()));
|
||||
Err(DownloadError::Other(error))
|
||||
}
|
||||
}
|
||||
Entry::Vacant(e) => {
|
||||
let error = anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
|
||||
let error = anyhow::anyhow!(SimulatedError::from(e.key()));
|
||||
e.insert(1);
|
||||
Err(DownloadError::Other(error))
|
||||
}
|
||||
@@ -80,6 +95,26 @@ impl UnreliableWrapper {
|
||||
}
|
||||
}
|
||||
|
||||
/// `pub` type for checking if this is the root cause around logging.
|
||||
///
|
||||
/// This is just a string to avoid cloning a huge number of paths a second time.
|
||||
#[derive(Debug)]
|
||||
pub struct SimulatedError(String);
|
||||
|
||||
impl<'a> From<&'a RemoteOp> for SimulatedError {
|
||||
fn from(value: &'_ RemoteOp) -> Self {
|
||||
SimulatedError(format!("simulated failure of remote operation {:?}", value))
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for SimulatedError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_str(&self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for SimulatedError {}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl RemoteStorage for UnreliableWrapper {
|
||||
async fn list_prefixes(
|
||||
|
||||
@@ -216,24 +216,6 @@ impl std::fmt::Debug for PrettyLocation<'_, '_> {
|
||||
}
|
||||
}
|
||||
|
||||
/// When you will store a secret but want to make sure it won't
|
||||
/// be accidentally logged, wrap it in a SecretString, whose Debug
|
||||
/// implementation does not expose the contents.
|
||||
#[derive(Clone, Eq, PartialEq)]
|
||||
pub struct SecretString(String);
|
||||
|
||||
impl SecretString {
|
||||
pub fn get_contents(&self) -> &str {
|
||||
self.0.as_str()
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for SecretString {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "[SECRET]")
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use metrics::{core::Opts, IntCounterVec};
|
||||
|
||||
@@ -431,14 +431,14 @@ impl CgroupWatcher {
|
||||
.context("failed to request upscale")?;
|
||||
|
||||
let memory_high =
|
||||
self.get_memory_high_bytes().context("failed to get memory.high")?;
|
||||
self.get_high_bytes().context("failed to get memory.high")?;
|
||||
let new_high = memory_high + self.config.memory_high_increase_by_bytes;
|
||||
info!(
|
||||
current_high_bytes = memory_high,
|
||||
new_high_bytes = new_high,
|
||||
"updating memory.high"
|
||||
);
|
||||
self.set_memory_high_bytes(new_high)
|
||||
self.set_high_bytes(new_high)
|
||||
.context("failed to set memory.high")?;
|
||||
last_memory_high_increase_at = Some(Instant::now());
|
||||
continue;
|
||||
@@ -556,6 +556,14 @@ impl CgroupWatcher {
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents a set of limits we apply to a cgroup to control memory usage.
|
||||
///
|
||||
/// Setting these values also affects the thresholds for receiving usage alerts.
|
||||
#[derive(Debug)]
|
||||
pub struct MemoryLimits {
|
||||
pub high: u64,
|
||||
}
|
||||
|
||||
// Methods for manipulating the actual cgroup
|
||||
impl CgroupWatcher {
|
||||
/// Get a handle on the freezer subsystem.
|
||||
@@ -616,29 +624,50 @@ impl CgroupWatcher {
|
||||
}
|
||||
|
||||
/// Set cgroup memory.high threshold.
|
||||
pub fn set_memory_high_bytes(&self, bytes: u64) -> anyhow::Result<()> {
|
||||
self.set_memory_high_internal(MaxValue::Value(u64::min(bytes, i64::MAX as u64) as i64))
|
||||
}
|
||||
|
||||
/// Set the cgroup's memory.high to 'max', disabling it.
|
||||
pub fn unset_memory_high(&self) -> anyhow::Result<()> {
|
||||
self.set_memory_high_internal(MaxValue::Max)
|
||||
}
|
||||
|
||||
fn set_memory_high_internal(&self, value: MaxValue) -> anyhow::Result<()> {
|
||||
pub fn set_high_bytes(&self, bytes: u64) -> anyhow::Result<()> {
|
||||
self.memory()
|
||||
.context("failed to get memory subsystem")?
|
||||
.set_mem(cgroups_rs::memory::SetMemory {
|
||||
low: None,
|
||||
high: Some(value),
|
||||
high: Some(MaxValue::Value(u64::min(bytes, i64::MAX as u64) as i64)),
|
||||
min: None,
|
||||
max: None,
|
||||
})
|
||||
.map_err(anyhow::Error::from)
|
||||
.context("failed to set memory.high")
|
||||
}
|
||||
|
||||
/// Set cgroup memory.high and memory.max.
|
||||
pub fn set_limits(&self, limits: &MemoryLimits) -> anyhow::Result<()> {
|
||||
info!(limits.high, path = self.path(), "writing new memory limits",);
|
||||
self.memory()
|
||||
.context("failed to get memory subsystem while setting memory limits")?
|
||||
.set_mem(cgroups_rs::memory::SetMemory {
|
||||
min: None,
|
||||
low: None,
|
||||
high: Some(MaxValue::Value(
|
||||
u64::min(limits.high, i64::MAX as u64) as i64
|
||||
)),
|
||||
max: None,
|
||||
})
|
||||
.context("failed to set memory limits")
|
||||
}
|
||||
|
||||
/// Given some amount of available memory, set the desired cgroup memory limits
|
||||
pub fn set_memory_limits(&mut self, available_memory: u64) -> anyhow::Result<()> {
|
||||
let new_high = self.config.calculate_memory_high_value(available_memory);
|
||||
let limits = MemoryLimits { high: new_high };
|
||||
info!(
|
||||
path = self.path(),
|
||||
memory = ?limits,
|
||||
"setting cgroup memory",
|
||||
);
|
||||
self.set_limits(&limits)
|
||||
.context("failed to set cgroup memory limits")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get memory.high threshold.
|
||||
pub fn get_memory_high_bytes(&self) -> anyhow::Result<u64> {
|
||||
pub fn get_high_bytes(&self) -> anyhow::Result<u64> {
|
||||
let high = self
|
||||
.memory()
|
||||
.context("failed to get memory subsystem while getting memory statistics")?
|
||||
|
||||
@@ -16,7 +16,7 @@ use tokio::sync::mpsc;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use crate::cgroup::{CgroupWatcher, Sequenced};
|
||||
use crate::cgroup::{CgroupWatcher, MemoryLimits, Sequenced};
|
||||
use crate::dispatcher::Dispatcher;
|
||||
use crate::filecache::{FileCacheConfig, FileCacheState};
|
||||
use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources};
|
||||
@@ -106,51 +106,6 @@ impl Runner {
|
||||
kill,
|
||||
};
|
||||
|
||||
// If we have both the cgroup and file cache integrations enabled, it's possible for
|
||||
// temporary failures to result in cgroup throttling (from memory.high), that in turn makes
|
||||
// it near-impossible to connect to the file cache (because it times out). Unfortunately,
|
||||
// we *do* still want to determine the file cache size before setting the cgroup's
|
||||
// memory.high, so it's not as simple as just swapping the order.
|
||||
//
|
||||
// Instead, the resolution here is that on vm-monitor startup (note: happens on each
|
||||
// connection from autoscaler-agent, possibly multiple times per compute_ctl lifecycle), we
|
||||
// temporarily unset memory.high, to allow any existing throttling to dissipate. It's a bit
|
||||
// of a hacky solution, but helps with reliability.
|
||||
if let Some(name) = &args.cgroup {
|
||||
// Best not to set up cgroup stuff more than once, so we'll initialize cgroup state
|
||||
// now, and then set limits later.
|
||||
info!("initializing cgroup");
|
||||
|
||||
let (cgroup, cgroup_event_stream) = CgroupWatcher::new(name.clone(), requesting_send)
|
||||
.context("failed to create cgroup manager")?;
|
||||
|
||||
info!("temporarily unsetting memory.high");
|
||||
|
||||
// Temporarily un-set cgroup memory.high; see above.
|
||||
cgroup
|
||||
.unset_memory_high()
|
||||
.context("failed to unset memory.high")?;
|
||||
|
||||
let cgroup = Arc::new(cgroup);
|
||||
|
||||
let cgroup_clone = Arc::clone(&cgroup);
|
||||
spawn_with_cancel(
|
||||
token.clone(),
|
||||
|_| error!("cgroup watcher terminated"),
|
||||
async move { cgroup_clone.watch(notified_recv, cgroup_event_stream).await },
|
||||
);
|
||||
|
||||
state.cgroup = Some(cgroup);
|
||||
} else {
|
||||
// *NOTE*: We need to forget the sender so that its drop impl does not get ran.
|
||||
// This allows us to poll it in `Monitor::run` regardless of whether we
|
||||
// are managing a cgroup or not. If we don't forget it, all receives will
|
||||
// immediately return an error because the sender is droped and it will
|
||||
// claim all select! statements, effectively turning `Monitor::run` into
|
||||
// `loop { fail to receive }`.
|
||||
mem::forget(requesting_send);
|
||||
}
|
||||
|
||||
let mut file_cache_reserved_bytes = 0;
|
||||
let mem = get_total_system_memory();
|
||||
|
||||
@@ -164,7 +119,7 @@ impl Runner {
|
||||
false => FileCacheConfig::default_in_memory(),
|
||||
};
|
||||
|
||||
let mut file_cache = FileCacheState::new(connstr, config, token)
|
||||
let mut file_cache = FileCacheState::new(connstr, config, token.clone())
|
||||
.await
|
||||
.context("failed to create file cache")?;
|
||||
|
||||
@@ -197,15 +152,35 @@ impl Runner {
|
||||
state.filecache = Some(file_cache);
|
||||
}
|
||||
|
||||
if let Some(cgroup) = &state.cgroup {
|
||||
let available = mem - file_cache_reserved_bytes;
|
||||
let value = cgroup.config.calculate_memory_high_value(available);
|
||||
if let Some(name) = &args.cgroup {
|
||||
let (mut cgroup, cgroup_event_stream) =
|
||||
CgroupWatcher::new(name.clone(), requesting_send)
|
||||
.context("failed to create cgroup manager")?;
|
||||
|
||||
info!(value, "setting memory.high");
|
||||
let available = mem - file_cache_reserved_bytes;
|
||||
|
||||
cgroup
|
||||
.set_memory_high_bytes(value)
|
||||
.context("failed to set cgroup memory.high")?;
|
||||
.set_memory_limits(available)
|
||||
.context("failed to set cgroup memory limits")?;
|
||||
|
||||
let cgroup = Arc::new(cgroup);
|
||||
|
||||
// Some might call this . . . cgroup v2
|
||||
let cgroup_clone = Arc::clone(&cgroup);
|
||||
|
||||
spawn_with_cancel(token, |_| error!("cgroup watcher terminated"), async move {
|
||||
cgroup_clone.watch(notified_recv, cgroup_event_stream).await
|
||||
});
|
||||
|
||||
state.cgroup = Some(cgroup);
|
||||
} else {
|
||||
// *NOTE*: We need to forget the sender so that its drop impl does not get ran.
|
||||
// This allows us to poll it in `Monitor::run` regardless of whether we
|
||||
// are managing a cgroup or not. If we don't forget it, all receives will
|
||||
// immediately return an error because the sender is droped and it will
|
||||
// claim all select! statements, effectively turning `Monitor::run` into
|
||||
// `loop { fail to receive }`.
|
||||
mem::forget(requesting_send);
|
||||
}
|
||||
|
||||
Ok(state)
|
||||
@@ -282,11 +257,14 @@ impl Runner {
|
||||
new_cgroup_mem_high = cgroup.config.calculate_memory_high_value(available_memory);
|
||||
}
|
||||
|
||||
// new_cgroup_mem_high is initialized to 0 but it is guaranteed to not be here
|
||||
// since it is properly initialized in the previous cgroup if let block
|
||||
let limits = MemoryLimits {
|
||||
// new_cgroup_mem_high is initialized to 0 but it is guarancontextd to not be here
|
||||
// since it is properly initialized in the previous cgroup if let block
|
||||
high: new_cgroup_mem_high,
|
||||
};
|
||||
cgroup
|
||||
.set_memory_high_bytes(new_cgroup_mem_high)
|
||||
.context("failed to set cgroup memory.high")?;
|
||||
.set_limits(&limits)
|
||||
.context("failed to set cgroup memory limits")?;
|
||||
|
||||
let message = format!(
|
||||
"set cgroup memory.high to {} MiB, of new max {} MiB",
|
||||
@@ -349,9 +327,12 @@ impl Runner {
|
||||
name = cgroup.path(),
|
||||
"updating cgroup memory.high",
|
||||
);
|
||||
let limits = MemoryLimits {
|
||||
high: new_cgroup_mem_high,
|
||||
};
|
||||
cgroup
|
||||
.set_memory_high_bytes(new_cgroup_mem_high)
|
||||
.context("failed to set cgroup memory.high")?;
|
||||
.set_limits(&limits)
|
||||
.context("failed to set file cache size")?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -644,7 +644,7 @@ fn create_remote_storage_client(
|
||||
let config = if let Some(config) = &conf.remote_storage_config {
|
||||
config
|
||||
} else {
|
||||
tracing::warn!("no remote storage configured, this is a deprecated configuration");
|
||||
// No remote storage configured.
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
|
||||
@@ -11,7 +11,6 @@ use std::env;
|
||||
use storage_broker::Uri;
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::id::ConnectionId;
|
||||
use utils::logging::SecretString;
|
||||
|
||||
use once_cell::sync::OnceCell;
|
||||
use reqwest::Url;
|
||||
@@ -208,9 +207,6 @@ pub struct PageServerConf {
|
||||
pub background_task_maximum_delay: Duration,
|
||||
|
||||
pub control_plane_api: Option<Url>,
|
||||
|
||||
/// JWT token for use with the control plane API.
|
||||
pub control_plane_api_token: Option<SecretString>,
|
||||
}
|
||||
|
||||
/// We do not want to store this in a PageServerConf because the latter may be logged
|
||||
@@ -287,7 +283,6 @@ struct PageServerConfigBuilder {
|
||||
background_task_maximum_delay: BuilderValue<Duration>,
|
||||
|
||||
control_plane_api: BuilderValue<Option<Url>>,
|
||||
control_plane_api_token: BuilderValue<Option<SecretString>>,
|
||||
}
|
||||
|
||||
impl Default for PageServerConfigBuilder {
|
||||
@@ -352,7 +347,6 @@ impl Default for PageServerConfigBuilder {
|
||||
.unwrap()),
|
||||
|
||||
control_plane_api: Set(None),
|
||||
control_plane_api_token: Set(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -573,9 +567,6 @@ impl PageServerConfigBuilder {
|
||||
control_plane_api: self
|
||||
.control_plane_api
|
||||
.ok_or(anyhow!("missing control_plane_api"))?,
|
||||
control_plane_api_token: self
|
||||
.control_plane_api_token
|
||||
.ok_or(anyhow!("missing control_plane_api_token"))?,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -954,7 +945,6 @@ impl PageServerConf {
|
||||
ondemand_download_behavior_treat_error_as_warn: false,
|
||||
background_task_maximum_delay: Duration::ZERO,
|
||||
control_plane_api: None,
|
||||
control_plane_api_token: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1178,8 +1168,7 @@ background_task_maximum_delay = '334 s'
|
||||
background_task_maximum_delay: humantime::parse_duration(
|
||||
defaults::DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY
|
||||
)?,
|
||||
control_plane_api: None,
|
||||
control_plane_api_token: None
|
||||
control_plane_api: None
|
||||
},
|
||||
"Correct defaults should be used when no config values are provided"
|
||||
);
|
||||
@@ -1235,8 +1224,7 @@ background_task_maximum_delay = '334 s'
|
||||
test_remote_failures: 0,
|
||||
ondemand_download_behavior_treat_error_as_warn: false,
|
||||
background_task_maximum_delay: Duration::from_secs(334),
|
||||
control_plane_api: None,
|
||||
control_plane_api_token: None
|
||||
control_plane_api: None
|
||||
},
|
||||
"Should be able to parse all basic config values correctly"
|
||||
);
|
||||
|
||||
@@ -53,16 +53,12 @@ impl ControlPlaneClient {
|
||||
segs.pop_if_empty().push("");
|
||||
}
|
||||
|
||||
let mut client = reqwest::ClientBuilder::new();
|
||||
|
||||
if let Some(jwt) = &conf.control_plane_api_token {
|
||||
let mut headers = hyper::HeaderMap::new();
|
||||
headers.insert("Authorization", jwt.get_contents().parse().unwrap());
|
||||
client = client.default_headers(headers);
|
||||
}
|
||||
let client = reqwest::ClientBuilder::new()
|
||||
.build()
|
||||
.expect("Failed to construct http client");
|
||||
|
||||
Some(Self {
|
||||
http_client: client.build().expect("Failed to construct HTTP client"),
|
||||
http_client: client,
|
||||
base_url: url,
|
||||
node_id: conf.id,
|
||||
cancel: cancel.clone(),
|
||||
|
||||
@@ -291,14 +291,6 @@ static RESIDENT_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RESIDENT_PHYSICAL_SIZE_GLOBAL: Lazy<UIntGauge> = Lazy::new(|| {
|
||||
register_uint_gauge!(
|
||||
"pageserver_resident_physical_size_global",
|
||||
"Like `pageserver_resident_physical_size`, but without tenant/timeline dimensions."
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static REMOTE_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_remote_physical_size",
|
||||
@@ -309,14 +301,6 @@ static REMOTE_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static REMOTE_PHYSICAL_SIZE_GLOBAL: Lazy<UIntGauge> = Lazy::new(|| {
|
||||
register_uint_gauge!(
|
||||
"pageserver_remote_physical_size_global",
|
||||
"Like `pageserver_remote_physical_size`, but without tenant/timeline dimensions."
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static REMOTE_ONDEMAND_DOWNLOADED_LAYERS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_remote_ondemand_downloaded_layers_total",
|
||||
@@ -1225,7 +1209,7 @@ pub struct TimelineMetrics {
|
||||
pub load_layer_map_histo: StorageTimeMetrics,
|
||||
pub garbage_collect_histo: StorageTimeMetrics,
|
||||
pub last_record_gauge: IntGauge,
|
||||
resident_physical_size_gauge: UIntGauge,
|
||||
pub resident_physical_size_gauge: UIntGauge,
|
||||
/// copy of LayeredTimeline.current_logical_size
|
||||
pub current_logical_size_gauge: UIntGauge,
|
||||
pub num_persistent_files_created: IntCounter,
|
||||
@@ -1303,29 +1287,10 @@ impl TimelineMetrics {
|
||||
}
|
||||
|
||||
pub fn record_new_file_metrics(&self, sz: u64) {
|
||||
self.resident_physical_size_add(sz);
|
||||
self.resident_physical_size_gauge.add(sz);
|
||||
self.num_persistent_files_created.inc_by(1);
|
||||
self.persistent_bytes_written.inc_by(sz);
|
||||
}
|
||||
|
||||
pub fn resident_physical_size_sub(&self, sz: u64) {
|
||||
self.resident_physical_size_gauge.sub(sz);
|
||||
crate::metrics::RESIDENT_PHYSICAL_SIZE_GLOBAL.sub(sz);
|
||||
}
|
||||
|
||||
pub fn resident_physical_size_add(&self, sz: u64) {
|
||||
self.resident_physical_size_gauge.add(sz);
|
||||
crate::metrics::RESIDENT_PHYSICAL_SIZE_GLOBAL.add(sz);
|
||||
}
|
||||
|
||||
pub fn resident_physical_size_set(&self, sz: u64) {
|
||||
self.resident_physical_size_gauge.set(sz);
|
||||
crate::metrics::RESIDENT_PHYSICAL_SIZE_GLOBAL.set(sz);
|
||||
}
|
||||
|
||||
pub fn resident_physical_size_get(&self) -> u64 {
|
||||
self.resident_physical_size_gauge.get()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TimelineMetrics {
|
||||
@@ -1333,10 +1298,7 @@ impl Drop for TimelineMetrics {
|
||||
let tenant_id = &self.tenant_id;
|
||||
let timeline_id = &self.timeline_id;
|
||||
let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, timeline_id]);
|
||||
{
|
||||
RESIDENT_PHYSICAL_SIZE_GLOBAL.sub(self.resident_physical_size_get());
|
||||
let _ = RESIDENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]);
|
||||
}
|
||||
let _ = RESIDENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]);
|
||||
let _ = CURRENT_LOGICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]);
|
||||
let _ = NUM_PERSISTENT_FILES_CREATED.remove_label_values(&[tenant_id, timeline_id]);
|
||||
let _ = PERSISTENT_BYTES_WRITTEN.remove_label_values(&[tenant_id, timeline_id]);
|
||||
@@ -1390,43 +1352,10 @@ use std::time::{Duration, Instant};
|
||||
use crate::context::{PageContentKind, RequestContext};
|
||||
use crate::task_mgr::TaskKind;
|
||||
|
||||
/// Maintain a per timeline gauge in addition to the global gauge.
|
||||
struct PerTimelineRemotePhysicalSizeGauge {
|
||||
last_set: u64,
|
||||
gauge: UIntGauge,
|
||||
}
|
||||
|
||||
impl PerTimelineRemotePhysicalSizeGauge {
|
||||
fn new(per_timeline_gauge: UIntGauge) -> Self {
|
||||
Self {
|
||||
last_set: per_timeline_gauge.get(),
|
||||
gauge: per_timeline_gauge,
|
||||
}
|
||||
}
|
||||
fn set(&mut self, sz: u64) {
|
||||
self.gauge.set(sz);
|
||||
if sz < self.last_set {
|
||||
REMOTE_PHYSICAL_SIZE_GLOBAL.sub(self.last_set - sz);
|
||||
} else {
|
||||
REMOTE_PHYSICAL_SIZE_GLOBAL.add(sz - self.last_set);
|
||||
};
|
||||
self.last_set = sz;
|
||||
}
|
||||
fn get(&self) -> u64 {
|
||||
self.gauge.get()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PerTimelineRemotePhysicalSizeGauge {
|
||||
fn drop(&mut self) {
|
||||
REMOTE_PHYSICAL_SIZE_GLOBAL.sub(self.last_set);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RemoteTimelineClientMetrics {
|
||||
tenant_id: String,
|
||||
timeline_id: String,
|
||||
remote_physical_size_gauge: Mutex<Option<PerTimelineRemotePhysicalSizeGauge>>,
|
||||
remote_physical_size_gauge: Mutex<Option<UIntGauge>>,
|
||||
calls_unfinished_gauge: Mutex<HashMap<(&'static str, &'static str), IntGauge>>,
|
||||
bytes_started_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
|
||||
bytes_finished_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
|
||||
@@ -1444,24 +1373,18 @@ impl RemoteTimelineClientMetrics {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn remote_physical_size_set(&self, sz: u64) {
|
||||
pub fn remote_physical_size_gauge(&self) -> UIntGauge {
|
||||
let mut guard = self.remote_physical_size_gauge.lock().unwrap();
|
||||
let gauge = guard.get_or_insert_with(|| {
|
||||
PerTimelineRemotePhysicalSizeGauge::new(
|
||||
guard
|
||||
.get_or_insert_with(|| {
|
||||
REMOTE_PHYSICAL_SIZE
|
||||
.get_metric_with_label_values(&[
|
||||
&self.tenant_id.to_string(),
|
||||
&self.timeline_id.to_string(),
|
||||
])
|
||||
.unwrap(),
|
||||
)
|
||||
});
|
||||
gauge.set(sz);
|
||||
}
|
||||
|
||||
pub(crate) fn remote_physical_size_get(&self) -> u64 {
|
||||
let guard = self.remote_physical_size_gauge.lock().unwrap();
|
||||
guard.as_ref().map(|gauge| gauge.get()).unwrap_or(0)
|
||||
.unwrap()
|
||||
})
|
||||
.clone()
|
||||
}
|
||||
|
||||
pub fn remote_operation_time(
|
||||
|
||||
@@ -453,11 +453,11 @@ impl RemoteTimelineClient {
|
||||
} else {
|
||||
0
|
||||
};
|
||||
self.metrics.remote_physical_size_set(size);
|
||||
self.metrics.remote_physical_size_gauge().set(size);
|
||||
}
|
||||
|
||||
pub fn get_remote_physical_size(&self) -> u64 {
|
||||
self.metrics.remote_physical_size_get()
|
||||
self.metrics.remote_physical_size_gauge().get()
|
||||
}
|
||||
|
||||
//
|
||||
@@ -1161,7 +1161,11 @@ impl RemoteTimelineClient {
|
||||
// at info level at first, and only WARN if the operation fails repeatedly.
|
||||
//
|
||||
// (See similar logic for downloads in `download::download_retry`)
|
||||
if retries < FAILED_UPLOAD_WARN_THRESHOLD {
|
||||
|
||||
let is_simulated = cfg!(feature = "testing")
|
||||
&& e.root_cause().is::<remote_storage::SimulatedError>();
|
||||
|
||||
if retries < FAILED_UPLOAD_WARN_THRESHOLD || is_simulated {
|
||||
info!(
|
||||
"failed to perform remote task {}, will retry (attempt {}): {:#}",
|
||||
task.op, retries, e
|
||||
|
||||
@@ -559,7 +559,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
pub fn resident_physical_size(&self) -> u64 {
|
||||
self.metrics.resident_physical_size_get()
|
||||
self.metrics.resident_physical_size_gauge.get()
|
||||
}
|
||||
|
||||
///
|
||||
@@ -1309,7 +1309,10 @@ impl Timeline {
|
||||
// will treat the file as a local layer again, count it towards resident size,
|
||||
// and it'll be like the layer removal never happened.
|
||||
// The bump in resident size is perhaps unexpected but overall a robust behavior.
|
||||
self.metrics.resident_physical_size_sub(layer_file_size);
|
||||
self.metrics
|
||||
.resident_physical_size_gauge
|
||||
.sub(layer_file_size);
|
||||
|
||||
self.metrics.evictions.inc();
|
||||
|
||||
if let Some(delta) = local_layer_residence_duration {
|
||||
@@ -1843,7 +1846,9 @@ impl Timeline {
|
||||
"loaded layer map with {} layers at {}, total physical size: {}",
|
||||
num_layers, disk_consistent_lsn, total_physical_size
|
||||
);
|
||||
self.metrics.resident_physical_size_set(total_physical_size);
|
||||
self.metrics
|
||||
.resident_physical_size_gauge
|
||||
.set(total_physical_size);
|
||||
|
||||
timer.stop_and_record();
|
||||
Ok(())
|
||||
@@ -4393,7 +4398,7 @@ impl Timeline {
|
||||
|
||||
// XXX the temp file is still around in Err() case
|
||||
// and consumes space until we clean up upon pageserver restart.
|
||||
self_clone.metrics.resident_physical_size_add(*size);
|
||||
self_clone.metrics.resident_physical_size_gauge.add(*size);
|
||||
|
||||
// Download complete. Replace the RemoteLayer with the corresponding
|
||||
// Delta- or ImageLayer in the layer map.
|
||||
|
||||
@@ -263,7 +263,7 @@ impl LayerManager {
|
||||
let desc = layer.layer_desc();
|
||||
if !layer.is_remote_layer() {
|
||||
layer.delete_resident_layer_file()?;
|
||||
metrics.resident_physical_size_sub(desc.file_size);
|
||||
metrics.resident_physical_size_gauge.sub(desc.file_size);
|
||||
}
|
||||
|
||||
// TODO Removing from the bottom of the layer map is expensive.
|
||||
|
||||
@@ -1790,14 +1790,6 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
|
||||
if (!XLogInsertAllowed())
|
||||
return;
|
||||
|
||||
/* ensure we have enough xlog buffers to log max-sized records */
|
||||
XLogEnsureRecordSpace(Min(remblocks, (XLR_MAX_BLOCK_ID - 1)), 0);
|
||||
|
||||
/*
|
||||
* Iterate over all the pages. They are collected into batches of
|
||||
* XLR_MAX_BLOCK_ID pages, and a single WAL-record is written for each
|
||||
* batch.
|
||||
*/
|
||||
while (remblocks > 0)
|
||||
{
|
||||
int count = Min(remblocks, XLR_MAX_BLOCK_ID);
|
||||
|
||||
@@ -42,7 +42,6 @@ reqwest-middleware.workspace = true
|
||||
reqwest-retry.workspace = true
|
||||
reqwest-tracing.workspace = true
|
||||
routerify.workspace = true
|
||||
rustc-hash.workspace = true
|
||||
rustls-pemfile.workspace = true
|
||||
rustls.workspace = true
|
||||
scopeguard.workspace = true
|
||||
|
||||
@@ -17,12 +17,11 @@ use std::{
|
||||
use tokio::time;
|
||||
use tokio_postgres::AsyncMessage;
|
||||
|
||||
use crate::{
|
||||
auth, console,
|
||||
metrics::{Ids, MetricCounter, USAGE_METRICS},
|
||||
};
|
||||
use crate::{auth, console};
|
||||
use crate::{compute, config};
|
||||
|
||||
use super::sql_over_http::MAX_RESPONSE_SIZE;
|
||||
|
||||
use crate::proxy::ConnectMechanism;
|
||||
|
||||
use tracing::{error, warn};
|
||||
@@ -401,6 +400,7 @@ async fn connect_to_compute_once(
|
||||
.user(&conn_info.username)
|
||||
.password(&conn_info.password)
|
||||
.dbname(&conn_info.dbname)
|
||||
.max_backend_message_size(MAX_RESPONSE_SIZE)
|
||||
.connect_timeout(timeout)
|
||||
.connect(tokio_postgres::NoTls)
|
||||
.await?;
|
||||
@@ -412,10 +412,6 @@ async fn connect_to_compute_once(
|
||||
span.in_scope(|| {
|
||||
info!(%conn_info, %session, "new connection");
|
||||
});
|
||||
let ids = Ids {
|
||||
endpoint_id: node_info.aux.endpoint_id.to_string(),
|
||||
branch_id: node_info.aux.branch_id.to_string(),
|
||||
};
|
||||
|
||||
tokio::spawn(
|
||||
poll_fn(move |cx| {
|
||||
@@ -454,18 +450,10 @@ async fn connect_to_compute_once(
|
||||
Ok(Client {
|
||||
inner: client,
|
||||
session: tx,
|
||||
ids,
|
||||
})
|
||||
}
|
||||
|
||||
pub struct Client {
|
||||
pub inner: tokio_postgres::Client,
|
||||
session: tokio::sync::watch::Sender<uuid::Uuid>,
|
||||
ids: Ids,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub fn metrics(&self) -> Arc<MetricCounter> {
|
||||
USAGE_METRICS.register(self.ids.clone())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,12 +3,10 @@ use std::sync::Arc;
|
||||
use anyhow::bail;
|
||||
use futures::pin_mut;
|
||||
use futures::StreamExt;
|
||||
use hashbrown::HashMap;
|
||||
use hyper::body::HttpBody;
|
||||
use hyper::header;
|
||||
use hyper::http::HeaderName;
|
||||
use hyper::http::HeaderValue;
|
||||
use hyper::Response;
|
||||
use hyper::StatusCode;
|
||||
use hyper::{Body, HeaderMap, Request};
|
||||
use serde_json::json;
|
||||
use serde_json::Map;
|
||||
@@ -18,11 +16,7 @@ use tokio_postgres::types::Type;
|
||||
use tokio_postgres::GenericClient;
|
||||
use tokio_postgres::IsolationLevel;
|
||||
use tokio_postgres::Row;
|
||||
use tracing::error;
|
||||
use tracing::instrument;
|
||||
use url::Url;
|
||||
use utils::http::error::ApiError;
|
||||
use utils::http::json::json_response;
|
||||
|
||||
use super::conn_pool::ConnInfo;
|
||||
use super::conn_pool::GlobalConnPool;
|
||||
@@ -45,6 +39,7 @@ enum Payload {
|
||||
Batch(BatchQueryData),
|
||||
}
|
||||
|
||||
pub const MAX_RESPONSE_SIZE: usize = 10 * 1024 * 1024; // 10 MB
|
||||
const MAX_REQUEST_SIZE: u64 = 1024 * 1024; // 1 MB
|
||||
|
||||
static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
|
||||
@@ -187,45 +182,7 @@ pub async fn handle(
|
||||
sni_hostname: Option<String>,
|
||||
conn_pool: Arc<GlobalConnPool>,
|
||||
session_id: uuid::Uuid,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let result = handle_inner(request, sni_hostname, conn_pool, session_id).await;
|
||||
|
||||
let mut response = match result {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
let message = format!("{:?}", e);
|
||||
let code = match e.downcast_ref::<tokio_postgres::Error>() {
|
||||
Some(e) => match e.code() {
|
||||
Some(e) => serde_json::to_value(e.code()).unwrap(),
|
||||
None => Value::Null,
|
||||
},
|
||||
None => Value::Null,
|
||||
};
|
||||
error!(
|
||||
?code,
|
||||
"sql-over-http per-client task finished with an error: {e:#}"
|
||||
);
|
||||
// TODO: this shouldn't always be bad request.
|
||||
json_response(
|
||||
StatusCode::BAD_REQUEST,
|
||||
json!({ "message": message, "code": code }),
|
||||
)?
|
||||
}
|
||||
};
|
||||
response.headers_mut().insert(
|
||||
"Access-Control-Allow-Origin",
|
||||
hyper::http::HeaderValue::from_static("*"),
|
||||
);
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
#[instrument(name = "sql-over-http", skip_all)]
|
||||
async fn handle_inner(
|
||||
request: Request<Body>,
|
||||
sni_hostname: Option<String>,
|
||||
conn_pool: Arc<GlobalConnPool>,
|
||||
session_id: uuid::Uuid,
|
||||
) -> anyhow::Result<Response<Body>> {
|
||||
) -> anyhow::Result<(Value, HashMap<HeaderName, HeaderValue>)> {
|
||||
//
|
||||
// Determine the destination and connection params
|
||||
//
|
||||
@@ -276,18 +233,13 @@ async fn handle_inner(
|
||||
|
||||
let mut client = conn_pool.get(&conn_info, !allow_pool, session_id).await?;
|
||||
|
||||
let mut response = Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(header::CONTENT_TYPE, "application/json");
|
||||
|
||||
//
|
||||
// Now execute the query and return the result
|
||||
//
|
||||
let mut size = 0;
|
||||
let result = match payload {
|
||||
Payload::Single(query) => {
|
||||
query_to_json(&client.inner, query, &mut size, raw_output, array_mode).await
|
||||
}
|
||||
Payload::Single(query) => query_to_json(&client.inner, query, raw_output, array_mode)
|
||||
.await
|
||||
.map(|x| (x, HashMap::default())),
|
||||
Payload::Batch(batch_query) => {
|
||||
let mut results = Vec::new();
|
||||
let mut builder = client.inner.build_transaction();
|
||||
@@ -302,8 +254,7 @@ async fn handle_inner(
|
||||
}
|
||||
let transaction = builder.start().await?;
|
||||
for query in batch_query.queries {
|
||||
let result =
|
||||
query_to_json(&transaction, query, &mut size, raw_output, array_mode).await;
|
||||
let result = query_to_json(&transaction, query, raw_output, array_mode).await;
|
||||
match result {
|
||||
Ok(r) => results.push(r),
|
||||
Err(e) => {
|
||||
@@ -313,27 +264,26 @@ async fn handle_inner(
|
||||
}
|
||||
}
|
||||
transaction.commit().await?;
|
||||
let mut headers = HashMap::default();
|
||||
if txn_read_only {
|
||||
response = response.header(
|
||||
headers.insert(
|
||||
TXN_READ_ONLY.clone(),
|
||||
HeaderValue::try_from(txn_read_only.to_string())?,
|
||||
);
|
||||
}
|
||||
if txn_deferrable {
|
||||
response = response.header(
|
||||
headers.insert(
|
||||
TXN_DEFERRABLE.clone(),
|
||||
HeaderValue::try_from(txn_deferrable.to_string())?,
|
||||
);
|
||||
}
|
||||
if let Some(txn_isolation_level) = txn_isolation_level_raw {
|
||||
response = response.header(TXN_ISOLATION_LEVEL.clone(), txn_isolation_level);
|
||||
headers.insert(TXN_ISOLATION_LEVEL.clone(), txn_isolation_level);
|
||||
}
|
||||
Ok(json!({ "results": results }))
|
||||
Ok((json!({ "results": results }), headers))
|
||||
}
|
||||
};
|
||||
|
||||
let metrics = client.metrics();
|
||||
|
||||
if allow_pool {
|
||||
let current_span = tracing::Span::current();
|
||||
// return connection to the pool
|
||||
@@ -343,30 +293,12 @@ async fn handle_inner(
|
||||
});
|
||||
}
|
||||
|
||||
match result {
|
||||
Ok(value) => {
|
||||
// how could this possibly fail
|
||||
let body = serde_json::to_string(&value).expect("json serialization should not fail");
|
||||
let len = body.len();
|
||||
let response = response
|
||||
.body(Body::from(body))
|
||||
// only fails if invalid status code or invalid header/values are given.
|
||||
// these are not user configurable so it cannot fail dynamically
|
||||
.expect("building response payload should not fail");
|
||||
|
||||
// count the egress bytes - we miss the TLS and header overhead but oh well...
|
||||
// moving this later in the stack is going to be a lot of effort and ehhhh
|
||||
metrics.record_egress(len as u64);
|
||||
Ok(response)
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
async fn query_to_json<T: GenericClient>(
|
||||
client: &T,
|
||||
data: QueryData,
|
||||
current_size: &mut usize,
|
||||
raw_output: bool,
|
||||
array_mode: bool,
|
||||
) -> anyhow::Result<Value> {
|
||||
@@ -380,10 +312,16 @@ async fn query_to_json<T: GenericClient>(
|
||||
// big.
|
||||
pin_mut!(row_stream);
|
||||
let mut rows: Vec<tokio_postgres::Row> = Vec::new();
|
||||
let mut current_size = 0;
|
||||
while let Some(row) = row_stream.next().await {
|
||||
let row = row?;
|
||||
*current_size += row.body_len();
|
||||
current_size += row.body_len();
|
||||
rows.push(row);
|
||||
if current_size > MAX_RESPONSE_SIZE {
|
||||
return Err(anyhow::anyhow!(
|
||||
"response is too large (max is {MAX_RESPONSE_SIZE} bytes)"
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// grab the command tag and number of rows affected
|
||||
|
||||
@@ -7,6 +7,7 @@ use crate::{
|
||||
};
|
||||
use bytes::{Buf, Bytes};
|
||||
use futures::{Sink, Stream, StreamExt};
|
||||
use hashbrown::HashMap;
|
||||
use hyper::{
|
||||
server::{
|
||||
accept,
|
||||
@@ -17,6 +18,7 @@ use hyper::{
|
||||
};
|
||||
use hyper_tungstenite::{tungstenite::Message, HyperWebsocket, WebSocketStream};
|
||||
use pin_project_lite::pin_project;
|
||||
use serde_json::{json, Value};
|
||||
|
||||
use std::{
|
||||
convert::Infallible,
|
||||
@@ -202,7 +204,44 @@ async fn ws_handler(
|
||||
// TODO: that deserves a refactor as now this function also handles http json client besides websockets.
|
||||
// Right now I don't want to blow up sql-over-http patch with file renames and do that as a follow up instead.
|
||||
} else if request.uri().path() == "/sql" && request.method() == Method::POST {
|
||||
sql_over_http::handle(request, sni_hostname, conn_pool, session_id).await
|
||||
let result = sql_over_http::handle(request, sni_hostname, conn_pool, session_id)
|
||||
.instrument(info_span!("sql-over-http"))
|
||||
.await;
|
||||
let status_code = match result {
|
||||
Ok(_) => StatusCode::OK,
|
||||
Err(_) => StatusCode::BAD_REQUEST,
|
||||
};
|
||||
let (json, headers) = match result {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
let message = format!("{:?}", e);
|
||||
let code = match e.downcast_ref::<tokio_postgres::Error>() {
|
||||
Some(e) => match e.code() {
|
||||
Some(e) => serde_json::to_value(e.code()).unwrap(),
|
||||
None => Value::Null,
|
||||
},
|
||||
None => Value::Null,
|
||||
};
|
||||
error!(
|
||||
?code,
|
||||
"sql-over-http per-client task finished with an error: {e:#}"
|
||||
);
|
||||
(
|
||||
json!({ "message": message, "code": code }),
|
||||
HashMap::default(),
|
||||
)
|
||||
}
|
||||
};
|
||||
json_response(status_code, json).map(|mut r| {
|
||||
r.headers_mut().insert(
|
||||
"Access-Control-Allow-Origin",
|
||||
hyper::http::HeaderValue::from_static("*"),
|
||||
);
|
||||
for (k, v) in headers {
|
||||
r.headers_mut().insert(k, v);
|
||||
}
|
||||
r
|
||||
})
|
||||
} else if request.uri().path() == "/sql" && request.method() == Method::OPTIONS {
|
||||
Response::builder()
|
||||
.header("Allow", "OPTIONS, POST")
|
||||
@@ -214,7 +253,7 @@ async fn ws_handler(
|
||||
.header("Access-Control-Max-Age", "86400" /* 24 hours */)
|
||||
.status(StatusCode::OK) // 204 is also valid, but see: https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods/OPTIONS#status_code
|
||||
.body(Body::empty())
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))
|
||||
.map_err(|e| ApiError::BadRequest(e.into()))
|
||||
} else {
|
||||
json_response(StatusCode::BAD_REQUEST, "query is not supported")
|
||||
}
|
||||
|
||||
@@ -3,18 +3,9 @@
|
||||
use crate::{config::MetricCollectionConfig, http};
|
||||
use chrono::{DateTime, Utc};
|
||||
use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
|
||||
use dashmap::{mapref::entry::Entry, DashMap};
|
||||
use once_cell::sync::Lazy;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
convert::Infallible,
|
||||
sync::{
|
||||
atomic::{AtomicU64, AtomicUsize, Ordering},
|
||||
Arc,
|
||||
},
|
||||
time::Duration,
|
||||
};
|
||||
use tracing::{error, info, instrument, trace};
|
||||
use serde::Serialize;
|
||||
use std::{collections::HashMap, convert::Infallible, time::Duration};
|
||||
use tracing::{error, info, instrument, trace, warn};
|
||||
|
||||
const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
|
||||
|
||||
@@ -27,95 +18,12 @@ const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
/// Both the proxy and the ingestion endpoint will live in the same region (or cell)
|
||||
/// so while the project-id is unique across regions the whole pipeline will work correctly
|
||||
/// because we enrich the event with project_id in the control-plane endpoint.
|
||||
#[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
|
||||
#[derive(Eq, Hash, PartialEq, Serialize, Debug, Clone)]
|
||||
pub struct Ids {
|
||||
pub endpoint_id: String,
|
||||
pub branch_id: String,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MetricCounter {
|
||||
transmitted: AtomicU64,
|
||||
opened_connections: AtomicUsize,
|
||||
}
|
||||
|
||||
impl MetricCounter {
|
||||
/// Record that some bytes were sent from the proxy to the client
|
||||
pub fn record_egress(&self, bytes: u64) {
|
||||
self.transmitted.fetch_add(bytes, Ordering::AcqRel);
|
||||
}
|
||||
|
||||
/// extract the value that should be reported
|
||||
fn should_report(self: &Arc<Self>) -> Option<u64> {
|
||||
// heuristic to see if the branch is still open
|
||||
// if a clone happens while we are observing, the heuristic will be incorrect.
|
||||
//
|
||||
// Worst case is that we won't report an event for this endpoint.
|
||||
// However, for the strong count to be 1 it must have occured that at one instant
|
||||
// all the endpoints were closed, so missing a report because the endpoints are closed is valid.
|
||||
let is_open = Arc::strong_count(self) > 1;
|
||||
let opened = self.opened_connections.swap(0, Ordering::AcqRel);
|
||||
|
||||
// update cached metrics eagerly, even if they can't get sent
|
||||
// (to avoid sending the same metrics twice)
|
||||
// see the relevant discussion on why to do so even if the status is not success:
|
||||
// https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
|
||||
let value = self.transmitted.swap(0, Ordering::AcqRel);
|
||||
|
||||
// Our only requirement is that we report in every interval if there was an open connection
|
||||
// if there were no opened connections since, then we don't need to report
|
||||
if value == 0 && !is_open && opened == 0 {
|
||||
None
|
||||
} else {
|
||||
Some(value)
|
||||
}
|
||||
}
|
||||
|
||||
/// Determine whether the counter should be cleared from the global map.
|
||||
fn should_clear(self: &mut Arc<Self>) -> bool {
|
||||
// we can't clear this entry if it's acquired elsewhere
|
||||
let Some(counter) = Arc::get_mut(self) else {
|
||||
return false;
|
||||
};
|
||||
let opened = *counter.opened_connections.get_mut();
|
||||
let value = *counter.transmitted.get_mut();
|
||||
// clear if there's no data to report
|
||||
value == 0 && opened == 0
|
||||
}
|
||||
}
|
||||
|
||||
// endpoint and branch IDs are not user generated so we don't run the risk of hash-dos
|
||||
type FastHasher = std::hash::BuildHasherDefault<rustc_hash::FxHasher>;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct Metrics {
|
||||
endpoints: DashMap<Ids, Arc<MetricCounter>, FastHasher>,
|
||||
}
|
||||
|
||||
impl Metrics {
|
||||
/// Register a new byte metrics counter for this endpoint
|
||||
pub fn register(&self, ids: Ids) -> Arc<MetricCounter> {
|
||||
let entry = if let Some(entry) = self.endpoints.get(&ids) {
|
||||
entry.clone()
|
||||
} else {
|
||||
self.endpoints
|
||||
.entry(ids)
|
||||
.or_insert_with(|| {
|
||||
Arc::new(MetricCounter {
|
||||
transmitted: AtomicU64::new(0),
|
||||
opened_connections: AtomicUsize::new(0),
|
||||
})
|
||||
})
|
||||
.clone()
|
||||
};
|
||||
|
||||
entry.opened_connections.fetch_add(1, Ordering::AcqRel);
|
||||
entry
|
||||
}
|
||||
}
|
||||
|
||||
pub static USAGE_METRICS: Lazy<Metrics> = Lazy::new(Metrics::default);
|
||||
|
||||
pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infallible> {
|
||||
info!("metrics collector config: {config:?}");
|
||||
scopeguard::defer! {
|
||||
@@ -123,83 +31,145 @@ pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infall
|
||||
}
|
||||
|
||||
let http_client = http::new_client_with_timeout(DEFAULT_HTTP_REPORTING_TIMEOUT);
|
||||
let mut cached_metrics: HashMap<Ids, (u64, DateTime<Utc>)> = HashMap::new();
|
||||
let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
|
||||
|
||||
let mut prev = Utc::now();
|
||||
let mut ticker = tokio::time::interval(config.interval);
|
||||
loop {
|
||||
ticker.tick().await;
|
||||
|
||||
let now = Utc::now();
|
||||
collect_metrics_iteration(
|
||||
&USAGE_METRICS,
|
||||
let res = collect_metrics_iteration(
|
||||
&http_client,
|
||||
&mut cached_metrics,
|
||||
&config.endpoint,
|
||||
&hostname,
|
||||
prev,
|
||||
now,
|
||||
)
|
||||
.await;
|
||||
prev = now;
|
||||
|
||||
match res {
|
||||
Err(e) => error!("failed to send consumption metrics: {e} "),
|
||||
Ok(_) => trace!("periodic metrics collection completed successfully"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn gather_proxy_io_bytes_per_client() -> Vec<(Ids, (u64, DateTime<Utc>))> {
|
||||
let mut current_metrics: Vec<(Ids, (u64, DateTime<Utc>))> = Vec::new();
|
||||
let metrics = prometheus::default_registry().gather();
|
||||
|
||||
for m in metrics {
|
||||
if m.get_name() == "proxy_io_bytes_per_client" {
|
||||
for ms in m.get_metric() {
|
||||
let direction = ms
|
||||
.get_label()
|
||||
.iter()
|
||||
.find(|l| l.get_name() == "direction")
|
||||
.unwrap()
|
||||
.get_value();
|
||||
|
||||
// Only collect metric for outbound traffic
|
||||
if direction == "tx" {
|
||||
let endpoint_id = ms
|
||||
.get_label()
|
||||
.iter()
|
||||
.find(|l| l.get_name() == "endpoint_id")
|
||||
.unwrap()
|
||||
.get_value();
|
||||
let branch_id = ms
|
||||
.get_label()
|
||||
.iter()
|
||||
.find(|l| l.get_name() == "branch_id")
|
||||
.unwrap()
|
||||
.get_value();
|
||||
|
||||
let value = ms.get_counter().get_value() as u64;
|
||||
|
||||
// Report if the metric value is suspiciously large
|
||||
if value > (1u64 << 40) {
|
||||
warn!(
|
||||
"potentially abnormal counter value: branch_id {} endpoint_id {} val: {}",
|
||||
branch_id, endpoint_id, value
|
||||
);
|
||||
}
|
||||
|
||||
current_metrics.push((
|
||||
Ids {
|
||||
endpoint_id: endpoint_id.to_string(),
|
||||
branch_id: branch_id.to_string(),
|
||||
},
|
||||
(value, Utc::now()),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
current_metrics
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn collect_metrics_iteration(
|
||||
metrics: &Metrics,
|
||||
client: &http::ClientWithMiddleware,
|
||||
cached_metrics: &mut HashMap<Ids, (u64, DateTime<Utc>)>,
|
||||
metric_collection_endpoint: &reqwest::Url,
|
||||
hostname: &str,
|
||||
prev: DateTime<Utc>,
|
||||
now: DateTime<Utc>,
|
||||
) {
|
||||
) -> anyhow::Result<()> {
|
||||
info!(
|
||||
"starting collect_metrics_iteration. metric_collection_endpoint: {}",
|
||||
metric_collection_endpoint
|
||||
);
|
||||
|
||||
let mut metrics_to_clear = Vec::new();
|
||||
let current_metrics = gather_proxy_io_bytes_per_client();
|
||||
|
||||
let metrics_to_send: Vec<(Ids, u64)> = metrics
|
||||
.endpoints
|
||||
let metrics_to_send: Vec<Event<Ids, &'static str>> = current_metrics
|
||||
.iter()
|
||||
.filter_map(|counter| {
|
||||
let key = counter.key().clone();
|
||||
let Some(value) = counter.should_report() else {
|
||||
metrics_to_clear.push(key);
|
||||
return None;
|
||||
.filter_map(|(curr_key, (curr_val, curr_time))| {
|
||||
let mut start_time = *curr_time;
|
||||
let mut value = *curr_val;
|
||||
|
||||
if let Some((prev_val, prev_time)) = cached_metrics.get(curr_key) {
|
||||
// Only send metrics updates if the metric has increased
|
||||
if curr_val > prev_val {
|
||||
value = curr_val - prev_val;
|
||||
start_time = *prev_time;
|
||||
} else {
|
||||
if curr_val < prev_val {
|
||||
error!("proxy_io_bytes_per_client metric value decreased from {} to {} for key {:?}",
|
||||
prev_val, curr_val, curr_key);
|
||||
}
|
||||
return None;
|
||||
}
|
||||
};
|
||||
Some((key, value))
|
||||
|
||||
Some(Event {
|
||||
kind: EventType::Incremental {
|
||||
start_time,
|
||||
stop_time: *curr_time,
|
||||
},
|
||||
metric: PROXY_IO_BYTES_PER_CLIENT,
|
||||
idempotency_key: idempotency_key(hostname),
|
||||
value,
|
||||
extra: Ids {
|
||||
endpoint_id: curr_key.endpoint_id.clone(),
|
||||
branch_id: curr_key.branch_id.clone(),
|
||||
},
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
if metrics_to_send.is_empty() {
|
||||
trace!("no new metrics to send");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Send metrics.
|
||||
// Split into chunks of 1000 metrics to avoid exceeding the max request size
|
||||
for chunk in metrics_to_send.chunks(CHUNK_SIZE) {
|
||||
let events = chunk
|
||||
.iter()
|
||||
.map(|(ids, value)| Event {
|
||||
kind: EventType::Incremental {
|
||||
start_time: prev,
|
||||
stop_time: now,
|
||||
},
|
||||
metric: PROXY_IO_BYTES_PER_CLIENT,
|
||||
idempotency_key: idempotency_key(hostname),
|
||||
value: *value,
|
||||
extra: Ids {
|
||||
endpoint_id: ids.endpoint_id.clone(),
|
||||
branch_id: ids.branch_id.clone(),
|
||||
},
|
||||
})
|
||||
.collect();
|
||||
|
||||
let res = client
|
||||
.post(metric_collection_endpoint.clone())
|
||||
.json(&EventChunk { events })
|
||||
.json(&EventChunk {
|
||||
events: chunk.into(),
|
||||
})
|
||||
.send()
|
||||
.await;
|
||||
|
||||
@@ -213,113 +183,34 @@ async fn collect_metrics_iteration(
|
||||
|
||||
if !res.status().is_success() {
|
||||
error!("metrics endpoint refused the sent metrics: {:?}", res);
|
||||
for metric in chunk.iter().filter(|(_, value)| *value > (1u64 << 40)) {
|
||||
for metric in chunk.iter().filter(|metric| metric.value > (1u64 << 40)) {
|
||||
// Report if the metric value is suspiciously large
|
||||
error!("potentially abnormal metric value: {:?}", metric);
|
||||
}
|
||||
}
|
||||
}
|
||||
// update cached metrics after they were sent
|
||||
// (to avoid sending the same metrics twice)
|
||||
// see the relevant discussion on why to do so even if the status is not success:
|
||||
// https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
|
||||
for send_metric in chunk {
|
||||
let stop_time = match send_metric.kind {
|
||||
EventType::Incremental { stop_time, .. } => stop_time,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
for metric in metrics_to_clear {
|
||||
match metrics.endpoints.entry(metric) {
|
||||
Entry::Occupied(mut counter) => {
|
||||
if counter.get_mut().should_clear() {
|
||||
counter.remove_entry();
|
||||
}
|
||||
}
|
||||
Entry::Vacant(_) => {}
|
||||
cached_metrics
|
||||
.entry(Ids {
|
||||
endpoint_id: send_metric.extra.endpoint_id.clone(),
|
||||
branch_id: send_metric.extra.branch_id.clone(),
|
||||
})
|
||||
// update cached value (add delta) and time
|
||||
.and_modify(|e| {
|
||||
e.0 = e.0.saturating_add(send_metric.value);
|
||||
e.1 = stop_time
|
||||
})
|
||||
// cache new metric
|
||||
.or_insert((send_metric.value, stop_time));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{
|
||||
net::TcpListener,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
use anyhow::Error;
|
||||
use chrono::Utc;
|
||||
use consumption_metrics::{Event, EventChunk};
|
||||
use hyper::{
|
||||
service::{make_service_fn, service_fn},
|
||||
Body, Response,
|
||||
};
|
||||
use url::Url;
|
||||
|
||||
use super::{collect_metrics_iteration, Ids, Metrics};
|
||||
use crate::http;
|
||||
|
||||
#[tokio::test]
|
||||
async fn metrics() {
|
||||
let listener = TcpListener::bind("0.0.0.0:0").unwrap();
|
||||
|
||||
let reports = Arc::new(Mutex::new(vec![]));
|
||||
let reports2 = reports.clone();
|
||||
|
||||
let server = hyper::server::Server::from_tcp(listener)
|
||||
.unwrap()
|
||||
.serve(make_service_fn(move |_| {
|
||||
let reports = reports.clone();
|
||||
async move {
|
||||
Ok::<_, Error>(service_fn(move |req| {
|
||||
let reports = reports.clone();
|
||||
async move {
|
||||
let bytes = hyper::body::to_bytes(req.into_body()).await?;
|
||||
let events: EventChunk<'static, Event<Ids, String>> =
|
||||
serde_json::from_slice(&bytes)?;
|
||||
reports.lock().unwrap().push(events);
|
||||
Ok::<_, Error>(Response::new(Body::from(vec![])))
|
||||
}
|
||||
}))
|
||||
}
|
||||
}));
|
||||
let addr = server.local_addr();
|
||||
tokio::spawn(server);
|
||||
|
||||
let metrics = Metrics::default();
|
||||
let client = http::new_client();
|
||||
let endpoint = Url::parse(&format!("http://{addr}")).unwrap();
|
||||
let now = Utc::now();
|
||||
|
||||
// no counters have been registered
|
||||
collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
|
||||
let r = std::mem::take(&mut *reports2.lock().unwrap());
|
||||
assert!(r.is_empty());
|
||||
|
||||
// register a new counter
|
||||
let counter = metrics.register(Ids {
|
||||
endpoint_id: "e1".to_string(),
|
||||
branch_id: "b1".to_string(),
|
||||
});
|
||||
|
||||
// the counter should be observed despite 0 egress
|
||||
collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
|
||||
let r = std::mem::take(&mut *reports2.lock().unwrap());
|
||||
assert_eq!(r.len(), 1);
|
||||
assert_eq!(r[0].events.len(), 1);
|
||||
assert_eq!(r[0].events[0].value, 0);
|
||||
|
||||
// record egress
|
||||
counter.record_egress(1);
|
||||
|
||||
// egress should be observered
|
||||
collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
|
||||
let r = std::mem::take(&mut *reports2.lock().unwrap());
|
||||
assert_eq!(r.len(), 1);
|
||||
assert_eq!(r[0].events.len(), 1);
|
||||
assert_eq!(r[0].events[0].value, 1);
|
||||
|
||||
// release counter
|
||||
drop(counter);
|
||||
|
||||
// we do not observe the counter
|
||||
collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
|
||||
let r = std::mem::take(&mut *reports2.lock().unwrap());
|
||||
assert!(r.is_empty());
|
||||
|
||||
// counter is unregistered
|
||||
assert!(metrics.endpoints.is_empty());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -129,18 +129,12 @@ impl<T: AsyncRead> WithClientIp<T> {
|
||||
// exit for bad header
|
||||
let len = usize::min(self.buf.len(), HEADER.len());
|
||||
if self.buf[..len] != HEADER[..len] {
|
||||
return Poll::Ready(Err(io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
"invalid proxy protocol v2 header",
|
||||
)));
|
||||
return Poll::Ready(Ok(None));
|
||||
}
|
||||
|
||||
// if no more bytes available then exit
|
||||
if ready!(bytes_read) == 0 {
|
||||
return Poll::Ready(Err(io::Error::new(
|
||||
io::ErrorKind::UnexpectedEof,
|
||||
"missing proxy protocol v2 header",
|
||||
)));
|
||||
return Poll::Ready(Ok(None));
|
||||
};
|
||||
}
|
||||
|
||||
@@ -152,27 +146,27 @@ impl<T: AsyncRead> WithClientIp<T> {
|
||||
let command = vc & 0b1111;
|
||||
if version != 2 {
|
||||
return Poll::Ready(Err(io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
io::ErrorKind::Other,
|
||||
"invalid proxy protocol version. expected version 2",
|
||||
)));
|
||||
}
|
||||
let local = match command {
|
||||
match command {
|
||||
// the connection was established on purpose by the proxy
|
||||
// without being relayed. The connection endpoints are the sender and the
|
||||
// receiver. Such connections exist when the proxy sends health-checks to the
|
||||
// server. The receiver must accept this connection as valid and must use the
|
||||
// real connection endpoints and discard the protocol block including the
|
||||
// family which is ignored.
|
||||
0 => true,
|
||||
0 => {}
|
||||
// the connection was established on behalf of another node,
|
||||
// and reflects the original connection endpoints. The receiver must then use
|
||||
// the information provided in the protocol block to get original the address.
|
||||
1 => false,
|
||||
1 => {}
|
||||
// other values are unassigned and must not be emitted by senders. Receivers
|
||||
// must drop connections presenting unexpected values here.
|
||||
_ => {
|
||||
return Poll::Ready(Err(io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
io::ErrorKind::Other,
|
||||
"invalid proxy protocol command. expected local (0) or proxy (1)",
|
||||
)))
|
||||
}
|
||||
@@ -192,29 +186,8 @@ impl<T: AsyncRead> WithClientIp<T> {
|
||||
// - \x22 : UDP over IPv6 : the forwarded connection uses UDP over the AF_INET6
|
||||
// protocol family. Address length is 2*16 + 2*2 = 36 bytes.
|
||||
0x21 | 0x22 => 36,
|
||||
|
||||
// - \x31 : UNIX stream : the forwarded connection uses SOCK_STREAM over the
|
||||
// AF_UNIX protocol family. Address length is 2*108 = 216 bytes.
|
||||
// - \x32 : UNIX datagram : the forwarded connection uses SOCK_DGRAM over the
|
||||
// AF_UNIX protocol family. Address length is 2*108 = 216 bytes.
|
||||
0x31 | 0x32 => 216,
|
||||
|
||||
// UNSPEC : the connection is forwarded for an unknown, unspecified
|
||||
// or unsupported protocol. The sender should use this family when sending
|
||||
// LOCAL commands or when dealing with unsupported protocol families. When
|
||||
// used with a LOCAL command, the receiver must accept the connection and
|
||||
// ignore any address information. For other commands, the receiver is free
|
||||
// to accept the connection anyway and use the real endpoints addresses or to
|
||||
// reject the connection. The receiver should ignore address information.
|
||||
0x00 | 0x01 | 0x02 | 0x10 | 0x20 | 0x30 if local => 0,
|
||||
|
||||
// unspecified or invalid. ignore the addresses
|
||||
_ => {
|
||||
return Poll::Ready(Err(io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
"invalid proxy protocol address family/transport protocol",
|
||||
)))
|
||||
}
|
||||
// unspecified or unix stream. ignore the addresses
|
||||
_ => 0,
|
||||
};
|
||||
|
||||
// The 15th and 16th bytes is the address length in bytes in network endian order.
|
||||
@@ -446,7 +419,6 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[should_panic(expected = "invalid proxy protocol v2 header")]
|
||||
async fn test_invalid() {
|
||||
let data = [0x55; 256];
|
||||
|
||||
@@ -454,15 +426,20 @@ mod tests {
|
||||
|
||||
let mut bytes = vec![];
|
||||
read.read_to_end(&mut bytes).await.unwrap();
|
||||
assert_eq!(bytes, data);
|
||||
assert_eq!(read.state, ProxyParse::None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[should_panic(expected = "missing proxy protocol v2 header")]
|
||||
async fn test_short() {
|
||||
let mut read = pin!(WithClientIp::new(&super::HEADER.as_slice()[..10]));
|
||||
let data = [0x55; 10];
|
||||
|
||||
let mut read = pin!(WithClientIp::new(data.as_slice()));
|
||||
|
||||
let mut bytes = vec![];
|
||||
read.read_to_end(&mut bytes).await.unwrap();
|
||||
assert_eq!(bytes, data);
|
||||
assert_eq!(read.state, ProxyParse::None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -7,7 +7,6 @@ use crate::{
|
||||
compute::{self, PostgresConnection},
|
||||
config::{ProxyConfig, TlsConfig},
|
||||
console::{self, errors::WakeComputeError, messages::MetricsAuxInfo, Api},
|
||||
metrics::{Ids, USAGE_METRICS},
|
||||
protocol2::WithClientIp,
|
||||
stream::{PqStream, Stream},
|
||||
};
|
||||
@@ -603,11 +602,6 @@ pub async fn proxy_pass(
|
||||
compute: impl AsyncRead + AsyncWrite + Unpin,
|
||||
aux: &MetricsAuxInfo,
|
||||
) -> anyhow::Result<()> {
|
||||
let usage = USAGE_METRICS.register(Ids {
|
||||
endpoint_id: aux.endpoint_id.to_string(),
|
||||
branch_id: aux.branch_id.to_string(),
|
||||
});
|
||||
|
||||
let m_sent = NUM_BYTES_PROXIED_COUNTER.with_label_values(&aux.traffic_labels("tx"));
|
||||
let mut client = MeasuredStream::new(
|
||||
client,
|
||||
@@ -615,7 +609,6 @@ pub async fn proxy_pass(
|
||||
|cnt| {
|
||||
// Number of bytes we sent to the client (outbound).
|
||||
m_sent.inc_by(cnt as u64);
|
||||
usage.record_egress(cnt as u64);
|
||||
},
|
||||
);
|
||||
|
||||
|
||||
@@ -105,8 +105,6 @@ class NeonCompare(PgCompare):
|
||||
self._pg_bin = pg_bin
|
||||
self.pageserver_http_client = self.env.pageserver.http_client()
|
||||
|
||||
# note that neon_simple_env now uses LOCAL_FS remote storage
|
||||
|
||||
# Create tenant
|
||||
tenant_conf: Dict[str, str] = {}
|
||||
if False: # TODO add pytest setting for this
|
||||
|
||||
@@ -460,11 +460,9 @@ class NeonEnvBuilder:
|
||||
), "Unexpectedly instantiated from outside a test function"
|
||||
self.test_name = test_name
|
||||
|
||||
def init_configs(self, default_remote_storage_if_missing: bool = True) -> NeonEnv:
|
||||
def init_configs(self) -> NeonEnv:
|
||||
# Cannot create more than one environment from one builder
|
||||
assert self.env is None, "environment already initialized"
|
||||
if default_remote_storage_if_missing and self.pageserver_remote_storage is None:
|
||||
self.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
self.env = NeonEnv(self)
|
||||
return self.env
|
||||
|
||||
@@ -472,19 +470,8 @@ class NeonEnvBuilder:
|
||||
assert self.env is not None, "environment is not already initialized, call init() first"
|
||||
self.env.start()
|
||||
|
||||
def init_start(
|
||||
self,
|
||||
initial_tenant_conf: Optional[Dict[str, str]] = None,
|
||||
default_remote_storage_if_missing: bool = True,
|
||||
) -> NeonEnv:
|
||||
"""
|
||||
Default way to create and start NeonEnv. Also creates the initial_tenant with root initial_timeline.
|
||||
|
||||
To avoid creating initial_tenant, call init_configs to setup the environment.
|
||||
|
||||
Configuring pageserver with remote storage is now the default. There will be a warning if pageserver is created without one.
|
||||
"""
|
||||
env = self.init_configs(default_remote_storage_if_missing=default_remote_storage_if_missing)
|
||||
def init_start(self, initial_tenant_conf: Optional[Dict[str, str]] = None) -> NeonEnv:
|
||||
env = self.init_configs()
|
||||
self.start()
|
||||
|
||||
# Prepare the default branch to start the postgres on later.
|
||||
@@ -559,7 +546,7 @@ class NeonEnvBuilder:
|
||||
user: RemoteStorageUser,
|
||||
bucket_name: Optional[str] = None,
|
||||
bucket_region: Optional[str] = None,
|
||||
) -> RemoteStorage:
|
||||
) -> Optional[RemoteStorage]:
|
||||
ret = kind.configure(
|
||||
self.repo_dir,
|
||||
self.mock_s3_server,
|
||||
@@ -902,8 +889,6 @@ def _shared_simple_env(
|
||||
"""
|
||||
# Internal fixture backing the `neon_simple_env` fixture. If TEST_SHARED_FIXTURES
|
||||
is set, this is shared by all tests using `neon_simple_env`.
|
||||
|
||||
This fixture will use RemoteStorageKind.LOCAL_FS with pageserver.
|
||||
"""
|
||||
|
||||
if os.environ.get("TEST_SHARED_FIXTURES") is None:
|
||||
|
||||
@@ -202,6 +202,9 @@ class RemoteStorageKind(str, enum.Enum):
|
||||
LOCAL_FS = "local_fs"
|
||||
MOCK_S3 = "mock_s3"
|
||||
REAL_S3 = "real_s3"
|
||||
# Pass to tests that are generic to remote storage
|
||||
# to ensure the test pass with or without the remote storage
|
||||
NOOP = "noop"
|
||||
|
||||
def configure(
|
||||
self,
|
||||
@@ -212,7 +215,10 @@ class RemoteStorageKind(str, enum.Enum):
|
||||
user: RemoteStorageUser,
|
||||
bucket_name: Optional[str] = None,
|
||||
bucket_region: Optional[str] = None,
|
||||
) -> RemoteStorage:
|
||||
) -> Optional[RemoteStorage]:
|
||||
if self == RemoteStorageKind.NOOP:
|
||||
return None
|
||||
|
||||
if self == RemoteStorageKind.LOCAL_FS:
|
||||
return LocalFsStorage(LocalFsStorage.component_path(repo_dir, user))
|
||||
|
||||
|
||||
@@ -4,12 +4,7 @@ from typing import List, Tuple
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
Endpoint,
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
|
||||
|
||||
@@ -31,18 +26,17 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
tenant_timelines: List[Tuple[TenantId, TimelineId, Endpoint]] = []
|
||||
|
||||
for _ in range(3):
|
||||
for _ in range(4):
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant()
|
||||
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute("CREATE TABLE t(key int primary key, value text)")
|
||||
cur.execute("INSERT INTO t SELECT generate_series(1,100), 'payload'")
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
endpoint.stop()
|
||||
tenant_timelines.append((tenant_id, timeline_id, endpoint))
|
||||
|
||||
# Stop the pageserver -- this has to be not immediate or we need to wait for uploads
|
||||
# Stop the pageserver
|
||||
env.pageserver.stop()
|
||||
|
||||
# Leave the first timeline alone, but corrupt the others in different ways
|
||||
@@ -51,21 +45,30 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
(tenant1, timeline1, pg1) = tenant_timelines[1]
|
||||
metadata_path = f"{env.pageserver.workdir}/tenants/{tenant1}/timelines/{timeline1}/metadata"
|
||||
with open(metadata_path, "w") as f:
|
||||
f.write("overwritten with garbage!")
|
||||
f = open(metadata_path, "w")
|
||||
f.write("overwritten with garbage!")
|
||||
f.close()
|
||||
log.info(f"Timeline {tenant1}/{timeline1} got its metadata spoiled")
|
||||
|
||||
(tenant2, timeline2, pg2) = tenant_timelines[2]
|
||||
timeline_path = f"{env.pageserver.workdir}/tenants/{tenant2}/timelines/{timeline2}/"
|
||||
for filename in os.listdir(timeline_path):
|
||||
if filename.startswith("00000"):
|
||||
# Looks like a layer file. Remove it
|
||||
os.remove(f"{timeline_path}/{filename}")
|
||||
log.info(
|
||||
f"Timeline {tenant2}/{timeline2} got its layer files removed (no remote storage enabled)"
|
||||
)
|
||||
|
||||
(tenant3, timeline3, pg3) = tenant_timelines[3]
|
||||
timeline_path = f"{env.pageserver.workdir}/tenants/{tenant3}/timelines/{timeline3}/"
|
||||
for filename in os.listdir(timeline_path):
|
||||
if filename.startswith("00000"):
|
||||
# Looks like a layer file. Corrupt it
|
||||
p = f"{timeline_path}/{filename}"
|
||||
size = os.path.getsize(p)
|
||||
with open(p, "wb") as f:
|
||||
f.truncate(0)
|
||||
f.truncate(size)
|
||||
log.info(f"Timeline {tenant2}/{timeline2} got its local layer files spoiled")
|
||||
f = open(f"{timeline_path}/{filename}", "w")
|
||||
f.write("overwritten with garbage!")
|
||||
f.close()
|
||||
log.info(f"Timeline {tenant3}/{timeline3} got its layer files spoiled")
|
||||
|
||||
env.pageserver.start()
|
||||
|
||||
@@ -84,13 +87,22 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
f"As expected, compute startup failed eagerly for timeline with corrupt metadata: {err}"
|
||||
)
|
||||
|
||||
# Second timeline will fail during basebackup, because the local layer file is corrupt.
|
||||
# Second timeline has no ancestors, only the metadata file and no layer files locally,
|
||||
# and we don't have the remote storage enabled. It is loaded into memory, but getting
|
||||
# the basebackup from it will fail.
|
||||
with pytest.raises(
|
||||
Exception, match=f"Tenant {tenant2} will not become active. Current state: Broken"
|
||||
) as err:
|
||||
pg2.start()
|
||||
log.info(f"As expected, compute startup failed for timeline with missing layers: {err}")
|
||||
|
||||
# Third timeline will also fail during basebackup, because the layer file is corrupt.
|
||||
# It will fail when we try to read (and reconstruct) a page from it, ergo the error message.
|
||||
# (We don't check layer file contents on startup, when loading the timeline)
|
||||
with pytest.raises(Exception, match="Failed to load delta layer") as err:
|
||||
pg2.start()
|
||||
pg3.start()
|
||||
log.info(
|
||||
f"As expected, compute startup failed for timeline {tenant2}/{timeline2} with corrupt layers: {err}"
|
||||
f"As expected, compute startup failed for timeline {tenant3}/{timeline3} with corrupt layers: {err}"
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -74,13 +74,11 @@ class EvictionEnv:
|
||||
pgbench_init_lsns: Dict[TenantId, Lsn]
|
||||
|
||||
def timelines_du(self) -> Tuple[int, int, int]:
|
||||
return poor_mans_du(
|
||||
self.neon_env, [(tid, tlid) for tid, tlid in self.timelines], verbose=False
|
||||
)
|
||||
return poor_mans_du(self.neon_env, [(tid, tlid) for tid, tlid in self.timelines])
|
||||
|
||||
def du_by_timeline(self) -> Dict[Tuple[TenantId, TimelineId], int]:
|
||||
return {
|
||||
(tid, tlid): poor_mans_du(self.neon_env, [(tid, tlid)], verbose=True)[0]
|
||||
(tid, tlid): poor_mans_du(self.neon_env, [(tid, tlid)])[0]
|
||||
for tid, tlid in self.timelines
|
||||
}
|
||||
|
||||
@@ -91,21 +89,7 @@ class EvictionEnv:
|
||||
"""
|
||||
lsn = self.pgbench_init_lsns[tenant_id]
|
||||
with self.neon_env.endpoints.create_start("main", tenant_id=tenant_id, lsn=lsn) as endpoint:
|
||||
# instead of using pgbench --select-only which does point selects,
|
||||
# run full table scans for all tables
|
||||
with endpoint.connect() as conn:
|
||||
cur = conn.cursor()
|
||||
|
||||
tables_cols = {
|
||||
"pgbench_accounts": "abalance",
|
||||
"pgbench_tellers": "tbalance",
|
||||
"pgbench_branches": "bbalance",
|
||||
"pgbench_history": "delta",
|
||||
}
|
||||
|
||||
for table, column in tables_cols.items():
|
||||
cur.execute(f"select avg({column}) from {table}")
|
||||
_avg = cur.fetchone()
|
||||
self.pg_bin.run(["pgbench", "-S", endpoint.connstr()])
|
||||
|
||||
def pageserver_start_with_disk_usage_eviction(
|
||||
self, period, max_usage_pct, min_avail_bytes, mock_behavior
|
||||
@@ -143,19 +127,6 @@ class EvictionEnv:
|
||||
self.neon_env.pageserver.allowed_errors.append(".*WARN.* disk usage still high.*")
|
||||
|
||||
|
||||
def human_bytes(amt: float) -> str:
|
||||
suffixes = ["", "Ki", "Mi", "Gi"]
|
||||
|
||||
last = suffixes[-1]
|
||||
|
||||
for name in suffixes:
|
||||
if amt < 1024 or name == last:
|
||||
return f"{int(round(amt))} {name}B"
|
||||
amt = amt / 1024
|
||||
|
||||
raise RuntimeError("unreachable")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> EvictionEnv:
|
||||
"""
|
||||
@@ -244,12 +215,8 @@ def test_broken_tenants_are_skipped(eviction_env: EvictionEnv):
|
||||
|
||||
healthy_tenant_id, healthy_timeline_id = env.timelines[1]
|
||||
|
||||
broken_size_pre, _, _ = poor_mans_du(
|
||||
env.neon_env, [(broken_tenant_id, broken_timeline_id)], verbose=True
|
||||
)
|
||||
healthy_size_pre, _, _ = poor_mans_du(
|
||||
env.neon_env, [(healthy_tenant_id, healthy_timeline_id)], verbose=True
|
||||
)
|
||||
broken_size_pre, _, _ = poor_mans_du(env.neon_env, [(broken_tenant_id, broken_timeline_id)])
|
||||
healthy_size_pre, _, _ = poor_mans_du(env.neon_env, [(healthy_tenant_id, healthy_timeline_id)])
|
||||
|
||||
# try to evict everything, then validate that broken tenant wasn't touched
|
||||
target = broken_size_pre + healthy_size_pre
|
||||
@@ -257,12 +224,8 @@ def test_broken_tenants_are_skipped(eviction_env: EvictionEnv):
|
||||
response = env.pageserver_http.disk_usage_eviction_run({"evict_bytes": target})
|
||||
log.info(f"{response}")
|
||||
|
||||
broken_size_post, _, _ = poor_mans_du(
|
||||
env.neon_env, [(broken_tenant_id, broken_timeline_id)], verbose=True
|
||||
)
|
||||
healthy_size_post, _, _ = poor_mans_du(
|
||||
env.neon_env, [(healthy_tenant_id, healthy_timeline_id)], verbose=True
|
||||
)
|
||||
broken_size_post, _, _ = poor_mans_du(env.neon_env, [(broken_tenant_id, broken_timeline_id)])
|
||||
healthy_size_post, _, _ = poor_mans_du(env.neon_env, [(healthy_tenant_id, healthy_timeline_id)])
|
||||
|
||||
assert broken_size_pre == broken_size_post, "broken tenant should not be touched"
|
||||
assert healthy_size_post < healthy_size_pre
|
||||
@@ -403,16 +366,18 @@ def test_partial_evict_tenant(eviction_env: EvictionEnv):
|
||||
du_by_timeline = env.du_by_timeline()
|
||||
|
||||
# pick any tenant
|
||||
[warm, cold] = list(du_by_timeline.keys())
|
||||
(tenant_id, timeline_id) = warm
|
||||
[our_tenant, other_tenant] = list(du_by_timeline.keys())
|
||||
(tenant_id, timeline_id) = our_tenant
|
||||
|
||||
# make picked tenant more recently used than the other one
|
||||
# make our tenant more recently used than the other one
|
||||
env.warm_up_tenant(tenant_id)
|
||||
|
||||
# Build up enough pressure to require evictions from both tenants,
|
||||
# but not enough to fall into global LRU.
|
||||
# So, set target to all occupied space, except 2*env.layer_size per tenant
|
||||
target = du_by_timeline[cold] + (du_by_timeline[warm] // 2) - 2 * 2 * env.layer_size
|
||||
# So, set target to all occipied space, except 2*env.layer_size per tenant
|
||||
target = (
|
||||
du_by_timeline[other_tenant] + (du_by_timeline[our_tenant] // 2) - 2 * 2 * env.layer_size
|
||||
)
|
||||
response = ps_http.disk_usage_eviction_run({"evict_bytes": target})
|
||||
log.info(f"{response}")
|
||||
|
||||
@@ -427,33 +392,22 @@ def test_partial_evict_tenant(eviction_env: EvictionEnv):
|
||||
later_tenant_usage < du_by_timeline[tenant]
|
||||
), "all tenants should have lost some layers"
|
||||
|
||||
warm_size = later_du_by_timeline[warm]
|
||||
|
||||
# bounds for warmed_size
|
||||
warm_lower = 0.5 * du_by_timeline[warm]
|
||||
|
||||
# We don't know exactly whether the cold tenant needs 2 or just 1 env.layer_size wiggle room.
|
||||
# So, check for up to 3 here.
|
||||
warm_upper = warm_lower + 3 * env.layer_size
|
||||
|
||||
cold_size = later_du_by_timeline[cold]
|
||||
cold_upper = 2 * env.layer_size
|
||||
|
||||
log.info(
|
||||
f"expecting for warm tenant: {human_bytes(warm_lower)} < {human_bytes(warm_size)} < {human_bytes(warm_upper)}"
|
||||
)
|
||||
log.info(f"expecting for cold tenant: {human_bytes(cold_size)} < {human_bytes(cold_upper)}")
|
||||
|
||||
assert warm_size > warm_lower, "warmed up tenant should be at about half size (lower)"
|
||||
assert warm_size < warm_upper, "warmed up tenant should be at about half size (upper)"
|
||||
|
||||
assert (
|
||||
cold_size < cold_upper
|
||||
), "the cold tenant should be evicted to its min_resident_size, i.e., max layer file size"
|
||||
later_du_by_timeline[our_tenant] > 0.5 * du_by_timeline[our_tenant]
|
||||
), "our warmed up tenant should be at about half capacity, part 1"
|
||||
assert (
|
||||
# We don't know exactly whether the cold tenant needs 2 or just 1 env.layer_size wiggle room.
|
||||
# So, check for up to 3 here.
|
||||
later_du_by_timeline[our_tenant]
|
||||
< 0.5 * du_by_timeline[our_tenant] + 3 * env.layer_size
|
||||
), "our warmed up tenant should be at about half capacity, part 2"
|
||||
assert (
|
||||
later_du_by_timeline[other_tenant] < 2 * env.layer_size
|
||||
), "the other tenant should be evicted to is min_resident_size, i.e., max layer file size"
|
||||
|
||||
|
||||
def poor_mans_du(
|
||||
env: NeonEnv, timelines: list[Tuple[TenantId, TimelineId]], verbose: bool = False
|
||||
env: NeonEnv, timelines: list[Tuple[TenantId, TimelineId]]
|
||||
) -> Tuple[int, int, int]:
|
||||
"""
|
||||
Disk usage, largest, smallest layer for layer files over the given (tenant, timeline) tuples;
|
||||
@@ -476,11 +430,9 @@ def poor_mans_du(
|
||||
smallest_layer = min(smallest_layer, size)
|
||||
else:
|
||||
smallest_layer = size
|
||||
if verbose:
|
||||
log.info(f"{tenant_id}/{timeline_id} => {file.name} {size} ({human_bytes(size)})")
|
||||
log.info(f"{tenant_id}/{timeline_id} => {file.name} {size}")
|
||||
|
||||
if verbose:
|
||||
log.info(f"{tenant_id}/{timeline_id}: sum {total} ({human_bytes(total)})")
|
||||
log.info(f"{tenant_id}/{timeline_id}: sum {total}")
|
||||
total_on_disk += total
|
||||
|
||||
assert smallest_layer is not None or total_on_disk == 0 and largest_layer == 0
|
||||
|
||||
@@ -5,6 +5,7 @@ from pathlib import Path
|
||||
from queue import SimpleQueue
|
||||
from typing import Any, Dict, Set
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
@@ -16,13 +17,15 @@ from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
# TODO: collect all of the env setup *AFTER* removal of RemoteStorageKind.NOOP
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"remote_storage_kind", [RemoteStorageKind.NOOP, RemoteStorageKind.LOCAL_FS]
|
||||
)
|
||||
def test_metric_collection(
|
||||
httpserver: HTTPServer,
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
httpserver_listen_address,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
):
|
||||
(host, port) = httpserver_listen_address
|
||||
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
|
||||
@@ -52,7 +55,7 @@ def test_metric_collection(
|
||||
synthetic_size_calculation_interval="3s"
|
||||
"""
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
|
||||
log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}")
|
||||
|
||||
@@ -65,14 +68,6 @@ def test_metric_collection(
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
|
||||
# httpserver is shut down before pageserver during passing run
|
||||
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
|
||||
# we have a fast rate of calculation, these can happen at shutdown
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*synthetic_size_worker:calculate_synthetic_size.*:gather_size_inputs.*: failed to calculate logical size at .*: cancelled.*"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*synthetic_size_worker: failed to calculate synthetic size for tenant .*: failed to calculate some logical_sizes"
|
||||
)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
@@ -103,14 +98,17 @@ def test_metric_collection(
|
||||
total += sample[2]
|
||||
return int(total)
|
||||
|
||||
# upload some data to remote storage
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
pageserver_http.timeline_gc(tenant_id, timeline_id, 10000)
|
||||
remote_uploaded = 0
|
||||
|
||||
remote_uploaded = get_num_remote_ops("index", "upload")
|
||||
assert remote_uploaded > 0
|
||||
# upload some data to remote storage
|
||||
if remote_storage_kind == RemoteStorageKind.LOCAL_FS:
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
pageserver_http.timeline_gc(tenant_id, timeline_id, 10000)
|
||||
|
||||
remote_uploaded = get_num_remote_ops("index", "upload")
|
||||
assert remote_uploaded > 0
|
||||
|
||||
# we expect uploads at 1Hz, on busy runners this could be too optimistic,
|
||||
# so give 5s we only want to get the following upload after "ready" value.
|
||||
@@ -213,14 +211,6 @@ def test_metric_collection_cleans_up_tempfile(
|
||||
|
||||
# httpserver is shut down before pageserver during passing run
|
||||
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
|
||||
# we have a fast rate of calculation, these can happen at shutdown
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*synthetic_size_worker:calculate_synthetic_size.*:gather_size_inputs.*: failed to calculate logical size at .*: cancelled.*"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*synthetic_size_worker: failed to calculate synthetic size for tenant .*: failed to calculate some logical_sizes"
|
||||
)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
|
||||
@@ -30,7 +30,9 @@ from fixtures.types import TenantId
|
||||
from fixtures.utils import run_pg_bench_small
|
||||
|
||||
|
||||
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
|
||||
@pytest.mark.parametrize(
|
||||
"remote_storage_kind", [RemoteStorageKind.NOOP, *available_remote_storages()]
|
||||
)
|
||||
def test_tenant_delete_smoke(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
@@ -142,12 +144,18 @@ FAILPOINTS_BEFORE_BACKGROUND = [
|
||||
def combinations():
|
||||
result = []
|
||||
|
||||
remotes = [RemoteStorageKind.MOCK_S3]
|
||||
remotes = [RemoteStorageKind.NOOP, RemoteStorageKind.MOCK_S3]
|
||||
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE"):
|
||||
remotes.append(RemoteStorageKind.REAL_S3)
|
||||
|
||||
for remote_storage_kind in remotes:
|
||||
for delete_failpoint in FAILPOINTS:
|
||||
if remote_storage_kind is RemoteStorageKind.NOOP and delete_failpoint in (
|
||||
"timeline-delete-before-index-delete",
|
||||
):
|
||||
# the above failpoint are not relevant for config without remote storage
|
||||
continue
|
||||
|
||||
# Simulate failures for only one type of remote storage
|
||||
# to avoid log pollution and make tests run faster
|
||||
if remote_storage_kind is RemoteStorageKind.MOCK_S3:
|
||||
@@ -207,18 +215,21 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
|
||||
with env.endpoints.create_start("delete", tenant_id=tenant_id) as endpoint:
|
||||
# generate enough layers
|
||||
run_pg_bench_small(pg_bin, endpoint.connstr())
|
||||
last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
|
||||
if remote_storage_kind is RemoteStorageKind.NOOP:
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
else:
|
||||
last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
if remote_storage_kind in available_s3_storages():
|
||||
assert_prefix_not_empty(
|
||||
neon_env_builder,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
str(tenant_id),
|
||||
)
|
||||
),
|
||||
)
|
||||
if remote_storage_kind in available_s3_storages():
|
||||
assert_prefix_not_empty(
|
||||
neon_env_builder,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
str(tenant_id),
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
ps_http.configure_failpoints((failpoint, "return"))
|
||||
|
||||
@@ -249,7 +260,12 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
if failpoint in (
|
||||
if (
|
||||
remote_storage_kind is RemoteStorageKind.NOOP
|
||||
and failpoint == "tenant-delete-before-create-local-mark"
|
||||
):
|
||||
tenant_delete_wait_completed(ps_http, tenant_id, iterations=iterations)
|
||||
elif failpoint in (
|
||||
"tenant-delete-before-shutdown",
|
||||
"tenant-delete-before-create-remote-mark",
|
||||
):
|
||||
|
||||
@@ -519,8 +519,11 @@ def test_detach_while_attaching(
|
||||
# * restart the pageserver and verify that ignored tenant is still not loaded
|
||||
# * `load` the same tenant
|
||||
# * ensure that it's status is `Active` and it's present in pageserver's memory with all timelines
|
||||
def test_ignored_tenant_reattach(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3)
|
||||
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.NOOP, RemoteStorageKind.MOCK_S3])
|
||||
def test_ignored_tenant_reattach(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
|
||||
):
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ from fixtures.log_helper import log
|
||||
from fixtures.metrics import (
|
||||
PAGESERVER_GLOBAL_METRICS,
|
||||
PAGESERVER_PER_TENANT_METRICS,
|
||||
PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
|
||||
parse_metrics,
|
||||
)
|
||||
from fixtures.neon_fixtures import (
|
||||
@@ -231,10 +232,17 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder):
|
||||
assert value
|
||||
|
||||
|
||||
def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilder):
|
||||
@pytest.mark.parametrize(
|
||||
"remote_storage_kind",
|
||||
# exercise both the code paths where remote_storage=None and remote_storage=Some(...)
|
||||
[RemoteStorageKind.NOOP, RemoteStorageKind.MOCK_S3],
|
||||
)
|
||||
def test_pageserver_metrics_removed_after_detach(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
|
||||
):
|
||||
"""Tests that when a tenant is detached, the tenant specific metrics are not left behind"""
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3)
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
|
||||
@@ -274,6 +282,9 @@ def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilde
|
||||
for tenant in [tenant_1, tenant_2]:
|
||||
pre_detach_samples = set([x.name for x in get_ps_metric_samples_for_tenant(tenant)])
|
||||
expected = set(PAGESERVER_PER_TENANT_METRICS)
|
||||
if remote_storage_kind == RemoteStorageKind.NOOP:
|
||||
# if there's no remote storage configured, we don't expose the remote timeline client metrics
|
||||
expected -= set(PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS)
|
||||
assert pre_detach_samples == expected
|
||||
|
||||
env.pageserver.http_client().tenant_detach(tenant)
|
||||
@@ -283,7 +294,9 @@ def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilde
|
||||
|
||||
|
||||
# Check that empty tenants work with or without the remote storage
|
||||
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
|
||||
@pytest.mark.parametrize(
|
||||
"remote_storage_kind", available_remote_storages() + [RemoteStorageKind.NOOP]
|
||||
)
|
||||
def test_pageserver_with_empty_tenants(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
|
||||
):
|
||||
|
||||
@@ -12,6 +12,7 @@ from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
last_flush_lsn_upload,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.pageserver.utils import (
|
||||
@@ -144,12 +145,19 @@ DELETE_FAILPOINTS = [
|
||||
def combinations():
|
||||
result = []
|
||||
|
||||
remotes = [RemoteStorageKind.MOCK_S3]
|
||||
remotes = [RemoteStorageKind.NOOP, RemoteStorageKind.MOCK_S3]
|
||||
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE"):
|
||||
remotes.append(RemoteStorageKind.REAL_S3)
|
||||
|
||||
for remote_storage_kind in remotes:
|
||||
for delete_failpoint in DELETE_FAILPOINTS:
|
||||
if remote_storage_kind == RemoteStorageKind.NOOP and delete_failpoint in (
|
||||
"timeline-delete-before-index-delete",
|
||||
"timeline-delete-after-index-delete",
|
||||
):
|
||||
# the above failpoints are not relevant for config without remote storage
|
||||
continue
|
||||
|
||||
result.append((remote_storage_kind, delete_failpoint))
|
||||
return result
|
||||
|
||||
@@ -197,21 +205,23 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
|
||||
with env.endpoints.create_start("delete") as endpoint:
|
||||
# generate enough layers
|
||||
run_pg_bench_small(pg_bin, endpoint.connstr())
|
||||
if remote_storage_kind is RemoteStorageKind.NOOP:
|
||||
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, timeline_id)
|
||||
else:
|
||||
last_flush_lsn_upload(env, endpoint, env.initial_tenant, timeline_id)
|
||||
|
||||
last_flush_lsn_upload(env, endpoint, env.initial_tenant, timeline_id)
|
||||
|
||||
if remote_storage_kind in available_s3_storages():
|
||||
assert_prefix_not_empty(
|
||||
neon_env_builder,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
str(env.initial_tenant),
|
||||
"timelines",
|
||||
str(timeline_id),
|
||||
)
|
||||
),
|
||||
)
|
||||
if remote_storage_kind in available_s3_storages():
|
||||
assert_prefix_not_empty(
|
||||
neon_env_builder,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
str(env.initial_tenant),
|
||||
"timelines",
|
||||
str(timeline_id),
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
env.pageserver.allowed_errors.append(f".*{timeline_id}.*failpoint: {failpoint}")
|
||||
# It appears when we stopped flush loop during deletion and then pageserver is stopped
|
||||
|
||||
@@ -301,8 +301,12 @@ def test_timeline_initial_logical_size_calculation_cancellation(
|
||||
# message emitted by the code behind failpoint "timeline-calculate-logical-size-check-dir-exists"
|
||||
|
||||
|
||||
def test_timeline_physical_size_init(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
@pytest.mark.parametrize("remote_storage_kind", [None, RemoteStorageKind.LOCAL_FS])
|
||||
def test_timeline_physical_size_init(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: Optional[RemoteStorageKind]
|
||||
):
|
||||
if remote_storage_kind is not None:
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
@@ -333,12 +337,17 @@ def test_timeline_physical_size_init(neon_env_builder: NeonEnvBuilder):
|
||||
)
|
||||
|
||||
assert_physical_size_invariants(
|
||||
get_physical_size_values(env, env.initial_tenant, new_timeline_id),
|
||||
get_physical_size_values(env, env.initial_tenant, new_timeline_id, remote_storage_kind),
|
||||
remote_storage_kind,
|
||||
)
|
||||
|
||||
|
||||
def test_timeline_physical_size_post_checkpoint(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
@pytest.mark.parametrize("remote_storage_kind", [None, RemoteStorageKind.LOCAL_FS])
|
||||
def test_timeline_physical_size_post_checkpoint(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: Optional[RemoteStorageKind]
|
||||
):
|
||||
if remote_storage_kind is not None:
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
@@ -360,14 +369,19 @@ def test_timeline_physical_size_post_checkpoint(neon_env_builder: NeonEnvBuilder
|
||||
|
||||
def check():
|
||||
assert_physical_size_invariants(
|
||||
get_physical_size_values(env, env.initial_tenant, new_timeline_id),
|
||||
get_physical_size_values(env, env.initial_tenant, new_timeline_id, remote_storage_kind),
|
||||
remote_storage_kind,
|
||||
)
|
||||
|
||||
wait_until(10, 1, check)
|
||||
|
||||
|
||||
def test_timeline_physical_size_post_compaction(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
@pytest.mark.parametrize("remote_storage_kind", [None, RemoteStorageKind.LOCAL_FS])
|
||||
def test_timeline_physical_size_post_compaction(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: Optional[RemoteStorageKind]
|
||||
):
|
||||
if remote_storage_kind is not None:
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
|
||||
# Disable background compaction as we don't want it to happen after `get_physical_size` request
|
||||
# and before checking the expected size on disk, which makes the assertion failed
|
||||
@@ -406,15 +420,21 @@ def test_timeline_physical_size_post_compaction(neon_env_builder: NeonEnvBuilder
|
||||
pageserver_http.timeline_checkpoint(env.initial_tenant, new_timeline_id)
|
||||
pageserver_http.timeline_compact(env.initial_tenant, new_timeline_id)
|
||||
|
||||
wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, new_timeline_id)
|
||||
if remote_storage_kind is not None:
|
||||
wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, new_timeline_id)
|
||||
|
||||
assert_physical_size_invariants(
|
||||
get_physical_size_values(env, env.initial_tenant, new_timeline_id),
|
||||
get_physical_size_values(env, env.initial_tenant, new_timeline_id, remote_storage_kind),
|
||||
remote_storage_kind,
|
||||
)
|
||||
|
||||
|
||||
def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
@pytest.mark.parametrize("remote_storage_kind", [None, RemoteStorageKind.LOCAL_FS])
|
||||
def test_timeline_physical_size_post_gc(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: Optional[RemoteStorageKind]
|
||||
):
|
||||
if remote_storage_kind is not None:
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
|
||||
# Disable background compaction and GC as we don't want it to happen after `get_physical_size` request
|
||||
# and before checking the expected size on disk, which makes the assertion failed
|
||||
@@ -451,10 +471,12 @@ def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder):
|
||||
pageserver_http.timeline_checkpoint(env.initial_tenant, new_timeline_id)
|
||||
pageserver_http.timeline_gc(env.initial_tenant, new_timeline_id, gc_horizon=None)
|
||||
|
||||
wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, new_timeline_id)
|
||||
if remote_storage_kind is not None:
|
||||
wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, new_timeline_id)
|
||||
|
||||
assert_physical_size_invariants(
|
||||
get_physical_size_values(env, env.initial_tenant, new_timeline_id),
|
||||
get_physical_size_values(env, env.initial_tenant, new_timeline_id, remote_storage_kind),
|
||||
remote_storage_kind,
|
||||
)
|
||||
|
||||
|
||||
@@ -538,10 +560,14 @@ def test_timeline_size_metrics(
|
||||
assert math.isclose(dbsize_sum, tl_logical_size_metric, abs_tol=2 * 1024 * 1024)
|
||||
|
||||
|
||||
def test_tenant_physical_size(neon_env_builder: NeonEnvBuilder):
|
||||
@pytest.mark.parametrize("remote_storage_kind", [None, RemoteStorageKind.LOCAL_FS])
|
||||
def test_tenant_physical_size(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: Optional[RemoteStorageKind]
|
||||
):
|
||||
random.seed(100)
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
if remote_storage_kind is not None:
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
@@ -549,10 +575,12 @@ def test_tenant_physical_size(neon_env_builder: NeonEnvBuilder):
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
tenant, timeline = env.neon_cli.create_tenant()
|
||||
if remote_storage_kind is not None:
|
||||
wait_for_upload_queue_empty(pageserver_http, tenant, timeline)
|
||||
|
||||
def get_timeline_resident_physical_size(timeline: TimelineId):
|
||||
sizes = get_physical_size_values(env, tenant, timeline)
|
||||
assert_physical_size_invariants(sizes)
|
||||
sizes = get_physical_size_values(env, tenant, timeline, remote_storage_kind)
|
||||
assert_physical_size_invariants(sizes, remote_storage_kind)
|
||||
return sizes.prometheus_resident_physical
|
||||
|
||||
timeline_total_resident_physical_size = get_timeline_resident_physical_size(timeline)
|
||||
@@ -572,7 +600,8 @@ def test_tenant_physical_size(neon_env_builder: NeonEnvBuilder):
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant, timeline)
|
||||
pageserver_http.timeline_checkpoint(tenant, timeline)
|
||||
|
||||
wait_for_upload_queue_empty(pageserver_http, tenant, timeline)
|
||||
if remote_storage_kind is not None:
|
||||
wait_for_upload_queue_empty(pageserver_http, tenant, timeline)
|
||||
|
||||
timeline_total_resident_physical_size += get_timeline_resident_physical_size(timeline)
|
||||
|
||||
@@ -601,6 +630,7 @@ def get_physical_size_values(
|
||||
env: NeonEnv,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
remote_storage_kind: Optional[RemoteStorageKind],
|
||||
) -> TimelinePhysicalSizeValues:
|
||||
res = TimelinePhysicalSizeValues()
|
||||
|
||||
@@ -616,9 +646,12 @@ def get_physical_size_values(
|
||||
res.prometheus_resident_physical = metrics.query_one(
|
||||
"pageserver_resident_physical_size", metrics_filter
|
||||
).value
|
||||
res.prometheus_remote_physical = metrics.query_one(
|
||||
"pageserver_remote_physical_size", metrics_filter
|
||||
).value
|
||||
if remote_storage_kind is not None:
|
||||
res.prometheus_remote_physical = metrics.query_one(
|
||||
"pageserver_remote_physical_size", metrics_filter
|
||||
).value
|
||||
else:
|
||||
res.prometheus_remote_physical = None
|
||||
|
||||
detail = client.timeline_detail(
|
||||
tenant_id, timeline_id, include_timeline_dir_layer_file_size_sum=True
|
||||
@@ -631,15 +664,20 @@ def get_physical_size_values(
|
||||
return res
|
||||
|
||||
|
||||
def assert_physical_size_invariants(sizes: TimelinePhysicalSizeValues):
|
||||
def assert_physical_size_invariants(
|
||||
sizes: TimelinePhysicalSizeValues, remote_storage_kind: Optional[RemoteStorageKind]
|
||||
):
|
||||
# resident phyiscal size is defined as
|
||||
assert sizes.python_timelinedir_layerfiles_physical == sizes.prometheus_resident_physical
|
||||
assert sizes.python_timelinedir_layerfiles_physical == sizes.layer_map_file_size_sum
|
||||
|
||||
# we don't do layer eviction, so, all layers are resident
|
||||
assert sizes.api_current_physical == sizes.prometheus_resident_physical
|
||||
assert sizes.prometheus_resident_physical == sizes.prometheus_remote_physical
|
||||
# XXX would be nice to assert layer file physical storage utilization here as well, but we can only do that for LocalFS
|
||||
if remote_storage_kind is not None:
|
||||
assert sizes.prometheus_resident_physical == sizes.prometheus_remote_physical
|
||||
# XXX would be nice to assert layer file physical storage utilization here as well, but we can only do that for LocalFS
|
||||
else:
|
||||
assert sizes.prometheus_remote_physical is None
|
||||
|
||||
|
||||
# Timeline logical size initialization is an asynchronous background task that runs once,
|
||||
|
||||
Reference in New Issue
Block a user